Flink学习笔记(三)——监控维基百科编辑流
2018-01-11 17:57
519 查看
维基百科提供了一个IRC频道,对所有的wiki的编辑都会被记录下来。并计算每个用户在给定时间窗口内编辑的字节数。使用flink实现这个功能只需要几分钟,而且非常的简单,同时也为后面学习更复杂的程序打下基础。
已上传至github: https://github.com/KeoZmy/wiki-flink.git
开发环境:
macOS High Sierra
IntelliJ IDEA
JDK 1.8
构建maven项目
引入相关依赖<properties> <flink.version>1.4.0</flink.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
Flink程序
Flink程序的第一步是创建一个StreamExecutionEnvironment
(如果是批处理则创建
ExecutionEnvironment)。这可以用来设置执行参数,并创建来自外部系统的读取源。
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
然后创建一个从维基百科 IPC 日志的读取源。
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
我想要获取每个用户在一段时间内窗口中添加或删除的字节数(比如:5秒)。为此,必须得先keying stream by userName,可以通过
KeySelector达到目的。
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits .keyBy(new KeySelector<WikipediaEditEvent, String>() { @Override public String getKey(WikipediaEditEvent event) { return event.getUser(); } });
在无限元素流上,计算集合时,正如前文提到的,需要Windows。这里我会计算每5秒钟汇总编辑字节的总和。
DataStream<Tuple2<String, Long>> result = keyedEdits .timeWindow(Time.seconds(5)) .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { acc.f0 = event.getUser(); acc.f1 += event.getByteDiff(); return acc; } });
然后我们将数据写入kafka
result .map(new MapFunction<Tuple2<String,Long>, String>() { @Override public String map(Tuple2<String, Long> tuple) { return tuple.toString(); } }) .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
运行程序
如何运行这个程序呢?当然你可以本地直接启kafka,然后运行main方法。但是这里,我想把程序丢到flink集群中去处理。本地安装flink
可以参考:flink安装入门,快速开始官网下载flink以后:
$ cd my/flink/directory $ bin/start-local.sh
即可启动flink
本地安装kafka
可以参考:kafka安装与启动#在kafka的根目录下 #启动zk bin/zookeeper-server-start.sh config/zookeeper.properties #启动kafka bin/kafka-server-start.sh config/server.properties & #创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-result
在flink集群上运行程序
注意哈,要把maven程序打包成可运行的jar,而不是一个简单的jar需要添加这么一段代码在pom文件之中
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.keozhao.WikipediaAnalysis</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> </plugin>
然后打包的时候不执行mvn package 而执行maven-assembly
可以参考maven打包成可执行的jar
然后我们将程序放到flink里执行
#进入你flink的文件夹 cd my/flink/directory #运行jar bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
可以看到运行结果:
可以访问http://localhost:8081,看到集群的资源状况,与job情况
可以在kafka的目录下执行指令:
#查看(消费)flink写入的数据 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result --from-beginning
可以看到源源不断的数据,这是维基百科的编辑流状态。
相关文章推荐
- Python学习笔记-DNS域名轮循业务监控
- GoldenGate学习笔记二_监控GoldenGate
- Linux之ubuntu学习笔记(五):文件的编辑 ( vim的使用)
- SQL*Plus 学习笔记——常用编辑命令
- [原创] hadoop学习笔记:hadoopWEB监控
- sql学习笔记3--编辑数据
- Flink学习笔记 --- Basic Concepts整理笔记
- Nodejs心跳包(二)监控elasticsearch服务器状态--学习笔记
- 学习笔记之——自定义带滑动距离监控和仿iOS回弹效果的ScrollView
- coursera NLP学习笔记之week1最小编辑距离计算
- HTML学习笔记之网页元素编辑
- WPF学习笔记——编辑DataGrid单元格并实时更新到数据库
- Flink学习笔记 --- Intellij自动导入
- Spark学习笔记(29)Spark Streaming日志和Web监控台
- zabbix监控 - 学习笔记11
- Flink学习笔记 --- 理解DataSet WordCount
- JVM深入学习笔记五:JVM 监控工具
- linux学习笔记—— 文本编辑(代码开发工具)——vim之常用命令
- zabbix监控 - 学习笔记
- Flink学习笔记 --- DataStream Transformations