您的位置:首页 > 其它

spark streaming读取kafka数据,记录offset

2017-06-15 15:13 483 查看
如下是pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>com.demo</groupId>
<artifactId>spark-streaming-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>spark-streaming-demo</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>1.6.2</spark.version>
<mysql-connector.version>5.1.35</mysql-connector.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.31</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.stratio.datasource</groupId>
<artifactId>spark-mongodb_2.11</artifactId>
<version>0.12.0</version>
</dependency>
</dependencies>
</project>


代码如下:

package com.fosun.spark_streaming_demo;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import javax.sql.DataSource;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class SparkstreamingOnDirectKafka {
public static JavaStreamingContext createContext() throws Exception {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
// jsc.checkpoint("/user/tenglq/checkpoint");

Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092");
kafkaParams.put("auto.offset.reset", "smallest");
Set<String> topics = new HashSet<String>();
topics.add("tlqtest3");

final Map<String, String> params = new HashMap<String, String>();
params.put("driverClassName", "com.mysql.jdbc.Driver");
params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming");
params.put("username", "root");
params.put("password", "root123456");

Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>();
DataSource ds = DruidDataSourceFactory.createDataSource(params);
Connection conn = ds.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'");
while (rs.next()) {
TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2));
offsets.put(topicAndPartition, rs.getLong(3));
}

final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();

JavaDStream<String> lines = null;

if (offsets.isEmpty()) {
JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
lines = pairDstream
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
private static final long serialVersionUID = 1L;

public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;

public Iterable<String> call(Tuple2<String, String> t) throws Exception {
return Arrays.asList(t._2);
}
});
} else {
JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class,
StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,
new Function<MessageAndMetadata<String, String>, String>() {

private static final long serialVersionUID = 1L;

public String call(MessageAndMetadata<String, String> v1) throws Exception {
return v1.message();
}
});
lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
private static final long serialVersionUID = 1L;

public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
});

}

lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;

public void call(JavaRDD<String> rdd) throws Exception {
// 操作rdd
List<String> map = rdd.collect();
String[] array = new String[map.size()];
System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size());
List<String> l = Arrays.asList(array);
Collections.sort(l);
for (String value : l) {
System.out.println(value);
}

// 保存offset
DataSource ds = DruidDataSourceFactory.createDataSource(params);
Connection conn = ds.getConnection();
Statement stmt = conn.createStatement();
for (OffsetRange offsetRange : offsetRanges.get()) {
ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='"
+ offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'");
if (rs.next()) {
int count = rs.getInt(1);
if (count > 0) {
stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset()
+ "' where topic='" + offsetRange.topic() + "' and partition='"
+ offsetRange.partition() + "'");
} else {
stmt.execute("insert into kafka_offsets(topic,partition,offset) values('"
+ offsetRange.topic() + "','" + offsetRange.partition() + "','"
+ offsetRange.untilOffset() + "')");
}
}

rs.close();
}

stmt.close();
conn.close();
}

});

return jsc;
}

public static void main(String[] args) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
try {
return createContext();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};

// JavaStreamingContext jsc =
// JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory);

JavaStreamingContext jsc = factory.create();

jsc.start();

jsc.awaitTermination();
jsc.close();

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka spark