您的位置:首页 > 其它

storm程序-单词统计wordcount

2017-12-16 19:50 453 查看

spout代码:

public class MyLocalFileSpout extends BaseRichSpout {
public static final String FILE_PATH = "/root/1.log";
//    public static final String FILE_PATH = "D:\\1.log";
private SpoutOutputCollector collector;
private BufferedReader bufferedReader;

//初始化方法
//该方法只会被调用一次,用来初始化
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
this.bufferedReader = new BufferedReader(new FileReader(new File(FILE_PATH)));
} catch (FileNotFoundException e) {
e.printStackTrace();
}

}

//Storm实时计算的特性就是对数据一条一条的处理
//while(true){
// this.nextTuple()
// }
public void nextTuple() {
// 每被调用一次就会发送一条数据出去
try {
String line = bufferedReader.readLine();
if (StringUtils.isNotBlank(line)) {
List<Object> arrayList = new ArrayList<Object>();
arrayList.add(line);
collector.emit(arrayList);
}
} catch (IOException e) {
e.printStackTrace();
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
//消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。
declarer.declare(new Fields("juzi"));
}
}


bolt1 句子分割成单词代码:

/**
* Map --->word,1
*/
public class MySplitBolt extends BaseBasicBolt {

public void execute(Tuple input, BasicOutputCollector collector) {
//1、数据如何获取
String juzi = (String) input.getValueByField("juzi");
//2、进行切割
String[] strings = juzi.split(" ");
//3、发送数据
for (String word : strings) {
//Values 对象帮我们生成一个list
collector.emit(new Values(word, 1));
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "num"));
}
}


bolt2 单词统计代码:

public class MyWordCountAndPrintBolt extends BaseBasicBolt {
private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();

public void execute(Tuple input, BasicOutputCollector collector) {
String word = (String) input.getValueByField("word");
Integer num = (Integer) input.getValueByField("num");
//1、查看单词对应的value是否存在
Integer integer = wordCountMap.get(word);
if (integer == null || integer.intValue() == 0) {
wordCountMap.put(word,num);
}else {
wordCountMap.put(word,integer.intValue()+num);
}
//2、打印数据
System.out.println(wordCountMap);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
//todo 不需要定义输出的字段
}
}


storm 驱动类:

public class StormTopologyDriver {

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//1、准备任务信息
//Storm框架支持多语言,在JAVA环境下创建一个拓扑,需要使用TopologyBuilder进行构建
TopologyBuilder topologyBuilder = new TopologyBuilder();
/* MyLocalFileSpout类,主要是将文本内容读成一行一行的模式
* 消息源spout是Storm里面一个topology里面的消息生产者。
* 一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。
* Spout可以是可靠的也可以是不可靠的。
* 如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
*
* 消息源可以发射多条消息流stream。多条消息流可以理解为多中类型的数据。
* 使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。
*
* Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。
* 要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
*
* 另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。
*/

topologyBuilder.setSpout("mySpout", new MyLocalFileSpout(), 2);
topologyBuilder.setBolt("bolt1", new MySplitBolt(), 4).shuffleGrouping("mySpout");
topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt(), 2).shuffleGrouping("bolt1");

//2、任务提交
//提交给谁?提交什么内容?
Config config = new Config();
//定义你希望集群分配多少个工作进程给你来执行这个topology
config.setNumWorkers(2);
StormTopology stormTopology = topologyBuilder.createTopology();

//这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
config.setDebug(false);
//storm的运行有两种模式: 本地模式和分布式模式.

//本地模式
//        LocalCluster localCluster = new LocalCluster();
//        localCluster.submitTopology("wordcount", config, stormTopology);
//        指定本地模式运行多长时间之后停止,如果不显式的关系程序将一直运行下去
//        Utils.sleep(10000);
//        localCluster.shutdown();

// 集群模式
StormSubmitter.submitTopology("wordcount1", config, stormTopology);
}
}


pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>cn.itcast.storm</groupId>
<artifactId>learnStorm</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<!--当打包到集群上运行时,集群中已经提供了storm jar包,使用provided,可以在打成的包中不包含该jar包-->
<scope>provided</scope>
<version>0.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.5</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>cn.itcast.storm.wordcount.StormTopologyDriver</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>


打包,上传至集群运行:



在storm ui上查看运行结果:







由上图可知,bolt2运行在 mini3 的6370端口。

登录mini3 查看运行日志:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: