一个基于libcurl的多线程HTTP 请求服务模板
2013-07-29 20:43
405 查看
最近在项目中,多次要去第三方验证,例如苹果的充值服务器验证等等,所以封装了一个模板。使用的是libcurl.
先看看代码。基础思想是,一个HttpService维护任务队列,完成任务队列,以及一组HttpThread队列, HttpThread负责HTTP请求。
工作线程的定义
线程被封装到HttpThread模板类
构造函数里会初始化一些libcurl,并且启用新的工作线程。
设置一些curl 的基础数据,
设定HTTP SERVER 有数据返回的回调HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata)函数,
我们通过
然后创建了一个线程。
如果当前任务是空, 就去主线程抓取一个任务。
我们通过下面2段来设置发送HTTP 请求的数据
_cur_task->GetLen() 这个接口抽取需要发送数据的长度, _cur_task->GetData()指定发送数据,
如果没有异常,然后等待服务器返回数据 回调我们前面设置的&HttpThread<Task>::HttpThreadParseData函数。
这里调用到了工作线程的方法,我们调用Task:: CheckData(p, len);来对任务进行验证处理。
_father->AddDone(_cur_task); . 然后把处理好的任务返回主线程的_done_list列表,然后清空当前任务.
上面就是整个工作线程的流程.
看看HttpService 的定义
void Run(u32 diff);负责从已经处理好的任务队列在应用中在次处理。
void ShutDown();负责关闭服务。释放对应的资源。
void AddTask(Task* t); 添加新的任务到工作队列 _task_list
void AddDone(Task* t);工作线程调用,负责把处理好的任务,丢进这_done_list
void checkStop(); 这个是检测应用在需要关闭的时候,子线程的资源的释放等等
void DoneTask( Task* t); 主线程对_done_list里面处理好的任务的再次处理。这个函数回调用Task的方法
Task* GetTask(); 工作线程获取从_task_list提取一个新的任务。
主线程会每个循环去检测_done_list, 然后对每个工作线程处理好的任务,进行在分配
然后对每个已经完成的任务,调用
方法,对已经通过工作线程验证过的任务,交给应用去处理。
上面我们调用了Task类一下接口,
另外RunObject 是一个状态控制类。主要是控制HttpService的工作状态。
否则会一直_wait_stop_状态,等待所有任务完成,不过在_wait_stop_的状态的时候, 是无法添加新的任务进列表。可以看
这样我们保证所有的任务都处理完毕。
附上代码
run_object.h
http_task.h
先看看代码。基础思想是,一个HttpService维护任务队列,完成任务队列,以及一组HttpThread队列, HttpThread负责HTTP请求。
工作线程的定义
template<class Task> class HttpThread { public: HttpThread(const char* url, HttpServiceBase<Task>* father); virtual ~HttpThread(); void Run(); void ParseData(char* p, size_t len); bool IsDone(); static size_t HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata); protected: private: CURL* _handle; boost::thread* _thread; Task* _cur_task; HttpServiceBase<Task>* _father; bool _stop; };
线程被封装到HttpThread模板类
构造函数里会初始化一些libcurl,并且启用新的工作线程。
template<class Task> HttpThread<Task>::HttpThread(const char* url, HttpServiceBase<Task>* father) { _father = father; _cur_task = NULL; _stop = false; _handle = curl_easy_init(); curl_slist *plist = curl_slist_append(NULL, "Content-Type: application/x-www-form-urlencoded; charset=UTF-8"); curl_easy_setopt(_handle, CURLOPT_URL, url); curl_easy_setopt(_handle, CURLOPT_HEADER, 0); #ifdef DEBUG curl_easy_setopt(_handle, CURLOPT_VERBOSE, 1); #else curl_easy_setopt(_handle, CURLOPT_VERBOSE, 0); #endif // _DEBUG curl_easy_setopt(_handle, CURLOPT_POST, 1); curl_easy_setopt(_handle, CURLOPT_WRITEDATA, 1); curl_easy_setopt(_handle, CURLOPT_TIMEOUT, 0); curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION, &HttpThread<Task>::HttpThreadParseData); curl_easy_setopt(_handle, CURLOPT_WRITEDATA, this); curl_easy_setopt(_handle, CURLOPT_HTTPHEADER, "Expect:"); curl_easy_setopt(_handle, CURLOPT_HTTPHEADER, plist); _thread = new boost::thread(boost::bind(&HttpThread<Task>::Run, this)); }
设置一些curl 的基础数据,
curl_easy_setopt(_handle, CURLOPT_URL, url);这个指定服务器的URL
curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION, &HttpThread<Task>::HttpThreadParseData);
设定HTTP SERVER 有数据返回的回调HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata)函数,
我们通过
curl_easy_setopt(_handle, CURLOPT_WRITEDATA, this);这个选项,把工作线程对应的指针绑定进去,在回调的时候回返回这个指针。
然后创建了一个线程。
_thread = new boost::thread(boost::bind(&HttpThread<Task>::Run, this)); }线程绑定了对象的run 函数。
template<class Task> void HttpThread<Task>::Run() { while(!_stop) { if (_handle) { if (NULL == _cur_task ) { _cur_task = _father->GetTask();} try { if (_cur_task) { CURLcode code = CURLE_OK; code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDSIZE, _cur_task->GetLen()); assert (code == CURLE_OK); code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDS, _cur_task->GetData()); assert (code == CURLE_OK); unsigned int _tem = 0; code = curl_easy_getinfo(_handle, CURLINFO_RESPONSE_CODE, &_tem); assert (code == CURLE_OK); code = curl_easy_perform(_handle); if (code != CURLE_OK) { if (_cur_task) { _cur_task->CheckException(); _father->AddDone(_cur_task); _cur_task = NULL; } } } } catch(...) { if (_cur_task) { _cur_task->CheckException(); _father->AddDone(_cur_task); _cur_task = NULL; } } } #ifdef _WIN32 Sleep(1); #else usleep(1000); #endif } }
if (NULL == _cur_task ) { _cur_task = _father->GetTask();}
如果当前任务是空, 就去主线程抓取一个任务。
我们通过下面2段来设置发送HTTP 请求的数据
CURLcode code = CURLE_OK; code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDSIZE, _cur_task->GetLen()); assert (code == CURLE_OK); code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDS, _cur_task->GetData());
_cur_task->GetLen() 这个接口抽取需要发送数据的长度, _cur_task->GetData()指定发送数据,
code = curl_easy_perform(_handle);用函数真正执行数据发送,如果出现异常
_cur_task->CheckException(); _father->AddDone(_cur_task); _cur_task = NULL;
如果没有异常,然后等待服务器返回数据 回调我们前面设置的&HttpThread<Task>::HttpThreadParseData函数。
template<class Task> size_t HttpThread<Task>::HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata) { HttpThread<Task>* p = static_cast<HttpThread<Task>*>(userdata); if (p) { p->ParseData(ptr, size * nmemb); } return size * nmemb; }
template<class Task> void HttpThread<Task>::ParseData(char* p, size_t len) { if (_cur_task) { _cur_task->CheckData(p, len); _father->AddDone(_cur_task); _cur_task = NULL; } }
这里调用到了工作线程的方法,我们调用Task:: CheckData(p, len);来对任务进行验证处理。
_father->AddDone(_cur_task); . 然后把处理好的任务返回主线程的_done_list列表,然后清空当前任务.
上面就是整个工作线程的流程.
看看HttpService 的定义
template<class Task> class HttpService : public RunObject, public HttpServiceBase<Task> { public: bool Init(u32 count, const char* url); void Run(u32 diff); void ShutDown(); void AddTask(Task* t); void AddDone(Task* t); void checkStop(); void DoneTask( Task* t); Task* GetTask(); protected: bool AddThread(u32 nCount); private: std::list<HttpThread<Task>*> _threads; std::list<Task*> _task_list; std::list<Task*> _done_list; std::string _url; boost::mutex _mutex_done; boost::mutex _mutex_task; };其中bool Init(u32 count, const char* url)负责初始化主线程的任务,参数是工作线程数和URL。libcur的全局初始化。
void Run(u32 diff);负责从已经处理好的任务队列在应用中在次处理。
void ShutDown();负责关闭服务。释放对应的资源。
void AddTask(Task* t); 添加新的任务到工作队列 _task_list
void AddDone(Task* t);工作线程调用,负责把处理好的任务,丢进这_done_list
void checkStop(); 这个是检测应用在需要关闭的时候,子线程的资源的释放等等
void DoneTask( Task* t); 主线程对_done_list里面处理好的任务的再次处理。这个函数回调用Task的方法
Task* GetTask(); 工作线程获取从_task_list提取一个新的任务。
主线程会每个循环去检测_done_list, 然后对每个工作线程处理好的任务,进行在分配
template<class Task> void HttpService<Task>::Run(u32 diff) { if (isObjState(_run_obj_) || isObjState(_wait_stop_)) { std::list<Task*> done_list; { boost::mutex::scoped_lock lock(_mutex_done); done_list = _done_list; _done_list.clear(); } if (done_list.size()) { std::list<Task*>::iterator it = done_list.begin(); std::list<Task*>::iterator itend = done_list.end(); for (; it != itend; ++it) { DoneTask( *it );} done_list.clear(); } } checkStop(); }
然后对每个已经完成的任务,调用
template<class Task> void HttpService<Task>::DoneTask( Task* t) { t->DoneCall(); }
方法,对已经通过工作线程验证过的任务,交给应用去处理。
上面我们调用了Task类一下接口,
//void CheckData(char*p , size_t len); parse from http server //void CheckException(); exception //void DoneCall(); task done call //size_t GetLen(); buffer send to http server //char* GetData(); buffer len
另外RunObject 是一个状态控制类。主要是控制HttpService的工作状态。
template<class Task> void HttpService<Task>::checkStop() { if (isObjState(_wait_stop_)) { { boost::mutex::scoped_lock lock(_mutex_task); if (_task_list.size() > 0) { return ;} } { boost::mutex::scoped_lock lock(_mutex_done); if (_done_list.size() > 0) { return ;} } std::list<HttpThread<Task>*>::iterator it2 = _threads.begin(); std::list<HttpThread<Task>*>::iterator it2end = _threads.end(); for (it2; it2 != it2end; ++it2) { HttpThread<Task>* pkThread = *it2; if (pkThread) { if (!pkThread->IsDone()) { return ;} } } setObjState(_stop_obj_ ); } }当检测到状态是 _wait_stop_等待停止的时候, 我们会检测当前的_task_list 列表, _done_list列表,以及正在处理的任务,如果所有的都处理完成才会真正的去关闭服务。
否则会一直_wait_stop_状态,等待所有任务完成,不过在_wait_stop_的状态的时候, 是无法添加新的任务进列表。可以看
template<class Task> void HttpService<Task>::AddTask(Task* t) { if (isObjState(_run_obj_)) { boost::mutex::scoped_lock lock(_mutex_task); _task_list.push_back(t); } }
这样我们保证所有的任务都处理完毕。
附上代码
run_object.h
#ifndef __run_object_h__ #define __run_object_h__ struct RunObject { enum { _init_obj_, //准备阶段 _run_obj_, //运行阶段 _wait_stop_, //准备停止阶段 _stop_obj_, //停止 }; RunObject() { _obj_state = _init_obj_; } void setObjState( u8 state) { _obj_state = state; } virtual void checkStop() = 0; bool isObjStop() { return _obj_state == _stop_obj_; } bool isObjState( u8 state) { return _obj_state == state; } u8 _obj_state; }; #endif
http_task.h
#ifndef ___http_task_h__
#define ___http_task_h__
#include "G_Common.h"
#include "boost/bind/bind.hpp"
#include "boost/thread/thread.hpp"
#include "curl/curl.h"
// task api
//void CheckData(char*p , size_t len); parse from http server
//void CheckException(); exception
//void DoneCall(); task done call
//size_t GetLen(); buffer send to http server
//char* GetData(); buffer len
template<class Task>
class HttpServiceBase
{
public:
virtual void AddDone(Task* t) = 0;
virtual Task* GetTask() = 0;
};
template<class Task> class HttpThread { public: HttpThread(const char* url, HttpServiceBase<Task>* father); virtual ~HttpThread(); void Run(); void ParseData(char* p, size_t len); bool IsDone(); static size_t HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata); protected: private: CURL* _handle; boost::thread* _thread; Task* _cur_task; HttpServiceBase<Task>* _father; bool _stop; };
template<class Task> class HttpService : public RunObject, public HttpServiceBase<Task> { public: bool Init(u32 count, const char* url); void Run(u32 diff); void ShutDown(); void AddTask(Task* t); void AddDone(Task* t); void checkStop(); void DoneTask( Task* t); Task* GetTask(); protected: bool AddThread(u32 nCount); private: std::list<HttpThread<Task>*> _threads; std::list<Task*> _task_list; std::list<Task*> _done_list; std::string _url; boost::mutex _mutex_done; boost::mutex _mutex_task; };
//////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////
template<class Task> size_t HttpThread<Task>::HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata) { HttpThread<Task>* p = static_cast<HttpThread<Task>*>(userdata); if (p) { p->ParseData(ptr, size * nmemb); } return size * nmemb; }
template<class Task> void HttpThread<Task>::ParseData(char* p, size_t len) { if (_cur_task) { _cur_task->CheckData(p, len); _father->AddDone(_cur_task); _cur_task = NULL; } }
template<class Task>
HttpThread<Task>::HttpThread(const char* url, HttpServiceBase<Task>* father)
{
_father = father;
_cur_task = NULL;
_stop = false;
_handle = NULL;
_handle = NULL;
_handle = curl_easy_init();
curl_slist *plist = curl_slist_append(NULL, "Content-Type: application/x-www-form-urlencoded; charset=UTF-8");
curl_easy_setopt(_handle, CURLOPT_URL, url);
curl_easy_setopt(_handle, CURLOPT_HEADER, 0);
#ifdef DEBUG
curl_easy_setopt(_handle, CURLOPT_VERBOSE, 1);
#else
curl_easy_setopt(_handle, CURLOPT_VERBOSE, 0);
#endif // _DEBUG
curl_easy_setopt(_handle, CURLOPT_POST, 1);
curl_easy_setopt(_handle, CURLOPT_WRITEDATA, 1);
curl_easy_setopt(_handle, CURLOPT_TIMEOUT, 0);
curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION, &HttpThread<Task>::HttpThreadParseData);
curl_easy_setopt(_handle, CURLOPT_WRITEDATA, this);
curl_easy_setopt(_handle, CURLOPT_HTTPHEADER, "Expect:");
curl_easy_setopt(_handle, CURLOPT_HTTPHEADER, plist);
_thread = new boost::thread(boost::bind(&HttpThread<Task>::Run, this));
}
template<class Task>
HttpThread<Task>::~HttpThread()
{
curl_easy_cleanup( _handle );
_handle = NULL ;
delete _thread;
}
template<class Task>
bool HttpThread<Task>::IsDone()
{
boost::mutex _mutex;
boost::mutex::scoped_lock lock(_mutex);
if ( NULL == _cur_task )
{
_stop = true;
return true ;
}else
{ return false ;}
}
template<class Task> void HttpThread<Task>::Run() { while(!_stop) { if (_handle) { if (NULL == _cur_task ) { _cur_task = _father->GetTask();} try { if (_cur_task) { CURLcode code = CURLE_OK; code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDSIZE, _cur_task->GetLen()); assert (code == CURLE_OK); code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDS, _cur_task->GetData()); assert (code == CURLE_OK); unsigned int _tem = 0; code = curl_easy_getinfo(_handle, CURLINFO_RESPONSE_CODE, &_tem); assert (code == CURLE_OK); code = curl_easy_perform(_handle); if (code != CURLE_OK) { if (_cur_task) { _cur_task->CheckException(); _father->AddDone(_cur_task); _cur_task = NULL; } } } } catch(...) { if (_cur_task) { _cur_task->CheckException(); _father->AddDone(_cur_task); _cur_task = NULL; } } } #ifdef _WIN32 Sleep(1); #else usleep(1000); #endif } }
//////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////
template<class Task>
bool HttpService<Task>::Init(u32 count, const char* url)
{
_threads.clear();
_task_list.clear();
_done_list.clear();
_url = url;
curl_global_init(CURL_GLOBAL_ALL);
setObjState(_run_obj_);
return AddThread(count);
}
template<class Task>
bool HttpService<Task>::AddThread(u32 nCount)
{
for (u16 i = 0 ; i < nCount; ++i)
{
HttpThread<Task>* p = new HttpThread<Task>(_url.c_str(), this) ;
if (p)
{ _threads.push_back( p );}
else
{ return false;}
}
return true;
}
template<class Task> void HttpService<Task>::Run(u32 diff) { if (isObjState(_run_obj_) || isObjState(_wait_stop_)) { std::list<Task*> done_list; { boost::mutex::scoped_lock lock(_mutex_done); done_list = _done_list; _done_list.clear(); } if (done_list.size()) { std::list<Task*>::iterator it = done_list.begin(); std::list<Task*>::iterator itend = done_list.end(); for (; it != itend; ++it) { DoneTask( *it );} done_list.clear(); } } checkStop(); }
template<class Task>
void HttpService<Task>::ShutDown()
{
std::list<HttpThread<Task>*>::iterator it = _threads.begin();
std::list<HttpThread<Task>*>::iterator itend = _threads.end();
for (it; it != itend; ++it)
{
HttpThread<Task>* pkThread = *it;
if (pkThread)
{ delete pkThread ;}
}
_threads.clear();
curl_global_cleanup();
}
template<class Task> void HttpService<Task>::AddTask(Task* t) { if (isObjState(_run_obj_)) { boost::mutex::scoped_lock lock(_mutex_task); _task_list.push_back(t); } }
template<class Task>
void HttpService<Task>::AddDone(Task* t)
{
boost::mutex::scoped_lock lock(_mutex_done);
_done_list.push_back(t);
}
template<class Task>
Task* HttpService<Task>::GetTask()
{
boost::mutex::scoped_lock lock(_mutex_task);
if (_task_list.size() > 0)
{
Task* t = _task_list.front();
_task_list.pop_front();
return t ;
}else
{
return NULL ;
}
}
template<class Task> void HttpService<Task>::DoneTask( Task* t) { t->DoneCall(); }
template<class Task> void HttpService<Task>::checkStop() { if (isObjState(_wait_stop_)) { { boost::mutex::scoped_lock lock(_mutex_task); if (_task_list.size() > 0) { return ;} } { boost::mutex::scoped_lock lock(_mutex_done); if (_done_list.size() > 0) { return ;} } std::list<HttpThread<Task>*>::iterator it2 = _threads.begin(); std::list<HttpThread<Task>*>::iterator it2end = _threads.end(); for (it2; it2 != it2end; ++it2) { HttpThread<Task>* pkThread = *it2; if (pkThread) { if (!pkThread->IsDone()) { return ;} } } setObjState(_stop_obj_ ); } }
#endif
相关文章推荐
- WCF技术剖析之二十七: 如何将一个服务发布成WSDL[基于HTTP-GET的实现](提供模拟程序)
- libcurl 一个实现了client请求http,ftp的库
- [ZooKeeper.net] 1 模仿dubbo实现一个简要的http服务的注册 基于webapi
- Android网络优化6--写一个网络请求模板2--基于Volley
- [ZooKeeper.net] 1 模仿dubbo实现一个简要的http服务的注册 基于webapi
- 一个技术汪的开源梦 —— 基于 .Net Core 的公共组件之 Http 请求客户端
- 基于HTTP 协议的GET和POST请求服务
- 基于Netty写一个http协议的服务
- WCF技术剖析之二十七: 如何将一个服务发布成WSDL[基于HTTP-GET的实现](提供模拟程序)
- AngularJS基于创建一个完整的mvc模板,($http访问数据)
- 一个HttpClient使用Windows认证请求WCF服务的例子
- <<node.js探秘>>(02).建立一个http站点服务.
- 简单的基于libcurl和c++11 thread多线程的多线程下载程序
- Java中通过方法创建一个http连接并请求(服务器间进行通信)
- Tomcat Server处理一个http请求的过程
- Java HTTP方式请求.NET WebService服务总结
- Enumeration遍历http请求参数的一个例子
- 【lucene系列学习三】用socke把lucene做成一个web服务并实现多线程
- 用一个WEB服务或普通站点 实现这样一个效果?以URL请求,返回一个XML文档
- C# .net基于Http实现web server(web服务)