您的位置:首页 > 其它

使用boost 的thread和asio库实现jpeg图片传输

2014-04-17 16:40 615 查看
 
 编写本文的主要目的是熟悉boost thread和asio库;主要实现下面内容;

1)实现jpeg图片传输;客户端将jpeg图片传输到服务端,服务端接收,打印接收到的jpeg数据长度

2)客户端使用boost thread开一个线程不断的读jpeg图片,然后进行传输,传输使用asio库;

3)服务端使用asio库,进行jpeg图片接收,接收到图片后,打印一行日志,接收到jpg图片的size是多少?

程序中只使用stl和boost库,不使用任何其它不利于移植的库;

实现要点:

客户端:

1)首先使用boost::filesystem,对指定路径下的jpg图片进行枚举;然后将枚举出来的图片放入STL Vector 容器;

2)文中使用了boost thread库,创建线程,为了演示多线程之间的同步,使用了2个线程,一个线程专门用读一帧jpeg图片,另外一个线程用来发送jpeg数据;

3)jpeg发送线程中调用asio库进行发送;将asio的sock单独封一个简单的类(CClientSock);

4)两个线程使用boost的互斥锁进行同步;

 //多线程同步

 typedef boost::try_mutex MUTEX;

 typedef MUTEX::scoped_try_lock LOCK;

  MUTEX iomutex;

5)文中演示了如何使用boost bind库,boost时间库

  boost::this_thread::sleep(boost::posix_time::milliseconds(5));

6)文中很好的使用了STL的vector和List容器;

 

客户端代码如下:

1)sock 封装类

ClientSock.h

#ifndef __CLIENT_SOCK__
#define __CLIENT_SOCK__

#include <boost/asio.hpp>
#include <boost/bind.hpp>
using namespace boost;

using boost::asio::io_service;
using boost::asio::ip::tcp;
using namespace boost::asio;

#include <string>
#include <iostream>
using namespace std;

class CClientSock
{
public:
CClientSock(tcp::endpoint& endpoint);
virtual ~CClientSock(void);

unsigned int send (char *pBuff,unsigned int len);
unsigned int recv (char *pBuff, int len);
void close ();

private:
io_service * io_service_;
tcp::socket * socket_;

};

#endif


ClientSock.cpp

#include "ClientSock.h"

CClientSock::CClientSock(tcp::endpoint& endpoint)
{
//
io_service_ = new io_service();
socket_ = new tcp::socket(*io_service_);
boost::system::error_code ec;
socket_->connect(endpoint, ec);

if (ec)
{
std::cerr << "Error: " << ec << std::endl;
throw ec;
}
}

CClientSock::~CClientSock(void)
{
//
delete io_service_;
delete socket_;
}

//
unsigned int  CClientSock:: send (char *pBuff,unsigned int len)
{
boost::asio::const_buffers_1 request(pBuff, len);
return socket_->send(request);
}

//
unsigned int CClientSock:: recv ( char *pBuff, int len)
{
//
//return boost::asio::read( socket_ ,boost::asio::buffer(static_cast<void*>(pBuff),len));
return 0;
}

//
void CClientSock::close ()
{
//
socket_->close();
}


2)客户端传输封装类

ClientTransfer.h

#ifndef __CLIENT_TRANSFER__
#define __CLIENT_TRANSFER__

#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/filesystem.hpp>
#include <boost/timer.hpp>
using namespace boost;

#include <iostream>
#include <string>

#include <vector>
#include <algorithm>
#include <fstream>
#include <List>
using namespace std;

#include "ClientSock.h"

typedef struct
{
unsigned char *pBuff;	//图像Buff指针
unsigned int length;	//Buff总的大小
unsigned int valid_len;	//有效数据的长度
}image_buff;

#define MAX_BUFF_SIZE 1024*1024
#define MAX_BUFF_NUMBER 4

class CClientTransfer
{
public:
CClientTransfer(std::string jpgPath);
virtual ~CClientTransfer(void);

void StartThread(void);
void WaitThreadTerminate(void);
private:

bool GetJpegData(image_buff &imageBuff,std::string jpgPath);
bool scanFilesUseRecursive(const string& rootPath,vector<string>& container=*(new vector<string>()));

static void ReadJpegThread(void *arg);
void ReadJpegData(void);
boost::thread m_ReadThread;		//读Jpeg线程

//
static void SendJpegThread(void *arg);
void SendJpegData(void);
boost::thread m_SendThread;		//发送Jpeg图片

std::string m_jpgPath;
vector<string>	m_vecFilePic;	//jpeg目录
unsigned int m_PicId;			//jpeg vector 容器中的第几张图片

//图像Buff 空List
list <image_buff> IdleList;
//发送图像Buff List
list <image_buff> SendBuffList;
bool m_bQuit;		//线程结束标志

//多线程同步
typedef boost::try_mutex MUTEX;
typedef MUTEX::scoped_try_lock LOCK;
MUTEX iomutex;

//asio
CClientSock *m_sock;
};

#endif


 

ClientTransfer.cpp

#include "ClientTransfer.h"

//参数jpgPath:保存jpg序列的目录
CClientTransfer::CClientTransfer(std::string jpgPath)
{
m_jpgPath=jpgPath;

//分配内存
image_buff desc;
desc.pBuff = NULL;
desc.length = MAX_BUFF_SIZE;
desc.valid_len = 0;

for (int i = 0; i < MAX_BUFF_NUMBER; ++i) {
desc.pBuff = new unsigned char [MAX_BUFF_SIZE];
IdleList.push_back(desc);
}

m_PicId=0;
m_bQuit=false;

ip::tcp::endpoint ep(ip::address_v4::from_string("127.0.0.1"), 8100);

m_sock=new CClientSock(ep);

}

//
CClientTransfer::~CClientTransfer(void)
{
//
std::list<image_buff>::iterator iter;

//注意加锁
LOCK lock(iomutex);
for (iter = SendBuffList.begin();iter != SendBuffList.end(); ++iter)
{
delete []((*iter).pBuff);
(*iter).pBuff = NULL;
}

SendBuffList.clear();

for (iter = IdleList.begin();iter != IdleList.end(); ++iter)
{
delete []((*iter).pBuff);
(*iter).pBuff = NULL;
}

IdleList.clear();

if (m_sock)
{
delete m_sock;
m_sock=NULL;
}

}

//函数功能:从jpgPath中获取jpeg数据
bool CClientTransfer::GetJpegData(image_buff &imageBuff,std::string jpgPath)
{
//
bool status=false;

ifstream ifs(jpgPath.c_str(), std::ios_base::binary);

unsigned int nJpgLen = 0;

if ( ifs.is_open() )
{
ifs.seekg( 0 , std::ios::end );
nJpgLen = ifs.tellg();

if ( nJpgLen > 0 &&nJpgLen<=imageBuff.length)
{
ifs.seekg( 0 , std::ios::beg );
ifs.read((char*)imageBuff.pBuff,nJpgLen);
imageBuff.valid_len=nJpgLen;

status=true;
}else
{
status=false;
}

ifs.close();
}

return status;
}

//函数功能:从文件夹下枚举.jpg图片,放入到jpg的vector容器中
bool CClientTransfer::scanFilesUseRecursive(const string& rootPath,vector<string>& container)
{
//

namespace fs = boost::filesystem;
fs::path fullpath (rootPath, fs::native);

vector<string> &ret = container;

//判断路径是否存在
if(!fs::exists(fullpath))
{
return false;
}

fs::recursive_directory_iterator end_iter;

for(fs::recursive_directory_iterator iter(fullpath);iter!=end_iter;iter++)
{

try{
//是文件名&&.jpg
if (!fs::is_directory( *iter ) &&(fs::extension(*iter)==".jpg"||fs::extension(*iter)==".JPG"||fs::extension(*iter)==".jpeg"||fs::extension(*iter)==".JPEG") )
{
//
std::string jpgstring=iter->path().string();

ret.push_back(jpgstring);
std::cout << *iter << " is a file" << std::endl;
}

} catch ( const std::exception & ex )
{
std::cerr << ex.what() << std::endl;
continue;
}
}

return true;

}

//
void TransferThread(image_buff &imageBuff)
{
//
printf("imageBuff len=%d\n",imageBuff.valid_len);

}

void CClientTransfer::ReadJpegThread(void *arg)
{
//
CClientTransfer *pThis=(CClientTransfer *)arg;

while (1)
{
if (pThis->m_bQuit)
{
break;
}

pThis->ReadJpegData();
}
}

//读jpeg数据
void CClientTransfer::ReadJpegData(void)
{

image_buff struImageBuff;
memset(&struImageBuff,0,sizeof(struImageBuff));

//多线程必须加锁
if (IdleList.size()>0)
{
LOCK lock(iomutex);			//锁定mutex
struImageBuff=IdleList.front();
IdleList.pop_front();
}else
{
//使用boost ::时间库替换
boost::this_thread::sleep(boost::posix_time::milliseconds(5));
return;

}

LOCK lock(iomutex);			//锁定mutex
if (m_PicId<m_vecFilePic.size())
{
//
std::string jpgPath=m_vecFilePic[m_PicId].c_str();

if (GetJpegData(struImageBuff,jpgPath))
{
m_PicId++;

SendBuffList.push_back(struImageBuff);
cout<<"read a jpeg image"<<endl;

}else
{
IdleList.push_back(struImageBuff);
}
}else
{
IdleList.push_back(struImageBuff);

cout<<"all jpeg image read complete"<<endl;
}

}

//发送Jpeg数据
void CClientTransfer::SendJpegThread(void *arg)
{
//
CClientTransfer *pThis=(CClientTransfer *)arg;

while (1)
{
if (pThis->m_bQuit)
{
//
break;
}

pThis->SendJpegData();
}
}

void CClientTransfer::SendJpegData(void)
{
//
image_buff struImageBuff;
memset(&struImageBuff,0,sizeof(struImageBuff));

//加锁
if (SendBuffList.size()>0)
{
LOCK lock(iomutex);			//锁定mutex
struImageBuff=SendBuffList.front();
SendBuffList.pop_front();

}else
{

boost::this_thread::sleep(boost::posix_time::milliseconds(5));
return;
}

//调用boost Asio发送数据
m_sock->send((char*)struImageBuff.pBuff,struImageBuff.valid_len);

LOCK lock(iomutex);			//锁定mutex
IdleList.push_back(struImageBuff);

cout<<"send a jpeg image"<<endl;

}

void CClientTransfer::StartThread(void)
{

scanFilesUseRecursive(m_jpgPath,m_vecFilePic);

m_ReadThread= boost::thread(boost::bind(&CClientTransfer::ReadJpegThread,this));
m_SendThread=boost::thread(boost::bind(&CClientTransfer::SendJpegThread,this));
}

//
void CClientTransfer::WaitThreadTerminate(void)
{
//
m_ReadThread.join();
m_SendThread.join();

}


 

3)客户端传输类调用代码

client.cpp

#include "stdafx.h"
#include "ClientTransfer.h"

int _tmain(int argc, _TCHAR* argv[])
{

CClientTransfer clientTransfer("D:\\Tool\\");
clientTransfer.StartThread();

clientTransfer.WaitThreadTerminate();

return 0;
}


 

服务端:

1)做一个简单的服务器,对jpeg图片进行接收,服务器代码如下:

// boostAsio.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
using boost::asio::ip::tcp;
#define max_len 1024*1024

class clientSession
:public boost::enable_shared_from_this<clientSession>
{
public:
clientSession(boost::asio::io_service& ioservice)
:m_socket(ioservice)
{
memset(data_,'\0',sizeof(data_));
}

~clientSession()
{
//

}

tcp::socket& socket()
{
return m_socket;
}

void start()
{
//boost::asio::async_write(m_socket,
//	boost::asio::buffer("link successed!"),
//	boost::bind(&clientSession::handle_write,shared_from_this(),
//	boost::asio::placeholders::error));

m_socket.async_read_some(boost::asio::buffer(data_,max_len),boost::bind(&clientSession::handle_read,shared_from_this()
,boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
private:
void handle_write(const boost::system::error_code& error)
{

if(error)
{
m_socket.close();
}

}

void handle_read(const boost::system::error_code& error,size_t bytes_transferred)
{

if(!error)
{
//std::cout << data_ << std::endl;
std::cout<<"read jpeg size="<<bytes_transferred<<std::endl;

m_socket.async_read_some(boost::asio::buffer(data_,max_len),boost::bind(&clientSession::handle_read,shared_from_this()
,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));
}
else
{
m_socket.close();
}

}
private:
tcp::socket m_socket;
char data_[max_len];
};

class serverApp
{
typedef boost::shared_ptr<clientSession> session_ptr;
public:
serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint)
:m_ioservice(ioservice),
acceptor_(ioservice,endpoint)
{
session_ptr new_session(new clientSession(ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
~serverApp()
{
}
private:
void handle_accept(const boost::system::error_code& error,session_ptr& session)
{
if(!error)
{
std::cout << "get a new client!" << std::endl;

session->start();

session_ptr new_session(new clientSession(m_ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
}
private:
boost::asio::io_service& m_ioservice;
tcp::acceptor acceptor_;
};

int main(int argc , char* argv[])
{
boost::asio::io_service myIoService;
short port = 8100;

tcp::endpoint endPoint(tcp::v4(),port);

serverApp sa(myIoService,endPoint);

myIoService.run();
return 0;
}


 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: