您的位置:首页 > 数据库 > Redis

redis源码学习4 ae.c 事件循环

2018-01-19 18:23 369 查看
基于一个定时器的事件循环,每次循环取已被触发的事件做处理

先列模块
ae.c event loop 主模块
ae_evport.c 负责调用 evport 的接口
ae_epoll.c 负责调用 epoll 的接口
ae_kqueue.c 负责调用 kqueue 的接口
ae_select.c 负责调用 select 的接口

注意
evport, epoll, kqueue, select 按是否支持而选择使用最前者,性能递减
一个客户端一个文件描述符
主要函数

`aeCreateFileEvent()` 创建文件事件
`aeApiPoll()` 取命中的事件
`aeProcessEvents()` 处理事件,先文件事件,后时间事件
`processTimeEvents()` 处理定时事件

写了一个精简版的服务器,框架就是redis抄下来的,直接看server.c 里调用的 ae*() 函数就明白调用流程了

server.c 代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include "ae.h"

#define IPADDR      "127.0.0.1"
#define PORT        8787
#define MAXLINE     1024
#define LISTEN_BACKLOG     5
#define CLIENT_SIZE        2
#define AE_SIZE (CLIENT_SIZE + 32)

#define UNUSED(V) ((void) V)
void readQueryFromClient(struct aeEventLoop *el, int fd, void *privdata, int mask);

typedef struct aClient {
int fd;
} aClient;

typedef struct aServer {
int ipfd;
aeEventLoop *el;
aClient clients[CLIENT_SIZE];
long long stat_numconnections;
int hz;
} aServer;
static aServer server;

/*===========================================================================*/

int listenToPort(const char* ip,int port)
{
int fd;
struct sockaddr_in servaddr;
fd = socket(AF_INET, SOCK_STREAM,0);
if (fd == -1) {
fprintf(stderr, "create socket fail,erron:%d,reason:%s\n",
errno, strerror(errno));
return -1;
}
server.ipfd = fd;

/*一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。*/
int reuse = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
return -1;
}

/* 设置非阻塞, accept() 时需要 */
if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
fprintf(stderr,"set non block error\n");
return -1;
}

bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
inet_pton(AF_INET,ip,&servaddr.sin_addr);

if (bind(fd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) {
perror("bind error: ");
close(fd);
return -1;
}

if (listen(fd,LISTEN_BACKLOG) == -1)
{
perror("listen error: ");
close(fd);
return -1;
}

return 1;
}

// 定时回调
int serverCron(struct aeEventLoop *el, long long id, void *clientData)
{
UNUSED(el);
UNUSED(id);
UNUSED(clientData);
return 1000/server.hz;
}

// tcp 接收器
void acceptTcpHandler(struct aeEventLoop *el, int fd, void *privdata, int mask)
{
int max = 1000, tmpval;
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

while(max--) {
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
cliaddrlen = sizeof(cliaddr);
int clifd;

clifd = accept(fd,(struct sockaddr*)&cliaddr,&cliaddrlen);
if (clifd == -1) {
if (errno == EINTR)
continue;

if (errno != EWOULDBLOCK)
fprintf(stderr, "accept fail,error:%s\n", strerror(errno));
return;
}

tmpval = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &tmpval, sizeof(tmpval)) == -1)
{
return;
}

fprintf(stdout, "accept a new client: %s:%d\n",
inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);

/* 将新客户端添加到数组中 */
int i = 0;
for (i = 0; i < CLIENT_SIZE; i++) {
if (server.clients[i].fd < 0) {
server.clients[i].fd = clifd;
if (aeCreateFileEvent(server.el, clifd, AE_READABLE,
readQueryFromClient, &server.clients[i]) == AE_ERR)
{
fprintf(stdout, "client client error: %s:%d\n",
inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
server.clients[i].fd = -1;
close(clifd);
return;
}
server.stat_numconnections++;
break;
}
}

/* 加不进新的客户端 */
if (i == CLIENT_SIZE) {
fprintf(stderr,"too many clients.\n");
close(clifd);
return;
}
}
}

// 客户端消息处理器
void readQueryFromClient(struct aeEventLoop *el, int fd, void *privdata, int mask)
{
UNUSED(mask);
int n;
char buf[MAXLINE] = {0};
aClient *client = (aClient *)privdata;

//接收客户端发送的信息
n = (int)read(fd, buf, MAXLINE);
if (n <= 0) {
printf("recv %d\n", n);
/*n==0表示读取完成,客户端关闭套接字*/
aeDeleteFileEvent(el, fd, AE_READABLE);
aeDeleteFileEvent(el, fd, AE_WRITABLE);
close(fd);
client->fd = -1;
server.stat_numconnections--;
return;
}
printf("recv buf is:%s, len:%d\n", buf, (int)strlen(buf));
write(fd, buf, strlen(buf) +1);
}

static int server_init()
{
int i;
for (i = 0; i < CLIENT_SIZE; ++i) {
server.clients[i].fd = -1;
}
server.stat_numconnections = 0;
server.hz = 10;

server.el = aeCreateEventLoop(AE_SIZE);
if (server.el == NULL) {
return -1;
}

if (listenToPort(IPADDR, PORT) < 0)
return -1;

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
return -1;
}

if (aeCreateFileEvent(server.el, server.ipfd, AE_READABLE, acceptTcpHandler, NULL) == AE_ERR)
return -1;

return 0;
}

static void server_uninit()
{
if (server.el)
aeDeleteEventLoop(server.el);
}

int main(int argc,char *argv[])
{
/* 初始化服务端 */
if (server_init() < 0) {
server_uninit();
return -1;
}

/* 主循环开始 */
aeMain(server.el);

server_uninit();
return 0;
}
void *zmalloc(size_t size) { return malloc(size); }

void zfree(void *ptr) { free(ptr); }

void *zrealloc(void *ptr, size_t size) { return realloc(ptr, size); }
https://github.com/antirez/redis/blob/unstable/src/ae.h
https://github.com/antirez/redis/blob/unstable/src/ae.c
直接用原版的 ae.c,然后在前面加上上面几个函数

client.c 代码

#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/select.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>

#define IPADDR      "127.0.0.1"
#define PORT        8787
#define MAXLINE     1024

static void handle_recv_msg(int sockfd, char *buf)
{
printf("client recv msg is:%s\n", buf);
sleep(5);
write(sockfd, buf, strlen(buf) +1);
}

static void handle_connection(int sockfd)
{
char recvline[MAXLINE];
int maxfdp;
fd_set readfds;
int n;
struct timeval tv;
int retval = 0;

while (1) {

FD_ZERO(&readfds);
FD_SET(sockfd,&readfds);
maxfdp = sockfd;

tv.tv_sec = 5;
tv.tv_usec = 0;

retval = select(maxfdp+1,&readfds,NULL,NULL,&tv);

if (retval == -1) {
return ;
}

if (retval == 0) {
printf("client timeout.\n");
continue;
}

if (FD_ISSET(sockfd, &readfds)) {
n = (int)read(sockfd,recvline,MAXLINE);
if (n <= 0) {
fprintf(stderr,"client: server is closed.\n");
close(sockfd);
FD_CLR(sockfd,&readfds);
return;
}

handle_recv_msg(sockfd, recvline);
}
}
}

int main(int argc,char *argv[])
{
int sockfd;
struct sockaddr_in servaddr;

sockfd = socket(AF_INET,SOCK_STREAM,0);

bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
inet_pton(AF_INET,IPADDR,&servaddr.sin_addr);

4000
int retval = 0;
retval = connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
if (retval < 0) {
fprintf(stderr, "connect fail,error:%s\n", strerror(errno));
return -1;
}

printf("client send to server .\n");
char hello[30] = {0};
sprintf(hello, "hello server by %d", sockfd);
write(sockfd, hello, strlen(hello) + 1);

handle_connection(sockfd);

return 0;
}
Makefile

cc = gcc

CFLAGS=-g -Wall -std=c99 -fPIC

objects = server.o ae.o client.o
OUTPUT=server client

all: $(OUTPUT)
echo "ok"

server: server.o ae.o
$(cc) $(CFLAGS) -o $@ $^

client: client.o
$(cc) $(CFLAGS) -o $@ $^

server.o:
client.o:
ae.o:

.PHONY: compile clean

compile:
$(cc) -c $(objects)

clean:
-rm $(OUTPUT) $(objects)


另附本机运行环境是 macOS High Sierra 10.13.2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: