您的位置:首页 > 移动开发 > Android开发

android voip通话音频数据流传输过程解析

2017-03-30 12:17 417 查看
我在前面几篇博客中已经描述了sip通话的建立过程,以及基本代码实现流程,但是我们真正的目的是基于sip协议进行语音通话,本文分析这些音频流是如何建立和传输的.

与音频流传输相关的两个java类为AudioStream和AudioGroup,我们先看AudioStream类,它继承自RtpStream类,表示基于RTP协议传输音频流,在类中定义了一个静态代码段,

static {

        System.loadLibrary("rtp_jni");

    }

当创建该java类对象时,虚拟机首先会检测该类的字节码是否已加载到虚拟机,如果没有,则需要将其加载进来,加载的时候会执行类的静态代码段,System.loadLibrary()方法在加载动态库时候,会调用到JNI_OnLoad()方法,这个实现是在虚拟机里面做的,感兴趣的可以查看dalvik或art的代码实现,

这个库的代码实现在android工程的frameworks/opt/net/voip/src/jni/rtp目录下.

在RtpStream的构造函数中,

RtpStream(InetAddress address) throws SocketException {

        mLocalPort = create(address.getHostAddress());

        mLocalAddress = address;

    }

create是一个native方法,我们直接看它的jni层实现,实现代码在RtpStream.cpp中,

jint create(JNIEnv *env, jobject thiz, jstring jAddress)

{

    env->SetIntField(thiz, gSocket, -1);

    sockaddr_storage ss;

    if (parse(env, jAddress, 0, &ss) < 0) {

        // Exception already thrown.

        return -1;

    }

    int socket = ::socket(ss.ss_family, SOCK_DGRAM, 0);

    socklen_t len = sizeof(ss);

    if (socket == -1 || bind(socket, (sockaddr *)&ss, sizeof(ss)) != 0 ||

        getsockname(socket, (sockaddr *)&ss, &len) != 0) {

        jniThrowException(env, "java/net/SocketException", strerror(errno));

        ::close(socket);

        return -1;

    }

    uint16_t *p = (ss.ss_family == AF_INET) ?

        &((sockaddr_in *)&ss)->sin_port : &((sockaddr_in6 *)&ss)->sin6_port;

    uint16_t port = ntohs(*p);

    if ((port & 1) == 0) {

        env->SetIntField(thiz, gSocket, socket);

        return port;

    }

    ::close(socket);

    socket = ::socket(ss.ss_family, SOCK_DGRAM, 0);

    if (socket != -1) {

        uint16_t delta = port << 1;

        ++port;

        for (int i = 0; i < 1000; ++i) {

            do {

                port += delta;

            } while (port < 1024);

            *p = htons(port);

            if (bind(socket, (sockaddr *)&ss, sizeof(ss))== 0) {

                env->SetIntField(thiz, gSocket, socket);

                return port;

            }

        }

    }

    jniThrowException(env, "java/net/SocketException", strerror(errno));

    ::close(socket);

    return -1;

}

上面的create()方法参数jAddress是java层RtpStream传递过来,表示一个本地ip地址,create()就是创建一个基于udp协议的socket,并绑定到一个可用的端口上,创建的socket描述符会保存到java层RtpStream类的mSocket成员中.

我在前面博客点击打开链接讲述过拨打网络电话需要调用到SipAudioCall的makeCall方法,该方法会创建一个AudioStream对象.其实与makeCall对应的另一端在接听电话时需要调用answerCall(),这个方法里面也会创建一个AudioStream对象,这样用两个socket就可以进行全双工通信了.

public void makeCall(SipProfile peerProfile, SipSession sipSession,

            int timeout) throws SipException {

        if (DBG) log("makeCall: " + peerProfile + " session=" + sipSession + " timeout=" + timeout);

        if (!SipManager.isVoipSupported(mContext)) {

            throw new SipException("VOIP API is not supported");

        }

        synchronized (this) {

            mSipSession = sipSession;

            try {

                mAudioStream = new AudioStream(InetAddress.getByName(

                        getLocalIp()));

                sipSession.setListener(createListener());

                sipSession.makeCall(peerProfile, createOffer().encode(),

                        timeout);

            } catch (IOException e) {

                loge("makeCall:", e);

                throw new SipException("makeCall()", e);

            }

        }

   

另外,当sip会话建立后,会触发SipAudioCall.Listener的onCallEstablished方法,表示会话建立完成,可以开始通话了,这个方法中调用SipAudioCall的startAudio();开始音频传输.

public void startAudio() {

        try {

            startAudioInternal();

        } catch (UnknownHostException e) {

            onError(SipErrorCode.PEER_NOT_REACHABLE, e.getMessage());

        } catch (Throwable e) {

            onError(SipErrorCode.CLIENT_ERROR, e.getMessage());

        }

    }

private synchronized void startAudioInternal() throws UnknownHostException {

        if (DBG) loge("startAudioInternal: mPeerSd=" + mPeerSd);

        if (mPeerSd == null) {

            throw new IllegalStateException("mPeerSd = null");

        }

        stopCall(DONT_RELEASE_SOCKET);

        mInCall = true;

        // Run exact the same logic in createAnswer() to setup mAudioStream.

        SimpleSessionDescription offer =

                new SimpleSessionDescription(mPeerSd);

        AudioStream stream = mAudioStream;

        AudioCodec codec = null;

        for (Media media : offer.getMedia()) {

            if ((codec == null) && (media.getPort() > 0)

                    && "audio".equals(media.getType())

                    && "RTP/AVP".equals(media.getProtocol())) {

                // Find the first audio codec we supported.

                for (int type : media.getRtpPayloadTypes()) {

                    codec = AudioCodec.getCodec(

                            type, media.getRtpmap(type), media.getFmtp(type));

                    if (codec != null) {

                        break;

                    }

                }

                if (codec != null) {

                    // Associate with the remote host.

                    String address = media.getAddress();

                    if (address == null) {

                        address = offer.getAddress();

                    }

                    stream.associate(InetAddress.getByName(address),

                            media.getPort());

                    stream.setDtmfType(-1);

                    stream.setCodec(codec);

                    // Check if DTMF is supported in the same media.

                    for (int type : media.getRtpPayloadTypes()) {

                        String rtpmap = media.getRtpmap(type);

                        if ((type != codec.type) && (rtpmap != null)

                                && rtpmap.startsWith("telephone-event")) {

                            stream.setDtmfType(type);

                        }

                    }

                    // Handle recvonly and sendonly.

                    if (mHold) {

                        stream.setMode(RtpStream.MODE_NORMAL);

                    } else if (media.getAttribute("recvonly") != null) {

                        stream.setMode(RtpStream.MODE_SEND_ONLY);

                    } else if(media.getAttribute("sendonly") != null) {

                        stream.setMode(RtpStream.MODE_RECEIVE_ONLY);

                    } else if(offer.getAttribute("recvonly") != null) {

                        stream.setMode(RtpStream.MODE_SEND_ONLY);

                    } else if(offer.getAttribute("sendonly") != null) {

                        stream.setMode(RtpStream.MODE_RECEIVE_ONLY);

                    } else {

                        stream.setMode(RtpStream.MODE_NORMAL);

                    }

                    break;

                }

            }

        }

        if (codec == null) {

            throw new IllegalStateException("Reject SDP: no suitable codecs");

        }

        if (isWifiOn()) grabWifiHighPerfLock();

        // AudioGroup logic:

        AudioGroup audioGroup = getAudioGroup();

        if (mHold) {

            // don't create an AudioGroup here; doing so will fail if

            // there's another AudioGroup out there that's active

        } else {

            if (audioGroup == null) audioGroup = new AudioGroup();

            stream.join(audioGroup);

        }

        setAudioGroupMode();

    }

该方法中做了几个主要操作:

(1)调用stopCall(DONT_RELEASE_SOCKET);停止之前的音频流,但不关闭套接字,这个套接字用于传输其它音频流.

(2)SimpleSessionDescription offer =
                new SimpleSessionDescription(mPeerSd);基于参数mPeerSd创建一个SimpleSessionDescription对象,该对象是用于管理SDP(Session Description
Protocol)协议的消息内容,而消息内容就是来自于mPeerSd,它用于解析mPeerSd,而mPeerSd的值是在前面讲述SipAudioCall.Listener的onCallEstablished()方法传递过来的,它描述了一条SDP消息的所有相关内容,例如传输的音频编码类型,是否发送DTMF信号,目的主机IP地址等.

(3)for
(Media media : offer.getMedia())这个for循环就是用于构造一个AudioCodec对象的,它表示一种音频编码.

(4)AudioGroup
audioGroup = getAudioGroup();获取一个AudioGroup对象,第一次调用返回空.

(5)if
(mHold) {

            // don't create an AudioGroup here; doing so will fail if

            // there's another AudioGroup out there that's active

        } else {

            if (audioGroup == null) audioGroup = new AudioGroup();

            stream.join(audioGroup);

        }

mHold为true表示挂起电话,显然我们走else分支,它会创建一个AudioGroup对象,并调用stream.join(audioGroup);这个方法我后面重点分析.

(5)

private
void setAudioGroupMode() {

        AudioGroup audioGroup = getAudioGroup();

        if (DBG) log("setAudioGroupMode: audioGroup=" + audioGroup);

        if (audioGroup != null) {

            if (mHold) {

                audioGroup.setMode(AudioGroup.MODE_ON_HOLD);

            } else if (mMuted) {

                audioGroup.setMode(AudioGroup.MODE_MUTED);

            } else if (isSpeakerOn()) {

                audioGroup.setMode(AudioGroup.MODE_ECHO_SUPPRESSION);

            } else {

                audioGroup.setMode(AudioGroup.MODE_NORMAL);

            }

        }

    }

这里就是设置音频流组的工作模式,AudioGroup可以看作将所有AudioStream进行混合,例如mic,speaker音频混合.其中AudioGroup.MODE_MUTED表示关闭mic,

AudioGroup.MODE_ON_HOLD表示关闭mic和speaker.一般是走audioGroup.setMode(AudioGroup.MODE_NORMAL);

public
void setMode(int mode) {

        if (mode < 0 || mode > MODE_LAST) {

            throw new IllegalArgumentException("Invalid mode");

        }

        synchronized (this) {

            nativeSetMode(mode);

            mMode = mode;

        }

    }

nativeSetMode()也是一个native方法,我们在后面再进行分析.

我们再回到(4)中的stream.join(audioGroup),此处的stream就是之前创建的那个AudioStream对象,它的join方法如下:

public void join(AudioGroup group) {

        synchronized (this) {

            if (mGroup == group) {

                return;

            }

            if (mGroup != null) {

                mGroup.remove(this);

                mGroup = null;

            }

            if (group != null) {

                group.add(this);

                mGroup = group;

            }

        }

    }

此处调用了一个AudioGroup的add()方法,

    synchronized void add(AudioStream stream) {

        if (!mStreams.containsKey(stream)) {

            try {

                AudioCodec codec = stream.getCodec();

                String codecSpec = String.format(Locale.US, "%d %s %s", codec.type,

                        codec.rtpmap, codec.fmtp);

                int id = nativeAdd(stream.getMode(), stream.getSocket(),

                        stream.getRemoteAddress().getHostAddress(),

                        stream.getRemotePort(), codecSpec, stream.getDtmfType());

                mStreams.put(stream, id);

            } catch (NullPointerException e) {

                throw new IllegalStateException(e);

            }

        }

    }

    private native int nativeAdd(int mode, int socket, String remoteAddress,

            int remotePort, String codecSpec, int dtmfType);

从上面代码中可以看出最终调用了一个nativeAdd()方法,它的实现在jni层.它的实现在AudioGroup.cpp中的add()方法.

int add(JNIEnv *env, jobject thiz, jint mode,

    jint socket, jstring jRemoteAddress, jint remotePort,

    jstring jCodecSpec, jint dtmfType)

{

    AudioCodec *codec = NULL;

    AudioStream *stream = NULL;

    AudioGroup *group = NULL;

    // Sanity check.

    sockaddr_storage remote;

    if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {

        // Exception already thrown.

        return 0;

    }

    if (!jCodecSpec) {

        jniThrowNullPointerException(env, "codecSpec");

        return 0;

    }

    const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);

    if (!codecSpec) {

        // Exception already thrown.

        return 0;

    }

    socket = dup(socket);

    if (socket == -1) {

        jniThrowException(env, "java/lang/IllegalStateException",

            "cannot get stream socket");

        return 0;

    }

    // Create audio codec.

    int codecType = -1;

    char codecName[16];

    int sampleRate = -1;

    sscanf(codecSpec, "%d %15[^/]%*c%d", &codecType, codecName, &sampleRate);

    codec = newAudioCodec(codecName);

    int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);

    env->ReleaseStringUTFChars(jCodecSpec, codecSpec);

    if (sampleCount <= 0) {

        jniThrowException(env, "java/lang/IllegalStateException",

            "cannot initialize audio codec");

        goto error;

    }

    // Create audio stream.

    stream = new AudioStream;

    if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,

        codecType, dtmfType)) {

        jniThrowException(env, "java/lang/IllegalStateException",

            "cannot initialize audio stream");

        goto error;

    }

    socket = -1;

    codec = NULL;

    // Create audio group.

    group = (AudioGroup *)env->GetIntField(thiz, gNative);

    if (!group) {

        int mode = env->GetIntField(thiz, gMode);

        group = new AudioGroup;

        if (!group->set(8000, 256) || !group->setMode(mode)) {

            jniThrowException(env, "java/lang/IllegalStateException",

                "cannot initialize audio group");

            goto error;

        }

    }

    // Add audio stream into audio group.

    if (!group->add(stream)) {

        jniThrowException(env, "java/lang/IllegalStateException",

            "cannot add audio stream");

        goto error;

    }

    // Succeed.

    env->SetIntField(thiz, gNative, (int)group);

    return (int)stream;

error:

    delete group;

    delete stream;

    delete codec;

    close(socket);

    env->SetIntField(thiz, gNative, 0);

    return 0;

}

struct AudioCodecType {

    const char *name;

    AudioCodec *(*create)();

} gAudioCodecTypes[] = {

    {"PCMA", newAlawCodec},

    {"PCMU", newUlawCodec},

    {"GSM", newGsmCodec},

    {"AMR", newAmrCodec},

    {"GSM-EFR", newGsmEfrCodec},

    {NULL, NULL},

};

AudioCodec *newAudioCodec(const char *codecName)

{

    AudioCodecType *type = gAudioCodecTypes;

    while (type->name != NULL) {

        if (strcasecmp(codecName, type->name) == 0) {

            AudioCodec *codec = type->create();

            codec->name = type->name;

            return codec;

        }

        ++type;

    }

    return NULL;

}

上面方法中主要做了几个操作:调用newAudioCodec()创建一个新的AudioCodec,根据音频流类型种类创建不同的对象,此处用到了工厂方法模式.例如,假设音频类型为AMR,

则会调用newAmrCodec()创建一个AmrCodec对象.

(2)创建一个新的AudioStream对象,保存到成员mChain中,并调用其set()方法保存一些参数.

(3)实例化一个AudioGroup对象,这个只有首次才会创建,并调用AudioGroup对象的set()和setMode()方法.我们看一下这两个方法定义.

bool AudioGroup::set(int sampleRate, int sampleCount)

{

    mEventQueue = epoll_create(2);

    if (mEventQueue == -1) {

        ALOGE("epoll_create: %s", strerror(errno));

        return false;

    }

    mSampleRate = sampleRate;

    mSampleCount = sampleCount;

    // Create device socket.

    int pair[2];

    //创建一个套接字组对,它表示其中一个套接字发送数据时,另一个套接字会收到数据.且两个套接字都可以收发数据.

    if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {

        ALOGE("socketpair: %s", strerror(errno));

        return false;

    }

    mDeviceSocket = pair[0]; //保存起来,后面会使用到

    // Create device stream.

    mChain = new AudioStream;

    if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,

        sampleRate, sampleCount, -1, -1)) {

        close(pair[1]);

        ALOGE("cannot initialize device stream");

        return false;

    }

    // Give device socket a reasonable timeout.

    timeval tv;

    tv.tv_sec = 0;

    tv.tv_usec = 1000 * sampleCount / sampleRate * 500;

    if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {

        ALOGE("setsockopt: %s", strerror(errno));

        return false;

    }

    // Add device stream into event queue.

    epoll_event event;

    event.events = EPOLLIN;

    event.data.ptr = mChain;

   //然后使用epoll_ctl将其中一个套接字描述符加入到监控i/o事件集合中去

    if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {

        ALOGE("epoll_ctl: %s", strerror(errno));

        return false;

    }

    // Anything else?

    ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);

    return true;

}

注意此处的方法是在主线程中做的.

bool AudioGroup::setMode(int mode)

{

    if (mode < 0 || mode > LAST_MODE) {

        return false;

    }

    // FIXME: temporary code to overcome echo and mic gain issues on herring and tuna boards.

    // Must be modified/removed when the root cause of the issue is fixed in the hardware or

    // driver

    char value[PROPERTY_VALUE_MAX];

    property_get("ro.product.board", value, "");

    if (mode == NORMAL &&

            (!strcmp(value, "herring") || !strcmp(value, "tuna"))) {

        mode = ECHO_SUPPRESSION;

    }

//只有当模式发生了切换,或者是首次设置模式,if条件才不会满足

    if (mMode == mode) { 

        return true;

    }

//请求停止之前创建的线程

    mDeviceThread->requestExitAndWait();

    ALOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);

    mMode = mode;

//如果模式为ON_HOLD表示挂起,此时不会再创建新的子线程.

    return (mode == ON_HOLD) || mDeviceThread->start();

}

在上面方法的最后会调用mDeviceThread->start()启动一个新的DeviceThread线程.

我在前面分析过拨打电话会调用java层的AudioGroup对象的setMode()方法,这个方法通过jni调用到native层的AudioGroup对象的setMode(),这样就启动了一个子线程.这个子线程的具体工作我后面再分析.

(4)调用group->add(stream);

对应的代码如下:

bool AudioGroup::add(AudioStream *stream)

{

    mNetworkThread->requestExitAndWait(); //请求线程退出

    epoll_event event;

    event.events = EPOLLIN;

    event.data.ptr = stream;

    //将音频流的套接字描述符加入到mEventQueue的 i/o监控列表中.

    if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {

        ALOGE("epoll_ctl: %s", strerror(errno));

        return false;

    }

    //将当前stream对象插入到mChain指向的链表中.

    stream->mNext = mChain->mNext;

    mChain->mNext = stream;

    //启动mNetworkThread线程

    if (!mNetworkThread->start()) {

        // Only take over the stream when succeeded.

        mChain->mNext = stream->mNext;

        return false;

    }

    ALOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);

    return true;

}

根据上面分析,启动了两个子线程,一个是NetworkThread,一个是DeviceThread.

在分析这两个子线程的线程循环函数之前,我们先分析AudioStream的两个函数:

void AudioStream::decode(int tick)

{

    char c;

    if (mMode == SEND_ONLY) {

        recv(mSocket, &c, 1, MSG_DONTWAIT);

        return;

    }

    // Make sure mBufferHead and mBufferTail are reasonable.

    if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {

        mBufferHead = tick - HISTORY_SIZE;

        mBufferTail = mBufferHead;

    }

    if (tick - mBufferHead > HISTORY_SIZE) {

        // Throw away outdated samples.

        mBufferHead = tick - HISTORY_SIZE;

        if (mBufferTail - mBufferHead < 0) {

            mBufferTail = mBufferHead;

        }

    }

    // Adjust the jitter buffer if the latency keeps larger than the threshold

    // in the measurement period.

    int score = mBufferTail - tick - MEASURE_BASE;

    if (mLatencyScore > score || mLatencyScore <= 0) {

        mLatencyScore = score;

        mLatencyTimer = tick;

    } else if (tick - mLatencyTimer >= MEASURE_PERIOD) {

        ALOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);

        mBufferTail -= mLatencyScore;

        mLatencyScore = -1;

    }

    int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;

    if (count < mSampleCount) {

        // Buffer overflow. Drop the packet.

        ALOGV("stream[%d] buffer overflow", mSocket);

        recv(mSocket, &c, 1, MSG_DONTWAIT);

        return;

    }

    // Receive the packet and decode it.

    int16_t samples[count];

    if (!mCodec) {

        // Special case for device stream.

        count = recv(mSocket, samples, sizeof(samples),

            MSG_TRUNC | MSG_DONTWAIT) >> 1;

    } else {

        __attribute__((aligned(4))) uint8_t buffer[2048];

        sockaddr_storage remote;

        socklen_t addrlen = sizeof(remote);

//从网络接收数据,因为语音通信是全双工,另一端的音频流被编码,组成ip数据包发送到本地

        int length = recvfrom(mSocket, buffer, sizeof(buffer),

            MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);

        // Do we need to check SSRC, sequence, and timestamp? They are not

        // reliable but at least they can be used to identify duplicates?

        if (length < 12 || length > (int)sizeof(buffer) ||

            (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {

            ALOGV("stream[%d] malformed packet", mSocket);

            return;

        }

        int offset = 12 + ((buffer[0] & 0x0F) << 2);

        if ((buffer[0] & 0x10) != 0) {

            offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);

        }

        if ((buffer[0] & 0x20) != 0) {

            length -= buffer[length - 1];

        }

        length -= offset;

        if (length >= 0) {

    //调用具体的AudioCodec对象的decode()方法对收到的音频流进行解码.

            length = mCodec->decode(samples, count, &buffer[offset], length);

        }

        if (length > 0 && mFixRemote) {

            mRemote = remote;

            mFixRemote = false;

        }

        count = length;

    }

    if (count <= 0) {

        ALOGV("stream[%d] decoder error", mSocket);

        return;

    }

    if (tick - mBufferTail > 0) {

        // Buffer underrun. Reset the jitter buffer.

        ALOGV("stream[%d] buffer underrun", mSocket);

        if (mBufferTail - mBufferHead <= 0) {

            mBufferHead = tick + mInterval;

            mBufferTail = mBufferHead;

        } else {

            int tail = (tick + mInterval) * mSampleRate;

            for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {

                mBuffer[i & mBufferMask] = 0;

            }

            mBufferTail = tick + mInterval;

        }

    }

    // Append to the jitter buffer.

    int tail = mBufferTail * mSampleRate;

    //解码后的数据保存到AudioStream对象的mBuffer成员中.

    for (int i = 0; i < count; ++i) {

        mBuffer[tail & mBufferMask] = samples[i];

        ++tail;

    }

    mBufferTail += mInterval;

}

decode()方法中关键地方已添加代码注释,下面看encode方法.

void AudioStream::encode(int tick, AudioStream *chain)

{

    if (tick - mTick >= mInterval) {

        // We just missed the train. Pretend that packets in between are lost.

        int skipped = (tick - mTick) / mInterval;

        mTick += skipped * mInterval;

        mSequence += skipped;

        mTimestamp += skipped * mSampleCount;

        ALOGV("stream[%d] skips %d packets", mSocket, skipped);

    }

    tick = mTick;

    mTick += mInterval;

    ++mSequence;

    mTimestamp += mSampleCount;

    // If there is an ongoing DTMF event, send it now.表示有dtmf事件,需要发送到网络另一端

    if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {

        int duration = mTimestamp - mDtmfStart;

        // Make sure duration is reasonable.

        if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {

            duration += mSampleCount;

            int32_t buffer[4] = {

                htonl(mDtmfMagic | mSequence),

                htonl(mDtmfStart),

                mSsrc,

                htonl(mDtmfEvent | duration),

            };

            if (duration >= mSampleRate * DTMF_PERIOD) {

                buffer[3] |= htonl(1 << 23);

                mDtmfEvent = -1;

            }

            sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,

                (sockaddr *)&mRemote, sizeof(mRemote));

            return;

        }

        mDtmfEvent = -1;

    }

    int32_t buffer[mSampleCount + 3];

    bool data = false;

    if (mMode != RECEIVE_ONLY) {

        // Mix all other streams.

        memset(buffer, 0, sizeof(buffer));

        while (chain) {

            if (chain != this) {

//将链表中所有采样率相同的音频数据进行叠加

                data |= chain->mix(buffer, tick - mInterval, tick, mSampleRate);

            }

            chain = chain->mNext;

        }

    }

    int16_t samples[mSampleCount];

    if (data) {

        // Saturate into 16 bits.

        for (int i = 0; i < mSampleCount; ++i) {

            int32_t sample = buffer[i];

            if (sample < -32768) {

                sample = -32768;

            }

            if (sample > 32767) {

                sample = 32767;

            }

            samples[i] = sample;

        }

    } else {

        if ((mTick ^ mKeepAlive) >> 10 == 0) {

            return;

        }

        mKeepAlive = mTick;

        memset(samples, 0, sizeof(samples));

        if (mMode != RECEIVE_ONLY) {

            ALOGV("stream[%d] no data", mSocket);

        }

    }

    if (!mCodec) {

        // Special case for device stream.

        send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);

        return;

    }

    // Cook the packet and send it out.

    buffer[0] = htonl(mCodecMagic | mSequence);

    buffer[1] = htonl(mTimestamp);

    buffer[2] = mSsrc;

    //调用具体AudioCodec对象的encode方法进行编码

    int length = mCodec->encode(&buffer[3], samples);

    if (length <= 0) {

        ALOGV("stream[%d] encoder error", mSocket);

        return;

    }

//发送编码后的数据到网络对端

    sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,

        sizeof(mRemote));

}

下面我们在来分析NetworkThread线程的入口函数

bool AudioGroup::NetworkThread::threadLoop()

{

    AudioStream *chain = mGroup->mChain;

    int tick = elapsedRealtime();

    int deadline = tick + 10;

    int count = 0;

//遍历链表,调用所有AudioStream对象的encode方法对音频数据进行编码,并发送到网络另一端.

    for (AudioStream *stream = chain; stream; stream = stream->mNext) {

        if (tick - stream->mTick >= 0) {

            stream->encode(tick, chain);

        }

        if (deadline - stream->mTick > 0) {

            deadline = stream->mTick;

        }

        ++count;

    }

    int event = mGroup->mDtmfEvent;

    if (event != -1) {

        for (AudioStream *stream = chain; stream; stream = stream->mNext) {

            stream->sendDtmf(event);

        }

        mGroup->mDtmfEvent = -1;

    }

    deadline -= tick;

    if (deadline < 1) {

        deadline = 1;

    }

    epoll_event events[count];

    //注意此处的mGroup->mEventQueue中它监测所有AudioStream对象的套接字,以及socketpair创建的套接字对中的一个套接字.

    //该方法被唤醒,说明超时到或者有io事件到来,也就是有收到网络数据包.

    count = epoll_wait(mGroup->mEventQueue, events, count, deadline);

    if (count == -1) {

        ALOGE("epoll_wait: %s", strerror(errno));

        return false;

    }

    //调用AudioStream对象的decode方法进行解码

    for (int i = 0; i < count; ++i) {

        ((AudioStream *)events[i].data.ptr)->decode(tick);

    }

    return true;

}

从上面分析可知,该线程就是用于处理网络数据包的.我们再看DeviceThread线程的入口函数(保留关键代码段).

bool AudioGroup::DeviceThread::threadLoop()

{

    int mode = mGroup->mMode;

    int sampleRate = mGroup->mSampleRate;

    int sampleCount = mGroup->mSampleCount;

    int deviceSocket = mGroup->mDeviceSocket;

    ..//省略代码

    // Initialize AudioTrack and AudioRecord.

    sp<AudioTrack> track = new AudioTrack();

    sp<AudioRecord> record = new AudioRecord();

    

    //AudioTrack对象用于播放音频流相关,AudioRecord用于录音的,它们的set()方法会通过binder与AudioPolicyService,AudioFlinger进行通信

    //AudioFlinger会创建对应的线程并创建对应的Track对象,Track中会创建一块共享内存来垮进程传递音频数据.对于set方法具体如何工作本文不做分析,感兴趣读者可自行分析.

    if (track->set(AUDIO_STREAM_VOICE_CALL, sampleRate, AUDIO_FORMAT_PCM_16_BIT,

                AUDIO_CHANNEL_OUT_MONO, output, AUDIO_OUTPUT_FLAG_NONE, NULL /*callback_t*/,

                NULL /*user*/, 0 /*notificationFrames*/, 0 /*sharedBuffer*/,

                false /*threadCanCallJava*/, 0 /*sessionId*/,

                AudioTrack::TRANSFER_OBTAIN) != NO_ERROR ||

            record->set(AUDIO_SOURCE_VOICE_COMMUNICATION, sampleRate, AUDIO_FORMAT_PCM_16_BIT,

                AUDIO_CHANNEL_IN_MONO, input, NULL /*callback_t*/, NULL /*user*/,

                0 /*notificationFrames*/, false /*threadCanCallJava*/, 0 /*sessionId*/,

                AudioRecord::TRANSFER_OBTAIN) != NO_ERROR) {

        ALOGE("cannot initialize audio device");

        return false;

    }

    ALOGD("latency: output %d, input %d", track->latency(), record->latency());

    // Give device socket a reasonable buffer size.

    setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));

    setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));

...////////省略代码

   

    // Start AudioRecord before AudioTrack. This prevents AudioTrack from being

    // disabled due to buffer underrun while waiting for AudioRecord.

    if (mode != MUTED) {

        record->start();

        int16_t one;

        // FIXME this may not work any more

        record->read(&one, sizeof(one));

    }

    track->start();

    while (!exitPending()) {

        int16_t output[sampleCount];

//收到的网络数据包保存到output数组中

        if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {

            memset(output, 0, sizeof(output));

        }

        int16_t input[sampleCount];

        int toWrite = sampleCount;

        int toRead = (mode == MUTED) ? 0 : sampleCount;

        int chances = 100;

        while (--chances > 0 && (toWrite > 0 || toRead > 0)) {

            if (toWrite > 0) {

                AudioTrack::Buffer buffer;

                buffer.frameCount = toWrite;

//有需要写的数据,将数据写入到buffer中,实际就是写入到对应的硬件设备例如speaker进行输出声音.

                status_t status = track->obtainBuffer(&buffer, 1);

                if (status == NO_ERROR) {

                    int offset = sampleCount - toWrite;

                    memcpy(buffer.i8, &output[offset], buffer.size);

                    toWrite -= buffer.frameCount;

                    track->releaseBuffer(&buffer);

                } else if (status != TIMED_OUT && status != WOULD_BLOCK) {

                    ALOGE("cannot write to AudioTrack");

                    goto exit;

                }

            }

 

           //有需要读取的数据
   if (toRead > 0) {

                AudioRecord::Buffer buffer;

                buffer.frameCount = toRead;

   //调用AudioRecord对象的obtainBuffer方法读取mic采集的数据,该方法具体流程不做分析.

                status_t status = record->obtainBuffer(&buffer, 1);

                if (status == NO_ERROR) {

                    int offset = sampleCount - toRead;

 
    //读取到的数据保存到input数组中

                    memcpy(&input[offset], buffer.i8, buffer.size);

                    toRead -= buffer.frameCount;

                    record->releaseBuffer(&buffer);

                } else if (status != TIMED_OUT && status != WOULD_BLOCK) {

                    ALOGE("cannot read from AudioRecord");

                    goto exit;

                }

            }

            

        if (chances <= 0) {

            ALOGW("device loop timeout");

            while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);

        }

        if (mode != MUTED) {

            if (echo != NULL) {

                ALOGV("echo->run()");

                echo->run(output, input);

            }

    /*注意此处的send()方法不是将数据发送到网络另一端,它实际上是两个子线程之间的套接字通信,往deviceSocket发送数据,会唤醒线程NetworkThread

 的epoll_wait()方法,在NetworkThread的threadLoop方法中会对数据进行编码并发送给网络另一端*/

            send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);

        }

    }

exit:

    delete echo;

    delete aec;

    return true;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: