您的位置:首页 > 编程语言 > C语言/C++

protobuf实现c++与java之间的数据传递,beancopy数据到前台

2015-06-29 21:29 615 查看
定义proto文件

option java_package = "com.wy.web";
message my_message{
required string startedTime =1;
required string version=2;
required double configuredCapacity=3;
required double dfsUsed =4;
required int32 fileNum=5;
required int32 replicatedFilesNum =6;
required int32 blockNum =7;
required int32 livedNodeNum =8;
required int32 decommissioningNodeNum=9;
}


生成proto文件对应的类

windows:

protoc.exe --java_out=.\ infor.proto(注意'\'和文件名之间有空格,c++命令为protoc.exe --java_out==.\
infor.proto)

linux:

protoc -I=./ --java_out=./ infor.proto

c++代码,向java端发送数据

#include <netinet/in.h>    // for sockaddr_in
#include <sys/types.h>    // for socket
#include <sys/socket.h>    // for socket
#include <unistd.h>
#include <stdio.h>        // for printf
#include <stdlib.h>        // for exit
#include <string.h>        // for bzero
#include <string>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include "infor.pb.h"

#define HELLO_WORLD_SERVER_PORT    8000
#define LENGTH_OF_LISTEN_QUEUE 20
#define BUFFER_SIZE 1024
#define FILE_NAME_MAX_SIZE 512

int main()
{
std::string time = "2015-06-25";
std::string version = "0.0.1";
double config = 2.0;
double dfs = 3.0;
int file = 1000;
int rep = 1000;
int block = 1000;
int live = 1000;
int de = 1000;

struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
server_addr.sin_port = htons(HELLO_WORLD_SERVER_PORT);

int server_socket = socket(PF_INET,SOCK_STREAM,0);
if( server_socket < 0)
{
printf("Create Socket Failed!");
exit(1);
}

int opt =1;
setsockopt(server_socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));

if( bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr)))
{
printf("Server Bind Port : %d Failed!", HELLO_WORLD_SERVER_PORT);
exit(1);
}

if ( listen(server_socket, LENGTH_OF_LISTEN_QUEUE) )
{
printf("Server Listen Failed!");
exit(1);
}
while (1)
{

struct sockaddr_in client_addr;
socklen_t length = sizeof(client_addr);

int new_server_socket = accept(server_socket,(struct sockaddr*)&client_addr,&length);
if ( new_server_socket < 0)
{
printf("Server Accept Failed!\n");
break;
}

my_message mm;
mm.set_startedtime(time);
mm.set_version(version);
mm.set_configuredcapacity(config);
mm.set_dfsused(dfs);
mm.set_filenum(file);
mm.set_replicatedfilesnum(rep);
mm.set_blocknum(block);
mm.set_livednodenum(live);
mm.set_decommissioningnodenum(de);
file += 1; rep += 1; block += 1; live += 1; de += 1;

int len = mm.ByteSize() + 4;
char *buffer = new char[len];

google::protobuf::io::ArrayOutputStream arrayOut(buffer, len);
google::protobuf::io::CodedOutputStream codedOut(&arrayOut);

codedOut.WriteVarint32(mm.ByteSize());

//write protobuf ack to buffer
mm.SerializeToCodedStream(&codedOut);

//mm.SerializeToArray(buffer, len);

if(send(new_server_socket,buffer,len,0)<0)
{
printf("Send File:\t%s Failed\n", "ddddd");
break;
}

close(new_server_socket);
delete buffer;
}
close(server_socket);
return 0;
}


java接收线程,通过beancopy更新到数据中心

package com.wy.web;

import java.io.InputStream;
import java.net.Socket;
import java.net.UnknownHostException;

import com.wy.util.BeanUtil;
import com.wy.web.Infor.my_message;

/**
* 数据监听线程,从服务器端取得数据并更新到数据中心
*/
public class dataListennerT extends Thread{

private Socket socket;
private DataCenter dataCenter;
private static final String host="10.9.3.45";
//private static final String host="10.9.3.165";
public dataListennerT(DataCenter dataCenter) {
this.dataCenter=dataCenter;
}

@Override
public void run() {
while(true)
{
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
socket = new Socket(host,8000);

System.out.println("成功连接");

read(socket);

} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

private void read(Socket socket) {
InputStream clientIs;
try {
clientIs = socket.getInputStream();
//Infor.my_message ms = Infor.my_message.parseDelimitedFrom(clientIs);

/*-------------------------------*/
byte[] arr = new byte[256];
int len = clientIs.read(arr);
byte[] data=new byte[len];
for(int i=0;i<len;i++)
data[i]=arr[i];

Infor.my_message ms = Infor.my_message.parseFrom(data);

System.out.println(ms.getStartedTime());
//updataCenter(ms);
BeanUtil.beanFieldCopy(ms, dataCenter, "Csi",true);

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
数据中心

package com.wy.web;

import org.primefaces.push.annotation.Singleton;

/**
* 数据中心,从服务器取所有前台需要的数据
*/
public class DataCenter {

private static DataCenter instance;

private String csiStartedTime = "20150624";
private String csiVersion = "1.0.0";
private double csiConfiguredCapacity = 1;
private double csiDfsUsed = 1024;
private int csiFileNum = 26;
private int csiReplicatedFilesNum = 100;
private int csiBlockNum;
private int csiLivedNodeNum = 3;
private int csiDecommissioningNodeNum = 0;

public String getCsiStartedTime() {
return csiStartedTime;
}

public void setCsiStartedTime(String csiStartedTime) {
this.csiStartedTime = csiStartedTime;
}

public String getCsiVersion() {
return csiVersion;
}

public void setCsiVersion(String csiVersion) {
this.csiVersion = csiVersion;
}

public double getCsiConfiguredCapacity() {
return csiConfiguredCapacity;
}

public void setCsiConfiguredCapacity(double csiConfiguredCapacity) {
this.csiConfiguredCapacity = csiConfiguredCapacity;
}

public double getCsiDfsUsed() {
return csiDfsUsed;
}

public void setCsiDfsUsed(double csiDfsUsed) {
this.csiDfsUsed = csiDfsUsed;
}

public int getCsiFileNum() {
return csiFileNum;
}

public void setCsiFileNum(int csiFileNum) {
this.csiFileNum = csiFileNum;
}

public int getCsiReplicatedFilesNum() {
return csiReplicatedFilesNum;
}

public void setCsiReplicatedFilesNum(int csiReplicatedFilesNum) {
this.csiReplicatedFilesNum = csiReplicatedFilesNum;
}

public int getCsiLivedNodeNum() {
return csiLivedNodeNum;
}

public void setCsiLivedNodeNum(int csiLivedNodeNum) {
this.csiLivedNodeNum = csiLivedNodeNum;
}

public int getCsiDecommissioningNodeNum() {
return csiDecommissioningNodeNum;
}

public void setCsiDecommissioningNodeNum(int csiDecommissioningNodeNum) {
this.csiDecommissioningNodeNum = csiDecommissioningNodeNum;
}

public int getCsiBlockNum() {
return csiBlockNum;
}

public void setCsiBlockNum(int blockNum) {
this.csiBlockNum = blockNum;
}

public static DataCenter getClient() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new DataCenter();
}
}
}
return instance;
}

private DataCenter() {
//System.out.println("DataCenter!!!==========");
new dataListennerT(this).start();

}
}


bean copy,实现将两个类属性复制

package com.wy.util;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashSet;

public class BeanUtil {

public BeanUtil() {

}

public static synchronized void beanFieldCopy(Object src, Object des, String prefix,boolean desPrefix) {
Class srcClass = src.getClass();
Class desClass = des.getClass();
HashSet<String> setFields = new HashSet<String>();

Method[] srcMethods = srcClass.getMethods();
Method[] desMethods = desClass.getMethods();

//System.out.println(desClass.getName());

// 保存提供set方法的参数
for (int i = 0; i < desMethods.length; i++) {
Method desMethod = desMethods[i];
String desMethodName = desMethod.getName();

if (desMethodName.startsWith("set"))
{
if(desPrefix)
setFields.add(desMethodName.substring(3 + prefix.length(),
desMethodName.length()));
else
setFields.add(desMethodName.substring(3,
desMethodName.length()));
}
}
//		if(desClass.getName().equals("com.wy.web.ClusterSummaryInfo"))
//		System.out.println(setFields);

// Field[] desFields = desClass.getDeclaredFields();

for (int i = 0; i < srcMethods.length; i++) {
Method method = srcMethods[i];
String srcMethodName = method.getName();
if (srcMethodName.startsWith("get")) {
String fieldName;
if(desPrefix)
fieldName = srcMethodName.substring(3, srcMethodName.length());
else
fieldName=srcMethodName.substring(3+prefix.length(), srcMethodName.length());
//				if(desClass.getName().equals("com.wy.web.ClusterSummaryInfo"))
//				System.out.println(fieldName);
if (setFields.contains(fieldName)) {
String invokeMethodName;
if(desPrefix)
invokeMethodName= "set" + prefix + fieldName;
else
invokeMethodName="set"+fieldName;
//System.out.println(desClass.getName()+"   "+invokeMethodName);
try {
Method invokeMethod = desClass.getMethod(
invokeMethodName,
new Class[] { method.getReturnType() });
Object result = method.invoke(src, new Object[] {});
if (result == null)
break;

invokeMethod.invoke(des, new Object[] { result });

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}
}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: