在linux下实现简单聊天系统(三)服务器
2018-03-12 15:38
447 查看
我们在在一个主线程进行创建监听套接字,创建线程个数个双向管道(用双向管道因为消息发送是全双工的),创建cpu内核个数-1个子线程,利用libevent监听监听套接字和双向管道的一端,接受客户端连接,为客户端选择处理消息的子线程。
在子线程中,我们分别创建三个子线程来分别完成相应的功能。子线程1负责通过sock_pair双向管道进行与主线程的信息交换,用同一个套接字cli_fd可以让用户选择不同模式,这里使用mvc模式控制,使用的是map表,control控制台。
子线程2用 libevent框架,其中双向管道读取主线程发送的客户端fd,绑定客户端fd处理函数,并给主线程回复当前监听个数。
子线程3用户首次登陆就将用户加入到memeached之中,在规定时间内如果再次登陆就可以在memached中的缓存数据进行查找验证,如果查找不到再进入数据库查找。
主线程代码:
pthread.h
pthread.cpp
tcpsever.h
tcpsever.cpp
control.h
control.cpp
在子线程中,我们分别创建三个子线程来分别完成相应的功能。子线程1负责通过sock_pair双向管道进行与主线程的信息交换,用同一个套接字cli_fd可以让用户选择不同模式,这里使用mvc模式控制,使用的是map表,control控制台。
子线程2用 libevent框架,其中双向管道读取主线程发送的客户端fd,绑定客户端fd处理函数,并给主线程回复当前监听个数。
子线程3用户首次登陆就将用户加入到memeached之中,在规定时间内如果再次登陆就可以在memached中的缓存数据进行查找验证,如果查找不到再进入数据库查找。
主线程代码:
pthread.h
#ifndef PTHREAD_H #define PTHREAD_H #include<iostream> #include<map> #include<event.h> using namespace std; class pthread { public: pthread(int sock_fd); ~pthread(); private: int _sock_fd;//管道1端 struct event_base *_base;//libevent map<int,struct event*> _event_map;//存储事件的map表 pthread_t _pthread;//线程描述符 friend void sock_1_cb(int fd,short event,void *arg); friend void cli_cb(int fd,short event,void *arg); friend void* pth_run(void *); }; #endif
pthread.cpp
#include<iostream> #include<string> #include<string.h> //#include"control.h" #include"view.h" #include"pthread.h" #include<event.h> #include <stdlib.h> #include <string.h> #include <json/json.h> #include<map> #include<assert.h> using namespace std; void sock_1_cb(int fd,short event,void *arg); void cli_cb(int fd,short event ,void *arg); //extern control control_sever; void *pth_run(void *arg); pthread::pthread(int sock_1_fd) { int _sock_1_fd = sock_1_fd; _base=event_base_new(); //启动线程 int res; res=pthread_create(&_pthread,NULL,pth_run,this); if(res!=0) { cerr<<"pthread_run move fail\n"<<endl; } } void *pth_run(void *arg) { pthread *this3=(pthread *)arg; //将sock_pair_1加入到libevent sock_pair_1_cb() //添加到libevent //创建事件 struct event_base* _base=(struct event_base*)arg; struct event* listen_event = event_new(this3->_base,this3->_sock_fd,EV_READ|EV_PERSIST,sock_1_cb,this3); if(NULL == listen_event) { cout<<"event new fail"<<endl; return 0; } //将事件添加到事件列表 event_add(listen_event,NULL); //this3->_event_map.insert(make_pair(this3->_sock_fd,listen_event)); //循环监听 event_base_dispatch(this3->_base); } void sock_1_cb(int fd,short event,void *arg) {//recv -> clien_fd //将client_fd加入到libevent client_cb() pthread *this4=(pthread *)arg; char buf[16] = {0}; //int cli_fd; if(-1 ==recv(fd,buf,15,0)) { cout<<"accept fail"<<endl; return; } /*else { cli_fd=atoi(buf); Json::Value root; Json::Reader read; read.parse(buf,root); if(root["reason_type"]==MSG_TYPE_EXIT) { map<int,struct event*>::iterator it=this4->_event_map.find(cli_fd); event_free(it->second); this4->_event_map.erase(it); } }*/ int cli_fd = atoi(buf); //将clifd加入到事件列表 struct event* cli_event = event_new(this4->_base,cli_fd,EV_READ|EV_PERSIST,cli_cb,this4); if(NULL == cli_event) { cout<<"cli event creat fail"<<endl; return; } event_add(cli_event,NULL); this4->_event_map.insert(make_pair(cli_fd,cli_event)); cout<<" 给主线程回复数据 "<<endl; //给主线程回复当前监听的客户端数量 char buff[20]={0}; //sprintf(buff,"%s",this4->_event_map.size()); send(fd,buff,sizeof(buff),0); } void cli_cb(int fd,short event,void *arg) { //recv ->buff pthread* this5 = (pthread *)arg; cout<<"1"<<endl; char buff[100] = {0}; if(0< recv(fd,buff,99,0)) { //cerr<<"pthead recv fail"<<endl; } /*else { close(fd); }*/ //将buff发给control if(fd==0) { map<int,struct event*>::iterator it=this5->_event_map.find(fd); event_free(it->second); } // control_sever.process(fd,buff); }
tcpsever.h
#ifndef TCPSEVER_H #define TCPSEVER_H #include<map> #include"pthread.h" #include<vector> #include<iostream> using namespace std; class arr { public: arr(int brr[2]) { crr[0]=brr[0]; crr[1]=brr[1]; } int crr[2]; }; typedef class tcpsever { public: tcpsever(char *ip,short port,int pth_num); //~tcpsever(); //服务器运行 void run(); private: int _listen_fd;//监听套接字 int _pth_num;//线程个数 //vector<int [2]> _vec_sock;//socket_pair struct event_base* _base;//libevent vector<arr>_socket_pair;//socket pair vector vector<pthread*>_pthread;//pthread vector map<int,int> _pth_num_map;//存储线程负载量的map表 //创建管道 void create_socket_pair(); //创建子线程 void create_pth(); //监听套接子回调函数 friend void listen_cb(int fd,short event,void *arg); //双向管道0端回调函数 friend void sock_0_cb(int fd,short event,void *arg); }Tcpsever,*Ptcpsever; #endif
tcpsever.cpp
#include<iostream> #include<arpa/inet.h> #include<sys/socket.h> #include<errno.h> #include<event.h> #include<map> #include<stdlib.h> #include"pthread.h" #include"tcpsever.h" #include<string.h> using namespace std; void listen_cb(int fd,short event,void* arg) { //接受链接 cout<<"listen"<<endl; Tcpsever *this2=(Tcpsever*)arg; struct sockaddr_in caddr; socklen_t len = sizeof(caddr); int cli_fd=accept(fd,(struct sockaddr*)&caddr,&len); if(-1==cli_fd) { cerr<<"accept fail:errno:"<<errno<<endl; return ; } char buf[20]={0}; sprintf(buf,"%d",cli_fd); //if(buf=NULL) // return ; //在map表查找,second最小的 map<int,int>::iterator it1 =this2->_pth_num_map.begin(); map<int,int>::iterator it2 =this2->_pth_num_map.begin(); for(;it1 !=this2->_pth_num_map.end();++it1) { if(it1->second <= it2->second) { it2 = it1; } } cout<<it2->first<<";"<<it2->second<<endl; //将客户端套接子通过socktpair发给子线程 if(0>send(it2->first,buf,strlen(buf),0)) { cerr<<"it2 send fail;errno"<<errno<<endl; } cout<<"Yes"<<endl; } void sock_0_cb(int fd,short event,void *arg) { Tcpsever *this1 = (Tcpsever*)arg; //读取管道内容 char buff[1024]={0}; if(recv(fd,buff,sizeof(buff),0)<0) { cout<<"read fail\n"<<endl; } //更新到map表 map<int,int>::iterator it=this1->_pth_num_map.begin(); for(;it!=this1->_pth_num_map.end();it++) { if(it->first==fd) { it->second=atoi(buff); return; } } cout<<"not find fd\n"<<endl; } Tcpsever::tcpsever(char *ip,short port,int pth_num) { int _listen_fd = socket(PF_INET,SOCK_STREAM,0); if(-1 == _listen_fd) { cerr<<"fd creat fail;errno:"<<errno<<endl; return; } cout<<"_listen_fd"<<_listen_fd<<endl; struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(port); saddr.sin_addr.s_addr = inet_addr(ip); if(-1 == bind(_listen_fd,(struct sockaddr*)&saddr,sizeof(saddr))) { cerr<<"bind fail;errno:"<<errno<<endl; return; } if(-1 == listen(_listen_fd,20)) { cerr<<"listen fail;errno:"<<errno<<endl; return; } int _pth_num = pth_num; cout<<"num="<<_pth_num<<endl; //给libevent申请空间 _base = event_base_new(); //创建事件,绑定监听套接子的回调函数(listen_cb) struct event* listen_event = event_new(_base,_listen_fd,EV_READ|EV_PERSIST,listen_cb,this); if(NULL == listen_event) { cerr<<"event new fail;errno:"<<errno<<endl; return; } //将事件添加到事件列表 event_add(listen_event,NULL); cout<<"Tcpsever"<<endl; } //tcpsever::~tcpsever(); void Tcpsever::run() { //创建双向管道 create_socket_pair(); //启动规定个数个子线程 create_pth(); //为主线程的socktpair创建事件,绑定回调函数(sock_0_cb) struct event* sock_pair_event = event_new(_base,_listen_fd,EV_READ|EV_PERSIST,sock_0_cb,this); cout<<"we"<<endl; if(NULL == sock_pair_event) { cerr<<"event new fail;errno:"<<errno<<endl; return; } //启动libevent循环监听 event_add(sock_pair_event,NULL); event_base_dispatch(_base); } void Tcpsever::create_socket_pair() { //创建 for() int arr[2]; int res=socketpair(AF_UNIX,SOCK_STREAM,0,arr); if(res<0) { cerr<<"socketpair create faile\n;errno"<<errno<<endl; return; } //将双向管道加入到_sock_pair.push_back(); _socket_pair.push_back(arr); cout<<"arr[0]"<<arr[0]<<endl; //_pth_work_num.push_buck(makepair(arr[0],0)); _pth_num_map.insert(make_pair(arr[0],0)); } void Tcpsever::create_pth() { //启动线 4000 程for() int i; for(i=0;i<3;i++) { pthread_t id=_socket_pair[0].crr[i]; } //pthread_t id=_socket_pair[0].crr[1]; cout<<"create pthread"<<endl; //传入管道1端 }
control.h
#ifndef CONTROL_H #define CONTROL_H #include "view.h" #include <iostream> #include <map> #include<string> using namespace std; class Control { public: Control(); //~Control(); virtual void process(int fd,char* json); map<int,view*> _map; }; #endif
control.cpp
#include"exit.h" #include"talk_to_one.h" #include"list.h" #include "register.h" #include "login.h" #include "public.h" #include "control.h" #include <iostream> #include <json/json.h> #include <sys/socket.h> #include <errno.h> #include <stdlib.h> #include <string> #include <string.h> #include <stdio.h> #include <map> #include<iterator> using namespace std; Control::Control() { _map.insert(make_pair(MSG_TYPE_REGISTER, new Register())); _map.insert(make_pair(MSG_TYPE_LOGIN, new Login())); _map.insert(make_pair(MSG_TYPE_TALK_TO_ONE, new Talk_to_one())); //_map.insert(make_pai(MSG_TYPE_TALK_TO_GTOUP,new View_talk_to_group)); _map.insert(make_pair(MSG_TYPE_GET_LIST, new List())); _map.insert(make_pair(MSG_TYPE_EXIT, new Exit())); } void Control::process(int fd,char *json) { // char send_json[128] = {0}; // sprintf(send_json, "%s", json); //解析json,获取消息类型 Json::Value root; Json::Reader read; if(-1 == read.parse(json, root)) { cerr<<"Control process json prase fail;errno:"<<errno<<endl; return; } int type=root["reason_type"].asInt(); map<int, view*>::iterator it = _map.find(type); it->second->process(fd, root); it->second->response(); /* switch(type) { case MSG_TYPE_LOGIN: { cout<<"login"<<endl; map<int,view*>::iterator it=_map.find(type); it->second->process(fd,root); it->second->response(); break; } case MSG_TYPE_REGISTER: { map<int,view*>::iterator it=_map.find(type); it->second->process(fd,root); it->second->response(); break; } case MSG_TYPE_EXIT: { map<int,view*>::iterator it=_map.find(type); it->second->process(fd,root); it->second->response(); break; } case MSG_TYPE_GET_LIST: { map<int,view*>::iterator it=_map.find(type); it->second->process(fd,root); it->second->response(); break; } case MSG_TYPE_TALK_TO_ONE: { map<int,view*>::iterator it=_map.find(type); it->second->process(fd,root); it->second->response(); break; } default: { cout<<"type error"<<endl; return; } } }*/ /* //根据消息类型在map中查找 map<int, View*>::iterator it = _map.find(root["reason_type"].asInt()); //判断是否找到 if(it == _map.end()) { Json::Value val; val["reaso_type"] = "new Type fail"; if(-1 == send(fd, val.toStyledString().c_str(), strlen(val.toStyledString().c_str()), 0)) { cerr<<"Control process send fail;errno:"<<errno<<endl; return; } } it->second->process(fd,json); it->second->response(); }*/ } Control control_sever;
相关文章推荐
- 在linux下实现简单聊天系统(二)客户端的具体实现
- C#使用Socket实现服务器与多个客户端通信(简单的聊天系统)
- 在linux下实现简单聊天系统(一)项目基本框架内容
- 使用 ipmi实现Linux系统下对服务器的管理
- Linux网络编程:一个简单的正向代理服务器的实现
- Linux 服务器集群系统实现方案详解
- 使用 ipmitool 实现 Linux 系统下对服务器的 ipmi 管理
- Linux 服务器集群系统实现方案详解
- Linux 系统批量管理工具介绍,如何实现对一万台服务器的同时批量管理?
- Linux P2P音视频聊天系统实现步骤
- Linux下select函数实现的聊天服务器
- 快速掌握DWR开发图解-4 Reverse Ajax,服务器推技术实现简单聊天
- Linux 服务器集群系统实现方案详解
- linux系统Qt实现简单的任务管理器 推荐
- Linux下select函数实现的聊天服务器
- 用RMI实现一个简单的实时聊天系统(java语言)
- Linux 服务器集群系统实现方案详解
- 使用 ipmitool 实现 Linux 系统下对服务器的 ipmi 管理
- Linux服务器系统监控框架与MSN、E-mail、手机短信报警的实现