您的位置:首页 > 数据库 > Mongodb

Kafka->Mongodb->Es

2015-08-31 19:35 615 查看
实现了将Kafka中的数据推送给Mongodb,然后再将Mongodb中的数据推送给Es的过程。数据来源是来自txt文档中的180万条数据。

准备工作:

1)在Mongdb集群上创建一个数据库mydb,并创建一个空的Collection,命名为netflows

@SuppressWarnings("deprecation")
Mongo mongo = new Mongo("10.10.16.251", 10111);

DB db = mongo.getDB("mydb");

//创建Collection,但是不添加数据
db.createCollection("netflows", null);
DBCollection dbColl = db.getCollection("netflows");


2)在kafka的集群上创建一个主题flume1

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume1


2)在es集群上创建一个索引myindex,该索引的类型是netflows

IndexResponse res = client.prepareIndex().setIndex("myindex").setType("netflows").execute().actionGet();


下面是代码实现:

1.从文件中读取测试数据,并推动给Kafka

package com.test;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Kafka_Producer {

public static void main(String[] args) {

/*
* Properties props = new Properties();
* props.setProperty("metadata.broker.list","10.10.16.253:9092");
* props.setProperty
* ("serializer.class","kafka.serializer.StringEncoder");
* props.put("request.required.acks","-1");
*
* ProducerConfig config = new ProducerConfig(props); Producer<String,
* String> producer = new Producer<String, String>(config);
* KeyedMessage<String, String> data = new KeyedMessage<String,
* String>("flume","test-kafka");
*
* producer.send(data);
*
* producer.close();
*/

MessageSender messageSender = new MessageSender();

FileReader fr;
try {

fr = new FileReader("C:\\TxtData\\NetFlowAttackDDOS\\4test.txt");
BufferedReader br = new BufferedReader(fr);
String line = "";
String[] arrs = null;

DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = null;
long num = 0;

while ((line = br.readLine()) != null) {

arrs = line.split(",");
date = format.parse(format.format(new Date()));
Calendar cla = Calendar.getInstance();
cla.setTime(date);
cla.add(Calendar.HOUR_OF_DAY,8);

messageSender.sendToKafkaTopic((num+10001) + ","+arrs[3] + " ," + arrs[4] + " ," + arrs[5]+ ","+arrs[6]);
num ++;
}

br.close();
fr.close();

} catch (FileNotFoundException e) {

e.printStackTrace();
} catch (IOException e) {

e.printStackTrace();
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

messageSender.close();
}

}
其中MessageSender中代码如下所示:
package com.test;

import java.util.Properties;

import com.test.utils.ConstanUtil;
import com.test.utils.ObjectFormatUtil;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MessageSender {

private long windowStart = 0;

private long windowCurrent  = 0;

private long sendSize = 0;

Producer<String, String> producer = null;

private StringBuilder records = new StringBuilder();

public MessageSender() {

initKafkaProperties();
}

private void initKafkaProperties(){

Properties props = new Properties();
props.setProperty("metadata.broker.list","10.10.16.253:9092,10.10.16.252:9092,10.10.16.249:9092");

props.setProperty("serializer.class","kafka.serializer.StringEncoder");
props.put("request.required.acks","1");
props.put("zookeeper.session.timeout.ms", "400000");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}

public void sendToKafkaTopic(String message){

KeyedMessage<String, String> data = new KeyedMessage<String, String>("flume1",message);
producer.send(data);

/*long date = ObjectFormatUtil.formatDateToMinute(System.currentTimeMillis());
windowCurrent = date;

if(windowStart == 0){//初始化开始的时间窗口

windowStart = date;
}

if ((windowCurrent - windowStart) >= (ConstanUtil.TIME_WINDOW_LEN)) {

if(records.length() != 0){
String sendContent = records.substring(0,records.lastIndexOf("@"));

KeyedMessage<String, String> data = new KeyedMessage<String, String>("flume1",sendContent);
producer.send(data);
sendContent = null;
records = new StringBuilder();
sendSize = 0;
}
}

records.append(message + "@");
sendSize++;

if ((sendSize >= 100)) {

String sendContent = records.substring(0,records.lastIndexOf("@"));//去掉最后面的@符号
KeyedMessage<String, String> data = new KeyedMessage<String, String>("flume1",sendContent);
producer.send(data);
sendContent = null;
records = new StringBuilder();
sendSize = 0;//1000条记录向Kafka发送一次,发送完后将记录数据包的个数清零.
}*/
}

public void close(){

try {

if(null != producer){

producer.close();
}

} catch (Exception e) {

e.printStackTrace();
}
}
}
2.Kafka接收数据,并向Mongodb中存入,并同时同步给es的代码
package com.test.thread;

import java.net.UnknownHostException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.test.model.NetFlow;
import com.test.utils.ESUtils;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerThread extends Thread {

<span style="white-space:pre">	</span>private String topic;
<span style="white-space:pre">	</span>private String mongodbIp;
<span style="white-space:pre">	</span>private int mongodbPort;
<span style="white-space:pre">	</span>private String indexName;
<span style="white-space:pre">	</span>private String indexType;
<span style="white-space:pre">	</span>private String dbName;
<span style="white-space:pre">	</span>private String collName;

<span style="white-space:pre">	</span>public ConsumerThread(String topic,String mongodbIp,int mongodbPort,String indexName,String indexType,String dbName,String collName) {
<span style="white-space:pre">		</span>
<span style="white-space:pre">		</span>this.topic = topic;
<span style="white-space:pre">		</span>this.mongodbIp = mongodbIp;
<span style="white-space:pre">		</span>this.mongodbPort = mongodbPort;
<span style="white-space:pre">		</span>this.indexName = indexName;
<span style="white-space:pre">		</span>this.indexType = indexType;
<span style="white-space:pre">		</span>this.dbName = dbName;
<span style="white-space:pre">		</span>this.collName = collName;
<span style="white-space:pre">		</span>
<span style="white-space:pre">	</span>}

<span style="white-space:pre">	</span>@SuppressWarnings({ "deprecation", "resource" })
<span style="white-space:pre">	</span>@Override
<span style="white-space:pre">	</span>public void run() {
<span style="white-space:pre">		</span>
<span style="white-space:pre">		</span>ConsumerConnector consumer = createConsumer();
<span style="white-space:pre">		</span>Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
<span style="white-space:pre">		</span>topicCountMap.put(this.topic, 1); // 一次从主题中获取一个数据
<span style="white-space:pre">		</span>Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
<span style="white-space:pre">		</span>KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
<span style="white-space:pre">		</span>ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
<span style="white-space:pre">		</span>
<span style="white-space:pre">		</span>Mongo mongo;
<span style="white-space:pre">		</span>@SuppressWarnings("unused")
<span style="white-space:pre">		</span>Client client;
<span style="white-space:pre">		</span>
<span style="white-space:pre">		</span>try {
<span style="white-space:pre">			</span>
<span style="white-space:pre">			</span>mongo = new Mongo(this.mongodbIp, this.mongodbPort);
<span style="white-space:pre">			</span>DB db = mongo.getDB(this.dbName);
<span style="white-space:pre">			</span>DBCollection dbColl = db.getCollection(this.collName);
<span style="white-space:pre">		</span>
<span style="white-space:pre">			</span>client = new TransportClient().addTransportAddress(new InetSocketTransportAddress("10.10.16.253", 9300));<span style="white-space:pre">				</span>
<span style="white-space:pre">			</span>
<span style="white-space:pre">			</span>while (iterator.hasNext()) {
<span style="white-space:pre">				</span>
<span style="white-space:pre">				</span>String message = new String(iterator.next().message());
<span style="white-space:pre">				</span>
<span style="white-space:pre">				</span>String[] recMessage = message.split(",");
<span style="white-space:pre">				</span>
<span style="white-space:pre">				</span>if(recMessage.length !=9){
<span style="white-space:pre">					</span>
<span style="white-space:pre">					</span>continue;
<span style="white-space:pre">				</span>}
<span style="white-space:pre">				</span>
<span style="white-space:pre">				</span>else{
<span style="white-space:pre">					</span>
<span style="white-space:pre">					</span>for (int j = 0;j < recMessage.length; j++) {

<span style="white-space:pre">					</span>   DBObject data4 = new BasicDBObject();
<span style="white-space:pre">					</span>   data4.put("_id", 1);
<span style="white-space:pre">					</span>   data4.put("sourceIp", recMessage[3]);
<span style="white-space:pre">					</span>   data4.put("sourcePort", recMessage[4]);
<span style="white-space:pre">					</span>   data4.put("destIp", recMessage[5]);
<span style="white-space:pre">					</span>   data4.put("destPort", recMessage[6]);
<span style="white-space:pre">	</span>                     
<span style="white-space:pre">					</span>   DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
<span style="white-space:pre">					</span>   Date date = null;
<span style="white-space:pre">					</span>   date = format.parse(format.format(new Date()));

<span style="white-space:pre">					</span>   Calendar cla = Calendar.getInstance();
<span style="white-space:pre">					</span>   cla.setTime(date);
<span style="white-space:pre">					</span>   cla.add(Calendar.HOUR_OF_DAY, 8);
<span style="white-space:pre">					</span>   data4.put("update_time", cla.getTime().getTime());<span style="white-space:pre">									</span> <span style="white-space:pre">	</span>
<span style="white-space:pre">					</span>   dbColl.insert(data4);
<span style="white-space:pre">					</span>  <span style="white-space:pre">		</span>
<span style="white-space:pre">					</span>   GetResponse getResponse = client.prepareGet().setIndex(this.indexName).setType(this.indexType).setId(recMessage[0]).execute().actionGet();
<span style="white-space:pre">					</span>    
<span style="white-space:pre">					</span>   String searchResult = getResponse.getSourceAsString();
<span style="white-space:pre">					</span>  
<span style="white-space:pre">					</span>   if(searchResult==null){
<span style="white-space:pre">							</span>   
<span style="white-space:pre">						</span>    //向es中存入
<span style="white-space:pre">					</span>    <span style="white-space:pre">	</span>NetFlow netFlow = new NetFlow();
<span style="white-space:pre">							</span>netFlow.setSourceIp(recMessage[1]);
<span style="white-space:pre">							</span>netFlow.setSourcePort(Integer.parseInt(recMessage[2].trim()));<span style="white-space:pre">	</span>
<span style="white-space:pre">							</span>netFlow.setDestIp(recMessage[3]);<span style="white-space:pre">			</span>
<span style="white-space:pre">							</span>netFlow.setDestPort(Integer.parseInt(recMessage[4].trim()));<span style="white-space:pre">					</span>
<span style="white-space:pre">							</span>String jsondata = ESUtils.toJson(netFlow);
<span style="white-space:pre">					</span>    <span style="white-space:pre">	</span>
<span style="white-space:pre">							</span>IndexResponse indexResponse = client.prepareIndex().setIndex(this.indexName).setType(this.indexType).setId(recMessage[0]).setSource(jsondata).execute().actionGet();
<span style="white-space:pre">					</span>        
<span style="white-space:pre">					</span>   }
<span style="white-space:pre">					</span>   if(searchResult!=null){
<span style="white-space:pre">							</span>
<span style="white-space:pre">						</span>   //什么操作都不做
<span style="white-space:pre">	</span>                   }
<span style="white-space:pre">					</span>}
<span style="white-space:pre">				</span>}<span style="white-space:pre">		</span>
<span style="white-space:pre">			</span>}

<span style="white-space:pre">		</span>} catch (UnknownHostException e) {
<span style="white-space:pre">			</span>
<span style="white-space:pre">			</span>e.printStackTrace();<span style="white-space:pre">		</span>
<span style="white-space:pre">			</span>consumer.shutdown();
<span style="white-space:pre">			</span>
<span style="white-space:pre">		</span>} catch (ParseException e) {

<span style="white-space:pre">			</span>e.printStackTrace();
<span style="white-space:pre">			</span>consumer.shutdown();
<span style="white-space:pre">		</span>}
<span style="white-space:pre">		</span>
<span style="white-space:pre">		</span>
<span style="white-space:pre">	</span>}

<span style="white-space:pre">	</span>private ConsumerConnector createConsumer() {
<span style="white-space:pre">	</span>
<span style="white-space:pre">		</span>Properties properties = new Properties();
<span style="white-space:pre">		</span>properties.put("zookeeper.connect","10.10.16.252:2181,10.10.16.253:2181,10.10.16.249:2181");// 声明zk
<span style="white-space:pre">		</span>properties.put("group.id", "test-consummer-group");
<span style="white-space:pre">		</span>properties.put("serializer.class", "kafka.serializer.StringEncoder");
<span style="white-space:pre">		</span>return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
<span style="white-space:pre">	</span>}

<span style="white-space:pre">	</span>public static void main(String[] args) {

<span style="white-space:pre">		</span>new ConsumerThread("flume1","10.10.16.251",10111,"myindex","netflows","mydb","netflows").start();// 使用kafka集群中创建好的主题 test

<span style="white-space:pre">	</span>}

}
其中NetFlow是个实体类,如下:
package com.test.model;

public class NetFlow {

private String sourceIp;
private int sourcePort;
private String destIp;
private int destPort;
public String getSourceIp() {
return sourceIp;
}
public void setSourceIp(String sourceIp) {
this.sourceIp = sourceIp;
}
public int getSourcePort() {
return sourcePort;
}
public void setSourcePort(int sourcePort) {
this.sourcePort = sourcePort;
}
public String getDestIp() {
return destIp;
}
public void setDestIp(String destIp) {
this.destIp = destIp;
}
public int getDestPort() {
return destPort;
}
public void setDestPort(int destPort) {
this.destPort = destPort;
}

}
ESUtils是一个工具类:

package  com.test.utils;

import java.io.IOException;

import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;

/**
* 使用jackson定义了一个将对象转化成json的工具类
*/
public class ESUtils {
private static ObjectMapper objectMapper = new ObjectMapper();
public static String toJson(Object o){
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return "";
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: