Kafka环境搭建以及服务封装
2016-11-18 14:12
351 查看
一、安装前准备
4、hosts文件中主机与ip映射关系
二、zookeeper环境搭建
具体的zookeeper环境搭建请参考:
http://blog.csdn.net/liuchuanhong1/article/details/53192618
三、Kafka环境搭建
1、下载tar包
下载地址如下:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
2、解压到/usr/kafka目录下
解压命令如下:
3、解压后目录结构
四、修改配置文件
1、修改server.properties
修改配置文件如下:
五、启动kafka服务
1、启动zookeeper集群
2、进入/bin目录下,输入如下命令,启动kafka服务
启动之后,可以看到如下的配置信息:
3、查看kafka进程
4、创建topic
5、查看详细的topic
六、Kafka生产者服务封装
1、创建Kafka类
2、创建KafkaFactoryBean类
如果对FactoryBean的使用有不明白的地方,请参考另一篇文章:
http://blog.csdn.net/liuchuanhong1/article/details/52939353
七、测试生产者服务
1、在服务器上开启消费者监听进程
消费者进程如下:
2、测试的spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd" default-lazy-init="false">
<context:property-placeholder location="classpath:kafka.properties" />
<bean id="kafkaFactoryBean" class="com.chhliu.myself.KafkaFactoryBean">
<property name="producerBrokerList"
value="${bootstrap.servers}" />
<property name="producerSerializerClass" value="kafka.serializer.StringEncoder" />
<!--目标地址 -->
<property name="producerTopic" value="${kafka.topic}" />
<!-- 重试机制默认3次 -->
<property name="retry" value="3" />
</bean>
</beans>3、测试的properties文件
bootstrap.servers=192.168.1.108:9092
kafka.topic=kafka-test
4、运行测试代码
测试效果如下:
我们发现,生产者成功的将消息发送到了kafka,消费者也成功的消费了消息。
1、kafka版本:kafka_2.10-0.10.1.0.tgz 2、zookeeper版本:zookeeper-3.4.3.tar.gz 3、zookeeper集群: 192.168.1.108:2181,192.168.1.109:2181,192.168.1.110:2181
4、hosts文件中主机与ip映射关系
192.168.1.108 master 192.168.1.109 slave1 192.168.1.110 slave2
二、zookeeper环境搭建
具体的zookeeper环境搭建请参考:
http://blog.csdn.net/liuchuanhong1/article/details/53192618
三、Kafka环境搭建
1、下载tar包
下载地址如下:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
2、解压到/usr/kafka目录下
解压命令如下:
tar -zxvf kafka_2.10-0.10.1.0.tgz
3、解压后目录结构
四、修改配置文件
1、修改server.properties
修改配置文件如下:
broker.id=0 host.name=192.168.1.108 zookeeper.connect=slave1:2181,master:2181,slave2:2181
五、启动kafka服务
1、启动zookeeper集群
2、进入/bin目录下,输入如下命令,启动kafka服务
./kafka-server-start.sh ../config/server.properties
启动之后,可以看到如下的配置信息:
[root@192 bin]#./kafka-server-start.sh ../config/server.properties
[2016-11-18 10:50:51,067] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
default.replication.factor = 1
delete.topic.enable = false
fetch.purgatory.purge.interval.requests = 1000
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.protocol.version = 0.10.1-IV2
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /home/kafka/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 0.10.1-IV2
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 3
offsets.topic.segment.bytes = 104857600
port = 9092
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
producer.purgatory.purge.interval.requests = 1000
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism.inter.broker.protocol = GSSAPI
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
unclean.leader.election.enable = true
zookeeper.connect = slave1:2181,master:2181,slave2:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2016-11-18 10:50:51,175] INFO starting (kafka.server.KafkaServer)
[2016-11-18 10:50:51,244] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-11-18 10:50:51,261] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-11-18 10:50:51,269] INFO Connecting to zookeeper on slave1:2181,master:2181,slave2:2181 (kafka.server.KafkaServer)
[2016-11-18 10:50:51,308] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2016-11-18 10:50:51,328] INFO Client environment:zookeeper.version=3.4.8--1, built on 02/06/2016 03:18 GMT (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:host.name=master (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.version=1.8.0_111 (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.home=/usr/Java/jdk1.8.0_111/jre (org.apache.zookeeper.ZooKeeper)
3、查看kafka进程
4、创建topic
./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafka-test
5、查看详细的topic
六、Kafka生产者服务封装
1、创建Kafka类
public class Kafka { private static Logger LOG = Logger.getLogger(Kafka.class); /** * 生产者的broker列表 */ private String producerBrokerList; /** * 序列化类 */ private String producerSerializerClass; /** * topic */ private String producerTopic; /** * 重试次数 */ private int retry; /** * kafka生产者 */ private Producer<String, String> kafkaProducer; public Kafka(String producerBrokerList, String producerSerializerClass, String producerTopic, int retry) { super(); this.producerBrokerList = producerBrokerList; this.producerSerializerClass = producerSerializerClass; this.producerTopic = producerTopic; this.retry = retry; init(); } /** * Details:初始化方法 */ public void init() { kafkaProducer = initProducer(producerBrokerList, producerSerializerClass); } /** * Details:创建Produce */ private Producer<String, String> initProducer(String producerBrokerList, String producerSerializerClass) { Producer<String, String> kafkaProducer; Properties props = new Properties(); props.put("metadata.broker.list", producerBrokerList); props.put("serializer.class", producerSerializerClass); props.put("key.serializer.class", producerSerializerClass); props.put("request.required.acks", "-1"); props.put("producerType", "async"); ProducerConfig producerConfig = new ProducerConfig(props); kafkaProducer = new Producer<String, String>(producerConfig); return kafkaProducer; } /** * Details:供外部调用的接口,用来发送消息到kafka */ public void sendData(String content) { KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>( producerTopic, content); try { kafkaProducer.send(kafkaMessage); } catch (Exception e) { LOG.warn("send kafka message failed , prepare to retry......", e); // 重试发送消息 retryKafkaSendData(content); } } /** * Details:发送消息失败后的重试机制 */ public void retryKafkaSendData(String content) { KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>( producerTopic, content); for (int i = 1; i <= (retry <= 0 ? 3 : retry); i++) { try { kafkaProducer.send(kafkaMessage); return; } catch (Exception e) { LOG.warn("send kafka message failed , retry times:" + i, e); } } } /** * Details:销毁生产者的方法 */ public void close() { kafkaProducer.close(); } }
2、创建KafkaFactoryBean类
public class KafkaFactoryBean implements FactoryBean<Kafka>, InitializingBean, DisposableBean { /** * FactoryBean生成的目标对象 */ private Kafka kafka; /** * broker列表 */ private String producerBrokerList; /** * 序列化类 */ private String producerSerializerClass; /** * kafka的topic */ private String producerTopic; /** * 重试次数 */ private int retry; /** * Details:实例销毁方法,当实例销毁时,会自动调用这个方法 */ @Override public void destroy() throws Exception { if (null != kafka) { kafka.close(); } } /** * Details:spring加载后,会调用该方法 */ @Override public void afterPropertiesSet() throws Exception { kafka = new Kafka(producerBrokerList, producerSerializerClass, producerTopic, retry); } /** * Details:返回工厂创建的对象,注意,此处返回的并不是FactoryBean的一个实例,而是返回Kafka的一个实例 */ @Override public Kafka getObject() throws Exception { return this.kafka; } /** * Details:获取返回对象的类型 */ @Override public Class<?> getObjectType() { return (this.kafka == null ? Kafka.class : this.kafka.getClass()); } /** * Details:创建的对象是否为单例 */ @Override public boolean isSingleton() { return true; } }
如果对FactoryBean的使用有不明白的地方,请参考另一篇文章:
http://blog.csdn.net/liuchuanhong1/article/details/52939353
七、测试生产者服务
1、在服务器上开启消费者监听进程
./kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafka-test --from-beginnin
消费者进程如下:
2、测试的spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd" default-lazy-init="false">
<context:property-placeholder location="classpath:kafka.properties" />
<bean id="kafkaFactoryBean" class="com.chhliu.myself.KafkaFactoryBean">
<property name="producerBrokerList"
value="${bootstrap.servers}" />
<property name="producerSerializerClass" value="kafka.serializer.StringEncoder" />
<!--目标地址 -->
<property name="producerTopic" value="${kafka.topic}" />
<!-- 重试机制默认3次 -->
<property name="retry" value="3" />
</bean>
</beans>3、测试的properties文件
bootstrap.servers=192.168.1.108:9092
kafka.topic=kafka-test
4、运行测试代码
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext-kafka.xml" }) public class KafkaProducerTest { // 此处返回的并不是KafkaFactoryBean的一个实例,而是Kafka的实例 @Resource(name = "kafkaFactoryBean") private Kafka factory; @Test public void test() { try { for (int i = 0; i < 1000; i++) { factory.sendData("hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!" + i); } } catch (Exception e) { System.out.println(e.getMessage()); } } }
测试效果如下:
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!601 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!602 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!603 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!604 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!605 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!606 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!607 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!608 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!609 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!610 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!611 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!612 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!613 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!614 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!615 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!616 hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!617
我们发现,生产者成功的将消息发送到了kafka,消费者也成功的消费了消息。
相关文章推荐
- zookeeper以及kafka环境的搭建
- PHP环境搭建以及解决wampapache服务启动不了的问题
- linux环境搭建ftp服务以及遇到的问题总结:上传不成功,ftp:connection refused等等
- 【服务器搭建环境配置】Linux上部署Nginx服务以及故障排除
- 【Spring_WebService(CXF)】环境的搭建以及服务的暴露
- 【入门教程】kafka环境搭建以及基础教程
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。
- kafka本地环境的搭建,以及本地java测试的调用
- kafka学习(一) ---- 基本概念以及环境搭建
- PHP环境搭建以及解决wampapache服务启动不了的问题
- 在windows下搭建ROR开发环境以及安装开源项目管理软件Redmine
- 系出名门Android(1) - 在 Windows 下搭建 Android 开发环境,以及 Hello World 程序
- 在linux环境下启动oracle服务以及监听
- Android 环境搭建,Helloworld以及常见错误处理,最新版哦
- Fedora 11 的安装以及 LAMP环境的搭建(二)
- 在 Windows 下搭建 Android 开发环境以及开发流程
- 系出名门Android(1) - 在 Windows 下搭建 Android 开发环境,以及 Hello World 程序
- 研究Android一——开发环境搭建以及HelloWorld
- [翻译]Programming Windows Phone 7 Series简介以及开发环境搭建
- arm-linux-gdb+gdbserver环境搭建以及远程调试