storm程序-单词统计wordcount
2017-12-16 19:50
453 查看
spout代码:
public class MyLocalFileSpout extends BaseRichSpout { public static final String FILE_PATH = "/root/1.log"; // public static final String FILE_PATH = "D:\\1.log"; private SpoutOutputCollector collector; private BufferedReader bufferedReader; //初始化方法 //该方法只会被调用一次,用来初始化 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; try { this.bufferedReader = new BufferedReader(new FileReader(new File(FILE_PATH))); } catch (FileNotFoundException e) { e.printStackTrace(); } } //Storm实时计算的特性就是对数据一条一条的处理 //while(true){ // this.nextTuple() // } public void nextTuple() { // 每被调用一次就会发送一条数据出去 try { String line = bufferedReader.readLine(); if (StringUtils.isNotBlank(line)) { List<Object> arrayList = new ArrayList<Object>(); arrayList.add(line); collector.emit(arrayList); } } catch (IOException e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { //消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。 declarer.declare(new Fields("juzi")); } }
bolt1 句子分割成单词代码:
/** * Map --->word,1 */ public class MySplitBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { //1、数据如何获取 String juzi = (String) input.getValueByField("juzi"); //2、进行切割 String[] strings = juzi.split(" "); //3、发送数据 for (String word : strings) { //Values 对象帮我们生成一个list collector.emit(new Values(word, 1)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "num")); } }
bolt2 单词统计代码:
public class MyWordCountAndPrintBolt extends BaseBasicBolt { private Map<String, Integer> wordCountMap = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector collector) { String word = (String) input.getValueByField("word"); Integer num = (Integer) input.getValueByField("num"); //1、查看单词对应的value是否存在 Integer integer = wordCountMap.get(word); if (integer == null || integer.intValue() == 0) { wordCountMap.put(word,num); }else { wordCountMap.put(word,integer.intValue()+num); } //2、打印数据 System.out.println(wordCountMap); } public void declareOutputFields(OutputFieldsDeclarer declarer) { //todo 不需要定义输出的字段 } }
storm 驱动类:
public class StormTopologyDriver { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { //1、准备任务信息 //Storm框架支持多语言,在JAVA环境下创建一个拓扑,需要使用TopologyBuilder进行构建 TopologyBuilder topologyBuilder = new TopologyBuilder(); /* MyLocalFileSpout类,主要是将文本内容读成一行一行的模式 * 消息源spout是Storm里面一个topology里面的消息生产者。 * 一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。 * Spout可以是可靠的也可以是不可靠的。 * 如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。 * * 消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。 * 使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。 * * Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。 * 要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。 * * 另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。 */ topologyBuilder.setSpout("mySpout", new MyLocalFileSpout(), 2); topologyBuilder.setBolt("bolt1", new MySplitBolt(), 4).shuffleGrouping("mySpout"); topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt(), 2).shuffleGrouping("bolt1"); //2、任务提交 //提交给谁?提交什么内容? Config config = new Config(); //定义你希望集群分配多少个工作进程给你来执行这个topology config.setNumWorkers(2); StormTopology stormTopology = topologyBuilder.createTopology(); //这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。 config.setDebug(false); //storm的运行有两种模式: 本地模式和分布式模式. //本地模式 // LocalCluster localCluster = new LocalCluster(); // localCluster.submitTopology("wordcount", config, stormTopology); // 指定本地模式运行多长时间之后停止,如果不显式的关系程序将一直运行下去 // Utils.sleep(10000); // localCluster.shutdown(); // 集群模式 StormSubmitter.submitTopology("wordcount1", config, stormTopology); } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.itcast.storm</groupId> <artifactId>learnStorm</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <!--当打包到集群上运行时,集群中已经提供了storm jar包,使用provided,可以在打成的包中不包含该jar包--> <scope>provided</scope> <version>0.9.5</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.5</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>cn.itcast.storm.wordcount.StormTopologyDriver</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
打包,上传至集群运行:
在storm ui上查看运行结果:
由上图可知,bolt2运行在 mini3 的6370端口。
登录mini3 查看运行日志:
相关文章推荐
- hadoop的统计单词程序WordCount
- 第六篇:Eclipse上运行第一个Hadoop实例 - WordCount(单词统计程序)
- 运行Hadoop自带的wordcount单词统计程序
- 启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计
- hadoop的统计单词程序WordCount提示找不到WordCount类
- 《征服c指针》学习笔记-----统计文本单词数目的程序word_count
- Hadoop自带字数统计程序wordcount异常
- storm——本地eclipse上调试wordcount程序
- 软件工程第三个程序:“WC项目” —— 文件信息统计(Word Count ) 命令行程序
- 在Linux系统设置共享文件夹、Hadoop单机/伪分布部署,运行Hadoop Wordcount单词统计实例
- storm学习(2) 进阶word_count 程序
- 单词统计word count
- hive学习之WordCount单词统计
- wordcount(单词统计)
- pig分析日志脚本(1) 统计行数和单词个数wordcount
- HADOOP(1)__Mapreduce_WordCount统计单词数
- Scala中使用两种方式对单词进行次数统计(wordCount)
- MR英语单词频次统计案例-----wordcount
- hadoop2.5的第一个HelloWorld程序—单词计数(WordCount.)