kafka中写入avro数据
2016-07-14 09:12
211 查看
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import com.cnpc.soc.avro.Log;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class kafkaProducer2 extends Thread {
private String topic;
public kafkaProducer2(String topic) {
super();
this.topic = topic;
}
@Override
public void run() {
Producer<String, byte[]> producer = createProducer();
while (true) {
String regex = "^([0-9.]+)\\s([\\w.-]+)\\s([\\w.-]+)\\s(\\[[^\\[\\]]+\\])\\s\"((?:[^\"]|\\\")+)\"\\s(\\d{3})\\s(\\d+|-)\\s\"((?:[^\"]|\\\")+)\"\\s\"((?:[^\"]|\\\")+)\"$";
List<String> list = TextFile.readToList("access.log");
Pattern pattern = Pattern.compile(regex);
List<KeyedMessage<String, byte[]>> list1 = new ArrayList<KeyedMessage<String, byte[]>>();
for (String s : list) {
Matcher matcher = pattern.matcher(s);
if (matcher.find()) {
String ip = matcher.group(1);
String identity = matcher.group(2);
String userid = matcher.group(3);
String time = matcher.group(4);
String requestInfo = matcher.group(5);
String state = matcher.group(6);
String responce = matcher.group(7);
String referer = matcher.group(8);
String useragent = matcher.group(9);
GenericRecord record = new GenericData.Record(Log.getClassSchema());
record.put("ip", ip);
record.put("identity", identity);
record.put("userid", userid);
record.put("time", time);
record.put("requestInfo", requestInfo);
record.put("state", state);
record.put("responce", responce);
record.put("referer", referer);
record.put("useragent", useragent);
try {
byte[] serializedValue = serializeEvent(record);
list1.add(new KeyedMessage<String, byte[]>(topic, serializedValue));
} catch (Exception e) {
e.printStackTrace();
}
}
}
producer.send(list1);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
protected byte[] serializeEvent(GenericRecord record) throws Exception {
ByteArrayOutputStream bos = null;
try {
bos = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bos, null);
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema());
writer.write(record, encoder);
encoder.flush();
byte[] serializedValue = bos.toByteArray();
return serializedValue;
} catch (Exception ex) {
throw ex;
} finally {
if (bos != null) {
try {
bos.close();
} catch (Exception e) {
bos = null;
}
}
}
}
private Producer<String, byte[]> createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "xxx.xxx.xxx:2181,xxx.xxx.xxx:2181,xxx.xxx.xxx:2181");// 声明zk
properties.put("metadata.broker.list", "xxx.xxx.xxx:6667,xxx.xxx.xxx:6667,xxx.xxx.xxx:6667");// 声明kafka broker
return new Producer<String, byte[]>(new ProducerConfig(properties));
}
public static void main(String[] args) {
new kafkaProducer2("test_log_2").start();
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import com.cnpc.soc.avro.Log;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class kafkaProducer2 extends Thread {
private String topic;
public kafkaProducer2(String topic) {
super();
this.topic = topic;
}
@Override
public void run() {
Producer<String, byte[]> producer = createProducer();
while (true) {
String regex = "^([0-9.]+)\\s([\\w.-]+)\\s([\\w.-]+)\\s(\\[[^\\[\\]]+\\])\\s\"((?:[^\"]|\\\")+)\"\\s(\\d{3})\\s(\\d+|-)\\s\"((?:[^\"]|\\\")+)\"\\s\"((?:[^\"]|\\\")+)\"$";
List<String> list = TextFile.readToList("access.log");
Pattern pattern = Pattern.compile(regex);
List<KeyedMessage<String, byte[]>> list1 = new ArrayList<KeyedMessage<String, byte[]>>();
for (String s : list) {
Matcher matcher = pattern.matcher(s);
if (matcher.find()) {
String ip = matcher.group(1);
String identity = matcher.group(2);
String userid = matcher.group(3);
String time = matcher.group(4);
String requestInfo = matcher.group(5);
String state = matcher.group(6);
String responce = matcher.group(7);
String referer = matcher.group(8);
String useragent = matcher.group(9);
GenericRecord record = new GenericData.Record(Log.getClassSchema());
record.put("ip", ip);
record.put("identity", identity);
record.put("userid", userid);
record.put("time", time);
record.put("requestInfo", requestInfo);
record.put("state", state);
record.put("responce", responce);
record.put("referer", referer);
record.put("useragent", useragent);
try {
byte[] serializedValue = serializeEvent(record);
list1.add(new KeyedMessage<String, byte[]>(topic, serializedValue));
} catch (Exception e) {
e.printStackTrace();
}
}
}
producer.send(list1);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
protected byte[] serializeEvent(GenericRecord record) throws Exception {
ByteArrayOutputStream bos = null;
try {
bos = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bos, null);
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema());
writer.write(record, encoder);
encoder.flush();
byte[] serializedValue = bos.toByteArray();
return serializedValue;
} catch (Exception ex) {
throw ex;
} finally {
if (bos != null) {
try {
bos.close();
} catch (Exception e) {
bos = null;
}
}
}
}
private Producer<String, byte[]> createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "xxx.xxx.xxx:2181,xxx.xxx.xxx:2181,xxx.xxx.xxx:2181");// 声明zk
properties.put("metadata.broker.list", "xxx.xxx.xxx:6667,xxx.xxx.xxx:6667,xxx.xxx.xxx:6667");// 声明kafka broker
return new Producer<String, byte[]>(new ProducerConfig(properties));
}
public static void main(String[] args) {
new kafkaProducer2("test_log_2").start();
}
}
相关文章推荐
- 程序自杀
- jquery live hover绑定方法
- 【Linux全面学习】3.Linux如何管理分区
- 实战篇:案例解析-如何对企业网站SEO优化把脉?
- Android Activity生命周期 博客链接
- Swift - 根据图片URL获取图片的大小
- nodejs经典高并发
- 优秀的产品
- ASP.NET MVC实现多个按钮提交事件
- 1
- Spring MVC多个文件上传
- 每日安全简讯20160714
- to_char和to_date的一些法总结
- javascript超过容器后显示省略号效果的方法(兼容一行或者多行)
- Java8 的lambda 和 Stream
- Asp.Net时间戳与时间互转
- Mac 下 adb 环境配置
- SAP BDC说明
- Android Wear开发 - 入门指引 - Eclipse开发平台搭建
- Asp.Net微信登录-手机网站APP应用