您的位置:首页 > 运维架构 > Linux

在linux下实现简单聊天系统(三)服务器

2018-03-12 15:38 447 查看
我们在在一个主线程进行创建监听套接字,创建线程个数个双向管道(用双向管道因为消息发送是全双工的),创建cpu内核个数-1个子线程,利用libevent监听监听套接字和双向管道的一端,接受客户端连接,为客户端选择处理消息的子线程。

在子线程中,我们分别创建三个子线程来分别完成相应的功能。子线程1负责通过sock_pair双向管道进行与主线程的信息交换,用同一个套接字cli_fd可以让用户选择不同模式,这里使用mvc模式控制,使用的是map表,control控制台。

子线程2用 libevent框架,其中双向管道读取主线程发送的客户端fd,绑定客户端fd处理函数,并给主线程回复当前监听个数。

子线程3用户首次登陆就将用户加入到memeached之中,在规定时间内如果再次登陆就可以在memached中的缓存数据进行查找验证,如果查找不到再进入数据库查找。

主线程代码:

pthread.h

#ifndef PTHREAD_H
#define PTHREAD_H
#include<iostream>
#include<map>
#include<event.h>
using namespace std;
class pthread
{
public:
pthread(int sock_fd);
~pthread();

private:
int _sock_fd;//管道1端
struct event_base *_base;//libevent
map<int,struct event*> _event_map;//存储事件的map表
pthread_t _pthread;//线程描述符

friend void sock_1_cb(int fd,short event,void *arg);
friend void cli_cb(int fd,short event,void *arg);
friend void* pth_run(void *);

};

#endif


pthread.cpp

#include<iostream>
#include<string>
#include<string.h>
//#include"control.h"
#include"view.h"
#include"pthread.h"
#include<event.h>
#include <stdlib.h>
#include <string.h>
#include <json/json.h>
#include<map>
#include<assert.h>

using namespace std;

void sock_1_cb(int fd,short event,void *arg);
void cli_cb(int fd,short event ,void *arg);
//extern  control control_sever;
void *pth_run(void *arg);

pthread::pthread(int sock_1_fd)
{
int _sock_1_fd = sock_1_fd;
_base=event_base_new();
//启动线程
int res;
res=pthread_create(&_pthread,NULL,pth_run,this);
if(res!=0)
{
cerr<<"pthread_run move fail\n"<<endl;
}

}

void *pth_run(void *arg)
{
pthread *this3=(pthread *)arg;
//将sock_pair_1加入到libevent  sock_pair_1_cb()
//添加到libevent
//创建事件
struct event_base* _base=(struct event_base*)arg;
struct event* listen_event = event_new(this3->_base,this3->_sock_fd,EV_READ|EV_PERSIST,sock_1_cb,this3);
if(NULL == listen_event)
{
cout<<"event new fail"<<endl;
return 0;
}

//将事件添加到事件列表
event_add(listen_event,NULL);
//this3->_event_map.insert(make_pair(this3->_sock_fd,listen_event));

//循环监听
event_base_dispatch(this3->_base);

}

void sock_1_cb(int fd,short event,void *arg)
{//recv -> clien_fd

//将client_fd加入到libevent     client_cb()
pthread *this4=(pthread *)arg;

char buf[16] = {0};
//int cli_fd;
if(-1 ==recv(fd,buf,15,0))
{
cout<<"accept fail"<<endl;
return;
}
/*else
{
cli_fd=atoi(buf);
Json::Value root;
Json::Reader read;
read.parse(buf,root);
if(root["reason_type"]==MSG_TYPE_EXIT)
{
map<int,struct event*>::iterator it=this4->_event_map.find(cli_fd);
event_free(it->second);
this4->_event_map.erase(it);
}
}*/
int cli_fd = atoi(buf);

//将clifd加入到事件列表
struct event* cli_event = event_new(this4->_base,cli_fd,EV_READ|EV_PERSIST,cli_cb,this4);
if(NULL == cli_event)
{
cout<<"cli event creat fail"<<endl;
return;
}

event_add(cli_event,NULL);

this4->_event_map.insert(make_pair(cli_fd,cli_event));
cout<<" 给主线程回复数据 "<<endl;
//给主线程回复当前监听的客户端数量
char buff[20]={0};
//sprintf(buff,"%s",this4->_event_map.size());
send(fd,buff,sizeof(buff),0);
}

void cli_cb(int fd,short event,void *arg)
{
//recv  ->buff
pthread* this5 = (pthread *)arg;
cout<<"1"<<endl;
char buff[100] = {0};

if(0< recv(fd,buff,99,0))
{
//cerr<<"pthead recv fail"<<endl;
}
/*else
{
close(fd);
}*/
//将buff发给control
if(fd==0)
{
map<int,struct event*>::iterator it=this5->_event_map.find(fd);
event_free(it->second);
}

//   control_sever.process(fd,buff);
}


tcpsever.h

#ifndef TCPSEVER_H
#define TCPSEVER_H
#include<map>
#include"pthread.h"
#include<vector>
#include<iostream>
using namespace std;
class arr
{
public:
arr(int brr[2])
{
crr[0]=brr[0];
crr[1]=brr[1];
}
int crr[2];
};

typedef class tcpsever
{
public:
tcpsever(char *ip,short port,int pth_num);
//~tcpsever();

//服务器运行
void run();
private:
int _listen_fd;//监听套接字
int _pth_num;//线程个数
//vector<int [2]>  _vec_sock;//socket_pair
struct event_base* _base;//libevent
vector<arr>_socket_pair;//socket pair vector
vector<pthread*>_pthread;//pthread vector
map<int,int> _pth_num_map;//存储线程负载量的map表

//创建管道
void create_socket_pair();
//创建子线程
void create_pth();

//监听套接子回调函数
friend void listen_cb(int fd,short event,void *arg);
//双向管道0端回调函数
friend void sock_0_cb(int fd,short event,void *arg);
}Tcpsever,*Ptcpsever;

#endif


tcpsever.cpp
#include<iostream>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<errno.h>
#include<event.h>
#include<map>
#include<stdlib.h>
#include"pthread.h"
#include"tcpsever.h"
#include<string.h>
using namespace std;

void listen_cb(int fd,short event,void* arg)
{
//接受链接
cout<<"listen"<<endl;
Tcpsever *this2=(Tcpsever*)arg;
struct sockaddr_in caddr;
socklen_t len = sizeof(caddr);
int cli_fd=accept(fd,(struct sockaddr*)&caddr,&len);
if(-1==cli_fd)
{
cerr<<"accept fail:errno:"<<errno<<endl;
return ;
}

char buf[20]={0};
sprintf(buf,"%d",cli_fd);
//if(buf=NULL)
//	return ;
//在map表查找,second最小的
map<int,int>::iterator it1 =this2->_pth_num_map.begin();
map<int,int>::iterator it2 =this2->_pth_num_map.begin();
for(;it1 !=this2->_pth_num_map.end();++it1)
{
if(it1->second <= it2->second)
{
it2 = it1;
}
}
cout<<it2->first<<";"<<it2->second<<endl;
//将客户端套接子通过socktpair发给子线程

if(0>send(it2->first,buf,strlen(buf),0))
{
cerr<<"it2 send fail;errno"<<errno<<endl;

}
cout<<"Yes"<<endl;
}
void sock_0_cb(int fd,short event,void *arg)
{
Tcpsever *this1 = (Tcpsever*)arg;
//读取管道内容
char buff[1024]={0};
if(recv(fd,buff,sizeof(buff),0)<0)
{
cout<<"read fail\n"<<endl;
}
//更新到map表
map<int,int>::iterator it=this1->_pth_num_map.begin();
for(;it!=this1->_pth_num_map.end();it++)
{
if(it->first==fd)
{
it->second=atoi(buff);
return;
}
}
cout<<"not find fd\n"<<endl;
}

Tcpsever::tcpsever(char *ip,short port,int pth_num)
{
int _listen_fd = socket(PF_INET,SOCK_STREAM,0);
if(-1 == _listen_fd)
{
cerr<<"fd creat fail;errno:"<<errno<<endl;
return;
}
cout<<"_listen_fd"<<_listen_fd<<endl;
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(port);
saddr.sin_addr.s_addr = inet_addr(ip);

if(-1 == bind(_listen_fd,(struct sockaddr*)&saddr,sizeof(saddr)))
{
cerr<<"bind fail;errno:"<<errno<<endl;
return;
}

if(-1 == listen(_listen_fd,20))
{
cerr<<"listen fail;errno:"<<errno<<endl;
return;
}

int _pth_num = pth_num;
cout<<"num="<<_pth_num<<endl;
//给libevent申请空间
_base = event_base_new();

//创建事件,绑定监听套接子的回调函数(listen_cb)
struct event* listen_event = event_new(_base,_listen_fd,EV_READ|EV_PERSIST,listen_cb,this);
if(NULL == listen_event)
{
cerr<<"event new fail;errno:"<<errno<<endl;
return;
}

//将事件添加到事件列表
event_add(listen_event,NULL);

cout<<"Tcpsever"<<endl;
}

//tcpsever::~tcpsever();

void Tcpsever::run()
{
//创建双向管道
create_socket_pair();

//启动规定个数个子线程
create_pth();
//为主线程的socktpair创建事件,绑定回调函数(sock_0_cb)
struct event* sock_pair_event = event_new(_base,_listen_fd,EV_READ|EV_PERSIST,sock_0_cb,this);
cout<<"we"<<endl;
if(NULL == sock_pair_event)
{
cerr<<"event new fail;errno:"<<errno<<endl;
return;
}

//启动libevent循环监听
event_add(sock_pair_event,NULL);
event_base_dispatch(_base);
}

void Tcpsever::create_socket_pair()
{
//创建   for()
int arr[2];
int res=socketpair(AF_UNIX,SOCK_STREAM,0,arr);
if(res<0)
{
cerr<<"socketpair create faile\n;errno"<<errno<<endl;
return;
}
//将双向管道加入到_sock_pair.push_back();
_socket_pair.push_back(arr);
cout<<"arr[0]"<<arr[0]<<endl;
//_pth_work_num.push_buck(makepair(arr[0],0));
_pth_num_map.insert(make_pair(arr[0],0));
}

void Tcpsever::create_pth()
{
//启动线
4000
程for()
int i;
for(i=0;i<3;i++)
{
pthread_t id=_socket_pair[0].crr[i];
}
//pthread_t id=_socket_pair[0].crr[1];
cout<<"create pthread"<<endl;
//传入管道1端
}


control.h

#ifndef CONTROL_H
#define CONTROL_H
#include "view.h"
#include <iostream>
#include <map>
#include<string>
using namespace std;

class Control
{
public:
Control();
//~Control();
virtual void process(int fd,char* json);
map<int,view*> _map;
};
#endif


control.cpp

#include"exit.h"
#include"talk_to_one.h"
#include"list.h"
#include "register.h"
#include "login.h"
#include "public.h"
#include "control.h"
#include <iostream>
#include <json/json.h>
#include <sys/socket.h>
#include <errno.h>
#include <stdlib.h>
#include <string>
#include <string.h>
#include <stdio.h>
#include <map>
#include<iterator>
using namespace std;

Control::Control()
{
_map.insert(make_pair(MSG_TYPE_REGISTER, new Register()));
_map.insert(make_pair(MSG_TYPE_LOGIN, new Login()));
_map.insert(make_pair(MSG_TYPE_TALK_TO_ONE, new Talk_to_one()));
//_map.insert(make_pai(MSG_TYPE_TALK_TO_GTOUP,new View_talk_to_group));
_map.insert(make_pair(MSG_TYPE_GET_LIST, new List()));
_map.insert(make_pair(MSG_TYPE_EXIT, new Exit()));
}

void Control::process(int fd,char *json)
{
//	char send_json[128] = {0};
//	sprintf(send_json, "%s", json);
//解析json,获取消息类型
Json::Value root;
Json::Reader read;
if(-1 == read.parse(json, root))
{
cerr<<"Control process json prase fail;errno:"<<errno<<endl;
return;
}

int type=root["reason_type"].asInt();

map<int, view*>::iterator it = _map.find(type);
it->second->process(fd, root);
it->second->response();
/*	switch(type)
{
case MSG_TYPE_LOGIN:
{
cout<<"login"<<endl;
map<int,view*>::iterator it=_map.find(type);
it->second->process(fd,root);
it->second->response();
break;
}

case MSG_TYPE_REGISTER:
{
map<int,view*>::iterator it=_map.find(type);
it->second->process(fd,root);
it->second->response();
break;
}
case MSG_TYPE_EXIT:
{
map<int,view*>::iterator it=_map.find(type);
it->second->process(fd,root);
it->second->response();
break;
}
case MSG_TYPE_GET_LIST:
{
map<int,view*>::iterator it=_map.find(type);
it->second->process(fd,root);
it->second->response();
break;
}
case MSG_TYPE_TALK_TO_ONE:
{
map<int,view*>::iterator it=_map.find(type);
it->second->process(fd,root);
it->second->response();
break;
}
default:
{
cout<<"type error"<<endl;
return;
}
}
}*/
/*	//根据消息类型在map中查找
map<int, View*>::iterator it = _map.find(root["reason_type"].asInt());
//判断是否找到
if(it == _map.end())
{
Json::Value val;
val["reaso_type"] = "new Type fail";

if(-1 == send(fd, val.toStyledString().c_str(), strlen(val.toStyledString().c_str()), 0))
{
cerr<<"Control process send fail;errno:"<<errno<<endl;
return;
}
}

it->second->process(fd,json);
it->second->response();
}*/
}
Control control_sever;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  聊天 linux