【Storm入门指南】第七章 在 Storm 中使用非 JVM 语言
2016-01-19 17:38
429 查看
有时候你希望使用非JVM语言来实现一个 Storm工程,也许是你使用其他语言感觉更舒服,或者是你想使用其他语言的开发的库。
Storm 是用 Java 实现的,你之前所看到的本书那些示例 spout 和 bolt 也是用Java来编码的。所以能用能用 Python、Ruby 或者 JavaScript 来编写呢?答案是肯定的。通过使用多语言协议(multilang protocol) 来实现这种可能。
多语言协议是 Storm 实现的一个特殊协议,使用标准输入输出作为进程间通信通道来完spout和bolt的处理工作。消息被编码成JSON或者简单的文本通过该通道进行传递。
让我们来看一下一个简单的使用非JVM语言编写的 spout 和 bolt 的实例。有一个用来产生1到10000之间随机数的 spout 和一个过滤质数的 bolt,它们都是用PHP编写的。
本例中,检查质数的方式很幼稚。有很多更好的实现,但是它们太复杂,不在本例讨论的范围内。
有一个针对Storm 的 PHP DLS 官方实现。在本章,我们将展示我们的实现。首先,定义 topology。
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span> | ... TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000)); builder.setBolt("prime-numbers-filter", new PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator"); StormTopology topology = builder.createTopology(); ... |
没有任何新东西。来看看
NumbersGeneratorSpout的实现。
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | public class NumberGeneratorSpout extends ShellSpout implements IRichSpout { public NumberGeneratorSpout(Integer from, Integer to) { super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString()); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("number")); } public Map<String, Object> getComponentConfiguration() { return null; } } |
ShellSpout。这是一个来自 Storm
的特殊类,可以帮助运行和控制用其他语言编写的 spout。在本例中,它告诉 Storm 如何执行 PHP 脚本。
NumberGeneratorSpout的 PHP 脚本发射 tuple到标准输出,读取标准输入来处理应答和失败。
在浏览
NumberGeneratorSpout.php脚本实现之前,我们来看看更多多语言协议的工作细节。
Spout 产生从
from到
to的序列号,然后传递给构造函数。
接着,来看看
PrimeNumbersFilterBolt。这个方法实现了之前提到的接口。它告诉
Storm 如何执行 PHP 脚本。Storm 提供了特殊的类
ShellBolt来实现该功能,你需要做的唯一的事情是声明怎样运行脚本和发射字段。
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt { public PrimeNumbersFilterBolt() { super("php", "-f", "PrimeNumbersFilterBolt.php"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("number")); } } |
<span class="line-number" style="margin:0px; padding:0px">1</span> | php -f PrimeNumbersFilterBolt.php |
PrimeNumbersFilterBolt的 PHP 脚本从标准输入读取 tuple,处理,发射,应答或失败到标准输出。在浏览
PrimeNumbersFilterBolt.php脚本之前,让我们来看看多语言协议是如何工作的。
7.1 详解多语言协议
该协议依赖标准输入输出作为线程间通信的信道。一个脚本需要按照如下步骤来完成工作:初始握手。
开始循环。
读写 tuple。
让我们详细查看每一步,以及如何使用PHP脚本去实现。
7.1.1 初始握手
Storm 为了控制处理(启动或结束),需要知道正在运行脚本的进程 ID。根据多语言协议,当进程启动时做的第一件事就是发送一个包含了 Storm 配置、topology 上下文以及 PID 目录的 JSON 对象到标准输入。这个对象有点像如下代码块:<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | { "conf": { "topology.message.timeout.secs": 3, // etc }, "context": { "task->component": { "1": "example-spout", "2": "__acker", "3": "example-bolt" }, "taskid": 3 }, "pidDir": "..." } |
pidDir目录下创建一个以进程ID命名的空文件,将 PID 以 JSON
对象的方式写到标准输出中。
<span class="line-number" style="margin:0px; padding:0px">1</span> | {"pid": 1234} |
/tmp/exmaple/且脚本的 PID 是
123,你应该在
/tmp/example/123下创建一个空文件并且打印
{"pid":123}和
end到标准输出。Storm
就是这样在关闭进程的时候来持续跟踪 PID 和终止进程的。让我们来看看 PHP 是如何做到的:
<span class="line-number" style="margin:0px; padding:0px">1</span><span class="line-number" style="margin:0px; padding:0px">2</span> | $config = json_decode(read_msg(), true); $heartbeatdir = $config['pidDir']; $pid = getmypid(); fclose(fopen("$heartbeatdir/$pid", "w")); storm_send(["pid"=>$pid]); flush(); |
read_msg功能从标准输入来读取消息。多语言协议允许消息可以被编码成JSON的单行或多行。当
Storm 发送单条单词
end时,一条消息就完成了。
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | function read_msg() { $msg = ""; while(true) { $l = fgets(STDIN); $line = substr($l,0,-1); if($line=="end") { break; } $msg = "$msg$line\n"; } return substr($msg, 0, -1); } function storm_send($json) { write_line(json_encode($json)); write_line("end"); } function write_line($line) { echo("$line\n"); } |
flush是非常重要的。可能存在一个直到字符数累计到一定数量才会被刷新的缓冲区。这意味着你的脚本可能会被一直挂起,等待来自Storm的输入。它永远不会被接收,因为Storm在等待来自你脚本的输出。因此确保当你的脚本有输出时及时刷新是重要的。
7.1.2 开始循环并读写 Tuple
这是最重要的一步,在这里所有工作被完成。该步的实现取决于你在开发一个 spout 或是 bolt。在 spout 情形下,你需要开始发射 tuple。在 bolt 情形下,循环读取tuple、处理并发射它们、ack 或 fail。
让我们看看 spout 发射数字的实现。
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | $from = intval($argv[1]); $to = intval($argv[2]); while(true) { $msg = read_msg(); $cmd = json_decode($msg, true); if ($cmd['command']=='next') { if ($from<$to) { storm_emit(array("$from")); $task_ids = read_msg(); $from++; } else { sleep(1); } } storm_sync(); } |
from和
to参数,然后开始循环。每当从
Storm 获取到一个
next消息,意味着准备开始发射一个新的 tuple。
当发射完所有的数字并且没有更多的 tuple 需要发送的话,休眠一段时间。
为了保证脚本已经为准备好下一个 tuple,在发射下一个之前等待
sync指令。使用
read_msg()读取指令并按照
JSON 格式编码。
在本例中,bolt 有所不同。
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | while(true) { $msg = read_msg(); $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING); if (!empty($tuple["id"])) { if (isPrime($tuple["tuple"][0])) { storm_emit(array($tuple["tuple"][0])); } storm_ack($tuple["id"]); } } |
不论如何,都对 tuple 作应答。
在
json_decode中使用
JSON_BIGINT_AS_STRING参数,是为了解决
Java 和 PHP 之间的转换问题。Java 发送某些很大的数字,然而这些数字在 PHP 解析时会丢失精度,这就有问题了。为了解决这个问题,告诉 PHP 把大数字解析成字符串,并且在JSON消息中禁止对该字符串使用双引号。PHP 5.4.0 或者更高版本,需要这个参数才能正常工作。
emit、
ack、
fail和
log的格式如下:
Emit
<span class="line-number" style="margin:0px; padding:0px">1</span><span class="line-number" style="margin:0px; padding:0px">2</span> | { "command": "emit", "tuple": ["foo", "bar"] } |
Ack
<span class="line-number" style="margin:0px; padding:0px">1</span><span class="line-number" style="margin:0px; padding:0px">2</span> | { "command": "ack", "id": 123456789 } |
Fail
<span class="line-number" style="margin:0px; padding:0px">1</span><span class="line-number" style="margin:0px; padding:0px">2</span> | { "command": "fail", "id": 123456789 } |
Log
<span class="line-number" style="margin:0px; padding:0px">1</span><span class="line-number" style="margin:0px; padding:0px">2</span> | { "command": "log", "msg": "some message to be logged by storm." } |
Spout:
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | <?php function read_msg() { $msg = ""; while(true) { $l = fgets(STDIN); $line = substr($l,0,-1); if ($line=="end") { break; } $msg = "$msg$line\n"; } return substr($msg, 0, -1); } function write_line($line) { echo("$line\n"); } function storm_emit($tuple) { $msg = array("command" => "emit", "tuple" => $tuple); storm_send($msg); } function storm_send($json) { write_line(json_encode($json)); write_line("end"); } function storm_sync() { storm_send(array("command" => "sync")); } function storm_log($msg) { $msg = array("command" => "log", "msg" => $msg); storm_send($msg); flush(); } $config = json_decode(read_msg(), true); $heartbeatdir = $config['pidDir']; $pid = getmypid(); fclose(fopen("$heartbeatdir/$pid", "w")); storm_send(["pid"=>$pid]); flush(); $from = intval($argv[1]); $to = intval($argv[2]); while(true) { $msg = read_msg(); $cmd = json_decode($msg, true); if ($cmd['command']=='next') { if ($from<$to) { storm_emit(array("$from")); $task_ids = read_msg(); $from++; } else { sleep(1); } } storm_sync(); } ?> |
<span class="line-number" style="margin:0px; padding:0px">1</span> <span class="line-number" style="margin:0px; padding:0px">2</span> <span class="line-number" style="margin:0px; padding:0px">3</span> <span class="line-number" style="margin:0px; padding:0px">4</span> <span class="line-number" style="margin:0px; padding:0px">5</span> <span class="line-number" style="margin:0px; padding:0px">6</span> <span class="line-number" style="margin:0px; padding:0px">7</span><span class="line-number" style="margin:0px; padding:0px">8</span> | <?php |
multilang/resources目录下。这个目录会被包含在JAR包中发送到工作者。如果你没有将脚本放置到该目录,Storm
将会出错且不会运行它们。
原始地址: http://JavanLu.github.io/blog/2013/10/17/getting-started-with-storm-chapter-7/
written by JavanLu posted at http://JavanLu.github.io
相关文章推荐
- java性能调优工具
- 深入理解Java内存模型之系列篇
- AWK调用SHELL,并将变量传递给SHELL
- 转:安桌开发开源库的推荐1
- 二进制处理
- 在 MyEclipse 中使用 Maven3 开发
- linux grep 正则表达式
- [leetcode]Search Insert Position
- ion-view 视图的生命周期及事件集合
- Command /usr/bin/codesign failed with exit code 1
- 进程间传递文件描述符
- 转载:UI设计师不可不知的安卓屏幕知识(原创文章)
- json.parse()和json.stringify()
- 用codeblock配置FLTK
- C# 拖拽按钮移动
- 为什么越来越多的网站域名不加www前缀?
- JAVA8 十大新特性详解
- 【Storm入门指南】第六章 真实示例
- 例题5-10 UVA 207 PGA Tour Prize Money PGA巡回赛的奖金
- const成员函数修改成员变量的问题