spark streaming读取kafka数据,记录offset
2017-06-15 15:13
483 查看
如下是pom.xml文件
代码如下:
package com.fosun.spark_streaming_demo;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.sql.DataSource;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class SparkstreamingOnDirectKafka {
public static JavaStreamingContext createContext() throws Exception {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
// jsc.checkpoint("/user/tenglq/checkpoint");
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092");
kafkaParams.put("auto.offset.reset", "smallest");
Set<String> topics = new HashSet<String>();
topics.add("tlqtest3");
final Map<String, String> params = new HashMap<String, String>();
params.put("driverClassName", "com.mysql.jdbc.Driver");
params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming");
params.put("username", "root");
params.put("password", "root123456");
Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>();
DataSource ds = DruidDataSourceFactory.createDataSource(params);
Connection conn = ds.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'");
while (rs.next()) {
TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2));
offsets.put(topicAndPartition, rs.getLong(3));
}
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
JavaDStream<String> lines = null;
if (offsets.isEmpty()) {
JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
lines = pairDstream
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
private static final long serialVersionUID = 1L;
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public Iterable<String> call(Tuple2<String, String> t) throws Exception {
return Arrays.asList(t._2);
}
});
} else {
JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class,
StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,
new Function<MessageAndMetadata<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call(MessageAndMetadata<String, String> v1) throws Exception {
return v1.message();
}
});
lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
});
}
lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<String> rdd) throws Exception {
// 操作rdd
List<String> map = rdd.collect();
String[] array = new String[map.size()];
System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size());
List<String> l = Arrays.asList(array);
Collections.sort(l);
for (String value : l) {
System.out.println(value);
}
// 保存offset
DataSource ds = DruidDataSourceFactory.createDataSource(params);
Connection conn = ds.getConnection();
Statement stmt = conn.createStatement();
for (OffsetRange offsetRange : offsetRanges.get()) {
ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='"
+ offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'");
if (rs.next()) {
int count = rs.getInt(1);
if (count > 0) {
stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset()
+ "' where topic='" + offsetRange.topic() + "' and partition='"
+ offsetRange.partition() + "'");
} else {
stmt.execute("insert into kafka_offsets(topic,partition,offset) values('"
+ offsetRange.topic() + "','" + offsetRange.partition() + "','"
+ offsetRange.untilOffset() + "')");
}
}
rs.close();
}
stmt.close();
conn.close();
}
});
return jsc;
}
public static void main(String[] args) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
try {
return createContext();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
// JavaStreamingContext jsc =
// JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory);
JavaStreamingContext jsc = factory.create();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.demo</groupId> <artifactId>spark-streaming-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spark-streaming-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spark.version>1.6.2</spark.version> <mysql-connector.version>5.1.35</mysql-connector.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.31</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.stratio.datasource</groupId> <artifactId>spark-mongodb_2.11</artifactId> <version>0.12.0</version> </dependency> </dependencies> </project>
代码如下:
package com.fosun.spark_streaming_demo;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.sql.DataSource;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class SparkstreamingOnDirectKafka {
public static JavaStreamingContext createContext() throws Exception {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
// jsc.checkpoint("/user/tenglq/checkpoint");
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092");
kafkaParams.put("auto.offset.reset", "smallest");
Set<String> topics = new HashSet<String>();
topics.add("tlqtest3");
final Map<String, String> params = new HashMap<String, String>();
params.put("driverClassName", "com.mysql.jdbc.Driver");
params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming");
params.put("username", "root");
params.put("password", "root123456");
Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>();
DataSource ds = DruidDataSourceFactory.createDataSource(params);
Connection conn = ds.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'");
while (rs.next()) {
TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2));
offsets.put(topicAndPartition, rs.getLong(3));
}
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
JavaDStream<String> lines = null;
if (offsets.isEmpty()) {
JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
lines = pairDstream
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
private static final long serialVersionUID = 1L;
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public Iterable<String> call(Tuple2<String, String> t) throws Exception {
return Arrays.asList(t._2);
}
});
} else {
JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class,
StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,
new Function<MessageAndMetadata<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call(MessageAndMetadata<String, String> v1) throws Exception {
return v1.message();
}
});
lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
});
}
lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<String> rdd) throws Exception {
// 操作rdd
List<String> map = rdd.collect();
String[] array = new String[map.size()];
System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size());
List<String> l = Arrays.asList(array);
Collections.sort(l);
for (String value : l) {
System.out.println(value);
}
// 保存offset
DataSource ds = DruidDataSourceFactory.createDataSource(params);
Connection conn = ds.getConnection();
Statement stmt = conn.createStatement();
for (OffsetRange offsetRange : offsetRanges.get()) {
ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='"
+ offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'");
if (rs.next()) {
int count = rs.getInt(1);
if (count > 0) {
stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset()
+ "' where topic='" + offsetRange.topic() + "' and partition='"
+ offsetRange.partition() + "'");
} else {
stmt.execute("insert into kafka_offsets(topic,partition,offset) values('"
+ offsetRange.topic() + "','" + offsetRange.partition() + "','"
+ offsetRange.untilOffset() + "')");
}
}
rs.close();
}
stmt.close();
conn.close();
}
});
return jsc;
}
public static void main(String[] args) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
try {
return createContext();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
// JavaStreamingContext jsc =
// JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory);
JavaStreamingContext jsc = factory.create();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
相关文章推荐
- spark streaming 自定义kafka读取topic的offset(python)
- SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。
- SparkStreaming读取kafka数据进行反序列化以及mapPartition优化实例
- Spark Streaming 读取Kafka数据写入Elasticsearch
- Spark Streaming场景应用-Kafka数据读取方式
- spark-streaming 读取kafka数据不丢失(一)
- kafka SparkStreaming读取数据笔记
- spark streaming读取kafka数据令丢失(二)
- SparkStreaming读取Kafka数据
- Spark Streaming场景应用-Kafka数据读取方式
- spark streaming 读取kafka数据问题
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- 第90讲,Spark streaming基于kafka 以Receiver方式获取数据 原理和案例实战
- Spark Streaming和Kafka整合保证数据零丢失
- Sparak-Streaming基于Offset消费Kafka数据
- 第114课:SparkStreaming+Kafka+Spark SQL+TopN+Mysql+KafkaOffsetMonitor电商广告点击综合案例实战(详细内幕版本)
- 第114课加强版:SparkStreaming+Kafka+createDirectStream+KafkaOffsetMonitor解决内幕
- 第89课程 Spark STREAMING kafka 测试完成!生产者发数据,消费者收数据
- Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统