Apache Beam WordCount编程实战及源码解读
2017-02-21 10:41
423 查看
概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码
负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。
方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
可扩展:编写和分享新的SDKs,IO连接器和transformation库
部分翻译摘自官网:Apacher Beam 官网
创建Pipeline
将转换应用于Pipeline
读取输入文件
应用ParDo转换
应用SDK提供的转换(例如:Count)
写出输出
运行Pipeline
3.支持Spark,Flink,Apex等大数据数据框架来运行该WordCount程序。完整项目Github源码(推荐,注意
设置VM options
设置Programe arguments
设置VM options
设置Programe arguments
Flink运行等等
设置VM options
设置Programe arguments
负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。
1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。
Apache Beam 于2017年1月10日成为Apache新的顶级项目。1.1.Apache Beam 特点:
统一:对于批处理和流媒体用例使用单个编程模型。方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
可扩展:编写和分享新的SDKs,IO连接器和transformation库
部分翻译摘自官网:Apacher Beam 官网
1.2.Apache Beam关键概念:
1.2.1.Apache Beam SDKs
主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。1.2.2. Apache Beam Pipeline Runners(Beam的执行器/执行者们),支持Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多个大数据计算框架。可谓是一处Apache Beam编程,多计算框架运行。
1.2.3. 他们的对如下的支持情况详见
2.Apache Beam编程实战–Apache Beam源码解读
基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。2.1.源码解析-Apache Beam 数据流处理原理解析:
关键步骤:创建Pipeline
将转换应用于Pipeline
读取输入文件
应用ParDo转换
应用SDK提供的转换(例如:Count)
写出输出
运行Pipeline
2.2.源码解析,完整项目Github源码,附WordCount,pom.xml等
/** * MIT. * Author: wangxiaolei(王小雷). * Date:17-2-20. * Project:ApacheBeamWordCount. */ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; public class WordCount { /** *1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。 */ static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", Sum.ofLongs()); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); } // 将文本行划分为单词 String[] words = c.element().split("[^a-zA-Z']+"); // 输出PCollection中的单词 for (String word : words) { if (!word.isEmpty()) { c.output(word); } } } } /** *2.格式化输入的文本数据,将转换单词为并计数的打印字符串。 */ public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { @Override public String apply(KV<String, Long> input) { return input.getKey() + ": " + input.getValue(); } } /** *3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。 */ public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { // 将文本行转换成单个单词 PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // 计算每个单词次数 PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } /** *4.可以自定义一些选项(Options),比如文件输入输出路径 */ public interface WordCountOptions extends PipelineOptions { /** * 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt */ @Description("Path of the file to read from") @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); /** * 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml */ @Description("Path of the file to write to") @Required String getOutput(); void setOutput(String value); } /** * 5.运行程序 */ public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run().waitUntilFinish(); } }
3.支持Spark,Flink,Apex等大数据数据框架来运行该WordCount程序。完整项目Github源码(推荐,注意pom.xml
模块加载是否成功,在工具中开发大数据程序,利于调试,开发体验较好)
3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序
Spark运行设置VM options
-DPspark-runner
设置Programe arguments
--inputFile=pom.xml --output=counts
3.2.intellij IDEA(社区版)中Apex,Flink等支持的大数据框架均可运行WordCount的Pipeline计算程序,完整项目Github源码
Apex运行设置VM options
-DPapex-runner
设置Programe arguments
--inputFile=pom.xml --output=counts
Flink运行等等
设置VM options
-DPflink-runner
设置Programe arguments
--inputFile=pom.xml --output=counts
4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)
4.1.以下命令是下载官方示例源码,第一次运行下载较慢,如果失败了就多运行几次,(推荐下载,完整项目Github源码)直接用上述解读在intellij IDEA中运行。
mvn archetype:generate -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots -DarchetypeGroupId=org.apache.beam -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples -DarchetypeVersion=LATEST -DgroupId=org.example -DartifactId=word-count-beam -Dversion="0.1" -Dpackage=org.apache.beam.examples -DinteractiveMode=false
4.2.打包并运行
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
4.3.成功运行结果
4.3.1.显示运行成功
4.3.2.WordCount输出计算结果
相关文章推荐
- 精通Spark:Spark内核剖析、源码解读、性能优化和商业案例实战
- Apache Beam WordCount编程实战及源码解读
- 精通Spark:Spark内核剖析、源码解读、性能优化和商业案例实战
- 【基于zxing的编解码实战】zxing项目源码解读(2.3.0版本,Android部分)
- 《Spark商业案例与性能调优实战100课》第25课:Spark Hash Shuffle源码解读与剖析
- 【基于zxing的编解码实战】zxing项目源码解读(2.3.0版本,Android部分)
- 构建NetCore应用框架之实战篇(七):BitAdminCore框架登录功能源码解读
- Apache Beam WordCount编程实战及源码解读
- 【基于zxing的编解码实战】zxing项目源码解读(2.3.0版本,Android部分)
- Apache Beam WordCount编程实战及源码解读
- Kafka Eagle 源码解读
- HashSet的故事----Jdk源码解读
- JedisCluster源码解读:集群初始化、slot(槽)的分配、值的存取
- 第13课:Spark Streaming源码解读之Driver容错安全性
- 【转】WordPress源码解读(2)
- [Hadoop源码解读](二)MapReduce篇之Mapper类
- 第12课:Spark Streaming源码解读之Executor容错安全性
- Netty源码解读 Channel与Pipeline
- Selenium2Library源码解读(1)- 概述
- Flask源码解读 <2> --- 请求上下文和request对象