您的位置:首页 > 产品设计 > UI/UE

webRTC base模块MessageQueue消息队列的实现

2017-12-05 22:05 295 查看

MessgeQueue

MessageQueueManager

消息队列MessageQueue的管理器

全局单例类

MessageQueueManager* MessageQueueManager::instance_ = NULL; // 全局变量 确保Manager是单例
MessageQueueManager* MessageQueueManager::Instance() {
// C++ Instance常见套路
if (!instance_)
instance_ = new MessageQueueManager;
return instance_;
}


MessageQueueManager中管理着一个MessageQueue向量

由stl vector构成 

std::vector<MessageQueue *> message_queues_;


由此Add Remove Clear操作都是围绕消息队列的向量展开的.

1. 比如Add调用AddInternal也就是 message_queues_.push_back(xx)

2. Remove调用RemoveInternal等同于“`

伪代码:

1. 在向量中寻找和输入的message_queue一样的队列删除

2. 如果向量删除队列后为空则delete整个MessageQueueManager

3. Clear 调用ClearInternal


伪代码:

遍历向量

调用MessageQueue的Cleaar

## MessageQueue
消息队列用到了上一小节分析过的读写锁
[读写锁的实现](http://blog.csdn.net/qq_21358401/article/details/78714487)
还涉及到了信号与槽
以及socketserver

### MessageQueue的构造


MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)

: fStop_(false), fPeekKeep_(false),

dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {

if (!ss_) {

default_ss_.reset(new DefaultSocketServer());

ss_ = default_ss_.get();

}

ss_->SetMessageQueue(this);

if (init_queue) {

DoInit();

}

}

消息队列会创建socket server 跟踪代码可以发现socket server是抽象类 创建MessageQueue时一般会传入socket server的子类


class SocketServer : public SocketFactory {

public:

static const int kForever = -1;

virtual void SetMessageQueue(MessageQueue* queue) {}

virtual bool Wait(int cms, bool process_io) = 0;

virtual void WakeUp() = 0;

  void set_network_binder(NetworkBinderInterface* binder) {

network_binder_ = binder;

}

NetworkBinderInterface* network_binder() const { return network_binder_; }

private:

NetworkBinderInterface* network_binder_ = nullptr;

};

继承来的子类必须实现Wait WakeUp
SetMessageQueue set_network_binder视需要实现

MessageQueue构造时 若没有传入socket server 则会生成默认的default_ss_


MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)

: fStop_(false), fPeekKeep_(false),

dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {

if (!ss_) { // 没有传入socket server

default_ss_.reset(new DefaultSocketServer());

ss_ = default_ss_.get();

}

ss_->SetMessageQueue(this);

if (init_queue) {

DoInit();

}

}

MessageQueue Post

void MessageQueue::Post(MessageHandler* phandler,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
if (fStop_)
return;

{
CritScope cs(&crit_);
Message msg;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (time_sensitive) {
msg.ts_sensitive = Time() + kMaxMsgLatency;
}
msgq_.push_back(msg); // msgq_是一个stl链表
}
WakeUpSocketServer(); // 唤醒socket server
}


Post的机制很简单 就是将传入的消息用stl链表保存起来

随后唤醒socket server

 MessageQueue Get

Get的处理流程比较复杂

首先检查是否存在延时消息

{
CritScope cs(&crit_);
// firstparst表示Get只检查一次延时消息
if (first_pass) {
first_pass = false;
while (!dmsgq_.empty()) {
if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
break;
}
msgq_.push_back(dmsgq_.top().msg_);
dmsgq_.pop();// 把到时的延时消息加入即时消息中
}
}

if (msgq_.empty()) {
break;
} else {
*pmsg = msgq_.front();
msgq_.pop_front();// 返回第一个即时消息
}
}


处理完消息之后 阻塞socket server

{
// Wait and multiplex in the meantime
SharedScope ss(&ss_lock_);
if (!ss_->Wait(cmsNext, process_io))
return false;
}


注意运行到这里说明没有消息(有消息的话处理消息时会直接返回) 所以调用Wait 等待消息到来.

示例代码

#include <iostream>
#include <queue>
#include <memory>
#include <string>
#include <csignal>
#include "pthread.h"

#include "webrtc/base/messagequeue.h"
#include "webrtc/base/socketserver.h"
#include "webrtc/base/event.h"

using namespace rtc;
using namespace std;

class MsgData : public MessageData {
public:
MsgData(){}
~MsgData(){}
void setStr(string str) {
unique_str_ = str;
}
string& getStr() {
return unique_str_;
}
private:
string unique_str_;
};

class MsgHandler : public MessageHandler {
public:
virtual void OnMessage(Message* msg) {
switch(msg->message_id) {
case 0: {
MsgData *data = (MsgData *) msg->pdata;
cout << "data:" << data->getStr() << endl;
} break;
default: {
cout << "unknow id" << msg->message_id << endl;
} break;
}
}
private:

};

class mysocket : public SocketServer {
public:
mysocket() {
event_ = new Event(true, true);
event_->Reset();
}
~mysocket() {
delete(event_);
}
virtual bool Wait(int cms, bool process_io) {
cout << "mysocket wait" << endl;
event_->Wait(event_->kForever);
}
virtual void WakeUp() {
cout << "mysocket wakeup" << endl;
event_->Set();
}
virtual Socket* CreateSocket(int type) {

}
virtual Socket* CreateSocket(int family, int type) {

}
virtual AsyncSocket* CreateAsyncSocket(int type) {

}
virtual AsyncSocket* CreateAsyncSocket(int family, int type) {

}
private:
Event *event_;

};

void signalExec(int int_id) {
cout << "exec signal" << endl;
exit(-1);
}

int main(int argc, char **argv) {
int ret = 0;

signal(SIGINT, signalExec);

MsgData data;
data.setStr("sliver");

MsgHandler handler;

Message *msg = new Message();
msg->message_id = 0;
msg->pdata = &data;
msg->phandler = &handler;

mysocket *m_socket = new mysocket();
MessageQueue queue(m_socket, true);

Location locate;
queue.Post(locate, &handler);
queue.Get(msg);

return 0;
}


示例简单的利用Event实现mysocket的阻塞和唤醒 模拟消息队列的Post和Get的行为

 示例代码git地址

https://github.com/sliver-chen/webRTCTutorial/blob/master/MessageQueue/MessageQueue.cpp

 总结

MessageQueue的流程还是有一点复杂的 并且使用方式也不像示例的这么简单 往后分析到Thread行为时 还需要结合起来分析消息队列的用法.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: