storm多语言机制的一个例子
2015-06-02 17:07
295 查看
Java端
c++端
把c++文件编译成SplitSentence之后,创建一个start.sh文件,内容为:./SplitSentence
然后把SplitSentence和start.h放到 multilang/resources目录下,还要确保这两个文件有执行权限
然后就可以运行了
import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.utils.Utils; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.*; /** * This topology tests the C++ Storm wrapper. * It is taken from * https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/WordCountTopology.java */ /* ** storm使用c++的一个例子 */ public class SplitSentenceTest { // 定义一个Spout,随机产生字符串 public static class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){ _collector = collector; _rand = new Random(); } @Override public void nextTuple(){ Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } @Override public void ack(Object id){ } @Override public void fail(Object id){ } @Override public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("word")); } } // c++实现了一个Bolt,然后由SplitSentence来调用 public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { // start.sh里面会调用c++实现的一个Bolt(里面实现了字符串切分的功能等) super("sh", "start.sh"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } // This class was taken from the Storm starter project. // 一个Bolt,实现单词计数的功能 public static class Wosplit_sentencerdCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if(count==null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); System.out.println(word+" "+count); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { // 构建拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8) .shuffleGrouping("spout"); builder.setBolt("count", new Wosplit_sentencerdCount(), 12) .fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(false); // if(args!=null && args.length > 0) { // conf.setNumWorkers(3); // StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); // } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); // } } }
c++端
/* Author: Sasa Petrovic (montyphyton@gmail.com) Copyright (c) 2012, University of Edinburgh All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the author nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef SPLIT_SENTENCE_H #define SPLIT_SENTENCE_H #include <string> #include <vector> #include "Storm.h" #include "json/json.h" using namespace std; // strom命名空间 namespace storm { // A simple function that splits the input string into words based on // whitespaces. // 字符串切分函数 void splitString( const std::string &text, std::vector<std::string> &parts, const std::string &delimiter = " ") { parts.clear(); size_t delimiterPos = text.find(delimiter); size_t lastPos = 0; if (delimiterPos == string::npos) { parts.push_back(text); return; } while(delimiterPos != string::npos) { parts.push_back(text.substr(lastPos, delimiterPos - lastPos)); lastPos = delimiterPos + delimiter.size(); delimiterPos = text.find(delimiter, lastPos); } parts.push_back(text.substr(lastPos)); } // 使用c++实现的一个Bolt class SplitSentence : public BasicBolt { public: // 初始化函数 void Initialize(Json::Value conf, Json::Value context) { } // 处理函数 void Process(Tuple &tuple) { // 接收一个字符串 std::string s = tuple.GetValues()[0].asString(); std::vector<std::string> tokens; // 将字符串切割成单词 splitString(s, tokens, " "); // 发送每一个单词 for (int i = 0; i < tokens.size(); ++i) { Json::Value j_token; j_token.append(tokens[i]); Tuple t(j_token); Emit(t); } } }; #endif
/* Author: Sasa Petrovic (montyphyton@gmail.com) Copyright (c) 2012, University of Edinburgh All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the author nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include <iostream> #include <stdexcept> #include "SplitSentence.h" using namespace storm; using namespace std; int main(int argc, char *argv[]) { SplitSentence b; b.Run(); return 0; }
把c++文件编译成SplitSentence之后,创建一个start.sh文件,内容为:./SplitSentence
然后把SplitSentence和start.h放到 multilang/resources目录下,还要确保这两个文件有执行权限
然后就可以运行了
相关文章推荐
- 5.1.2 Binary Tree Level Order Traversal II
- KVM 介绍(2):CPU 和内存虚拟化
- windows8 使用ValidationAttribute对CheckBox验证必须
- NEC学习 ---- 模块 -水平文字链接列表
- 名称缩写
- 开源电商系统
- Nexus创建本地Maven仓库
- 2014届华为校园招聘机试题一
- js判断当前浏览器类型,判断IE浏览器方法
- LeetCode219:Contains Duplicate II
- JS 获取style 里面的样式
- magent 两主一备 telnet 上去第二次set卡住的解决
- UDP协议及包格式
- equals的错误用法
- ArcGIS for Android 部分小工具操作
- linux实现共享内存同步的四种方法
- 编译安卓4.0源码出现错误
- 常用的MySQL数据类型
- Java基础复习之一:static关键字的使用
- php怎么将数组数组转化为json格式的数据