您的位置:首页 > 其它

Kafka环境搭建以及服务封装

2016-11-18 14:12 351 查看
一、安装前准备

1、kafka版本:kafka_2.10-0.10.1.0.tgz
2、zookeeper版本:zookeeper-3.4.3.tar.gz
3、zookeeper集群:
192.168.1.108:2181,192.168.1.109:2181,192.168.1.110:2181

4、hosts文件中主机与ip映射关系

192.168.1.108 master
192.168.1.109 slave1
192.168.1.110 slave2

二、zookeeper环境搭建

具体的zookeeper环境搭建请参考:

http://blog.csdn.net/liuchuanhong1/article/details/53192618

三、Kafka环境搭建

1、下载tar包

下载地址如下:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz

2、解压到/usr/kafka目录下

解压命令如下:

tar -zxvf kafka_2.10-0.10.1.0.tgz


3、解压后目录结构

 


四、修改配置文件

1、修改server.properties

修改配置文件如下:

broker.id=0
host.name=192.168.1.108
zookeeper.connect=slave1:2181,master:2181,slave2:2181

五、启动kafka服务

1、启动zookeeper集群

2、进入/bin目录下,输入如下命令,启动kafka服务

./kafka-server-start.sh ../config/server.properties


启动之后,可以看到如下的配置信息:

[root@192 bin]#./kafka-server-start.sh ../config/server.properties

[2016-11-18 10:50:51,067] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
default.replication.factor = 1
delete.topic.enable = false
fetch.purgatory.purge.interval.requests = 1000
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.protocol.version = 0.10.1-IV2
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /home/kafka/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 0.10.1-IV2
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 3
offsets.topic.segment.bytes = 104857600
port = 9092
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
producer.purgatory.purge.interval.requests = 1000
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism.inter.broker.protocol = GSSAPI
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
unclean.leader.election.enable = true
zookeeper.connect = slave1:2181,master:2181,slave2:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2016-11-18 10:50:51,175] INFO starting (kafka.server.KafkaServer)
[2016-11-18 10:50:51,244] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-11-18 10:50:51,261] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-11-18 10:50:51,269] INFO Connecting to zookeeper on slave1:2181,master:2181,slave2:2181 (kafka.server.KafkaServer)
[2016-11-18 10:50:51,308] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2016-11-18 10:50:51,328] INFO Client environment:zookeeper.version=3.4.8--1, built on 02/06/2016 03:18 GMT (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:host.name=master (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.version=1.8.0_111 (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.home=/usr/Java/jdk1.8.0_111/jre (org.apache.zookeeper.ZooKeeper)

3、查看kafka进程

 


4、创建topic

./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafka-test

5、查看详细的topic



六、Kafka生产者服务封装

1、创建Kafka类

public class Kafka {

private static Logger LOG = Logger.getLogger(Kafka.class);

/**
* 生产者的broker列表
*/
private String producerBrokerList;

/**
* 序列化类
*/
private String producerSerializerClass;

/**
* topic
*/
private String producerTopic;

/**
* 重试次数
*/
private int retry;

/**
* kafka生产者
*/
private Producer<String, String> kafkaProducer;
public Kafka(String producerBrokerList, String producerSerializerClass,
String producerTopic, int retry) {

super();
this.producerBrokerList = producerBrokerList;
this.producerSerializerClass = producerSerializerClass;
this.producerTopic = producerTopic;
this.retry = retry;
init();
}

/**
* Details:初始化方法
*/
public void init() {
kafkaProducer = initProducer(producerBrokerList,
producerSerializerClass);
}

/**
* Details:创建Produce
*/
private Producer<String, String> initProducer(String producerBrokerList,
String producerSerializerClass) {
Producer<String, String> kafkaProducer;
Properties props = new Properties();
props.put("metadata.broker.list", producerBrokerList);
props.put("serializer.class", producerSerializerClass);
props.put("key.serializer.class", producerSerializerClass);
props.put("request.required.acks", "-1");
props.put("producerType", "async");
ProducerConfig producerConfig = new ProducerConfig(props);
kafkaProducer = new Producer<String, String>(producerConfig);
return kafkaProducer;
}

/**
* Details:供外部调用的接口,用来发送消息到kafka
*/
public void sendData(String content) {
KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>(
producerTopic, content);
try {
kafkaProducer.send(kafkaMessage);
} catch (Exception e) {
LOG.warn("send kafka message failed , prepare to retry......", e);
// 重试发送消息
retryKafkaSendData(content);
}
}

/**
* Details:发送消息失败后的重试机制
*/
public void retryKafkaSendData(String content) {
KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>(
producerTopic, content);
for (int i = 1; i <= (retry <= 0 ? 3 : retry); i++) {
try {
kafkaProducer.send(kafkaMessage);
return;
} catch (Exception e) {
LOG.warn("send kafka message failed , retry times:" + i, e);
}
}
}

/**
* Details:销毁生产者的方法
*/
public void close() {
kafkaProducer.close();
}
}

2、创建KafkaFactoryBean类

public class KafkaFactoryBean implements FactoryBean<Kafka>, InitializingBean,
DisposableBean {
/**
* FactoryBean生成的目标对象
*/
private Kafka kafka;

/**
* broker列表
*/
private String producerBrokerList;

/**
* 序列化类
*/
private String producerSerializerClass;

/**
* kafka的topic
*/
private String producerTopic;

/**
* 重试次数
*/
private int retry;

/**
* Details:实例销毁方法,当实例销毁时,会自动调用这个方法
*/
@Override
public void destroy() throws Exception {
if (null != kafka) {
kafka.close();
}
}

/**
* Details:spring加载后,会调用该方法
*/
@Override
public void afterPropertiesSet() throws Exception {
kafka = new Kafka(producerBrokerList, producerSerializerClass,
producerTopic, retry);
}

/**
* Details:返回工厂创建的对象,注意,此处返回的并不是FactoryBean的一个实例,而是返回Kafka的一个实例
*/
@Override
public Kafka getObject() throws Exception {
return this.kafka;
}

/**
* Details:获取返回对象的类型
*/
@Override
public Class<?> getObjectType() {
return (this.kafka == null ? Kafka.class : this.kafka.getClass());
}

/**
* Details:创建的对象是否为单例
*/
@Override
public boolean isSingleton() {
return true;
}
}

如果对FactoryBean的使用有不明白的地方,请参考另一篇文章:

http://blog.csdn.net/liuchuanhong1/article/details/52939353

七、测试生产者服务

1、在服务器上开启消费者监听进程

./kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafka-test --from-beginnin

消费者进程如下:

 


2、测试的spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd" default-lazy-init="false">

<context:property-placeholder location="classpath:kafka.properties" />

<bean id="kafkaFactoryBean" class="com.chhliu.myself.KafkaFactoryBean">
<property name="producerBrokerList"
value="${bootstrap.servers}" />
<property name="producerSerializerClass" value="kafka.serializer.StringEncoder" />
<!--目标地址 -->
<property name="producerTopic" value="${kafka.topic}" />
<!-- 重试机制默认3次 -->
<property name="retry" value="3" />
</bean>
</beans>3、测试的properties文件
bootstrap.servers=192.168.1.108:9092
kafka.topic=kafka-test
4、运行测试代码

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext-kafka.xml" })
public class KafkaProducerTest {
// 此处返回的并不是KafkaFactoryBean的一个实例,而是Kafka的实例
@Resource(name = "kafkaFactoryBean")
private Kafka factory;
@Test
public void test() {
try {
for (int i = 0; i < 1000; i++) {
factory.sendData("hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!"
+ i);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}

测试效果如下:

hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!601
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!602
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!603
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!604
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!605
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!606
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!607
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!608
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!609
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!610
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!611
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!612
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!613
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!614
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!615
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!616
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!617

我们发现,生产者成功的将消息发送到了kafka,消费者也成功的消费了消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: