java编写Producer(线程池,kafka)
2017-07-07 14:53
429 查看
1.将kafka带的jar包导入项目内
2.
2.
public class TestThreadPool{ public static void main(String args[]){ //在线程池中创建2个线程 ExecutorService exec = Executors.newFixedThreadPool(2); //创建100个线程目标对象 for(int index=0;index<100;index++){ Runnable run = new Runner(index); //执行线程目标对象 exec.execute(run); } //shutdown exec.shutdown(); } } //线程目标对象 class Runner implements Runnable{ int index = 0; public static String topic = "test"; public Runner(int index){ this.index = index; } public void run() { // TODO Auto-generated method stub long time = (long)(Math.random()*1000); Producer producer = createProducer(); KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,"线程:"+Thread.currentThread().getName()+"(目标对象" + index + ")"+ ":数字" + time); producer.send(keyedMessage); // System.out.println("线程:"+Thread.currentThread().getName()+"(目标对象" // + index + ")"+ ":数字" + time); try{ TimeUnit.SECONDS.sleep(1); }catch(InterruptedException e){ e.printStackTrace(); } } // } private Producer createProducer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "xx.xx.xx.xx:2181");//声明zk ip+端口 properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list","xx.xx.xx.xx:9092");//声明kafka broker return new Producer<Integer,String>(new ProducerConfig(properties)); } }
相关文章推荐
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- java编写Producer(线程池,kafka)
- Kafka Java API 之Producer源码解析
- java线程池---编写自己的线程池