您的位置:首页 > 其它

storm多语言机制的一个例子

2015-06-02 17:07 295 查看
Java端

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.*;

/**
* This topology tests the C++ Storm wrapper.
* It is taken from
* https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/WordCountTopology.java */
/*
** storm使用c++的一个例子
*/
public class SplitSentenceTest {

// 定义一个Spout,随机产生字符串
public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
_collector = collector;
_rand = new Random();
}

@Override
public void nextTuple(){
Utils.sleep(100);
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
String sentence = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(sentence));
}

@Override
public void ack(Object id){
}

@Override
public void fail(Object id){
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("word"));
}
}

// c++实现了一个Bolt,然后由SplitSentence来调用
public static class SplitSentence extends ShellBolt implements IRichBolt {

public SplitSentence() {
// start.sh里面会调用c++实现的一个Bolt(里面实现了字符串切分的功能等)
super("sh", "start.sh");
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

// This class was taken from the Storm starter project.
// 一个Bolt,实现单词计数的功能
public static class Wosplit_sentencerdCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if(count==null) count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
System.out.println(word+"	"+count);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) throws Exception {

// 构建拓扑
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);

builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("spout");
builder.setBolt("count", new  Wosplit_sentencerdCount(), 12)
.fieldsGrouping("split", new Fields("word"));

Config conf = new Config();
conf.setDebug(false);

//  if(args!=null && args.length > 0) {
//    conf.setNumWorkers(3);

//     StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
// } else {
conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();
// }
}
}


c++端

/*
Author: Sasa Petrovic (montyphyton@gmail.com)
Copyright (c) 2012, University of Edinburgh
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef SPLIT_SENTENCE_H
#define SPLIT_SENTENCE_H

#include <string>
#include <vector>

#include "Storm.h"
#include "json/json.h"
using namespace std;

// strom命名空间
namespace storm
{

// A simple function that splits the input string into words based on
// whitespaces.

// 字符串切分函数
void splitString(
const std::string &text,
std::vector<std::string> &parts,
const std::string &delimiter = " ")
{
parts.clear();
size_t delimiterPos = text.find(delimiter);
size_t lastPos = 0;
if (delimiterPos == string::npos)
{
parts.push_back(text);
return;
}

while(delimiterPos != string::npos)
{
parts.push_back(text.substr(lastPos, delimiterPos - lastPos));
lastPos = delimiterPos + delimiter.size();
delimiterPos = text.find(delimiter, lastPos);
}
parts.push_back(text.substr(lastPos));
}

// 使用c++实现的一个Bolt
class SplitSentence : public BasicBolt
{
public:
// 初始化函数
void Initialize(Json::Value conf, Json::Value context) { }
// 处理函数
void Process(Tuple &tuple)
{
// 接收一个字符串
std::string s = tuple.GetValues()[0].asString();
std::vector<std::string> tokens;
// 将字符串切割成单词
splitString(s, tokens, " ");
// 发送每一个单词
for (int i = 0; i < tokens.size(); ++i)
{
Json::Value j_token;
j_token.append(tokens[i]);
Tuple t(j_token);
Emit(t);
}
}
};

#endif


/*
Author: Sasa Petrovic (montyphyton@gmail.com)
Copyright (c) 2012, University of Edinburgh
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include <iostream>
#include <stdexcept>

#include "SplitSentence.h"

using namespace storm;
using namespace std;

int main(int argc, char *argv[])
{
SplitSentence b;
b.Run();
return 0;
}


把c++文件编译成SplitSentence之后,创建一个start.sh文件,内容为:./SplitSentence

然后把SplitSentence和start.h放到 multilang/resources目录下,还要确保这两个文件有执行权限

然后就可以运行了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: