webRTC base模块MessageQueue消息队列的实现
2017-12-05 22:05
295 查看
MessgeQueue
MessageQueueManager
消息队列MessageQueue的管理器全局单例类
MessageQueueManager* MessageQueueManager::instance_ = NULL; // 全局变量 确保Manager是单例 MessageQueueManager* MessageQueueManager::Instance() { // C++ Instance常见套路 if (!instance_) instance_ = new MessageQueueManager; return instance_; }
MessageQueueManager中管理着一个MessageQueue向量
由stl vector构成
std::vector<MessageQueue *> message_queues_;
由此Add Remove Clear操作都是围绕消息队列的向量展开的.
1. 比如Add调用AddInternal也就是 message_queues_.push_back(xx)
2. Remove调用RemoveInternal等同于“`
伪代码:
1. 在向量中寻找和输入的message_queue一样的队列删除
2. 如果向量删除队列后为空则delete整个MessageQueueManager
3. Clear 调用ClearInternal
伪代码:
遍历向量
调用MessageQueue的Cleaar
## MessageQueue 消息队列用到了上一小节分析过的读写锁 [读写锁的实现](http://blog.csdn.net/qq_21358401/article/details/78714487) 还涉及到了信号与槽 以及socketserver ### MessageQueue的构造
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
: fStop_(false), fPeekKeep_(false),
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
if (!ss_) {
default_ss_.reset(new DefaultSocketServer());
ss_ = default_ss_.get();
}
ss_->SetMessageQueue(this);
if (init_queue) {
DoInit();
}
}
消息队列会创建socket server 跟踪代码可以发现socket server是抽象类 创建MessageQueue时一般会传入socket server的子类
class SocketServer : public SocketFactory {
public:
static const int kForever = -1;
virtual void SetMessageQueue(MessageQueue* queue) {}
virtual bool Wait(int cms, bool process_io) = 0;
virtual void WakeUp() = 0;
void set_network_binder(NetworkBinderInterface* binder) {
network_binder_ = binder;
}
NetworkBinderInterface* network_binder() const { return network_binder_; }
private:
NetworkBinderInterface* network_binder_ = nullptr;
};
继承来的子类必须实现Wait WakeUp SetMessageQueue set_network_binder视需要实现 MessageQueue构造时 若没有传入socket server 则会生成默认的default_ss_
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
: fStop_(false), fPeekKeep_(false),
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
if (!ss_) { // 没有传入socket server
default_ss_.reset(new DefaultSocketServer());
ss_ = default_ss_.get();
}
ss_->SetMessageQueue(this);
if (init_queue) {
DoInit();
}
}
MessageQueue Post
void MessageQueue::Post(MessageHandler* phandler, uint32_t id, MessageData* pdata, bool time_sensitive) { if (fStop_) return; { CritScope cs(&crit_); Message msg; msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; if (time_sensitive) { msg.ts_sensitive = Time() + kMaxMsgLatency; } msgq_.push_back(msg); // msgq_是一个stl链表 } WakeUpSocketServer(); // 唤醒socket server }
Post的机制很简单 就是将传入的消息用stl链表保存起来
随后唤醒socket server
MessageQueue Get
Get的处理流程比较复杂首先检查是否存在延时消息
{ CritScope cs(&crit_); // firstparst表示Get只检查一次延时消息 if (first_pass) { first_pass = false; while (!dmsgq_.empty()) { if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); break; } msgq_.push_back(dmsgq_.top().msg_); dmsgq_.pop();// 把到时的延时消息加入即时消息中 } } if (msgq_.empty()) { break; } else { *pmsg = msgq_.front(); msgq_.pop_front();// 返回第一个即时消息 } }
处理完消息之后 阻塞socket server
{ // Wait and multiplex in the meantime SharedScope ss(&ss_lock_); if (!ss_->Wait(cmsNext, process_io)) return false; }
注意运行到这里说明没有消息(有消息的话处理消息时会直接返回) 所以调用Wait 等待消息到来.
示例代码
#include <iostream> #include <queue> #include <memory> #include <string> #include <csignal> #include "pthread.h" #include "webrtc/base/messagequeue.h" #include "webrtc/base/socketserver.h" #include "webrtc/base/event.h" using namespace rtc; using namespace std; class MsgData : public MessageData { public: MsgData(){} ~MsgData(){} void setStr(string str) { unique_str_ = str; } string& getStr() { return unique_str_; } private: string unique_str_; }; class MsgHandler : public MessageHandler { public: virtual void OnMessage(Message* msg) { switch(msg->message_id) { case 0: { MsgData *data = (MsgData *) msg->pdata; cout << "data:" << data->getStr() << endl; } break; default: { cout << "unknow id" << msg->message_id << endl; } break; } } private: }; class mysocket : public SocketServer { public: mysocket() { event_ = new Event(true, true); event_->Reset(); } ~mysocket() { delete(event_); } virtual bool Wait(int cms, bool process_io) { cout << "mysocket wait" << endl; event_->Wait(event_->kForever); } virtual void WakeUp() { cout << "mysocket wakeup" << endl; event_->Set(); } virtual Socket* CreateSocket(int type) { } virtual Socket* CreateSocket(int family, int type) { } virtual AsyncSocket* CreateAsyncSocket(int type) { } virtual AsyncSocket* CreateAsyncSocket(int family, int type) { } private: Event *event_; }; void signalExec(int int_id) { cout << "exec signal" << endl; exit(-1); } int main(int argc, char **argv) { int ret = 0; signal(SIGINT, signalExec); MsgData data; data.setStr("sliver"); MsgHandler handler; Message *msg = new Message(); msg->message_id = 0; msg->pdata = &data; msg->phandler = &handler; mysocket *m_socket = new mysocket(); MessageQueue queue(m_socket, true); Location locate; queue.Post(locate, &handler); queue.Get(msg); return 0; }
示例简单的利用Event实现mysocket的阻塞和唤醒 模拟消息队列的Post和Get的行为
示例代码git地址
https://github.com/sliver-chen/webRTCTutorial/blob/master/MessageQueue/MessageQueue.cpp总结
MessageQueue的流程还是有一点复杂的 并且使用方式也不像示例的这么简单 往后分析到Thread行为时 还需要结合起来分析消息队列的用法.相关文章推荐
- webRTC base模块Event事件的实现
- redis list实现消息队列以及事件模块
- 异步消息队列zeromq实现服务器间高性能通信
- 用redis实现支持优先级的消息队列
- Linux进程间通信(IPC)编程实践(五)消息队列实现回射客户/服务器
- Python中线程的MQ消息队列实现以及消息队列的优点解析
- Python操作RabbitMQ服务器实现消息队列的路由功能
- 企业级 SpringBoot 教程 (十四) 在springboot中用redis实现消息队列
- Java利用Redis实现消息队列
- Linux下利用消息队列实现进程间通信
- redis实现消息队列
- [.NET领域驱动设计实战系列]专题八:DDD案例:网上书店分布式消息队列和分布式缓存的实现
- C#_使用微软消息队列实现C#进程间通信
- Redis使用总结(3):实现简单的消息队列
- 利用Redis 实现消息队列
- 消息队列实现多人聊天
- 利用消息队列实现两个进程之间的文件复制
- 使用消息队列+js实现分布式服务器热切换业务处理功能
- Redis笔记(七)Java实现Redis消息队列
- ActiveMQ实现消息队列