您的位置:首页 > Web前端 > JavaScript

Kafka+SparkStreaming解析Json数据并插入Hbase,包含部分业务逻辑

2018-03-01 09:38 1991 查看
以下代码是在学习Spark时候自己写的例子,还不成熟,仅供记录和参考
下边直接上代码,我在我觉得有用的位置加了比较详细的注解import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.client.Put;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.alibaba.fastjson.JSONObject;

public class KafkaStream_Json {

static final String ZK_QUORUM = "devhadoop3:2181,devhadoop2:2181,devhadoop1:2181";
static final String GROUP = "spark_json_test_group";
static final String TOPICSS = "spark_json_test2";
static final String NUM_THREAD = "5";

@SuppressWarnings({ "serial" })
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("json_test").setMaster("local[2]");
conf.set("spark.testing.memory", "2147480000");// 后面的值大于512m即可
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(20));

int numThreads = Integer.parseInt(NUM_THREAD);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = TOPICSS.split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap);// 原始数据
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {// 这里返回的应该就是一个Json字符串了
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<JSONObject> words_2 = lines.flatMap(new FlatMapFunction<String, JSONObject>() {// 把数据转换成json
@Override
public Iterable<JSONObject> call(String jsonStr) throws Exception {
List<JSONObject> arr = new ArrayList<JSONObject>();
JSONObject obj = JSONObject.parseObject(jsonStr);
System.out.println("收到的数据" + jsonStr);
arr.add(obj);
return arr;
}
});
JavaDStream<JSONObject> words = words_2.persist();// 缓存也可以根据实际业务保存,也可以用cache,cache只支持MEMORY_ONLY级别缓存
// 如果上边不缓存,那么type1和type2输出的时候,都需要重新执行以下lines,words_2的操作,那么效率将会很低
// 业务分流,根据业务编号先区分出不同的消息,业务1
JavaDStream<JSONObject> type1 = words.filter(new Function<JSONObject, Boolean>() {
@Override
public Boolean call(JSONObject v1) throws Exception {
return "1".equals(v1.getString("type"));
}
});
// 业务2的数据
JavaDStream<JSONObject> type2 = words.filter(new Function<JSONObject, Boolean>() {
@Override
public Boolean call(JSONObject v1) throws Exception {
return "2".equals(v1.getString("type"));
}
});

JavaDStream<JSONObject> type1_2 = type1.map(new Function<JSONObject, JSONObject>() {

@Override
public JSONObject call(JSONObject v1) throws Exception {
/*
* 对v1进行业务处理,但是最终结果是在type1_2,类似于string的 substring函数
*
* 必须用一个新的去接而不是改变type1里的v1的值
*
* 这里即使我们改变的起始是v1但是实际上type1里的v1并没有变化
*/
v1.put("context", "测试哈哈哈");
return v1;
}
});
type1.print();//
type1_2.print();//
type2.print();

/*
* 下边是循环是获得真正数据的一种方式 ,foreachRDD也相当于是一种输出
*/
type1_2.foreachRDD(new VoidFunction<JavaRDD<JSONObject>>() {
@Override
public void call(JavaRDD<JSONObject> rdd) throws Exception {
System.out.println("123333333333333333333333333333");
List<Put> puts = new ArrayList<Put>();
System.out.println("外部" + puts.hashCode());
List<JSONObject> dataList = rdd.collect();
for (JSONObject t : dataList) {
System.out.println(t.getString("name"));
Put put = new Put(t.getString("name").getBytes());
put.addColumn("data".getBytes(), "name".getBytes(), t.getString("name").getBytes());
put.addColumn("data".getBytes(), "age".getBytes(), t.getString("age").getBytes());
put.addColumn("data".getBytes(), "type".getBytes(), t.getString("type").getBytes());
put.addColumn("data".getBytes(), "context".getBytes(), t.getString("context").getBytes());
puts.add(put);
// System.out.println("内部" + puts.hashCode());//这里的puts,hashCode每次都不一样,但是确实是最后都加入到一个List里了
}
if (puts.size() > 0) {
System.out.println("数组大小"+puts.size());
HbaseInsert.getInstance().insertHbase("lwb_test", puts);
}
}
});
jssc.start();//
jssc.awaitTermination();//
}
}


这个是批量插入HBase的随便写的一个插入类import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

public class HbaseInsert {
private static HbaseInsert hbaseInsert;
private static Configuration configuration;
private static String zkHost = "devhadoop3,devhadoop2,devhadoop1";
private static String zkPort = "2181";
private static String zkParent = "/hbase-unsecure";
private static Connection connection;

private HbaseInsert() {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", zkHost);
configuration.set("hbase.zookeeper.property.clientPort", zkPort);
configuration.set("zookeeper.znode.parent", zkParent);
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}

public static synchronized HbaseInsert getInstance() {
if (hbaseInsert == null) {
hbaseInsert = new HbaseInsert();
}
return hbaseInsert;
}

public void insertHbase(String tablename, List<Put> puts) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tablename));
table.put(puts);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}
}


下边是我测试往kafka里插入数据的代码import java.util.Properties;

import com.alibaba.fastjson.JSONObject;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

public static void main(String[] aaa) {
Properties props = new Properties();
// 此处配置的是kafka的端口
props.put("metadata.broker.list", "192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181");// 这里必须用域名
// kafka.serializer.
props.put("request.required.acks", "-1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
for (int i = 0; i < 10; i++) {
JSONObject obj = new JSONObject();
obj.put("name", "name"+i);
obj.put("age", i);
obj.put("type", String.valueOf(i%4));
producer.send(new KeyedMessage<String, String>("spark_json_test2", obj.toJSONString()));//
}
producer.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark
相关文章推荐