android voip通话音频数据流传输过程解析
2017-03-30 12:17
417 查看
我在前面几篇博客中已经描述了sip通话的建立过程,以及基本代码实现流程,但是我们真正的目的是基于sip协议进行语音通话,本文分析这些音频流是如何建立和传输的.
与音频流传输相关的两个java类为AudioStream和AudioGroup,我们先看AudioStream类,它继承自RtpStream类,表示基于RTP协议传输音频流,在类中定义了一个静态代码段,
static {
System.loadLibrary("rtp_jni");
}
当创建该java类对象时,虚拟机首先会检测该类的字节码是否已加载到虚拟机,如果没有,则需要将其加载进来,加载的时候会执行类的静态代码段,System.loadLibrary()方法在加载动态库时候,会调用到JNI_OnLoad()方法,这个实现是在虚拟机里面做的,感兴趣的可以查看dalvik或art的代码实现,
这个库的代码实现在android工程的frameworks/opt/net/voip/src/jni/rtp目录下.
在RtpStream的构造函数中,
RtpStream(InetAddress address) throws SocketException {
mLocalPort = create(address.getHostAddress());
mLocalAddress = address;
}
create是一个native方法,我们直接看它的jni层实现,实现代码在RtpStream.cpp中,
jint create(JNIEnv *env, jobject thiz, jstring jAddress)
{
env->SetIntField(thiz, gSocket, -1);
sockaddr_storage ss;
if (parse(env, jAddress, 0, &ss) < 0) {
// Exception already thrown.
return -1;
}
int socket = ::socket(ss.ss_family, SOCK_DGRAM, 0);
socklen_t len = sizeof(ss);
if (socket == -1 || bind(socket, (sockaddr *)&ss, sizeof(ss)) != 0 ||
getsockname(socket, (sockaddr *)&ss, &len) != 0) {
jniThrowException(env, "java/net/SocketException", strerror(errno));
::close(socket);
return -1;
}
uint16_t *p = (ss.ss_family == AF_INET) ?
&((sockaddr_in *)&ss)->sin_port : &((sockaddr_in6 *)&ss)->sin6_port;
uint16_t port = ntohs(*p);
if ((port & 1) == 0) {
env->SetIntField(thiz, gSocket, socket);
return port;
}
::close(socket);
socket = ::socket(ss.ss_family, SOCK_DGRAM, 0);
if (socket != -1) {
uint16_t delta = port << 1;
++port;
for (int i = 0; i < 1000; ++i) {
do {
port += delta;
} while (port < 1024);
*p = htons(port);
if (bind(socket, (sockaddr *)&ss, sizeof(ss))== 0) {
env->SetIntField(thiz, gSocket, socket);
return port;
}
}
}
jniThrowException(env, "java/net/SocketException", strerror(errno));
::close(socket);
return -1;
}
上面的create()方法参数jAddress是java层RtpStream传递过来,表示一个本地ip地址,create()就是创建一个基于udp协议的socket,并绑定到一个可用的端口上,创建的socket描述符会保存到java层RtpStream类的mSocket成员中.
我在前面博客点击打开链接讲述过拨打网络电话需要调用到SipAudioCall的makeCall方法,该方法会创建一个AudioStream对象.其实与makeCall对应的另一端在接听电话时需要调用answerCall(),这个方法里面也会创建一个AudioStream对象,这样用两个socket就可以进行全双工通信了.
public void makeCall(SipProfile peerProfile, SipSession sipSession,
int timeout) throws SipException {
if (DBG) log("makeCall: " + peerProfile + " session=" + sipSession + " timeout=" + timeout);
if (!SipManager.isVoipSupported(mContext)) {
throw new SipException("VOIP API is not supported");
}
synchronized (this) {
mSipSession = sipSession;
try {
mAudioStream = new AudioStream(InetAddress.getByName(
getLocalIp()));
sipSession.setListener(createListener());
sipSession.makeCall(peerProfile, createOffer().encode(),
timeout);
} catch (IOException e) {
loge("makeCall:", e);
throw new SipException("makeCall()", e);
}
}
另外,当sip会话建立后,会触发SipAudioCall.Listener的onCallEstablished方法,表示会话建立完成,可以开始通话了,这个方法中调用SipAudioCall的startAudio();开始音频传输.
public void startAudio() {
try {
startAudioInternal();
} catch (UnknownHostException e) {
onError(SipErrorCode.PEER_NOT_REACHABLE, e.getMessage());
} catch (Throwable e) {
onError(SipErrorCode.CLIENT_ERROR, e.getMessage());
}
}
private synchronized void startAudioInternal() throws UnknownHostException {
if (DBG) loge("startAudioInternal: mPeerSd=" + mPeerSd);
if (mPeerSd == null) {
throw new IllegalStateException("mPeerSd = null");
}
stopCall(DONT_RELEASE_SOCKET);
mInCall = true;
// Run exact the same logic in createAnswer() to setup mAudioStream.
SimpleSessionDescription offer =
new SimpleSessionDescription(mPeerSd);
AudioStream stream = mAudioStream;
AudioCodec codec = null;
for (Media media : offer.getMedia()) {
if ((codec == null) && (media.getPort() > 0)
&& "audio".equals(media.getType())
&& "RTP/AVP".equals(media.getProtocol())) {
// Find the first audio codec we supported.
for (int type : media.getRtpPayloadTypes()) {
codec = AudioCodec.getCodec(
type, media.getRtpmap(type), media.getFmtp(type));
if (codec != null) {
break;
}
}
if (codec != null) {
// Associate with the remote host.
String address = media.getAddress();
if (address == null) {
address = offer.getAddress();
}
stream.associate(InetAddress.getByName(address),
media.getPort());
stream.setDtmfType(-1);
stream.setCodec(codec);
// Check if DTMF is supported in the same media.
for (int type : media.getRtpPayloadTypes()) {
String rtpmap = media.getRtpmap(type);
if ((type != codec.type) && (rtpmap != null)
&& rtpmap.startsWith("telephone-event")) {
stream.setDtmfType(type);
}
}
// Handle recvonly and sendonly.
if (mHold) {
stream.setMode(RtpStream.MODE_NORMAL);
} else if (media.getAttribute("recvonly") != null) {
stream.setMode(RtpStream.MODE_SEND_ONLY);
} else if(media.getAttribute("sendonly") != null) {
stream.setMode(RtpStream.MODE_RECEIVE_ONLY);
} else if(offer.getAttribute("recvonly") != null) {
stream.setMode(RtpStream.MODE_SEND_ONLY);
} else if(offer.getAttribute("sendonly") != null) {
stream.setMode(RtpStream.MODE_RECEIVE_ONLY);
} else {
stream.setMode(RtpStream.MODE_NORMAL);
}
break;
}
}
}
if (codec == null) {
throw new IllegalStateException("Reject SDP: no suitable codecs");
}
if (isWifiOn()) grabWifiHighPerfLock();
// AudioGroup logic:
AudioGroup audioGroup = getAudioGroup();
if (mHold) {
// don't create an AudioGroup here; doing so will fail if
// there's another AudioGroup out there that's active
} else {
if (audioGroup == null) audioGroup = new AudioGroup();
stream.join(audioGroup);
}
setAudioGroupMode();
}
该方法中做了几个主要操作:
(1)调用stopCall(DONT_RELEASE_SOCKET);停止之前的音频流,但不关闭套接字,这个套接字用于传输其它音频流.
(2)SimpleSessionDescription offer =
new SimpleSessionDescription(mPeerSd);基于参数mPeerSd创建一个SimpleSessionDescription对象,该对象是用于管理SDP(Session Description
Protocol)协议的消息内容,而消息内容就是来自于mPeerSd,它用于解析mPeerSd,而mPeerSd的值是在前面讲述SipAudioCall.Listener的onCallEstablished()方法传递过来的,它描述了一条SDP消息的所有相关内容,例如传输的音频编码类型,是否发送DTMF信号,目的主机IP地址等.
(3)for
(Media media : offer.getMedia())这个for循环就是用于构造一个AudioCodec对象的,它表示一种音频编码.
(4)AudioGroup
audioGroup = getAudioGroup();获取一个AudioGroup对象,第一次调用返回空.
(5)if
(mHold) {
// don't create an AudioGroup here; doing so will fail if
// there's another AudioGroup out there that's active
} else {
if (audioGroup == null) audioGroup = new AudioGroup();
stream.join(audioGroup);
}
mHold为true表示挂起电话,显然我们走else分支,它会创建一个AudioGroup对象,并调用stream.join(audioGroup);这个方法我后面重点分析.
(5)
private
void setAudioGroupMode() {
AudioGroup audioGroup = getAudioGroup();
if (DBG) log("setAudioGroupMode: audioGroup=" + audioGroup);
if (audioGroup != null) {
if (mHold) {
audioGroup.setMode(AudioGroup.MODE_ON_HOLD);
} else if (mMuted) {
audioGroup.setMode(AudioGroup.MODE_MUTED);
} else if (isSpeakerOn()) {
audioGroup.setMode(AudioGroup.MODE_ECHO_SUPPRESSION);
} else {
audioGroup.setMode(AudioGroup.MODE_NORMAL);
}
}
}
这里就是设置音频流组的工作模式,AudioGroup可以看作将所有AudioStream进行混合,例如mic,speaker音频混合.其中AudioGroup.MODE_MUTED表示关闭mic,
AudioGroup.MODE_ON_HOLD表示关闭mic和speaker.一般是走audioGroup.setMode(AudioGroup.MODE_NORMAL);
public
void setMode(int mode) {
if (mode < 0 || mode > MODE_LAST) {
throw new IllegalArgumentException("Invalid mode");
}
synchronized (this) {
nativeSetMode(mode);
mMode = mode;
}
}
nativeSetMode()也是一个native方法,我们在后面再进行分析.
我们再回到(4)中的stream.join(audioGroup),此处的stream就是之前创建的那个AudioStream对象,它的join方法如下:
public void join(AudioGroup group) {
synchronized (this) {
if (mGroup == group) {
return;
}
if (mGroup != null) {
mGroup.remove(this);
mGroup = null;
}
if (group != null) {
group.add(this);
mGroup = group;
}
}
}
此处调用了一个AudioGroup的add()方法,
synchronized void add(AudioStream stream) {
if (!mStreams.containsKey(stream)) {
try {
AudioCodec codec = stream.getCodec();
String codecSpec = String.format(Locale.US, "%d %s %s", codec.type,
codec.rtpmap, codec.fmtp);
int id = nativeAdd(stream.getMode(), stream.getSocket(),
stream.getRemoteAddress().getHostAddress(),
stream.getRemotePort(), codecSpec, stream.getDtmfType());
mStreams.put(stream, id);
} catch (NullPointerException e) {
throw new IllegalStateException(e);
}
}
}
private native int nativeAdd(int mode, int socket, String remoteAddress,
int remotePort, String codecSpec, int dtmfType);
从上面代码中可以看出最终调用了一个nativeAdd()方法,它的实现在jni层.它的实现在AudioGroup.cpp中的add()方法.
int add(JNIEnv *env, jobject thiz, jint mode,
jint socket, jstring jRemoteAddress, jint remotePort,
jstring jCodecSpec, jint dtmfType)
{
AudioCodec *codec = NULL;
AudioStream *stream = NULL;
AudioGroup *group = NULL;
// Sanity check.
sockaddr_storage remote;
if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
// Exception already thrown.
return 0;
}
if (!jCodecSpec) {
jniThrowNullPointerException(env, "codecSpec");
return 0;
}
const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
if (!codecSpec) {
// Exception already thrown.
return 0;
}
socket = dup(socket);
if (socket == -1) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot get stream socket");
return 0;
}
// Create audio codec.
int codecType = -1;
char codecName[16];
int sampleRate = -1;
sscanf(codecSpec, "%d %15[^/]%*c%d", &codecType, codecName, &sampleRate);
codec = newAudioCodec(codecName);
int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
if (sampleCount <= 0) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot initialize audio codec");
goto error;
}
// Create audio stream.
stream = new AudioStream;
if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
codecType, dtmfType)) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot initialize audio stream");
goto error;
}
socket = -1;
codec = NULL;
// Create audio group.
group = (AudioGroup *)env->GetIntField(thiz, gNative);
if (!group) {
int mode = env->GetIntField(thiz, gMode);
group = new AudioGroup;
if (!group->set(8000, 256) || !group->setMode(mode)) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot initialize audio group");
goto error;
}
}
// Add audio stream into audio group.
if (!group->add(stream)) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot add audio stream");
goto error;
}
// Succeed.
env->SetIntField(thiz, gNative, (int)group);
return (int)stream;
error:
delete group;
delete stream;
delete codec;
close(socket);
env->SetIntField(thiz, gNative, 0);
return 0;
}
struct AudioCodecType {
const char *name;
AudioCodec *(*create)();
} gAudioCodecTypes[] = {
{"PCMA", newAlawCodec},
{"PCMU", newUlawCodec},
{"GSM", newGsmCodec},
{"AMR", newAmrCodec},
{"GSM-EFR", newGsmEfrCodec},
{NULL, NULL},
};
AudioCodec *newAudioCodec(const char *codecName)
{
AudioCodecType *type = gAudioCodecTypes;
while (type->name != NULL) {
if (strcasecmp(codecName, type->name) == 0) {
AudioCodec *codec = type->create();
codec->name = type->name;
return codec;
}
++type;
}
return NULL;
}
上面方法中主要做了几个操作:调用newAudioCodec()创建一个新的AudioCodec,根据音频流类型种类创建不同的对象,此处用到了工厂方法模式.例如,假设音频类型为AMR,
则会调用newAmrCodec()创建一个AmrCodec对象.
(2)创建一个新的AudioStream对象,保存到成员mChain中,并调用其set()方法保存一些参数.
(3)实例化一个AudioGroup对象,这个只有首次才会创建,并调用AudioGroup对象的set()和setMode()方法.我们看一下这两个方法定义.
bool AudioGroup::set(int sampleRate, int sampleCount)
{
mEventQueue = epoll_create(2);
if (mEventQueue == -1) {
ALOGE("epoll_create: %s", strerror(errno));
return false;
}
mSampleRate = sampleRate;
mSampleCount = sampleCount;
// Create device socket.
int pair[2];
//创建一个套接字组对,它表示其中一个套接字发送数据时,另一个套接字会收到数据.且两个套接字都可以收发数据.
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
ALOGE("socketpair: %s", strerror(errno));
return false;
}
mDeviceSocket = pair[0]; //保存起来,后面会使用到
// Create device stream.
mChain = new AudioStream;
if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
sampleRate, sampleCount, -1, -1)) {
close(pair[1]);
ALOGE("cannot initialize device stream");
return false;
}
// Give device socket a reasonable timeout.
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
ALOGE("setsockopt: %s", strerror(errno));
return false;
}
// Add device stream into event queue.
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = mChain;
//然后使用epoll_ctl将其中一个套接字描述符加入到监控i/o事件集合中去
if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
ALOGE("epoll_ctl: %s", strerror(errno));
return false;
}
// Anything else?
ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
return true;
}
注意此处的方法是在主线程中做的.
bool AudioGroup::setMode(int mode)
{
if (mode < 0 || mode > LAST_MODE) {
return false;
}
// FIXME: temporary code to overcome echo and mic gain issues on herring and tuna boards.
// Must be modified/removed when the root cause of the issue is fixed in the hardware or
// driver
char value[PROPERTY_VALUE_MAX];
property_get("ro.product.board", value, "");
if (mode == NORMAL &&
(!strcmp(value, "herring") || !strcmp(value, "tuna"))) {
mode = ECHO_SUPPRESSION;
}
//只有当模式发生了切换,或者是首次设置模式,if条件才不会满足
if (mMode == mode) {
return true;
}
//请求停止之前创建的线程
mDeviceThread->requestExitAndWait();
ALOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
mMode = mode;
//如果模式为ON_HOLD表示挂起,此时不会再创建新的子线程.
return (mode == ON_HOLD) || mDeviceThread->start();
}
在上面方法的最后会调用mDeviceThread->start()启动一个新的DeviceThread线程.
我在前面分析过拨打电话会调用java层的AudioGroup对象的setMode()方法,这个方法通过jni调用到native层的AudioGroup对象的setMode(),这样就启动了一个子线程.这个子线程的具体工作我后面再分析.
(4)调用group->add(stream);
对应的代码如下:
bool AudioGroup::add(AudioStream *stream)
{
mNetworkThread->requestExitAndWait(); //请求线程退出
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = stream;
//将音频流的套接字描述符加入到mEventQueue的 i/o监控列表中.
if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
ALOGE("epoll_ctl: %s", strerror(errno));
return false;
}
//将当前stream对象插入到mChain指向的链表中.
stream->mNext = mChain->mNext;
mChain->mNext = stream;
//启动mNetworkThread线程
if (!mNetworkThread->start()) {
// Only take over the stream when succeeded.
mChain->mNext = stream->mNext;
return false;
}
ALOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
return true;
}
根据上面分析,启动了两个子线程,一个是NetworkThread,一个是DeviceThread.
在分析这两个子线程的线程循环函数之前,我们先分析AudioStream的两个函数:
void AudioStream::decode(int tick)
{
char c;
if (mMode == SEND_ONLY) {
recv(mSocket, &c, 1, MSG_DONTWAIT);
return;
}
// Make sure mBufferHead and mBufferTail are reasonable.
if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
mBufferHead = tick - HISTORY_SIZE;
mBufferTail = mBufferHead;
}
if (tick - mBufferHead > HISTORY_SIZE) {
// Throw away outdated samples.
mBufferHead = tick - HISTORY_SIZE;
if (mBufferTail - mBufferHead < 0) {
mBufferTail = mBufferHead;
}
}
// Adjust the jitter buffer if the latency keeps larger than the threshold
// in the measurement period.
int score = mBufferTail - tick - MEASURE_BASE;
if (mLatencyScore > score || mLatencyScore <= 0) {
mLatencyScore = score;
mLatencyTimer = tick;
} else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
ALOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
mBufferTail -= mLatencyScore;
mLatencyScore = -1;
}
int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
if (count < mSampleCount) {
// Buffer overflow. Drop the packet.
ALOGV("stream[%d] buffer overflow", mSocket);
recv(mSocket, &c, 1, MSG_DONTWAIT);
return;
}
// Receive the packet and decode it.
int16_t samples[count];
if (!mCodec) {
// Special case for device stream.
count = recv(mSocket, samples, sizeof(samples),
MSG_TRUNC | MSG_DONTWAIT) >> 1;
} else {
__attribute__((aligned(4))) uint8_t buffer[2048];
sockaddr_storage remote;
socklen_t addrlen = sizeof(remote);
//从网络接收数据,因为语音通信是全双工,另一端的音频流被编码,组成ip数据包发送到本地
int length = recvfrom(mSocket, buffer, sizeof(buffer),
MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
// Do we need to check SSRC, sequence, and timestamp? They are not
// reliable but at least they can be used to identify duplicates?
if (length < 12 || length > (int)sizeof(buffer) ||
(ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
ALOGV("stream[%d] malformed packet", mSocket);
return;
}
int offset = 12 + ((buffer[0] & 0x0F) << 2);
if ((buffer[0] & 0x10) != 0) {
offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
}
if ((buffer[0] & 0x20) != 0) {
length -= buffer[length - 1];
}
length -= offset;
if (length >= 0) {
//调用具体的AudioCodec对象的decode()方法对收到的音频流进行解码.
length = mCodec->decode(samples, count, &buffer[offset], length);
}
if (length > 0 && mFixRemote) {
mRemote = remote;
mFixRemote = false;
}
count = length;
}
if (count <= 0) {
ALOGV("stream[%d] decoder error", mSocket);
return;
}
if (tick - mBufferTail > 0) {
// Buffer underrun. Reset the jitter buffer.
ALOGV("stream[%d] buffer underrun", mSocket);
if (mBufferTail - mBufferHead <= 0) {
mBufferHead = tick + mInterval;
mBufferTail = mBufferHead;
} else {
int tail = (tick + mInterval) * mSampleRate;
for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
mBuffer[i & mBufferMask] = 0;
}
mBufferTail = tick + mInterval;
}
}
// Append to the jitter buffer.
int tail = mBufferTail * mSampleRate;
//解码后的数据保存到AudioStream对象的mBuffer成员中.
for (int i = 0; i < count; ++i) {
mBuffer[tail & mBufferMask] = samples[i];
++tail;
}
mBufferTail += mInterval;
}
decode()方法中关键地方已添加代码注释,下面看encode方法.
void AudioStream::encode(int tick, AudioStream *chain)
{
if (tick - mTick >= mInterval) {
// We just missed the train. Pretend that packets in between are lost.
int skipped = (tick - mTick) / mInterval;
mTick += skipped * mInterval;
mSequence += skipped;
mTimestamp += skipped * mSampleCount;
ALOGV("stream[%d] skips %d packets", mSocket, skipped);
}
tick = mTick;
mTick += mInterval;
++mSequence;
mTimestamp += mSampleCount;
// If there is an ongoing DTMF event, send it now.表示有dtmf事件,需要发送到网络另一端
if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
int duration = mTimestamp - mDtmfStart;
// Make sure duration is reasonable.
if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
duration += mSampleCount;
int32_t buffer[4] = {
htonl(mDtmfMagic | mSequence),
htonl(mDtmfStart),
mSsrc,
htonl(mDtmfEvent | duration),
};
if (duration >= mSampleRate * DTMF_PERIOD) {
buffer[3] |= htonl(1 << 23);
mDtmfEvent = -1;
}
sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
(sockaddr *)&mRemote, sizeof(mRemote));
return;
}
mDtmfEvent = -1;
}
int32_t buffer[mSampleCount + 3];
bool data = false;
if (mMode != RECEIVE_ONLY) {
// Mix all other streams.
memset(buffer, 0, sizeof(buffer));
while (chain) {
if (chain != this) {
//将链表中所有采样率相同的音频数据进行叠加
data |= chain->mix(buffer, tick - mInterval, tick, mSampleRate);
}
chain = chain->mNext;
}
}
int16_t samples[mSampleCount];
if (data) {
// Saturate into 16 bits.
for (int i = 0; i < mSampleCount; ++i) {
int32_t sample = buffer[i];
if (sample < -32768) {
sample = -32768;
}
if (sample > 32767) {
sample = 32767;
}
samples[i] = sample;
}
} else {
if ((mTick ^ mKeepAlive) >> 10 == 0) {
return;
}
mKeepAlive = mTick;
memset(samples, 0, sizeof(samples));
if (mMode != RECEIVE_ONLY) {
ALOGV("stream[%d] no data", mSocket);
}
}
if (!mCodec) {
// Special case for device stream.
send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
return;
}
// Cook the packet and send it out.
buffer[0] = htonl(mCodecMagic | mSequence);
buffer[1] = htonl(mTimestamp);
buffer[2] = mSsrc;
//调用具体AudioCodec对象的encode方法进行编码
int length = mCodec->encode(&buffer[3], samples);
if (length <= 0) {
ALOGV("stream[%d] encoder error", mSocket);
return;
}
//发送编码后的数据到网络对端
sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
sizeof(mRemote));
}
下面我们在来分析NetworkThread线程的入口函数
bool AudioGroup::NetworkThread::threadLoop()
{
AudioStream *chain = mGroup->mChain;
int tick = elapsedRealtime();
int deadline = tick + 10;
int count = 0;
//遍历链表,调用所有AudioStream对象的encode方法对音频数据进行编码,并发送到网络另一端.
for (AudioStream *stream = chain; stream; stream = stream->mNext) {
if (tick - stream->mTick >= 0) {
stream->encode(tick, chain);
}
if (deadline - stream->mTick > 0) {
deadline = stream->mTick;
}
++count;
}
int event = mGroup->mDtmfEvent;
if (event != -1) {
for (AudioStream *stream = chain; stream; stream = stream->mNext) {
stream->sendDtmf(event);
}
mGroup->mDtmfEvent = -1;
}
deadline -= tick;
if (deadline < 1) {
deadline = 1;
}
epoll_event events[count];
//注意此处的mGroup->mEventQueue中它监测所有AudioStream对象的套接字,以及socketpair创建的套接字对中的一个套接字.
//该方法被唤醒,说明超时到或者有io事件到来,也就是有收到网络数据包.
count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
if (count == -1) {
ALOGE("epoll_wait: %s", strerror(errno));
return false;
}
//调用AudioStream对象的decode方法进行解码
for (int i = 0; i < count; ++i) {
((AudioStream *)events[i].data.ptr)->decode(tick);
}
return true;
}
从上面分析可知,该线程就是用于处理网络数据包的.我们再看DeviceThread线程的入口函数(保留关键代码段).
bool AudioGroup::DeviceThread::threadLoop()
{
int mode = mGroup->mMode;
int sampleRate = mGroup->mSampleRate;
int sampleCount = mGroup->mSampleCount;
int deviceSocket = mGroup->mDeviceSocket;
..//省略代码
// Initialize AudioTrack and AudioRecord.
sp<AudioTrack> track = new AudioTrack();
sp<AudioRecord> record = new AudioRecord();
//AudioTrack对象用于播放音频流相关,AudioRecord用于录音的,它们的set()方法会通过binder与AudioPolicyService,AudioFlinger进行通信
//AudioFlinger会创建对应的线程并创建对应的Track对象,Track中会创建一块共享内存来垮进程传递音频数据.对于set方法具体如何工作本文不做分析,感兴趣读者可自行分析.
if (track->set(AUDIO_STREAM_VOICE_CALL, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
AUDIO_CHANNEL_OUT_MONO, output, AUDIO_OUTPUT_FLAG_NONE, NULL /*callback_t*/,
NULL /*user*/, 0 /*notificationFrames*/, 0 /*sharedBuffer*/,
false /*threadCanCallJava*/, 0 /*sessionId*/,
AudioTrack::TRANSFER_OBTAIN) != NO_ERROR ||
record->set(AUDIO_SOURCE_VOICE_COMMUNICATION, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
AUDIO_CHANNEL_IN_MONO, input, NULL /*callback_t*/, NULL /*user*/,
0 /*notificationFrames*/, false /*threadCanCallJava*/, 0 /*sessionId*/,
AudioRecord::TRANSFER_OBTAIN) != NO_ERROR) {
ALOGE("cannot initialize audio device");
return false;
}
ALOGD("latency: output %d, input %d", track->latency(), record->latency());
// Give device socket a reasonable buffer size.
setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
...////////省略代码
// Start AudioRecord before AudioTrack. This prevents AudioTrack from being
// disabled due to buffer underrun while waiting for AudioRecord.
if (mode != MUTED) {
record->start();
int16_t one;
// FIXME this may not work any more
record->read(&one, sizeof(one));
}
track->start();
while (!exitPending()) {
int16_t output[sampleCount];
//收到的网络数据包保存到output数组中
if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
memset(output, 0, sizeof(output));
}
int16_t input[sampleCount];
int toWrite = sampleCount;
int toRead = (mode == MUTED) ? 0 : sampleCount;
int chances = 100;
while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
if (toWrite > 0) {
AudioTrack::Buffer buffer;
buffer.frameCount = toWrite;
//有需要写的数据,将数据写入到buffer中,实际就是写入到对应的硬件设备例如speaker进行输出声音.
status_t status = track->obtainBuffer(&buffer, 1);
if (status == NO_ERROR) {
int offset = sampleCount - toWrite;
memcpy(buffer.i8, &output[offset], buffer.size);
toWrite -= buffer.frameCount;
track->releaseBuffer(&buffer);
} else if (status != TIMED_OUT && status != WOULD_BLOCK) {
ALOGE("cannot write to AudioTrack");
goto exit;
}
}
//有需要读取的数据
if (toRead > 0) {
AudioRecord::Buffer buffer;
buffer.frameCount = toRead;
//调用AudioRecord对象的obtainBuffer方法读取mic采集的数据,该方法具体流程不做分析.
status_t status = record->obtainBuffer(&buffer, 1);
if (status == NO_ERROR) {
int offset = sampleCount - toRead;
//读取到的数据保存到input数组中
memcpy(&input[offset], buffer.i8, buffer.size);
toRead -= buffer.frameCount;
record->releaseBuffer(&buffer);
} else if (status != TIMED_OUT && status != WOULD_BLOCK) {
ALOGE("cannot read from AudioRecord");
goto exit;
}
}
if (chances <= 0) {
ALOGW("device loop timeout");
while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
}
if (mode != MUTED) {
if (echo != NULL) {
ALOGV("echo->run()");
echo->run(output, input);
}
/*注意此处的send()方法不是将数据发送到网络另一端,它实际上是两个子线程之间的套接字通信,往deviceSocket发送数据,会唤醒线程NetworkThread
的epoll_wait()方法,在NetworkThread的threadLoop方法中会对数据进行编码并发送给网络另一端*/
send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
}
}
exit:
delete echo;
delete aec;
return true;
}
与音频流传输相关的两个java类为AudioStream和AudioGroup,我们先看AudioStream类,它继承自RtpStream类,表示基于RTP协议传输音频流,在类中定义了一个静态代码段,
static {
System.loadLibrary("rtp_jni");
}
当创建该java类对象时,虚拟机首先会检测该类的字节码是否已加载到虚拟机,如果没有,则需要将其加载进来,加载的时候会执行类的静态代码段,System.loadLibrary()方法在加载动态库时候,会调用到JNI_OnLoad()方法,这个实现是在虚拟机里面做的,感兴趣的可以查看dalvik或art的代码实现,
这个库的代码实现在android工程的frameworks/opt/net/voip/src/jni/rtp目录下.
在RtpStream的构造函数中,
RtpStream(InetAddress address) throws SocketException {
mLocalPort = create(address.getHostAddress());
mLocalAddress = address;
}
create是一个native方法,我们直接看它的jni层实现,实现代码在RtpStream.cpp中,
jint create(JNIEnv *env, jobject thiz, jstring jAddress)
{
env->SetIntField(thiz, gSocket, -1);
sockaddr_storage ss;
if (parse(env, jAddress, 0, &ss) < 0) {
// Exception already thrown.
return -1;
}
int socket = ::socket(ss.ss_family, SOCK_DGRAM, 0);
socklen_t len = sizeof(ss);
if (socket == -1 || bind(socket, (sockaddr *)&ss, sizeof(ss)) != 0 ||
getsockname(socket, (sockaddr *)&ss, &len) != 0) {
jniThrowException(env, "java/net/SocketException", strerror(errno));
::close(socket);
return -1;
}
uint16_t *p = (ss.ss_family == AF_INET) ?
&((sockaddr_in *)&ss)->sin_port : &((sockaddr_in6 *)&ss)->sin6_port;
uint16_t port = ntohs(*p);
if ((port & 1) == 0) {
env->SetIntField(thiz, gSocket, socket);
return port;
}
::close(socket);
socket = ::socket(ss.ss_family, SOCK_DGRAM, 0);
if (socket != -1) {
uint16_t delta = port << 1;
++port;
for (int i = 0; i < 1000; ++i) {
do {
port += delta;
} while (port < 1024);
*p = htons(port);
if (bind(socket, (sockaddr *)&ss, sizeof(ss))== 0) {
env->SetIntField(thiz, gSocket, socket);
return port;
}
}
}
jniThrowException(env, "java/net/SocketException", strerror(errno));
::close(socket);
return -1;
}
上面的create()方法参数jAddress是java层RtpStream传递过来,表示一个本地ip地址,create()就是创建一个基于udp协议的socket,并绑定到一个可用的端口上,创建的socket描述符会保存到java层RtpStream类的mSocket成员中.
我在前面博客点击打开链接讲述过拨打网络电话需要调用到SipAudioCall的makeCall方法,该方法会创建一个AudioStream对象.其实与makeCall对应的另一端在接听电话时需要调用answerCall(),这个方法里面也会创建一个AudioStream对象,这样用两个socket就可以进行全双工通信了.
public void makeCall(SipProfile peerProfile, SipSession sipSession,
int timeout) throws SipException {
if (DBG) log("makeCall: " + peerProfile + " session=" + sipSession + " timeout=" + timeout);
if (!SipManager.isVoipSupported(mContext)) {
throw new SipException("VOIP API is not supported");
}
synchronized (this) {
mSipSession = sipSession;
try {
mAudioStream = new AudioStream(InetAddress.getByName(
getLocalIp()));
sipSession.setListener(createListener());
sipSession.makeCall(peerProfile, createOffer().encode(),
timeout);
} catch (IOException e) {
loge("makeCall:", e);
throw new SipException("makeCall()", e);
}
}
另外,当sip会话建立后,会触发SipAudioCall.Listener的onCallEstablished方法,表示会话建立完成,可以开始通话了,这个方法中调用SipAudioCall的startAudio();开始音频传输.
public void startAudio() {
try {
startAudioInternal();
} catch (UnknownHostException e) {
onError(SipErrorCode.PEER_NOT_REACHABLE, e.getMessage());
} catch (Throwable e) {
onError(SipErrorCode.CLIENT_ERROR, e.getMessage());
}
}
private synchronized void startAudioInternal() throws UnknownHostException {
if (DBG) loge("startAudioInternal: mPeerSd=" + mPeerSd);
if (mPeerSd == null) {
throw new IllegalStateException("mPeerSd = null");
}
stopCall(DONT_RELEASE_SOCKET);
mInCall = true;
// Run exact the same logic in createAnswer() to setup mAudioStream.
SimpleSessionDescription offer =
new SimpleSessionDescription(mPeerSd);
AudioStream stream = mAudioStream;
AudioCodec codec = null;
for (Media media : offer.getMedia()) {
if ((codec == null) && (media.getPort() > 0)
&& "audio".equals(media.getType())
&& "RTP/AVP".equals(media.getProtocol())) {
// Find the first audio codec we supported.
for (int type : media.getRtpPayloadTypes()) {
codec = AudioCodec.getCodec(
type, media.getRtpmap(type), media.getFmtp(type));
if (codec != null) {
break;
}
}
if (codec != null) {
// Associate with the remote host.
String address = media.getAddress();
if (address == null) {
address = offer.getAddress();
}
stream.associate(InetAddress.getByName(address),
media.getPort());
stream.setDtmfType(-1);
stream.setCodec(codec);
// Check if DTMF is supported in the same media.
for (int type : media.getRtpPayloadTypes()) {
String rtpmap = media.getRtpmap(type);
if ((type != codec.type) && (rtpmap != null)
&& rtpmap.startsWith("telephone-event")) {
stream.setDtmfType(type);
}
}
// Handle recvonly and sendonly.
if (mHold) {
stream.setMode(RtpStream.MODE_NORMAL);
} else if (media.getAttribute("recvonly") != null) {
stream.setMode(RtpStream.MODE_SEND_ONLY);
} else if(media.getAttribute("sendonly") != null) {
stream.setMode(RtpStream.MODE_RECEIVE_ONLY);
} else if(offer.getAttribute("recvonly") != null) {
stream.setMode(RtpStream.MODE_SEND_ONLY);
} else if(offer.getAttribute("sendonly") != null) {
stream.setMode(RtpStream.MODE_RECEIVE_ONLY);
} else {
stream.setMode(RtpStream.MODE_NORMAL);
}
break;
}
}
}
if (codec == null) {
throw new IllegalStateException("Reject SDP: no suitable codecs");
}
if (isWifiOn()) grabWifiHighPerfLock();
// AudioGroup logic:
AudioGroup audioGroup = getAudioGroup();
if (mHold) {
// don't create an AudioGroup here; doing so will fail if
// there's another AudioGroup out there that's active
} else {
if (audioGroup == null) audioGroup = new AudioGroup();
stream.join(audioGroup);
}
setAudioGroupMode();
}
该方法中做了几个主要操作:
(1)调用stopCall(DONT_RELEASE_SOCKET);停止之前的音频流,但不关闭套接字,这个套接字用于传输其它音频流.
(2)SimpleSessionDescription offer =
new SimpleSessionDescription(mPeerSd);基于参数mPeerSd创建一个SimpleSessionDescription对象,该对象是用于管理SDP(Session Description
Protocol)协议的消息内容,而消息内容就是来自于mPeerSd,它用于解析mPeerSd,而mPeerSd的值是在前面讲述SipAudioCall.Listener的onCallEstablished()方法传递过来的,它描述了一条SDP消息的所有相关内容,例如传输的音频编码类型,是否发送DTMF信号,目的主机IP地址等.
(3)for
(Media media : offer.getMedia())这个for循环就是用于构造一个AudioCodec对象的,它表示一种音频编码.
(4)AudioGroup
audioGroup = getAudioGroup();获取一个AudioGroup对象,第一次调用返回空.
(5)if
(mHold) {
// don't create an AudioGroup here; doing so will fail if
// there's another AudioGroup out there that's active
} else {
if (audioGroup == null) audioGroup = new AudioGroup();
stream.join(audioGroup);
}
mHold为true表示挂起电话,显然我们走else分支,它会创建一个AudioGroup对象,并调用stream.join(audioGroup);这个方法我后面重点分析.
(5)
private
void setAudioGroupMode() {
AudioGroup audioGroup = getAudioGroup();
if (DBG) log("setAudioGroupMode: audioGroup=" + audioGroup);
if (audioGroup != null) {
if (mHold) {
audioGroup.setMode(AudioGroup.MODE_ON_HOLD);
} else if (mMuted) {
audioGroup.setMode(AudioGroup.MODE_MUTED);
} else if (isSpeakerOn()) {
audioGroup.setMode(AudioGroup.MODE_ECHO_SUPPRESSION);
} else {
audioGroup.setMode(AudioGroup.MODE_NORMAL);
}
}
}
这里就是设置音频流组的工作模式,AudioGroup可以看作将所有AudioStream进行混合,例如mic,speaker音频混合.其中AudioGroup.MODE_MUTED表示关闭mic,
AudioGroup.MODE_ON_HOLD表示关闭mic和speaker.一般是走audioGroup.setMode(AudioGroup.MODE_NORMAL);
public
void setMode(int mode) {
if (mode < 0 || mode > MODE_LAST) {
throw new IllegalArgumentException("Invalid mode");
}
synchronized (this) {
nativeSetMode(mode);
mMode = mode;
}
}
nativeSetMode()也是一个native方法,我们在后面再进行分析.
我们再回到(4)中的stream.join(audioGroup),此处的stream就是之前创建的那个AudioStream对象,它的join方法如下:
public void join(AudioGroup group) {
synchronized (this) {
if (mGroup == group) {
return;
}
if (mGroup != null) {
mGroup.remove(this);
mGroup = null;
}
if (group != null) {
group.add(this);
mGroup = group;
}
}
}
此处调用了一个AudioGroup的add()方法,
synchronized void add(AudioStream stream) {
if (!mStreams.containsKey(stream)) {
try {
AudioCodec codec = stream.getCodec();
String codecSpec = String.format(Locale.US, "%d %s %s", codec.type,
codec.rtpmap, codec.fmtp);
int id = nativeAdd(stream.getMode(), stream.getSocket(),
stream.getRemoteAddress().getHostAddress(),
stream.getRemotePort(), codecSpec, stream.getDtmfType());
mStreams.put(stream, id);
} catch (NullPointerException e) {
throw new IllegalStateException(e);
}
}
}
private native int nativeAdd(int mode, int socket, String remoteAddress,
int remotePort, String codecSpec, int dtmfType);
从上面代码中可以看出最终调用了一个nativeAdd()方法,它的实现在jni层.它的实现在AudioGroup.cpp中的add()方法.
int add(JNIEnv *env, jobject thiz, jint mode,
jint socket, jstring jRemoteAddress, jint remotePort,
jstring jCodecSpec, jint dtmfType)
{
AudioCodec *codec = NULL;
AudioStream *stream = NULL;
AudioGroup *group = NULL;
// Sanity check.
sockaddr_storage remote;
if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
// Exception already thrown.
return 0;
}
if (!jCodecSpec) {
jniThrowNullPointerException(env, "codecSpec");
return 0;
}
const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
if (!codecSpec) {
// Exception already thrown.
return 0;
}
socket = dup(socket);
if (socket == -1) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot get stream socket");
return 0;
}
// Create audio codec.
int codecType = -1;
char codecName[16];
int sampleRate = -1;
sscanf(codecSpec, "%d %15[^/]%*c%d", &codecType, codecName, &sampleRate);
codec = newAudioCodec(codecName);
int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
if (sampleCount <= 0) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot initialize audio codec");
goto error;
}
// Create audio stream.
stream = new AudioStream;
if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
codecType, dtmfType)) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot initialize audio stream");
goto error;
}
socket = -1;
codec = NULL;
// Create audio group.
group = (AudioGroup *)env->GetIntField(thiz, gNative);
if (!group) {
int mode = env->GetIntField(thiz, gMode);
group = new AudioGroup;
if (!group->set(8000, 256) || !group->setMode(mode)) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot initialize audio group");
goto error;
}
}
// Add audio stream into audio group.
if (!group->add(stream)) {
jniThrowException(env, "java/lang/IllegalStateException",
"cannot add audio stream");
goto error;
}
// Succeed.
env->SetIntField(thiz, gNative, (int)group);
return (int)stream;
error:
delete group;
delete stream;
delete codec;
close(socket);
env->SetIntField(thiz, gNative, 0);
return 0;
}
struct AudioCodecType {
const char *name;
AudioCodec *(*create)();
} gAudioCodecTypes[] = {
{"PCMA", newAlawCodec},
{"PCMU", newUlawCodec},
{"GSM", newGsmCodec},
{"AMR", newAmrCodec},
{"GSM-EFR", newGsmEfrCodec},
{NULL, NULL},
};
AudioCodec *newAudioCodec(const char *codecName)
{
AudioCodecType *type = gAudioCodecTypes;
while (type->name != NULL) {
if (strcasecmp(codecName, type->name) == 0) {
AudioCodec *codec = type->create();
codec->name = type->name;
return codec;
}
++type;
}
return NULL;
}
上面方法中主要做了几个操作:调用newAudioCodec()创建一个新的AudioCodec,根据音频流类型种类创建不同的对象,此处用到了工厂方法模式.例如,假设音频类型为AMR,
则会调用newAmrCodec()创建一个AmrCodec对象.
(2)创建一个新的AudioStream对象,保存到成员mChain中,并调用其set()方法保存一些参数.
(3)实例化一个AudioGroup对象,这个只有首次才会创建,并调用AudioGroup对象的set()和setMode()方法.我们看一下这两个方法定义.
bool AudioGroup::set(int sampleRate, int sampleCount)
{
mEventQueue = epoll_create(2);
if (mEventQueue == -1) {
ALOGE("epoll_create: %s", strerror(errno));
return false;
}
mSampleRate = sampleRate;
mSampleCount = sampleCount;
// Create device socket.
int pair[2];
//创建一个套接字组对,它表示其中一个套接字发送数据时,另一个套接字会收到数据.且两个套接字都可以收发数据.
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
ALOGE("socketpair: %s", strerror(errno));
return false;
}
mDeviceSocket = pair[0]; //保存起来,后面会使用到
// Create device stream.
mChain = new AudioStream;
if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
sampleRate, sampleCount, -1, -1)) {
close(pair[1]);
ALOGE("cannot initialize device stream");
return false;
}
// Give device socket a reasonable timeout.
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
ALOGE("setsockopt: %s", strerror(errno));
return false;
}
// Add device stream into event queue.
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = mChain;
//然后使用epoll_ctl将其中一个套接字描述符加入到监控i/o事件集合中去
if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
ALOGE("epoll_ctl: %s", strerror(errno));
return false;
}
// Anything else?
ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
return true;
}
注意此处的方法是在主线程中做的.
bool AudioGroup::setMode(int mode)
{
if (mode < 0 || mode > LAST_MODE) {
return false;
}
// FIXME: temporary code to overcome echo and mic gain issues on herring and tuna boards.
// Must be modified/removed when the root cause of the issue is fixed in the hardware or
// driver
char value[PROPERTY_VALUE_MAX];
property_get("ro.product.board", value, "");
if (mode == NORMAL &&
(!strcmp(value, "herring") || !strcmp(value, "tuna"))) {
mode = ECHO_SUPPRESSION;
}
//只有当模式发生了切换,或者是首次设置模式,if条件才不会满足
if (mMode == mode) {
return true;
}
//请求停止之前创建的线程
mDeviceThread->requestExitAndWait();
ALOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
mMode = mode;
//如果模式为ON_HOLD表示挂起,此时不会再创建新的子线程.
return (mode == ON_HOLD) || mDeviceThread->start();
}
在上面方法的最后会调用mDeviceThread->start()启动一个新的DeviceThread线程.
我在前面分析过拨打电话会调用java层的AudioGroup对象的setMode()方法,这个方法通过jni调用到native层的AudioGroup对象的setMode(),这样就启动了一个子线程.这个子线程的具体工作我后面再分析.
(4)调用group->add(stream);
对应的代码如下:
bool AudioGroup::add(AudioStream *stream)
{
mNetworkThread->requestExitAndWait(); //请求线程退出
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = stream;
//将音频流的套接字描述符加入到mEventQueue的 i/o监控列表中.
if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
ALOGE("epoll_ctl: %s", strerror(errno));
return false;
}
//将当前stream对象插入到mChain指向的链表中.
stream->mNext = mChain->mNext;
mChain->mNext = stream;
//启动mNetworkThread线程
if (!mNetworkThread->start()) {
// Only take over the stream when succeeded.
mChain->mNext = stream->mNext;
return false;
}
ALOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
return true;
}
根据上面分析,启动了两个子线程,一个是NetworkThread,一个是DeviceThread.
在分析这两个子线程的线程循环函数之前,我们先分析AudioStream的两个函数:
void AudioStream::decode(int tick)
{
char c;
if (mMode == SEND_ONLY) {
recv(mSocket, &c, 1, MSG_DONTWAIT);
return;
}
// Make sure mBufferHead and mBufferTail are reasonable.
if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
mBufferHead = tick - HISTORY_SIZE;
mBufferTail = mBufferHead;
}
if (tick - mBufferHead > HISTORY_SIZE) {
// Throw away outdated samples.
mBufferHead = tick - HISTORY_SIZE;
if (mBufferTail - mBufferHead < 0) {
mBufferTail = mBufferHead;
}
}
// Adjust the jitter buffer if the latency keeps larger than the threshold
// in the measurement period.
int score = mBufferTail - tick - MEASURE_BASE;
if (mLatencyScore > score || mLatencyScore <= 0) {
mLatencyScore = score;
mLatencyTimer = tick;
} else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
ALOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
mBufferTail -= mLatencyScore;
mLatencyScore = -1;
}
int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
if (count < mSampleCount) {
// Buffer overflow. Drop the packet.
ALOGV("stream[%d] buffer overflow", mSocket);
recv(mSocket, &c, 1, MSG_DONTWAIT);
return;
}
// Receive the packet and decode it.
int16_t samples[count];
if (!mCodec) {
// Special case for device stream.
count = recv(mSocket, samples, sizeof(samples),
MSG_TRUNC | MSG_DONTWAIT) >> 1;
} else {
__attribute__((aligned(4))) uint8_t buffer[2048];
sockaddr_storage remote;
socklen_t addrlen = sizeof(remote);
//从网络接收数据,因为语音通信是全双工,另一端的音频流被编码,组成ip数据包发送到本地
int length = recvfrom(mSocket, buffer, sizeof(buffer),
MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
// Do we need to check SSRC, sequence, and timestamp? They are not
// reliable but at least they can be used to identify duplicates?
if (length < 12 || length > (int)sizeof(buffer) ||
(ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
ALOGV("stream[%d] malformed packet", mSocket);
return;
}
int offset = 12 + ((buffer[0] & 0x0F) << 2);
if ((buffer[0] & 0x10) != 0) {
offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
}
if ((buffer[0] & 0x20) != 0) {
length -= buffer[length - 1];
}
length -= offset;
if (length >= 0) {
//调用具体的AudioCodec对象的decode()方法对收到的音频流进行解码.
length = mCodec->decode(samples, count, &buffer[offset], length);
}
if (length > 0 && mFixRemote) {
mRemote = remote;
mFixRemote = false;
}
count = length;
}
if (count <= 0) {
ALOGV("stream[%d] decoder error", mSocket);
return;
}
if (tick - mBufferTail > 0) {
// Buffer underrun. Reset the jitter buffer.
ALOGV("stream[%d] buffer underrun", mSocket);
if (mBufferTail - mBufferHead <= 0) {
mBufferHead = tick + mInterval;
mBufferTail = mBufferHead;
} else {
int tail = (tick + mInterval) * mSampleRate;
for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
mBuffer[i & mBufferMask] = 0;
}
mBufferTail = tick + mInterval;
}
}
// Append to the jitter buffer.
int tail = mBufferTail * mSampleRate;
//解码后的数据保存到AudioStream对象的mBuffer成员中.
for (int i = 0; i < count; ++i) {
mBuffer[tail & mBufferMask] = samples[i];
++tail;
}
mBufferTail += mInterval;
}
decode()方法中关键地方已添加代码注释,下面看encode方法.
void AudioStream::encode(int tick, AudioStream *chain)
{
if (tick - mTick >= mInterval) {
// We just missed the train. Pretend that packets in between are lost.
int skipped = (tick - mTick) / mInterval;
mTick += skipped * mInterval;
mSequence += skipped;
mTimestamp += skipped * mSampleCount;
ALOGV("stream[%d] skips %d packets", mSocket, skipped);
}
tick = mTick;
mTick += mInterval;
++mSequence;
mTimestamp += mSampleCount;
// If there is an ongoing DTMF event, send it now.表示有dtmf事件,需要发送到网络另一端
if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
int duration = mTimestamp - mDtmfStart;
// Make sure duration is reasonable.
if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
duration += mSampleCount;
int32_t buffer[4] = {
htonl(mDtmfMagic | mSequence),
htonl(mDtmfStart),
mSsrc,
htonl(mDtmfEvent | duration),
};
if (duration >= mSampleRate * DTMF_PERIOD) {
buffer[3] |= htonl(1 << 23);
mDtmfEvent = -1;
}
sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
(sockaddr *)&mRemote, sizeof(mRemote));
return;
}
mDtmfEvent = -1;
}
int32_t buffer[mSampleCount + 3];
bool data = false;
if (mMode != RECEIVE_ONLY) {
// Mix all other streams.
memset(buffer, 0, sizeof(buffer));
while (chain) {
if (chain != this) {
//将链表中所有采样率相同的音频数据进行叠加
data |= chain->mix(buffer, tick - mInterval, tick, mSampleRate);
}
chain = chain->mNext;
}
}
int16_t samples[mSampleCount];
if (data) {
// Saturate into 16 bits.
for (int i = 0; i < mSampleCount; ++i) {
int32_t sample = buffer[i];
if (sample < -32768) {
sample = -32768;
}
if (sample > 32767) {
sample = 32767;
}
samples[i] = sample;
}
} else {
if ((mTick ^ mKeepAlive) >> 10 == 0) {
return;
}
mKeepAlive = mTick;
memset(samples, 0, sizeof(samples));
if (mMode != RECEIVE_ONLY) {
ALOGV("stream[%d] no data", mSocket);
}
}
if (!mCodec) {
// Special case for device stream.
send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
return;
}
// Cook the packet and send it out.
buffer[0] = htonl(mCodecMagic | mSequence);
buffer[1] = htonl(mTimestamp);
buffer[2] = mSsrc;
//调用具体AudioCodec对象的encode方法进行编码
int length = mCodec->encode(&buffer[3], samples);
if (length <= 0) {
ALOGV("stream[%d] encoder error", mSocket);
return;
}
//发送编码后的数据到网络对端
sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
sizeof(mRemote));
}
下面我们在来分析NetworkThread线程的入口函数
bool AudioGroup::NetworkThread::threadLoop()
{
AudioStream *chain = mGroup->mChain;
int tick = elapsedRealtime();
int deadline = tick + 10;
int count = 0;
//遍历链表,调用所有AudioStream对象的encode方法对音频数据进行编码,并发送到网络另一端.
for (AudioStream *stream = chain; stream; stream = stream->mNext) {
if (tick - stream->mTick >= 0) {
stream->encode(tick, chain);
}
if (deadline - stream->mTick > 0) {
deadline = stream->mTick;
}
++count;
}
int event = mGroup->mDtmfEvent;
if (event != -1) {
for (AudioStream *stream = chain; stream; stream = stream->mNext) {
stream->sendDtmf(event);
}
mGroup->mDtmfEvent = -1;
}
deadline -= tick;
if (deadline < 1) {
deadline = 1;
}
epoll_event events[count];
//注意此处的mGroup->mEventQueue中它监测所有AudioStream对象的套接字,以及socketpair创建的套接字对中的一个套接字.
//该方法被唤醒,说明超时到或者有io事件到来,也就是有收到网络数据包.
count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
if (count == -1) {
ALOGE("epoll_wait: %s", strerror(errno));
return false;
}
//调用AudioStream对象的decode方法进行解码
for (int i = 0; i < count; ++i) {
((AudioStream *)events[i].data.ptr)->decode(tick);
}
return true;
}
从上面分析可知,该线程就是用于处理网络数据包的.我们再看DeviceThread线程的入口函数(保留关键代码段).
bool AudioGroup::DeviceThread::threadLoop()
{
int mode = mGroup->mMode;
int sampleRate = mGroup->mSampleRate;
int sampleCount = mGroup->mSampleCount;
int deviceSocket = mGroup->mDeviceSocket;
..//省略代码
// Initialize AudioTrack and AudioRecord.
sp<AudioTrack> track = new AudioTrack();
sp<AudioRecord> record = new AudioRecord();
//AudioTrack对象用于播放音频流相关,AudioRecord用于录音的,它们的set()方法会通过binder与AudioPolicyService,AudioFlinger进行通信
//AudioFlinger会创建对应的线程并创建对应的Track对象,Track中会创建一块共享内存来垮进程传递音频数据.对于set方法具体如何工作本文不做分析,感兴趣读者可自行分析.
if (track->set(AUDIO_STREAM_VOICE_CALL, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
AUDIO_CHANNEL_OUT_MONO, output, AUDIO_OUTPUT_FLAG_NONE, NULL /*callback_t*/,
NULL /*user*/, 0 /*notificationFrames*/, 0 /*sharedBuffer*/,
false /*threadCanCallJava*/, 0 /*sessionId*/,
AudioTrack::TRANSFER_OBTAIN) != NO_ERROR ||
record->set(AUDIO_SOURCE_VOICE_COMMUNICATION, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
AUDIO_CHANNEL_IN_MONO, input, NULL /*callback_t*/, NULL /*user*/,
0 /*notificationFrames*/, false /*threadCanCallJava*/, 0 /*sessionId*/,
AudioRecord::TRANSFER_OBTAIN) != NO_ERROR) {
ALOGE("cannot initialize audio device");
return false;
}
ALOGD("latency: output %d, input %d", track->latency(), record->latency());
// Give device socket a reasonable buffer size.
setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
...////////省略代码
// Start AudioRecord before AudioTrack. This prevents AudioTrack from being
// disabled due to buffer underrun while waiting for AudioRecord.
if (mode != MUTED) {
record->start();
int16_t one;
// FIXME this may not work any more
record->read(&one, sizeof(one));
}
track->start();
while (!exitPending()) {
int16_t output[sampleCount];
//收到的网络数据包保存到output数组中
if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
memset(output, 0, sizeof(output));
}
int16_t input[sampleCount];
int toWrite = sampleCount;
int toRead = (mode == MUTED) ? 0 : sampleCount;
int chances = 100;
while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
if (toWrite > 0) {
AudioTrack::Buffer buffer;
buffer.frameCount = toWrite;
//有需要写的数据,将数据写入到buffer中,实际就是写入到对应的硬件设备例如speaker进行输出声音.
status_t status = track->obtainBuffer(&buffer, 1);
if (status == NO_ERROR) {
int offset = sampleCount - toWrite;
memcpy(buffer.i8, &output[offset], buffer.size);
toWrite -= buffer.frameCount;
track->releaseBuffer(&buffer);
} else if (status != TIMED_OUT && status != WOULD_BLOCK) {
ALOGE("cannot write to AudioTrack");
goto exit;
}
}
//有需要读取的数据
if (toRead > 0) {
AudioRecord::Buffer buffer;
buffer.frameCount = toRead;
//调用AudioRecord对象的obtainBuffer方法读取mic采集的数据,该方法具体流程不做分析.
status_t status = record->obtainBuffer(&buffer, 1);
if (status == NO_ERROR) {
int offset = sampleCount - toRead;
//读取到的数据保存到input数组中
memcpy(&input[offset], buffer.i8, buffer.size);
toRead -= buffer.frameCount;
record->releaseBuffer(&buffer);
} else if (status != TIMED_OUT && status != WOULD_BLOCK) {
ALOGE("cannot read from AudioRecord");
goto exit;
}
}
if (chances <= 0) {
ALOGW("device loop timeout");
while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
}
if (mode != MUTED) {
if (echo != NULL) {
ALOGV("echo->run()");
echo->run(output, input);
}
/*注意此处的send()方法不是将数据发送到网络另一端,它实际上是两个子线程之间的套接字通信,往deviceSocket发送数据,会唤醒线程NetworkThread
的epoll_wait()方法,在NetworkThread的threadLoop方法中会对数据进行编码并发送给网络另一端*/
send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
}
}
exit:
delete echo;
delete aec;
return true;
}
相关文章推荐
- Android 音频数据传输流程图 - Playback过程
- android xml解析技术 json数据传输技术
- Android中解析XML文件并传输数据
- android网络编程之json传输数据以及解析方式
- Android Gson解析复杂多重数据过程及错误排查
- Android网络传输数据Json解析的基本认识
- [Audio] Android音频数据传输
- 【android学习】通过正则表达式解决数据传输过程中的堆包问题
- Android蓝牙编程经验总结——同时传输数据和音频
- Android音频数据传输
- android Timer在通话过程中中断——Handler中有数据发送代码
- Android JSON数据解析(数据传输)
- Android之数据传输利器和低耦合器EventBus解析
- js 将json字符串转换为json对象的方法解析 作者: 字体:[增加 减小] 类型:转载 将json字符串转换为json对象的方法。在数据传输过程中,json是以文本,即字符串的形式传递的,而J
- [Android源码分析]蓝牙文件传输过程解析之UI实现
- Android 解析网络数据传输JSON或XML对象
- Android音频数据传输
- 达内科技NTD1712浅浅解析数据在各个设备中如何传输的过程
- 网络中数据传输过程的解析
- Android系统源码阅读(8):Content Provider数据传输过程