您的位置:首页 > 理论基础 > 计算机网络

C++ 网络编程:一个可复用的套接字管理类和一个简单用例

2016-03-16 16:52 1006 查看

C++网络编程公共类

本文章使用C++提供的几个基本网络编程相关调用,进行封装,提供必要的出错处理等功能,用select实现对端口数据的接收。

目标:实现一个通用的套接字管理类

create

connect

bind

listen

accept

recv

close

头文件public.h

#ifndef __SOCKET_MGR_H__
#define __SOCKET_MGR_H__
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <sys/ioctl.h>
#include <sys/select.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>

#define MAXSOCKFD 1024
#define MAX_MSG_BUF_HEAD 8
#define MAX_MSG_BUF 4000       //1024
#define BUFFLEN 1024
#define MAXCLIENT 50
#define RETRY_SEND 5

#define QUERY_SLAVE_TIME_INTERVAL 2*60*1000*1000  /*2 minutes*/

typedef struct fsocket_info fsock_info;

class CSocketMgr
{
public:
CSocketMgr();
~CSocketMgr();

/*停止本地服务*/
int stop_sock_mgr();
int sock_create(int, int, int);
int sock_connect(int, struct sockaddr *, int);
int sock_bind(int, struct sockaddr *, int);
int sock_listen(int, int);
int sock_send(int, const char *, unsigned int ,int);
int sock_recv(int, char *, unsigned int *, int);
void sock_close(int *fd);

/*循环接收命令接收线程*/
static void* s_accept_routine(void* context);
static void *s_recv_routine(void *context);
int send_request(char *, int);
int accept_routine(int, fsock_info *, int);
int recv_routine(fsock_info *);
void *set_fd_to_arry(int, struct sockaddr_in, fsock_info *, int);

int recv_msg(fsock_info *);

int sync_send_and_recv(char const *ip, int port, const char *, char *, unsigned int send_len, unsigned int *recv_len);
int sync_send(char const *ip, int port, const char * buf, int len);
private:
int ser_sock_fd;
pthread_t accept_thread;
pthread_t recv_thread;
bool _running;
int _socket;//本地socket
pthread_t client_thread[MAXCLIENT];
CXmlBuilder _node_info;
MutexLock _mutex;
char *_pBuf;
/*可以组合公共网络类*/
};

typedef struct fsocket_info
{
bool sock_fd_use_flag;
time_t accept_time;
int sock_fd;
struct sockaddr_in client_addr;
} fsock_info;

typedef struct ObjectFd
{
int listen_type;
fsock_info *watch_sock;
class CSocketMgr _sock_mgr;
} ObjectFd;

typedef struct Client_Info
{
char ip[16];
int port;
class CSocketMgr *client_this;
} Client_Info;

#endif
//定义结束


具体实现public.cpp

#include "public.h"

pthread_mutex_t my_mutex;

#define min(a, b) ((a) > (b) ? (b) :(a))
#define RECV_BUF 4*1024*1024

CSocketMgr::CSocketMgr()//公共网络模块的构造
: _pBuf(NULL)
{
_pBuf = (char *)calloc(1, RECV_BUF);
if (_pBuf == NULL)
{
printf("there isn't enough memory to be allocatede\n");
}
}

CSocketMgr::~CSocketMgr()
{
if (_pBuf != NULL)
{
free(_pBuf);
}
}

int CSocketMgr::sock_create(int af, int type, int protocol)//创建套接字
{
return socket(af, type, protocol);
}

int CSocketMgr::sock_connect(int cli_sock_fd, struct sockaddr *serv_addr, int len)//连接套接字
{
int ret = 0;
int retry_count = 0;

RETRY_COUNT:
do
{
ret = connect(cli_sock_fd, serv_addr, len);
} while (ret < 0 && errno == EINTR);
if (ret == -1)
{
printf("sock connect failed:%s, errno=%d\n", strerror(errno), errno);
retry_count++;
if (retry_count < 5)
{
goto RETRY_COUNT;
}
ret = -1;
}

return ret;
}

int CSocketMgr::sock_bind(int ser_sock_fd, struct sockaddr *serv_addr, int len)//绑定套接字
{
int ret = -1;
/*an error maybe happended when bind, as sometimes when device reboot, the port which will be used not be accured, but bind failed*/
for (int i = 0; i < 2*60; i++)
{
ret = bind(ser_sock_fd, serv_addr, len);
if (ret == 0)
{
break;
}
else
{
printf("bind error: %s\n", strerror(errno));
usleep(1000000);
}
}
return ret;
}

int CSocketMgr::sock_listen(int ser_sock_fd, int maxsockfd)//监听套接字
{
return listen(ser_sock_fd, maxsockfd);
}

void *CSocketMgr::s_recv_routine(void * context)
{
ObjectFd *object_fd = (ObjectFd *)context;
CSocketMgr *_this = &object_fd->_sock_mgr;
fsock_info *sock_info = object_fd->watch_sock;

if (_this != NULL)
{
_this->recv_routine(sock_info);
}
return NULL;
}

int CSocketMgr::recv_msg(fsock_info *sock_info)//接受消息
{
if (_pBuf != NULL)
{
memset(_pBuf, 0, RECV_BUF);
}
int len = RECV_BUF;
char *msg_buf = _pBuf;

int ret = 0;
int bytes_read;
int bytes_left = len;
int i = 0;
int msg_size = 0;

struct timeval tv;
tv.tv_sec = 2;
tv.tv_usec = 0;
setsockopt(sock_info->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

do
{
while((bytes_read = recv(sock_info->sock_fd, msg_buf, bytes_left, 0)) == -1 && errno == EINTR);
if(bytes_read < 0)
{
printf("recv_msg %s %d\n", strerror(errno), errno);
ret = -1;
goto finial_exit;
}
if ((bytes_read >= 8) && (i == 0))
{
i++;
memcpy(&msg_size, msg_buf + 4, 4);
}

bytes_left -= bytes_read;
msg_buf += bytes_read;
} while (len - bytes_left < msg_size);

//printf("recv_msg  = %d\n", msg_size);
if ((ret == 0) && (msg_size >= 8))
{
msg_porc(sock_info, _pBuf);//根据套接字的内容进行相应操作,根据具体项目有所不同
}

finial_exit:
//printf("recv_msg len %d\n", len - bytes_left);
return len - bytes_left;

}

int CSocketMgr::recv_routine(fsock_info *sock_info)//select实现recv
{
int i;
int ret = -1;
int maxfd = -1;
fd_set readfds;
struct timeval tvl;

tvl.tv_sec = 2;
tvl.tv_usec = 0;

printf("enter the recv routine\n");
while (1/*此处需要用变量*/)
{
i = 0;
FD_ZERO(&readfds);
while(i < MAXSOCKFD)
{
pthread_mutex_lock(&my_mutex);
if(sock_info[i].sock_fd > 0)
{
FD_SET(sock_info[i].sock_fd, &readfds);
if (maxfd < sock_info[i].sock_fd)
{
maxfd = sock_info[i].sock_fd;
}
}
pthread_mutex_unlock(&my_mutex);
i++;
}

ret = select(maxfd + 1, &readfds, NULL, NULL, &tvl);
if (ret > 0)
{
i = 0;
while (i < MAXSOCKFD)
{
pthread_mutex_lock(&my_mutex);
if (sock_info[i].sock_fd > 0)
{
if (FD_ISSET(sock_info[i].sock_fd, &readfds))
{
ret = recv_msg(&sock_info[i]);
}
}
pthread_mutex_unlock(&my_mutex);
i++;
}
}

usleep(10000);

}
return 0;
}

int CSocketMgr::accept_routine(int ser_sock_fd, fsock_info *local_watch_sock, int is_need_heart)/*accept实现,将accept的套接字填入结构体数组,供select监测*/
{
int accept_sock_fd = -1;
struct sockaddr_in cli_addr;
int len = sizeof(cli_addr);

while (1/*此处需要用变量*/)
{
accept_sock_fd = accept(ser_sock_fd, (struct sockaddr*)&cli_addr, (socklen_t *)&len);
if (accept_sock_fd != -1)
{
set_fd_to_arry(accept_sock_fd, cli_addr, local_watch_sock, is_need_heart);
}
usleep(20000);
}
close(ser_sock_fd);

return 0;
}

void *CSocketMgr::set_fd_to_arry(int accept_sock_fd,struct sockaddr_in cli_addr, fsock_info *watch_sock, int is_need_heart)//功能同上
{
int i = 0;

pthread_mutex_lock(&my_mutex);
for (i = 0; i < MAXSOCKFD; i++)
{
if (watch_sock[i].sock_fd > 0)
{
continue;
}

watch_sock[i].sock_fd = accept_sock_fd;
memcpy(&(watch_sock[i].client_addr), &cli_addr, sizeof(cli_addr));
if (is_need_heart)
{
time_t timep;
time(&timep);
watch_sock[i].accept_time = timep;
}
break;
}
pthread_mutex_unlock(&my_mutex);
if (i >= MAXSOCKFD)
{
printf("listen queue is full!");
}
return NULL;
}

int CSocketMgr::sock_send(int sock_fd, const char *buf, unsigned int len, int flag)
{
ssize_t ret;
size_t bytes_sent = 0;
int retrycount = 0;

_mutex.lock();
do
{
if (sock_fd <= 0)
{
printf("sock_fd error!\n");
_mutex.unlock();
return -1;
}
ret = send(sock_fd, (char*)buf + bytes_sent, min(len - bytes_sent, 0x2000), flag);
if (ret <= 0)
{
printf("errno=%d failed info:%s\n", errno, strerror(errno));
if ((errno == ENOBUFS) || (errno == EINTR))
{
usleep(1000);
continue;
}

if (++retrycount > RETRY_SEND)
{
_mutex.unlock();
return bytes_sent;
}
usleep(10000);
}
else
{
bytes_sent += ret;
}
}
while (bytes_sent < len);
//printf("bytes_sent = %d\n", (int)bytes_sent);
_mutex.unlock();
return bytes_sent;
}

int CSocketMgr::sock_recv(int sock_fd, char *buf, unsigned int *len, int flag)
{
int i = 0;
int msg_size = 0;
int ret = 0;
int bytes_left;
int bytes_read;
int net_flag = 1;
int length_flag = 1;
struct timeval tv;

tv.tv_sec = 2;
tv.tv_usec = 0;
bytes_left = (int)(*len);

_mutex.lock();
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

do
{
while((bytes_read = recv(sock_fd, buf, bytes_left, flag)) == -1 && errno == EINTR);
if(bytes_read < 0)
{
printf("recv_msg %s %d\n", strerror(errno), errno);
ret = -1;
net_flag = 0;
goto finial_exit;
}
if ((bytes_read >= 8) && (i == 0))
{
i++;
memcpy(&msg_size, buf + 4, 4);
}

/*add code*/
if (msg_size > (int)*len)
{
length_flag = 0;
printf("length is not enough!\n");
goto finial_exit;
}
/*add code*/

bytes_left -= bytes_read;
buf += bytes_read;
} while ((int)*len - bytes_left < msg_size);

finial_exit:
if (!net_flag)
{
_mutex.unlock();
return ret;
}
if (!length_flag)
{
_mutex.unlock();
return -1;
}
*len -= bytes_left;
_mutex.unlock();
return (*len);
}

/*同步收发接口*/
int CSocketMgr::sync_send_and_recv(const char *ip, int port, const char *send_buf, char *recv_buf, unsigned int send_len, unsigned int *recv_len)
{
int ret = -1;
int socket_fd = -1;
struct timeval tv;
struct sockaddr_in serv_addr;

if ((!ip) || (!send_buf) || (!recv_buf))
{
printf("there exist NULL pointer\n");
return -1;
}

socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1)
{
printf("create socket failed: %s %d!\n", strerror(errno), errno);
return -1;
}

tv.tv_sec = 3;
tv.tv_usec = 0;
setsockopt(socket_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

printf("sockMgr's ip = %s  port = %d\n", ip, port);
bzero(&serv_addr,sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = inet_addr(ip);

ret = sock_connect(socket_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret == -1)
{
printf("socket connect %s failed, the reason is %s\n", ip, strerror(errno));
close(socket_fd);
return -1;
}

ret = sock_send(socket_fd, (char *)send_buf, send_len, 0);
if (ret < (int)send_len)
{
printf("failed to send info\n");
close(socket_fd);
return -1;
}

ret = sock_recv(socket_fd, recv_buf, recv_len, 0);   //需要后续考虑recv_buf
if (ret < 0)
{
printf("failed to recv info\n");
close(socket_fd);
return -1;
}
if (socket_fd > 0)
{
close(socket_fd);
}
return 0;
}

/*同步发送接口*/
int CSocketMgr::sync_send(char const *ip, int port, const char *buf, int len)
{
int ret = -1;
int socket_fd = -1;
struct timeval tv;
struct sockaddr_in serv_addr;

if ((!ip) || (!buf))
{
printf("there exist NULL pointer\n");
return -1;
}

socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1)
{
printf("create socket failed: %s %d!\n", strerror(errno), errno);
return -1;
}

tv.tv_sec = 2;
tv.tv_usec = 0;
setsockopt(socket_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));

bzero(&serv_addr,sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = inet_addr(ip);

ret = sock_connect(socket_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret == -1)
{
close(socket_fd);
return -1;
}

ret = sock_send(socket_fd, (char *)buf, len, 0);
if (ret < len)
{
printf("failed to send info\n");
close(socket_fd);
return -1;
}

if (socket_fd > 0)
{
close(socket_fd);
}
return 0;
}

void CSocketMgr::sock_close( int *fd )
{
if(*fd <= 0)
return;
close(*fd);
*fd = -1;
}


一个使用该类实现网络监听的例子example.h

#include "SocketMgr.h"

#define UNIX_DOMAIN "/tmp/unix.domain"//我们创建一个本地域套接字

class CLocalListener
{
public:
CLocalListener(void);
~CLocalListener(void);

//我们只演示如何用公共类开启一个本地监听服务
int start_local_service();

private:
int ser_sock_fd;
pthread_t recv_thread;
int _socket;//本地socket
CSocketMgr _sock_mgr;
};


具体实现example.cpp

int CLocalListener::start_local_service()
{

int ret = -1;
ObjectFd object_fd;//封装的fd结构体
struct sockaddr_un serv_addr;
fsock_info local_watch_sock[MAXSOCKFD];

memset(local_watch_sock, 0, sizeof(local_watch_sock));
ser_sock_fd = _sock_mgr.sock_create(AF_UNIX, SOCK_STREAM, 0);//创建本地套接字
if (ser_sock_fd == -1)
{
perror("socket failed!");
return NAZ_ERR_SOCK_ERROR;
}

bzero(&serv_addr,sizeof(serv_addr));
serv_addr.sun_family = AF_UNIX;
unlink(UNIX_DOMAIN);
memcpy(serv_addr.sun_path, UNIX_DOMAIN, sizeof(serv_addr.sun_path) - 1);//设置绑定所需的参数
ret = _sock_mgr.sock_bind(ser_sock_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));//绑定套接字
if (ret == -1)
{
perror("bind failed!");
close(ser_sock_fd);
return NAZ_ERR_SOCK_ERROR;
}
_sock_mgr.sock_listen(ser_sock_fd, MAXSOCKFD);//监听套接字

object_fd.watch_sock = local_watch_sock;
object_fd._sock_mgr = _sock_mgr;
//创建接收线程,满足多个请求连接到该套接字的情况
if (pthread_create(&recv_thread, NULL, _sock_mgr.s_recv_routine, (void *)&object_fd) != 0)
{
perror("pthread_create failed:\n");
close(ser_sock_fd);
return (-1);
}
//accept,将accept的套接字更新到local_watch_sock数组中,由上边的recv线程收取并采取相应的操作
_sock_mgr.accept_routine(ser_sock_fd, local_watch_sock, 0);
pthread_join(recv_thread, NULL);

if (ser_sock_fd > 0)
{
close(ser_sock_fd);
}

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: