您的位置:首页 > 编程语言 > Java开发

java编写Producer(线程池,kafka)

2017-07-07 14:53 429 查看
1.将kafka带的jar包导入项目内

2.
public class TestThreadPool{

public static void main(String args[]){
//在线程池中创建2个线程
ExecutorService exec = Executors.newFixedThreadPool(2);
//创建100个线程目标对象
for(int index=0;index<100;index++){
Runnable run = new Runner(index);
//执行线程目标对象
exec.execute(run);
}

//shutdown
exec.shutdown();
}
}

//线程目标对象
class Runner implements Runnable{
int index = 0;
public static String topic = "test";

public Runner(int index){
this.index = index;
}

public void run() {
// TODO Auto-generated method stub
long time = (long)(Math.random()*1000);
Producer producer = createProducer();
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,"线程:"+Thread.currentThread().getName()+"(目标对象"
+ index + ")"+ ":数字" + time);
producer.send(keyedMessage);
//		System.out.println("线程:"+Thread.currentThread().getName()+"(目标对象"
//				+ index + ")"+ ":数字" + time);
try{
TimeUnit.SECONDS.sleep(1);
}catch(InterruptedException e){
e.printStackTrace();
}

}
//	}
private Producer createProducer(){
Properties properties = new Properties();
properties.put("zookeeper.connect", "xx.xx.xx.xx:2181");//声明zk ip+端口
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list","xx.xx.xx.xx:9092");//声明kafka broker
return new Producer<Integer,String>(new ProducerConfig(properties));
}
}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka Producer