您的位置:首页 > 其它

kafka中写入avro数据

2016-07-14 09:12 211 查看
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;

import com.cnpc.soc.avro.Log;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class kafkaProducer2 extends Thread {

private String topic;

public kafkaProducer2(String topic) {
super();
this.topic = topic;
}

@Override
public void run() {
Producer<String, byte[]> producer = createProducer();
while (true) {
String regex = "^([0-9.]+)\\s([\\w.-]+)\\s([\\w.-]+)\\s(\\[[^\\[\\]]+\\])\\s\"((?:[^\"]|\\\")+)\"\\s(\\d{3})\\s(\\d+|-)\\s\"((?:[^\"]|\\\")+)\"\\s\"((?:[^\"]|\\\")+)\"$";
List<String> list = TextFile.readToList("access.log");
Pattern pattern = Pattern.compile(regex);
List<KeyedMessage<String, byte[]>> list1 = new ArrayList<KeyedMessage<String, byte[]>>();
for (String s : list) {

Matcher matcher = pattern.matcher(s);
if (matcher.find()) {
String ip = matcher.group(1);
String identity = matcher.group(2);
String userid = matcher.group(3);
String time = matcher.group(4);
String requestInfo = matcher.group(5);
String state = matcher.group(6);
String responce = matcher.group(7);
String referer = matcher.group(8);
String useragent = matcher.group(9);
GenericRecord record = new GenericData.Record(Log.getClassSchema());
record.put("ip", ip);
record.put("identity", identity);
record.put("userid", userid);
record.put("time", time);
record.put("requestInfo", requestInfo);
record.put("state", state);
record.put("responce", responce);
record.put("referer", referer);
record.put("useragent", useragent);
try {
byte[] serializedValue = serializeEvent(record);
list1.add(new KeyedMessage<String, byte[]>(topic, serializedValue));
} catch (Exception e) {
e.printStackTrace();
}

}
}
producer.send(list1);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

protected byte[] serializeEvent(GenericRecord record) throws Exception {
ByteArrayOutputStream bos = null;
try {
bos = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bos, null);
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema());
writer.write(record, encoder);
encoder.flush();
byte[] serializedValue = bos.toByteArray();
return serializedValue;
} catch (Exception ex) {
throw ex;
} finally {
if (bos != null) {
try {
bos.close();
} catch (Exception e) {
bos = null;
}
}
}
}

private Producer<String, byte[]> createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "xxx.xxx.xxx:2181,xxx.xxx.xxx:2181,xxx.xxx.xxx:2181");// 声明zk
properties.put("metadata.broker.list", "xxx.xxx.xxx:6667,xxx.xxx.xxx:6667,xxx.xxx.xxx:6667");// 声明kafka broker
return new Producer<String, byte[]>(new ProducerConfig(properties));
}

public static void main(String[] args) {
new kafkaProducer2("test_log_2").start();
}

}

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: