您的位置:首页 > 大数据

getting start with storm 翻译 第七章 part-1

2013-08-15 19:30 316 查看

转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/9988757

第七章 在Storm中使用非JVM语言

有时候你需要使用不是基于JVM的语言来实现一个Storm工程,或者你用其他的语言会感到更舒服,抑或你想使用由其他语言开发的库。

Storm用Java实现,并且本书中你看到的所有的spouts和bolts都是用Java实现的。所以使用例如Python,Ruby或者甚至JavaScript语言来实现spouts和bolts是可能的吗?答案的可能的!通过使用名为multilang protocol的东西实现了这个可能。

multilang protocol是Storm实现的一个特殊的协议,它使用标准的输入输出作为和做spout和bolt工作的进程的通信通道。消息被编码成JSON或者简单的文本通过这个通道进行传递。

我们看一个非JVM语言的简单的spout和bolt示例。你将有一个用来产生1到10000之间的数字的spout和一个用来过滤质数的bolt,都用PHP开发的。


在本示例中,我们以一种幼稚的方式来检查质数。有更好的实现方式,但是它们也更复杂并且超出了本示例的范围。

 

有一个针对storm的PHP DSL官方实现。在本章中,我们将展示我们的实现来作为示例。首先,定义topology。

...
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("numbers-generator",newNumberGeneratorSpout(1,10000));
builder.setBolt("prime-numbers-filter",new
PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
StormTopology topology =builder.createTopology();
...


有一种方式来指定非JVM语言中的topologies。因为Stormtopologies是Thrift架构,并且Nimbus是一个Thrift守护进程,因此你可以使用任何你想用的语言来建立和提交topologies。但这超出了本书的范围。

 

这里没有新东西。我们看一下NumbersGeneratorSpout的实现。

public class NumberGeneratorSpoutextendsShellSpoutimplementsIRichSpout{
public NumberGeneratorSpout(Integerfrom,Integerto)
{
super("php","-f","NumberGeneratorSpout.php",from.toString(),to
.toString());
}
public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{
declarer.declare(newFields("number"));
}
public Map<String,Object>getComponentConfiguration()
{
return null;
}
}

正如你可能已经注意到的,该spout继承自ShellSpout。这是Storm提供的帮助你运行和控制用其他语言编写的spouts的特殊类。在这个例子中,它告诉Storm怎样执行你的PHP脚本。

NumberGeneratorSpoutPHP脚本发射元组到标准输出,读取标准输入来处理acks或fails。

在查看NumberGeneratorSpout.php脚本的实现之前,更详细地看一下mutilang protocol是怎样工作的。

Spout产生从from参数到to参数的序列号,然后传递给构造方法。

接下来看一下PrimeNumbersFilterBolt。该类实现了之前提到的shell。它告诉Storm怎样执行你的PHP脚本。Storm为该目的提供了一个特殊的叫做ShellBolt的类,你唯一要做的事情指出怎样运行脚本和在发射时声明域。

public class PrimeNumbersFilterBoltextendsShellBoltimplementsIRichBolt{
public PrimeNumbersFilterBolt() {
super("php","-f","PrimeNumbersFilterBolt.php");
}
public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{
declarer.declare(newFields("number"));
}
}

在构造方法中,只需告诉Storm怎样运行PHP脚本。这与下述命令是等价的:

         php –f PrimeNumbersFilterBolt.php

PrimeNumbersFilterBoltPHP脚本从标准输入读取元组,处理,发射,acks或fails到标准输出。在查看PrimeNumbersFilterBolt.php脚本的实现前,我们先详细查看下mutilang protocol是怎样工作的。

Mutilang协议

该协议依赖标准输入输出作为进程间通信的信道。一个脚本需要按下述的步骤来准备工作:

1.      初始握手

2.      启动循环

3.      读写元组


有一个特殊的从你的脚本记录日志的方式,它使用Storm内部的日志记录机制,这样你无需实现你自己的日子记录系统。

 

我们看一下每个步骤的细节及怎样用PHP脚本实现它。

初始握手

为了控制进程(启动和停止它),Storm需要知道脚本正在执行的进程ID(PID)。根据mutilang protocol,当你的进程启动时第一件事是Storm将发送一个包含storm配置,topology上下文,PID目录的JSON对象到标准输入。它看起来有点像下边的代码块:

        {
"conf": {
"topology.message.timeout.secs":3,
// etc
},
"context": {
"task->component": {
"1":
"example-spout",
"2":
"__acker",
"3":
"example-bolt"
},
"taskid":
3
},
"pidDir":
"..."
}

进程必须在pidDir指定的目录建一个空文件,文件名为进程ID,并且以一个JSON对象将PID写到标准输出。

{"pid":
1234}

例如,如果你接收到/tmp/example\n并且你的脚本的PID是123,你应该建立一个空文件在/tmp/example/123并且打印{"pid": 123}和结束\n到标准输出。这就是Storm如何记录PID及关闭时杀死进程的。我们看一下用PHP怎么做:

$config =json_decode(read_msg(),true);
$heartbeatdir =$config['pidDir'];
 
$pid =getmypid();
fclose(fopen("$heartbeatdir/$pid","w"));
storm_send(["pid"=>$pid]);
flush();

你建立了一个叫做read_msg的函数来从标准输入读取消息。Mutilang protocol声明消息是可以被编码成JSON的单行或者多行。当Storm发送一个以\n结束的单行时,一个消息就完成了。

function read_msg() {
$msg ="";
while(true) {
$l =fgets(STDIN);
$line =substr($l,0,-1);
if($line=="end") {
break;
}
$msg ="$msg$line\n";
}
return substr($msg,0,-1);
}
function storm_send($json) {
write_line(json_encode($json));
write_line("end");
}
function write_line($line) {
echo("$line\n");
}

flush()的使用很重要;可能存在这样一个直到字符数累计到一定数量才会被刷新的缓冲区。这意味着你的脚本被一直挂起来等待来自Storm的输入,它永远不会接收到因为Storm转而在等待来自你脚本的输出。因此确保当你的脚本有输出时被及时的刷新是重要的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Storm 大数据