storm从入门到精通 第七节 wordcount
2016-10-27 14:26
351 查看
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.storm</groupId> <artifactId>storm-example</artifactId> <version>0.0.1-SNAPSHOT</version> <name>storm-example</name> <properties> <project.build.sourceEncoding>UFT-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
spout
package com.storm.spout;
import java.util.Map;
import java.util.Random;
import org.apache.storm.shade.org.joda.time.DateTime;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
public class RandomSpout extends BaseRichSpout {
SpoutOutputCollector collector;
static String[] words = {"小猫","小狗","小猪","小羊"};
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
Utils.sleep(10000);
String word = words[new Random().nextInt(words.length)];
System.out.println("线程名:"+Thread.currentThread().getName()+" "
+new DateTime().toString("yyyy-MM-dd HH:mm:ss ")+"10s发射一次数据:"+word);
collector.emit(new Values(word));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
bolt
package com.storm.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class SenqueceBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = (String)input.getValue(0);
String out = "我是"+word+"!";
System.out.println("out=" + out);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}
topology
package com.storm.topology;
import java.util.Random;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import com.storm.bolt.SenqueceBolt;
import com.storm.spout.RandomSpout;
public class WordCountPopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException{
TopologyBuilder topology = new TopologyBuilder();
topology.setSpout("spout", new RandomSpout());
topology.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
//集群模式
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, topology.createTopology());
} else {//单击模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("firstTopo", conf, topology.createTopology());
Utils.sleep(3000000);
cluster.killTopology("firstTopo");
cluster.shutdown();
}
}
}
相关文章推荐
- Storm 从入门到精通 第十四讲 Storm Word Count 示例讲解
- Storm 从入门到精通 第十五讲 Storm Word Count 示例代码
- Storm入门(四)WordCount示例
- storm入门简介及WordCount代码解析(一)
- Storm入门(四)WordCount示例
- Storm入门(四)WordCount示例
- Storm入门与实践(3)通过WordCount展开Storm的编程之旅
- Storm入门程序——WordCount
- Storm系列(三):创建Maven项目打包提交wordcount到Storm集群
- Hadoop入门经典:WordCount
- storm入门到精通(八)--源码结构
- spark入门wordcount详解(JAVA)
- Hadoop入门经典:WordCount
- SparkStreaming快速入门程序----WordCount
- Storm 从入门到精通 第二讲 Storm体系结构
- hadoop入门学习系列之五Eclipse下搭建Hadoop2.6.5开发环境并写wordcount
- Storm+Kafka+Hbase的wordcount统计
- Hadoop MapReduce编程 API入门系列之wordcount版本2(六)
- Storm示例剖析-fastWordCount
- storm+kafka:WordCount程序