kafka增加SSL认证的Producer客户端代码示例
2017-08-08 09:12
344 查看
package com.kafka.safe.ssl;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.*;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SslProducerTest {
//public static final String TOPIC_NAME = "kafka-cluster";
public static final String TOPIC_NAME = "bpu_gateway_router";
private static final String CONTENT = "1704197100,9800100,4321,192.168.76.202,iaucap,2017-06-28 13:33:32";
public static void main(String[] args) throws KafkaException
{
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.76.202:9093");
//props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "E:\\kafka\\safe\\client.truststore.jks");
props.put("ssl.truststore.password", "test1234");
props.put("ssl.keystore.location", "E:\\kafka\\safe\\client.keystore.jks");
props.put("ssl.keystore.password", "pdas202");
props.put("ssl.key.password", "pdas202");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Runnable runnable = new Runnable()
{
Integer times = 0;
public void run()
{
producer.send(new ProducerRecord<String,String>(TOPIC_NAME,Integer.toString(times),CONTENT));
}
};
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(runnable, 0,100,TimeUnit.MICROSECONDS);
System.out.println("-----------------");
}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.*;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SslProducerTest {
//public static final String TOPIC_NAME = "kafka-cluster";
public static final String TOPIC_NAME = "bpu_gateway_router";
private static final String CONTENT = "1704197100,9800100,4321,192.168.76.202,iaucap,2017-06-28 13:33:32";
public static void main(String[] args) throws KafkaException
{
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.76.202:9093");
//props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "E:\\kafka\\safe\\client.truststore.jks");
props.put("ssl.truststore.password", "test1234");
props.put("ssl.keystore.location", "E:\\kafka\\safe\\client.keystore.jks");
props.put("ssl.keystore.password", "pdas202");
props.put("ssl.key.password", "pdas202");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Runnable runnable = new Runnable()
{
Integer times = 0;
public void run()
{
producer.send(new ProducerRecord<String,String>(TOPIC_NAME,Integer.toString(times),CONTENT));
}
};
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(runnable, 0,100,TimeUnit.MICROSECONDS);
System.out.println("-----------------");
}
}
相关文章推荐
- kafka Kafka JAVA客户端代码示例
- kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)
- kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)
- Springboot集成Kafka实现producer和consumer的示例代码
- Kafka使用Java客户端进行访问的示例代码
- Asp.net 2.0 自定义控件开发专题讲解[为用户控件增加DataSource属性, 能够自动识别不同数据源](示例代码下载)
- 整合Kafka到Spark Streaming——代码示例和挑战
- kafka生产者与消费者java代码示例
- Javascript 修改String 对象 增加去除空格功能(示例代码)
- 整合Kafka到Spark Streaming——代码示例和挑战
- 拷贝网页内容增加版权信息的 JavaScript 代码示例
- kafka 消费者代码示例
- redis 学习笔记(7)-cluster 客户端(jedis)代码示例
- 拷贝网页内容增加版权信息的 JavaScript 代码示例
- 在springboot中对kafka进行读写的示例代码
- Asp.net 2.0 自定义控件开发专题讲解[为用户控件增加DataSource属性, 能够自动识别不同数据源](示例代码下载)
- Asp.net 2.0 自定义控件开发专题讲解[为用户控件增加DataSource属性, 能够自动识别不同数据源](示例代码下载)
- kafka学习(1)linux下的安装和启动,以及Java示例代码
- 基于jQuery表格增加删除代码示例
- 关于使用PHP向客户端发送文件-示例代码解释