kafka连接flink流计算,实现flink消费kafka的数据
一、启动Kafka集群和flink集群
- 环境变量配置(注:kafka 3台都需要设置,flink仅master设置就好)
[code][root@master ~]# vim /etc/profile
配置完执行命令:
[code][root@master ~]# source /etc/profile
2.创建执行文件,添加启动服务
[code][root@master ~]# vim start_kafka.sh 添加(注:3台都需要设置): zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties & kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties &
[code][root@master ~]# vim start_flink.sh 添加(仅master创建即可): start-cluster.sh
3.分别启动kafka集群
由于kafka集群依赖于zookeeper集群,所以kafka提供了通过kafka去启动zookeeper集群的功能
[code][root@master ~]# ./start_kafka.sh
4.master启动flink集群
[code][root@master ~]# ./start_flink.sh
5.验证:进程及WebUI
(1)进程
[code][root@master ~]# jps 1488 QuorumPeerMain 2945 Kafka 1977 SecondaryNameNode 2505 JobManager 1900 NameNode 2653 Jps
(2)WebUI
输入:ip:8081
二、编写Flink程序,实现consume kafka的数据
1.代码前的操作及配置
使用idea创建maven创建工程前准备:
Jdk(1.8.0_181)
Scala plugin for IDEA(在IDEA中下载)
Maven(3.5.3)
Scala的jar包(2.11.0)
(1)打开IDEA软件
(2)更改字体(非必须)
导航栏:File—->settings—->appearance&behavior—->appeareance—>override default fonts by(not recommended)
编辑器:file—–>settings—–>editor—->colors&fonts—–>font
控制台:file—–>settings—–>editor—->colors&fonts—–>font—->console font
(3)下载scala for intellij idea的插件(若有则跳过此步)
Flie->settings->plugins
点击下载安装插件,然后重启Intellij IDEA。
(4)使用"new project"创建一个带scala的maven工程
(5)指定程序的groupId和artifactId
(6)指定程序的工程名和路径
(7)更换下载源(根据需要)
安装路径下更改plugins\maven\lib\maven3\conf\settings.xml
然后找到mirrors标签替换即可,瞬间满速下载jar
[code]<mirror> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror>
(8)pom.xml配置(主要添加依赖和将项目打成jar包的插件),添加以下依赖:
添加的依赖:
groupId |
artifactId |
version |
org.apache.flink |
flink-core |
1.3.2 |
org.apache.flink |
flink-connector-kafka-0.10_2.11 |
1.3.2 |
org.apache.kafka |
kafka_2.11 |
0.10.2.0 |
org.apache.flink |
flink-streaming-java_2.11 |
1.3.2 |
添加的插件:
groupId |
artifactId |
version |
org.apache.maven.plugins |
maven-assembly-plugin |
2.4.1 |
具体配置如下:(注意修改maven-assembly-plugin的mainClass为自己主类的路径)
[code]<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wugenqiang.flink</groupId> <artifactId>flink_kafka</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.8</scala.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.3.2</version> <scope>compile</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10_2.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.3.2</version> <scope> compile</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.2.0</version> <scope>compile</scope> </dependency> <!-- flink-streaming的jar包,2.11为scala版本号 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.3.2</version> <scope> compile</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <useUniqueVersions>false</useUniqueVersions> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.wugenqiang.test.ReadingFromKafka</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <!-- get all project dependencies --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!-- MainClass in mainfest make a executable jar --> <archive> <manifest> <mainClass>com.wugenqiang.flink.ReadingFromKafka</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
2.正式开始,编写Flink程序,实现consume kafka的数据
(1)在scala文件夹下创建scala类
(2)编写flink读取kafka数据的代码
这里就是简单的实现接收kafka的数据,要指定zookeeper以及kafka的集群配置,并指定topic的名字。
最后将consume的数据直接打印出来。
[code]package com.wugenqiang.flink import java.util.Properties import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ /** * 用Flink消费kafka */ object ReadingFromKafka { private val ZOOKEEPER_HOST = "master:2181,slave1:2181,slave2:2181" private val KAFKA_BROKER = "master:9092,slave1:9092,slave2:9092" private val TRANSACTION_GROUP = "com.wugenqiang.flink" def main(args : Array[String]){ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // configure Kafka consumer val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) //topicd的名字是new,schema默认使用SimpleStringSchema()即可 val transaction = env .addSource( new FlinkKafkaConsumer08[String]("mastertest", new SimpleStringSchema(), kafkaProps) ) transaction.print() env.execute() } }
(3)编译测试
3.生成kafka到flink的连接jar包
(1)找窗口右边的Maven Projects选项,,点击Lifecycle,再选择打包package(如需重新打包先clean,再package),
(2)成功code为0,项目目录会生成target目录,里面有打好的jar包
4.验证jar包是否可以将kafka数据传输给flink
(1)将jar包传输进centos中指定目录下(比如说:/root,接下来操作在此目录下完成)
(2)kafka生产数据
命令行输入(集群和topic根据实际修改):
[code][root@master ~]# kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic mastertest
(3)flink运行jar进行连接消费kafka数据
(根据实际修改:com.wugenqiang.test.ReadingFromKafka(mainclass名)
root/flink_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar(存路径jar名))
[code][root@master ~]# flink run -c com.wugenqiang.test.ReadingFromKafka /root/flink_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar
(4)打开网址ip:8081查看是否正常启动运行
(5)查看flink的标准输出,验证是否正常消费
到taskmanager节点上查看,根据上一步知道所在服务器,在taskmanager工作的服务器上执行命令操作:
[code][root@slave1 ~]# cd /opt/flink-1.3.2/log/ [root@slave1 log]# tail -F flink-root-taskmanager-0-master.*
注:第(2)步输入kafka生产数据,第(5)步接收flink消费数据日志反馈
到此,数据从kafka到flink传输任务完成···
阅读更多- storm消费kafka实现实时计算
- Spark Streaming消费Kafka Direct方式数据零丢失实现
- Kafka 使用Java实现数据的生产和消费demo
- Kafka 使用Java实现数据的生产和消费demo
- spark-streaming 编程(三)连接kafka消费数据
- 关于Spark Streaming微批次,Flink真正流处理 消费Kafka数据,处理数据的差距对比
- Kafka利用Java实现数据的生产和消费实例教程
- Spark Streaming消费Kafka Direct方式数据零丢失实现
- storm消费kafka实现实时计算
- 利用Flink stream从kafka中写数据到mysql
- kafka java中发送数据、连接失败问题解决
- 关于使用nio实现广播数据给所有已连接客户端
- spark streaming从指定offset处消费Kafka数据(第一种方式)
- Flume读取日志数据并写入到Kafka,ConsoleConsumer进行实时消费
- Flink1.4.0连接Kafka0.10.2时遇到的问题
- 使用PHP连接数据库_实现用户数据的增删改查的整体操作示例
- jdbc连接数据库的7大步骤(以通过jdbc连接实现数据的插入、更新为例)
- MPI实现并行计算统计数据
- Pyhton科学计算工具Pandas(七)—— 数据的合并,连接与修补
- Kafka重复消费和丢失数据研究