skynet----看看2----这个要坚持下来。研究透彻
2014-01-23 16:19
337 查看
#include "skynet_socket.h" #include "socket_server.h" #include "skynet_server.h" #include "skynet_mq.h" #include "skynet_harbor.h" #include "skynet.h" //这个socket调用了这么多库。看来要看很多了 #include <assert.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> static struct socket_server * SOCKET_SERVER = NULL; void skynet_socket_init() { SOCKET_SERVER = socket_server_create(); } void skynet_socket_exit() { socket_server_exit(SOCKET_SERVER); } void skynet_socket_free() { socket_server_release(SOCKET_SERVER); SOCKET_SERVER = NULL; } //这里来了这么一个主循环线程。。。。很突兀啊 // mainloop thread static void forward_message(int type, bool padding, struct socket_message * result) { struct skynet_socket_message *sm; int sz = sizeof(*sm); if (padding) { if (result->data) { sz += strlen(result->data) + 1; } else { result->data = ""; sz += 1; } } sm = (struct skynet_socket_message *)malloc(sz); sm->type = type; sm->id = result->id; sm->ud = result->ud; if (padding) { sm->buffer = NULL; strcpy((char*)(sm+1), result->data); } else { sm->buffer = result->data; } struct skynet_message message; message.source = 0; message.session = 0; message.data = sm; message.sz = sz | PTYPE_SOCKET << HANDLE_REMOTE_SHIFT; if (skynet_context_push((uint32_t)result->opaque, &message)) { // todo: report somewhere to close socket // don't call skynet_socket_close here (It will block mainloop) free(sm); } }//这个函数,待我推敲好了,再讨论一下 int skynet_socket_poll() { struct socket_server *ss = SOCKET_SERVER; assert(ss); struct socket_message result; int more = 1; int type = socket_server_poll(ss, &result, &more); switch (type) { case SOCKET_EXIT: return 0; case SOCKET_DATA: forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result); break; case SOCKET_CLOSE: forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result); break; case SOCKET_OPEN: forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result); break; case SOCKET_ERROR: forward_message(SKYNET_SOCKET_TYPE_ERROR, false, &result); break; case SOCKET_ACCEPT: forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result); break; default: skynet_error(NULL, "Unknown socket message type %d.",type); return -1; } if (more) { return -1; } return 1; } int skynet_socket_send(struct skynet_context *ctx, int id, void *buffer, int sz) { int64_t wsz = socket_server_send(SOCKET_SERVER, id, buffer, sz); if (wsz < 0) { free(buffer); return -1; } else if (wsz > 1024 * 1024) { int kb4 = wsz / 1024 / 4; if (kb4 % 256 == 0) { skynet_error(ctx, "%d Mb bytes on socket %d need to send out", (int)(wsz / (1024 * 1024)), id); } } return 0; } int skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) { uint32_t source = skynet_context_handle(ctx); return socket_server_listen(SOCKET_SERVER, source, host, port, backlog); } int skynet_socket_connect(struct skynet_context *ctx, const char *host, int port) { uint32_t source = skynet_context_handle(ctx); return socket_server_connect(SOCKET_SERVER, source, host, port); } int skynet_socket_block_connect(struct skynet_context *ctx, const char *host, int port) { uint32_t source = skynet_context_handle(ctx); return socket_server_block_connect(SOCKET_SERVER, source, host, port); } int skynet_socket_bind(struct skynet_context *ctx, int fd) { uint32_t source = skynet_context_handle(ctx); return socket_server_bind(SOCKET_SERVER, source, fd); } void skynet_socket_close(struct skynet_context *ctx, int id) { uint32_t source = skynet_context_handle(ctx); socket_server_close(SOCKET_SERVER, source, id); } void skynet_socket_start(struct skynet_context *ctx, int id) { uint32_t source = skynet_context_handle(ctx); socket_server_start(SOCKET_SERVER, source, id); }
这个文件是个整合出来的接口,还得看看内部实现的几个文件,再说。
#ifndef SKYNET_SERVER_H #define SKYNET_SERVER_H #include <stdint.h> #include <stdlib.h> struct skynet_context; struct skynet_message; struct skynet_monitor; struct skynet_context * skynet_context_new(const char * name, const char * parm); void skynet_context_grab(struct skynet_context *); struct skynet_context * skynet_context_release(struct skynet_context *); uint32_t skynet_context_handle(struct skynet_context *); void skynet_context_init(struct skynet_context *, uint32_t handle); int skynet_context_push(uint32_t handle, struct skynet_message *message); void skynet_context_send(struct skynet_context * context, void * msg, size_t sz, uint32_t source, int type, int session); int skynet_context_newsession(struct skynet_context *); int skynet_context_message_dispatch(struct skynet_monitor *); // return 1 when block int skynet_context_total(); void skynet_context_endless(uint32_t handle); // for monitor #endif这个文件,更蛋疼。。看来这个skynet_context,是在这里产生的。new出来,他在哪里声明的呢?
#include "skynet_server.h" #include "skynet_module.h" #include "skynet_handle.h" #include "skynet_mq.h" #include "skynet_timer.h" #include "skynet_harbor.h" #include "skynet_env.h" #include "skynet.h" #include "skynet_multicast.h" #include "skynet_group.h" #include "skynet_monitor.h" #include <string.h> #include <assert.h> #include <stdint.h> #include <stdio.h> #include <stdbool.h> #ifdef CALLING_CHECK #define CHECKCALLING_BEGIN(ctx) assert(__sync_lock_test_and_set(&ctx->calling,1) == 0); #define CHECKCALLING_END(ctx) __sync_lock_release(&ctx->calling); #define CHECKCALLING_INIT(ctx) ctx->calling = 0; #define CHECKCALLING_DECL int calling; #else #define CHECKCALLING_BEGIN(ctx) #define CHECKCALLING_END(ctx) #define CHECKCALLING_INIT(ctx) #define CHECKCALLING_DECL #endif struct skynet_context { void * instance; struct skynet_module * mod; uint32_t handle; int ref; char result[32]; void * cb_ud; skynet_cb cb; int session_id; uint32_t forward; struct message_queue *queue; bool init; bool endless; CHECKCALLING_DECL }; struct skynet_node { int total; uint32_t monitor_exit; }; static struct skynet_node G_NODE = { 0,0 }; int skynet_context_total() { return G_NODE.total; } static void _context_inc() { __sync_fetch_and_add(&G_NODE.total,1); } static void _context_dec() { __sync_fetch_and_sub(&G_NODE.total,1); } static void _id_to_hex(char * str, uint32_t id) { int i; static char hex[16] = { '0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F' }; str[0] = ':'; for (i=0;i<8;i++) { str[i+1] = hex[(id >> ((7-i) * 4))&0xf]; } str[9] = '\0'; } struct skynet_context * skynet_context_new(const char * name, const char *param) { struct skynet_module * mod = skynet_module_query(name); if (mod == NULL) return NULL; void *inst = skynet_module_instance_create(mod); if (inst == NULL) return NULL; struct skynet_context * ctx = malloc(sizeof(*ctx)); CHECKCALLING_INIT(ctx) ctx->mod = mod; ctx->instance = inst; ctx->ref = 2; ctx->cb = NULL; ctx->cb_ud = NULL; ctx->session_id = 0; ctx->forward = 0; ctx->init = false; ctx->endless = false; ctx->handle = skynet_handle_register(ctx); struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); // init function maybe use ctx->handle, so it must init at last _context_inc(); CHECKCALLING_BEGIN(ctx) int r = skynet_module_instance_init(mod, inst, ctx, param); CHECKCALLING_END(ctx) if (r == 0) { struct skynet_context * ret = skynet_context_release(ctx); if (ret) { ctx->init = true; } skynet_mq_force_push(queue); if (ret) { skynet_error(ret, "LAUNCH %s %s", name, param ? param : ""); } return ret; } else { skynet_error(ctx, "FAILED launch %s", name); skynet_context_release(ctx); skynet_handle_retire(ctx->handle); skynet_mq_release(queue); return NULL; } } int skynet_context_newsession(struct skynet_context *ctx) { // session always be a positive number int session = (++ctx->session_id) & 0x7fffffff; return session; } void skynet_context_grab(struct skynet_context *ctx) { __sync_add_and_fetch(&ctx->ref,1); } static void _delete_context(struct skynet_context *ctx) { skynet_module_instance_release(ctx->mod, ctx->instance); skynet_mq_mark_release(ctx->queue); free(ctx); _context_dec(); } struct skynet_context * skynet_context_release(struct skynet_context *ctx) { if (__sync_sub_and_fetch(&ctx->ref,1) == 0) { _delete_context(ctx); return NULL; } return ctx; } int skynet_context_push(uint32_t handle, struct skynet_message *message) { struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { return -1; } skynet_mq_push(ctx->queue, message); skynet_context_release(ctx); return 0; } void skynet_context_endless(uint32_t handle) { struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { return; } ctx->endless = true; skynet_context_release(ctx); } int skynet_isremote(struct skynet_context * ctx, uint32_t handle, int * harbor) { int ret = skynet_harbor_message_isremote(handle); if (harbor) { *harbor = (int)(handle >> HANDLE_REMOTE_SHIFT); } return ret; } static void _send_message(uint32_t des, struct skynet_message *msg) { if (skynet_harbor_message_isremote(des)) { struct remote_message * rmsg = malloc(sizeof(*rmsg)); rmsg->destination.handle = des; rmsg->message = msg->data; rmsg->sz = msg->sz; skynet_harbor_send(rmsg, msg->source, msg->session); } else { if (skynet_context_push(des, msg)) { free(msg->data); skynet_error(NULL, "Drop message from %x forward to %x (size=%d)", msg->source, des, (int)msg->sz); } } } static int _forwarding(struct skynet_context *ctx, struct skynet_message *msg) { if (ctx->forward) { uint32_t des = ctx->forward; ctx->forward = 0; _send_message(des, msg); return 1; } return 0; } static void _mc(void *ud, uint32_t source, const void * msg, size_t sz) { struct skynet_context * ctx = ud; int type = sz >> HANDLE_REMOTE_SHIFT; sz &= HANDLE_MASK; ctx->cb(ctx, ctx->cb_ud, type, 0, source, msg, sz); if (ctx->forward) { uint32_t des = ctx->forward; ctx->forward = 0; struct skynet_message message; message.source = source; message.session = 0; message.data = malloc(sz); memcpy(message.data, msg, sz); message.sz = sz | (type << HANDLE_REMOTE_SHIFT); _send_message(des, &message); } } static void _dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { assert(ctx->init); CHECKCALLING_BEGIN(ctx) int type = msg->sz >> HANDLE_REMOTE_SHIFT; size_t sz = msg->sz & HANDLE_MASK; if (type == PTYPE_MULTICAST) { skynet_multicast_dispatch((struct skynet_multicast_message *)msg->data, ctx, _mc); } else { int reserve = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); reserve |= _forwarding(ctx, msg); if (!reserve) { free(msg->data); } } CHECKCALLING_END(ctx) } int skynet_context_message_dispatch(struct skynet_monitor *sm) { struct message_queue * q = skynet_globalmq_pop(); if (q==NULL) return 1; uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { int s = skynet_mq_release(q); if (s>0) { skynet_error(NULL, "Drop message queue %x (%d messages)", handle,s); } return 0; } struct skynet_message msg; if (skynet_mq_pop(q,&msg)) { skynet_context_release(ctx); return 0; } skynet_monitor_trigger(sm, msg.source , handle); if (ctx->cb == NULL) { free(msg.data); skynet_error(NULL, "Drop message from %x to %x without callback , size = %d",msg.source, handle, (int)msg.sz); } else { _dispatch_message(ctx, &msg); } assert(q == ctx->queue); skynet_mq_pushglobal(q); skynet_context_release(ctx); skynet_monitor_trigger(sm, 0,0); return 0; } static void _copy_name(char name[GLOBALNAME_LENGTH], const char * addr) { int i; for (i=0;i<GLOBALNAME_LENGTH && addr[i];i++) { name[i] = addr[i]; } for (;i<GLOBALNAME_LENGTH;i++) { name[i] = '\0'; } } static const char * _group_command(struct skynet_context * ctx, const char * cmd, int handle, uint32_t v) { uint32_t self; if (v != 0) { if (skynet_harbor_message_isremote(v)) { skynet_error(ctx, "Can't add remote handle %x",v); return NULL; } self = v; } else { self = ctx->handle; } if (strcmp(cmd, "ENTER") == 0) { skynet_group_enter(handle, self); return NULL; } if (strcmp(cmd, "LEAVE") == 0) { skynet_group_leave(handle, self); return NULL; } if (strcmp(cmd, "QUERY") == 0) { uint32_t addr = skynet_group_query(handle); if (addr == 0) { return NULL; } _id_to_hex(ctx->result, addr); return ctx->result; } if (strcmp(cmd, "CLEAR") == 0) { skynet_group_clear(handle); return NULL; } return NULL; } uint32_t skynet_queryname(struct skynet_context * context, const char * name) { switch(name[0]) { case ':': return strtoul(name+1,NULL,16); case '.': return skynet_handle_findname(name + 1); } skynet_error(context, "Don't support query global name %s",name); return 0; } static void handle_exit(struct skynet_context * context, uint32_t handle) { if (handle == 0) { handle = context->handle; skynet_error(context, "KILL self"); } else { skynet_error(context, "KILL :%0x", handle); } if (G_NODE.monitor_exit) { skynet_send(context, handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0); } skynet_handle_retire(handle); } const char * skynet_command(struct skynet_context * context, const char * cmd , const char * param) { if (strcmp(cmd,"TIMEOUT") == 0) { char * session_ptr = NULL; int ti = strtol(param, &session_ptr, 10); int session = skynet_context_newsession(context); skynet_timeout(context->handle, ti, session); sprintf(context->result, "%d", session); return context->result; } if (strcmp(cmd,"LOCK") == 0) { if (context->init == false) { return NULL; } skynet_mq_lock(context->queue, context->session_id+1); return NULL; } if (strcmp(cmd,"UNLOCK") == 0) { if (context->init == false) { return NULL; } skynet_mq_unlock(context->queue); return NULL; } if (strcmp(cmd,"REG") == 0) { if (param == NULL || param[0] == '\0') { sprintf(context->result, ":%x", context->handle); return context->result; } else if (param[0] == '.') { return skynet_handle_namehandle(context->handle, param + 1); } else { assert(context->handle!=0); struct remote_name *rname = malloc(sizeof(*rname)); _copy_name(rname->name, param); rname->handle = context->handle; skynet_harbor_register(rname); return NULL; } } if (strcmp(cmd,"QUERY") == 0) { if (param[0] == '.') { uint32_t handle = skynet_handle_findname(param+1); sprintf(context->result, ":%x", handle); return context->result; } return NULL; } if (strcmp(cmd,"NAME") == 0) { int size = strlen(param); char name[size+1]; char handle[size+1]; sscanf(param,"%s %s",name,handle); if (handle[0] != ':') { return NULL; } uint32_t handle_id = strtoul(handle+1, NULL, 16); if (handle_id == 0) { return NULL; } if (name[0] == '.') { return skynet_handle_namehandle(handle_id, name + 1); } else { struct remote_name *rname = malloc(sizeof(*rname)); _copy_name(rname->name, name); rname->handle = handle_id; skynet_harbor_register(rname); } return NULL; } if (strcmp(cmd,"NOW") == 0) { uint32_t ti = skynet_gettime(); sprintf(context->result,"%u",ti); return context->result; } if (strcmp(cmd,"EXIT") == 0) { handle_exit(context, 0); return NULL; } if (strcmp(cmd,"KILL") == 0) { uint32_t handle = 0; if (param[0] == ':') { handle = strtoul(param+1, NULL, 16); } else if (param[0] == '.') { handle = skynet_handle_findname(param+1); } else { skynet_error(context, "Can't kill %s",param); // todo : kill global service } if (handle) { handle_exit(context, handle); } return NULL; } if (strcmp(cmd,"LAUNCH") == 0) { size_t sz = strlen(param); char tmp[sz+1]; strcpy(tmp,param); char * args = tmp; char * mod = strsep(&args, " \t\r\n"); args = strsep(&args, "\r\n"); struct skynet_context * inst = skynet_context_new(mod,args); if (inst == NULL) { return NULL; } else { _id_to_hex(context->result, inst->handle); return context->result; } } if (strcmp(cmd,"GETENV") == 0) { return skynet_getenv(param); } if (strcmp(cmd,"SETENV") == 0) { size_t sz = strlen(param); char key[sz+1]; int i; for (i=0;param[i] != ' ' && param[i];i++) { key[i] = param[i]; } if (param[i] == '\0') return NULL; key[i] = '\0'; param += i+1; skynet_setenv(key,param); return NULL; } if (strcmp(cmd,"STARTTIME") == 0) { uint32_t sec = skynet_gettime_fixsec(); sprintf(context->result,"%u",sec); return context->result; } if (strcmp(cmd,"GROUP") == 0) { int sz = strlen(param); char tmp[sz+1]; strcpy(tmp,param); tmp[sz] = '\0'; char cmd[sz+1]; int handle=0; uint32_t addr=0; sscanf(tmp, "%s %d :%x",cmd,&handle,&addr); return _group_command(context, cmd, handle,addr); } if (strcmp(cmd,"ENDLESS") == 0) { if (context->endless) { strcpy(context->result, "1"); context->endless = false; return context->result; } return NULL; } if (strcmp(cmd,"ABORT") == 0) { skynet_handle_retireall(); return NULL; } if (strcmp(cmd,"MONITOR") == 0) { uint32_t handle=0; if (param == NULL || param[0] == '\0') { if (G_NODE.monitor_exit) { // return current monitor serivce sprintf(context->result, ":%x", G_NODE.monitor_exit); return context->result; } return NULL; } else { if (param[0] == ':') { handle = strtoul(param+1, NULL, 16); } else if (param[0] == '.') { handle = skynet_handle_findname(param+1); } else { skynet_error(context, "Can't monitor %s",param); // todo : monitor global service } } G_NODE.monitor_exit = handle; return NULL; } return NULL; } void skynet_forward(struct skynet_context * context, uint32_t destination) { assert(context->forward == 0); if (destination == 0) { context->forward = context->handle; } else { context->forward = destination; } } static void _filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) { int needcopy = !(type & PTYPE_TAG_DONTCOPY); int allocsession = type & PTYPE_TAG_ALLOCSESSION; type &= 0xff; if (allocsession) { assert(*session == 0); *session = skynet_context_newsession(context); } if (needcopy && *data) { char * msg = malloc(*sz+1); memcpy(msg, *data, *sz); msg[*sz] = '\0'; *data = msg; } assert((*sz & HANDLE_MASK) == *sz); *sz |= type << HANDLE_REMOTE_SHIFT; } int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) { _filter_args(context, type, &session, (void **)&data, &sz); if (source == 0) { source = context->handle; } if (destination == 0) { return session; } if (skynet_harbor_message_isremote(destination)) { struct remote_message * rmsg = malloc(sizeof(*rmsg)); rmsg->destination.handle = destination; rmsg->message = data; rmsg->sz = sz; skynet_harbor_send(rmsg, source, session); } else { struct skynet_message smsg; smsg.source = source; smsg.session = session; smsg.data = data; smsg.sz = sz; if (skynet_context_push(destination, &smsg)) { free(data); skynet_error(NULL, "Drop message from %x to %x (type=%d)(size=%d)", source, destination, type, (int)(sz & HANDLE_MASK)); return -1; } } return session; } int skynet_sendname(struct skynet_context * context, const char * addr , int type, int session, void * data, size_t sz) { uint32_t source = context->handle; uint32_t des = 0; if (addr[0] == ':') { des = strtoul(addr+1, NULL, 16); } else if (addr[0] == '.') { des = skynet_handle_findname(addr + 1); if (des == 0) { if (type & PTYPE_TAG_DONTCOPY) { free(data); } skynet_error(context, "Drop message to %s", addr); return session; } } else { _filter_args(context, type, &session, (void **)&data, &sz); struct remote_message * rmsg = malloc(sizeof(*rmsg)); _copy_name(rmsg->destination.name, addr); rmsg->destination.handle = 0; rmsg->message = data; rmsg->sz = sz; skynet_harbor_send(rmsg, source, session); return session; } return skynet_send(context, source, des, type, session, data, sz); } uint32_t skynet_context_handle(struct skynet_context *ctx) { return ctx->handle; } void skynet_context_init(struct skynet_context *ctx, uint32_t handle) { ctx->handle = handle; } void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) { context->cb = cb; context->cb_ud = ud; } void skynet_context_send(struct skynet_context * ctx, void * msg, size_t sz, uint32_t source, int type, int session) { struct skynet_message smsg; smsg.source = source; smsg.session = session; smsg.data = msg; smsg.sz = sz | type << HANDLE_REMOTE_SHIFT; skynet_mq_push(ctx->queue, &smsg); }
原来在这里定义的,内部定义的。。真大。。看来这个文件才是核心几乎调用了其他的函数。
好吧这个文章很费,啥都没分析出来。。。下篇文章,再研究里面的几个模块。
相关文章推荐
- 坚持不下去的时候看看这个
- 把读过的书记录下来。很多年以后再看看这个书单,应该很有趣~
- position属性absolute与relative 详解 最近一直在研究javascript脚本,熟悉DOM中CSS样式的各种定位属性,以前对这个属性不太了解,从网上找到两篇文章感觉讲得很透彻,
- 端口占用问题,今天学习的时候又出现了这个问题,找了度娘,特此记录以下。第一次写博客,希望自己能坚持下来。大家共同进步
- 这个没事看看 挺好
- 不开心的时候看看这个
- 看看三年前“制造企业管理与电子商务标准研究”!
- 尝试加载 Oracle 客户端库时引发 BadImageFormatException。如果在安装 32 位 Oracle 客户端组件的情况下以 64 位模式运行,将出现此问题。(遇到了这个问题网上查了下保存下来)
- 度度熊有一张网格纸,但是纸上有一些点过的点,每个点都在网格点上,若把网格看成一个坐标轴平行于网格线的坐标系的话,每个点可以用一对整数x,y来表示。度度熊必须沿着网格线画一个正方形,使所有点在正方形的内部或者边界。然后把这个正方形剪下来。问剪掉正方形的最小面积是多少。
- 一个普通人研究一个领域三年,他就可以成为这个领域的专家
- CorelDraw应该怎么样才能学好描图技巧看看这个就能懂
- 【站长分享】网站建设经验 原题:草根到知名的转变:我是通过激情和意念坚持下来的-
- 看看这个函数
- 每次我不想好好学习的时候,就看看这个。。。
- 会编程的都来玩玩这个游戏吧,看看你的编程水平.
- 一条在没有水的环境下坚持了四年生存下来的鱼
- wikipad这个软件可以深入研究是基于pywebkit linux uubntu 下有分发包
- 大家看看这个奇妙的问题。。。。
- 大家看看这个问题怎么解决啊(哎,论坛没分了)
- stm32f10x.h 是库3.0以后的; stm32f10x_lib.h 这个是库2.0的吧 哎,打开看看不就知道了啊!