您的位置:首页 > 编程语言 > Java开发

Gearman--任务分发系统 (初探)(JAVA版)

2014-04-10 19:11 423 查看
最近拿DBN(Deep Belief Network)做实验, 由于数据维数太大, 程序跑了6天也没训练完成, 由于长时间没关机, 电脑变的特别卡. 正好实验室有两台比较空闲的服务器, 所以计划将实验程序放到服务器上运行, 如果将程序直接放到服务器上运行也比较方便快捷. 但是想想要是弄一个任务分发系统, 虽然现在麻烦点, 以后就会方便很多.一开始想自己写一个功能简单一点的, 但是觉得自己写的话通用性健壮性都不好, 要是有一个现成的岂不更好. 最先想到的当然是Hadoop,但是Hadoop比较重量级,
而且在Windows下不好配置. 后来就找到了Gearman, 觉得很适合当前需求. 这里使用的是Java版的, 一个托管在Github上,一个在Launchpad上.本文使用的是Github版.

简单来讲Gearman是一个轻量级的任务分发系统,Gearman由三部分组成. Job Server, Worker, Client. 其中Client当然是提交任务, Worker是苦逼的工人,负责完成任务, Job Server负责将Client提交的任务分配给Worker做.为了提高系统的稳定性, Job Server可以有多个, 只要有一个Job Server没有宕机系统就能正常运行.三者关系如图下所示.



Gearman中Job Server都是独立运行的,并不知道其他的Job Server的存在, 所以Client需要指定连接哪个或者哪几个Job Server, 同样Worker也需要指定.Client根据对Job Server的指定顺序进行选择, 前面的Job Server失效才会选择后面的, 所以在大负荷的情况下尽量随机排列Job Server, 以起到Job Server负载均衡的效果. Job Server对Worker的选择有自己一套方法,具体如何分配有待分析具体源码.

下面给出简单使用的例子:

Gearman依赖slf4j, 所以需要slf4j-api-1.6.4.jar和slf4j-simple-1.6.4.jar(版本随意)

1. 启动Job Server. 直接运行Gearman的Jar包, (可以添加启动参数来修改监听IP和端口等)

2. 创建Worker (这里部分代码源自网络, 具体出处不记得了)

public class MyGearmanWorker
{
public static final String ECHO_HOST = "localhost"; // gearman server地址
public static final int ECHO_PORT = 4730; // gearman server 端口

public static void main(String[] args) {
Gearman gearman = Gearman.createGearman();  // 创建gearman对象,无论是client,worker都是
//由这个对象产生的
GearmanServer server = gearman.createGearmanServer(MyGearmanWorker.ECHO_HOST,
MyGearmanWorker.ECHO_PORT); // 创建Server
GearmanWorker worker = gearman.createGearmanWorker(); // 创建worker。
worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间
worker.setMaximumConcurrency(5); // 最大并发数

worker.addFunction(SimpleGearmanWorker.WORKER_NAME, new SimpleGearmanWorker()); // 添加function方法
worker.addServer(server); // 将work添加到server中
}
}


public class SimpleGearmanWorker implements GearmanFunction {
public static final String WORKER_NAME = "CUSTOM_TASK_DEFAULT"; //任务名, Job Server只将这个名称的任务
//分配给该Worker(需要想Job Server注册)

@Override
public byte[] work(String function, byte[] data,  //data为Client发送过来的数据
GearmanFunctionCallback callback) throws Exception {
//任务处理代码
//任务处理过程中可以使用callback.sendData(xxx)向Client发送GEARMAN_JOB_DATA消息,
//并将xxx发送给Client  (可以用于传输调试信息)
return XXX;//这里向Client发送GEARMAN_JOB_SUCCESS信息, 并将XXX作为数据发送给Client
}
}


3 创建Client (这里部分代码源自网络, 具体出处不记得了)

public class MyGearmanClient { //同步提交任务
public static final String WORKER_NAME = "CUSTOM_TASK_DEFAULT";  //任务名
public static final String ECHO_HOST = "localhost";  //gearman server 地址
public static final int ECHO_PORT = 4730;

public static void main(String[] args) throws InterruptedException, IOException {

Gearman gearman = Gearman.createGearman();  //创建gearman对象,client,worker都有这个对象产生
GearmanClient client = gearman.createGearmanClient();

GearmanServer server = gearman.createGearmanServer(ECHO_HOST, ECHO_PORT);  //创建server对象

client.addServer(server);

for (int i = 0; i < 1; i++) {
GearmanJobReturn jobReturn = client.submitJob(WORKER_NAME,data));
while (!jobReturn.isEOF()) {  //根据返回值 是否结束,来判断各种gearman事件状态
GearmanJobEvent event = jobReturn.poll();
switch (event.getEventType()) {
case GEARMAN_JOB_SUCCESS:
System.out.println(">>>> " + new String(event.getData()));  //获取worker的返回值
break;
case GEARMAN_SUBMIT_FAIL:
System.out.println("### submit fail");
break;
case GEARMAN_JOB_FAIL:
System.err.println(event.getEventType() + ": " + new String(event.getData()));
break;
case GEARMAN_JOB_DATA:
System.out.println(event.getEventType() + ": " + new String(event.getData()));
break;
default:
}
}
Thread.sleep(20);  //如果不休眠,循环提交任务,worker会认为受到攻击,会将任务pending
}
gearman.shutdown();  //使用完毕后,一定要将gearman对象进行关闭
}


public class SynMyGearmanClient implements GearmanJobEventCallback<String> { //异步提交任务
public static final String ECHO_FUNCTION_NAME = "CUSTOM_TASK_DEFAULT";
public static final String ECHO_HOST = "localhost";
public static final int ECHO_PORT = 4730;

public static void main(String[] args) throws InterruptedException {
Gearman gearman = Gearman.createGearman();
GearmanClient client = gearman.createGearmanClient();

GearmanServer server = gearman.createGearmanServer(ECHO_HOST, ECHO_PORT);
client.addServer(server);

for (int i = 0; i < 500; i++) {
GearmanJoin<String> join = client.submitJob(ECHO_FUNCTION_NAME, ("hi,zhaoyang" + i).getBytes(),
ECHO_FUNCTION_NAME, new SynMyGearmanClient());
join.join();
Thread.sleep(20);
}
gearman.shutdown();
}

@Override
public void onEvent(String s, GearmanJobEvent event) {
switch (event.getEventType()) {
case GEARMAN_JOB_SUCCESS:
......
}
}
}


到这里为止,整个系统构建完成. 细心的你可能会发现一个问题, 如果系统的任务是比较固定的,这种方式是完全符合要求的, 但是任务经常变化(Worker做的工作进行经常变), 系统岂不是需要经常修改? 这样的话该系统还有什么意义?仔细一想, 这个问题还是比较好解决. Client可以将任务程序和任务数据打成一个压缩包发给Job Server, 然后Job Server又将压缩包发给Worker, 这时Worker解压出程序和数据, 直接运行解压出来的程序即可.
这样就可以做到不变应万变了.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: