您的位置:首页 > Web前端 > Node.js

nodejs 自定义事件处理

2013-07-23 10:14 459 查看
#include <node.h>
#include <v8.h>
#include <http_agent.h>
#include <iostream>
#include <map>
#include <semaphore.h>
#include <pthread.h>
#include<errno.h>
using namespace v8;

//#define BING_LOG(fmt,...) printf("%s:%d[%s]"fmt,__FILE__,__LINE__,__FUNCTION__,##__VA_ARGS__)
#define BING_LOG (void)
struct send_msg_body{
uv_work_t req;
Persistent<Function> cb;
std::string ip,port;
dk_array_t* req_msg;
dk_array_t* res_msg;
int ret;
public:
send_msg_body():ret(0){
req.data = this;
req_msg = dk_array_init();
res_msg = dk_array_init();
}
~send_msg_body(){
dk_array_free(req_msg);
dk_array_free(res_msg);
}
};

static void array2object(const dk_array_t* array,Handle<Object>& obj){
HandleScope scope;
for(size_t i = 0; i < array->used_; ++i){
const dk_data_unset* data = array->data_[i];
BING_LOG("array data : %s : %s\n", data->key_->ptr_,data->str_value_->ptr_);
Handle<Value> key = String::New(data->key_->ptr_);
switch (data->type_) {
case TYPE_INTEGER: {
Handle<Value> val = Integer::New(atoi(data->str_value_->ptr_));
obj->Set(key,val);
break;
}
case TYPE_STRING: {
Handle<Value> val = String::New(data->str_value_->ptr_);
obj->Set(key,val);
break;
}
case TYPE_DOUBLE: {
Handle<Value> val = Number::New(atof(data->str_value_->ptr_));
obj->Set(key,val);
break;
}
case TYPE_UNSET:
case TYPE_ARRAY:
case TYPE_HASHMAP:
default: {
BING_LOG("not surpport the type : %d\n",data->type_);
break;
}
}
}
}

static void obj2Array(const Local<Object>& obj, dk_array_t*const array){
HandleScope scope;
Local<Array> properties = obj->GetOwnPropertyNames ();
for(uint32_t i = 0; i < properties->Length(); ++i){
Local<Value> key = properties->Get(i);
Local<Value> val = obj->Get(key);
std::string key_str = *String::AsciiValue(key);
if(val->IsString()){
std::string val_str = *String::AsciiValue(val->ToString());
BING_LOG("OBJ data : %s : %s\n", key_str.c_str(),val_str.c_str());
dk_array_insert_or_replace_string(array,key_str.c_str(),val_str.c_str());
}else if(val->IsInt32()){
dk_array_insert_or_replace_int(array,key_str.c_str(),val->ToInt32()->Value());
}else {
std::string val_str = *String::AsciiValue(val->ToString());
BING_LOG("OBJ data not support insert as string : %s : %s\n", key_str.c_str(),val_str.c_str());
dk_array_insert_or_replace_string(array,key_str.c_str(),val_str.c_str());
}
}
}

static void request_send_msg(uv_work_t* req){
send_msg_body* body = (send_msg_body*)req->data;
BING_LOG("call request function ip = %s, port = %s\n", body->ip.c_str(), body->port.c_str());
body->ret = dk_send_message(body->ip.c_str(), body->port.c_str(),body->req_msg,body->res_msg);
BING_LOG("request function result = %d---------------\n",body->ret);
}

static void notify_send_msg(uv_work_t* req, int status){
HandleScope scope;
send_msg_body* body = (send_msg_body*)req->data;
Local<Object> res = Local<Object>::New(Object::New());
array2object(body->res_msg,res);
Local<Value> argv[] = {
Number::New(body->ret),
res
};
TryCatch try_catch;
BING_LOG("notify connection function result : %d\n",body->ret);
body->cb->Call(Context::GetCurrent()->Global(), 2, argv);
body->cb.Dispose();
delete body;
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
}

Handle<Value> dk_send_msg(const Arguments& args) {
HandleScope scope;
if(args.Length() < 4){
ThrowException(Exception::TypeError(String::New("input parament's count failed at least 4")));
return scope.Close(Undefined());
}
if(!args[0]->IsString() || !args[1]->IsString() || !args[2]->IsObject() || !args[3]->IsFunction()){
ThrowException(Exception::TypeError(String::New("input parament's type failed : string,string,object,function")));
return scope.Close(Undefined());
}
send_msg_body* body = new send_msg_body;
body->ip= *String::AsciiValue(args[0]->ToString());
body->port = *String::AsciiValue(args[1]->ToString());
body->cb = Persistent<Function>::New(Local<Function>::Cast(args[3]));
obj2Array(args[2]->ToObject(),body->req_msg);

uv_queue_work(uv_default_loop(),&body->req,request_send_msg,notify_send_msg);
return scope.Close(Undefined());
}

static void user_msg_call_backer(Persistent<Function>& caller,const dk_array_t* request,dk_array_t* const response){
HandleScope scope;
Local<Object> req = Object::New();
array2object(request,req);
Local<Object> res = Local<Object>::New(Object::New());
Local<Value> argv[] = {
req,
res
};
BING_LOG("call node.js process route .... \n");
TryCatch try_catch;
caller->Call(Context::GetCurrent()->Global(),2,argv);
obj2Array(res,response);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
}

struct user_msg_body{
uv_work_t req;
Persistent<Function> caller;
pthread_mutex_t locker;
sem_t msg_comming;
sem_t msg_process;
dk_array_t* request;
dk_array_t* response;
void*       context;
public:
user_msg_body():request(NULL),response(NULL),context(NULL){
req.data = this;
pthread_mutex_init(&locker,NULL);
sem_init(&msg_comming,0 , 0);
sem_init(&msg_process,0 , 0);
}
~user_msg_body(){
pthread_mutex_destroy(&locker);
sem_destroy(&msg_comming);
sem_destroy(&msg_process);
}
};

static void waite_msg(uv_work_t* req){
user_msg_body* body = (user_msg_body*)req->data;
BING_LOG("Node.js waite msg coming ...\n");
int ret = sem_wait(&body->msg_comming);
BING_LOG("Node.js got msg %d:%d ...\n",ret,errno);
}

static void process_msg(uv_work_t* req, int status){
user_msg_body* body = (user_msg_body*)req->data;
BING_LOG("Node.js function process msg ... \n");
user_msg_call_backer(body->caller,body->request,body->response);
BING_LOG("Node.js function process msg is over ... \n");
sem_post(&body->msg_process);
BING_LOG("Node.js register next callbacker ... \n");
uv_queue_work(uv_default_loop(),req,waite_msg,process_msg);
}

static void msg_call_backer(void* v,const dk_array_t* request,dk_array_t* const response){
user_msg_body* body = (user_msg_body*)v;
BING_LOG("dk_svr new request comming ....  \n");
pthread_mutex_lock(&body->locker);
BING_LOG("dk_svr set data ....  \n");
body->request = (dk_array_t*)request;
body->response = response;
body->context = v;
BING_LOG("dk_svr notify node.js processing ....  \n");
sem_post(&body->msg_comming);
BING_LOG("dk_svr wait node.js processed ....  \n");
sem_wait(&body->msg_process);
BING_LOG("dk_svr read node.js processed signal ....  \n");
body->request = NULL;
body->response = NULL;
body->context = NULL;
pthread_mutex_unlock(&body->locker);
}

Handle<Value> dk_on_msg(const Arguments& args) {
HandleScope scope;
if(args.Length() < 2){
ThrowException(Exception::TypeError(String::New("input parament's count failed at least 2")));
return scope.Close(Undefined());
}
if(!args[0]->IsString() || !args[1]->IsFunction()){
ThrowException(Exception::TypeError(String::New("input parament's type failed : string,function")));
return scope.Close(Undefined());
}

std::string ports = *String::AsciiValue(args[0]->ToString());
user_msg_body* body = new user_msg_body();
body->caller = Persistent<Function>::New(Local<Function>::Cast(args[1]));
BING_LOG("call dk_on_message function at : %s\n", ports.c_str());
int ret = dk_on_message(ports.c_str(),body,msg_call_backer);
BING_LOG("call dk_on_message result = %d\n", ret);
if(0 != ret){
delete body;
return scope.Close(Local<Integer>::New(Integer::New(-1)));
}
uv_queue_work(uv_default_loop(),&body->req,waite_msg,process_msg);
return scope.Close(Local<Integer>::New(Integer::New(ret)));
}

void init(Handle<Object> target) {
NODE_SET_METHOD(target, "dk_send_msg", dk_send_msg);
NODE_SET_METHOD(target, "dk_on_msg", dk_on_msg);
}

NODE_MODULE(binding, init);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: