kafka分布式消息队列使用(springboot和springmvc)
2017-09-10 19:36
776 查看
Kafak版本:kafka_2.12-0.10.2.0
Zookeeper版本:zookeeper-3.4.9
kafka是一个高并发的基于发布订阅模式的分布式消息队列系统。kafka具有更好的吞吐量、内置的分区、复制和容错能力,这使它成为大型消息处理应用程序的一个很好的解决方案。
下载kafka_2.12-0.10.2.0.tar.gz 解压。修改config目录下 server.properties配置:
zookeeper.connect=localhost:2181 # zookeeper 注册中心
log.dirs=/opt/kafka/kafka-logs #log目录
num.partitions=2 #主题默认分区个数
listeners=PLAINTEXT://192.168.0.130:9092
broker.id=0 #id标识,在集群中,必须是唯一的整形数字
创建topic:bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 2 --replication-factor 2
查看所有topic:bin/kafka-topics.sh --list --zookeeper localhost:2181
查看topic : bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
修改topic :bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
gradle或maven加入以下依赖(以gralde为例):
compile('org.springframework.kafka:spring-kafka')
application.yml加入以下配置:
spring:
kafka:
bootstrap-servers: 192.168.0.130:9092,192.168.0.130:9293
template.default-topic: bootkafka
listener:
concurrency: 10 #并发数
producer:
bootstrap-servers: 192.168.0.130:9092,192.168.0.130:9293 #中间件ip:port
#key指定key 和value序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#发送消息失败情况下,尝试放送消息的次数
retries: 3
batch-size: 16384
acks: 1
linger:
ms: 1
consumer:
bootstrap-servers: 192.168.0.130:9092,192.168.0.130:9293
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#消费者组,默认
group-id: boot
这样一个就配置好了,springboot配置比springmvc简单的多。
producer发布消息:
consumer消费消息,这里使用spring注解方式,比较简单:
@Component
public class KafkaConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = {"bootkafka" })
public void listen(String data) {
logger.info("收到kafka消息" + data);
}}
gralde或maven加入以下依赖:
compile 'org.apache.kafka:kafka-clients:0.10.1.0'
compile 'org.springframework.kafka:spring-kafka:1.1.1.RELEASE'
application.properties:
#bootstrap.servers=192.168.0.130:9092,192.168.0.130:9093,192.168.0.130:9091
kafka.bootstrap.servers=192.168.0.130:9092,192.168.0.130:9093
kafka.group.id=0
kafka.retries=1
kafka.batch.size=16384
kafka.linger.ms=1
kafka.buffer.memory=33554432
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
创建配置文件:spring-kafka.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:cache="http://www.springframework.org/schema/cache" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--连接到kafka集群-->
<entry key="bootstrap.servers" value="${bootstrap.servers}" />
<entry key="group.id" value="${kafka.group.id}" />
<!--发送失败了,重新尝试次数-->
<entry key="retries" value="${kafka.retries}" />
<!--批量记录的最大量-->
<entry key="batch.size" value="16384" />
<!--消息延迟发送到broker-->
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<!--序列化key实现的接口-->
<entry key="key.serializer"
value="${kafka.key.serializer}" />
<entry key="value.serializer"
value="${kafka.value.serializer}" />
</map>
</constructor-arg>
</bean>
<context:property-placeholder location="classpath*:config/config.properties"/>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="mhb-test"/>
<property name="producerListener" ref="producerListener"/>
</bean>
<!-- 定义producer监听器,如果发送消息,会触发这个类 -->
<bean id="producerListener" class="com.test.myspring.kafka.kafkaProducerListener" />
< 定义消费者consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="false"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建消费者工厂consumerFactory bean -->;
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 如果采用和上面所述注解的方式创建消费者bean,就不需要下面这些配置 -->
<bean id="messageListernerConsumerService" class="com.test.myspring.kafka.KafkaConsumerServer"/>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties_trade" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="mhb-test"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>
<bean id="containerProperties_other" class="org.springframecck.kafka.listener.config.ContainerProperties">
<constructor-arg value="other_test_topic"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>
</beans>
web.xml 加上:
<param-value>classpath*:/config/spring-kafka.xml</param-value>
如果不想创建spring-kafka.xml
配置文件。我们可以自己创建KakfaFactory bean来初始化kafka:
package com.wtsd.myspring.kafka;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
/**
* 生成kafka工厂类
*
* @author test
* @date 2017/4/4 11:27
* @Package com.myspring.kafka
* @Version v1.0
*/
public class KakfaFactory {
private static final Logger logger = Logger.getLogger(KakfaFactory.class);
public static DefaultKafkaProducerFactory<String, String> kafkaProducerFactory;
public static volatile KafkaTemplate<String, String> kafkaTemplate;
/**
* 创建factory
*
* @param []
* @return
* @throws
* @date 2017/4/4 19:04
*/
public static DefaultKafkaProducerFactory<String, String> getKafkaFactory() {
return new DefaultKafkaProducerFactory<String, String>(getConfigs());
}
/**
* 读取*.properties中文件
*
* @param []
* @return
* @throws
* @date 2017/4/4 19:04
*/
public static Map<String, Object> getConfigs() {
logger.info(">>>> 加载kafka配置参数 <<<<<");
Resource resource = new ClassPathResource("config/application.properties");
Map<String, Object> config = new HashMap<String, Object>();
try {
InputStream inputStream = resource.getInputStream();
Properties properties = new Properties();
properties.load(inputStream);
inputStream.close();
config = getKafakProperteis(properties);
} catch (IOException e) {
logger.error("加载kafka配置参数", e);
}
logger.info(">>>配置参数:" + config);
return config;
}
/**
* 获取kafka的配置参数
*
* @param [properties]
* @return
* @throws
* @date 2017/4/4 13:02
*/
public static Map<String, Object> getKafakProperteis(Properties properties) {
HashMap<String, Object> config;
Set<String> set = new HashSet<String>();
if (null != properties) {
config = new HashMap<String, Object>();
set = properties.stringPropertyNames();
for (String s : set) {
if (StringUtils.contains(s, "kafka") && StringUtils.isNotBlank(properties.getProperty(s))) {
config.put(s.replace("kafka.", ""), properties.getProperty(s));
}
}
return config;
}
return null;
}
public static KafkaTemplate<String, String> getKafkaTemplate() {
if (kafkaTemplate == null) {
logger.info("开始获取kafkaTemplae");
synchronized (kafkaTemplate ) {
if (kafkaTemplate == null) {
kafkaTemplate = new KafkaTemplate<String, String>(getKafkaFactory(), true);
}
}
return kafkaTemplate;
}
}
生产者发送消息和消费者消费消息和springboot使用方法一样,这里就不介绍了。
Zookeeper版本:zookeeper-3.4.9
kafka是一个高并发的基于发布订阅模式的分布式消息队列系统。kafka具有更好的吞吐量、内置的分区、复制和容错能力,这使它成为大型消息处理应用程序的一个很好的解决方案。
安装和配置
kafka依赖于zookeeper,首先要下载安装zookeeper。这个其它文章有介绍安装方法。下载kafka_2.12-0.10.2.0.tar.gz 解压。修改config目录下 server.properties配置:
zookeeper.connect=localhost:2181 # zookeeper 注册中心
log.dirs=/opt/kafka/kafka-logs #log目录
num.partitions=2 #主题默认分区个数
listeners=PLAINTEXT://192.168.0.130:9092
broker.id=0 #id标识,在集群中,必须是唯一的整形数字
操作指令
启动:bin/kafka-server-start.sh config/server.properties 。如果集群的话,只需复制粘贴server.properties 重命名 server1.properties,修改broker.id和log.dirs。启动脚本替换使用该配置即可。创建topic:bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 2 --replication-factor 2
查看所有topic:bin/kafka-topics.sh --list --zookeeper localhost:2181
查看topic : bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
修改topic :bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
使用
springboot使用
springboot使用起来很简单,在springboot学习笔记一种已经将配置和依赖添加好了,这里在简单的把代码贴出来:gradle或maven加入以下依赖(以gralde为例):
compile('org.springframework.kafka:spring-kafka')
application.yml加入以下配置:
spring:
kafka:
bootstrap-servers: 192.168.0.130:9092,192.168.0.130:9293
template.default-topic: bootkafka
listener:
concurrency: 10 #并发数
producer:
bootstrap-servers: 192.168.0.130:9092,192.168.0.130:9293 #中间件ip:port
#key指定key 和value序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#发送消息失败情况下,尝试放送消息的次数
retries: 3
batch-size: 16384
acks: 1
linger:
ms: 1
consumer:
bootstrap-servers: 192.168.0.130:9092,192.168.0.130:9293
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#消费者组,默认
group-id: boot
这样一个就配置好了,springboot配置比springmvc简单的多。
producer发布消息:
private @Autowired KafkaTemplate<String, String> kafkaTemplate; @RequestMapping(value = "/home", method = RequestMethod.GET) public String home() { try { Map<String,Object> message=new HashMap<String,Object>(); message.put("description","kafka 消息测试"); message.put("topic","主题是 bootkafka"); message.put("timestamp",System.currentTimeMillis()/1000); String stringValue = JSONObject.toJSONString(message); kafkaTemplate.sendMessage("bootkafka",stringValue);//主题,消息 } catch (Exception e) { e.printStackTrace(); }
consumer消费消息,这里使用spring注解方式,比较简单:
@Component
public class KafkaConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = {"bootkafka" })
public void listen(String data) {
logger.info("收到kafka消息" + data);
}}
springmvc使用
springmvc的配置就比较多一点gralde或maven加入以下依赖:
compile 'org.apache.kafka:kafka-clients:0.10.1.0'
compile 'org.springframework.kafka:spring-kafka:1.1.1.RELEASE'
application.properties:
#bootstrap.servers=192.168.0.130:9092,192.168.0.130:9093,192.168.0.130:9091
kafka.bootstrap.servers=192.168.0.130:9092,192.168.0.130:9093
kafka.group.id=0
kafka.retries=1
kafka.batch.size=16384
kafka.linger.ms=1
kafka.buffer.memory=33554432
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
创建配置文件:spring-kafka.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:cache="http://www.springframework.org/schema/cache" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--连接到kafka集群-->
<entry key="bootstrap.servers" value="${bootstrap.servers}" />
<entry key="group.id" value="${kafka.group.id}" />
<!--发送失败了,重新尝试次数-->
<entry key="retries" value="${kafka.retries}" />
<!--批量记录的最大量-->
<entry key="batch.size" value="16384" />
<!--消息延迟发送到broker-->
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<!--序列化key实现的接口-->
<entry key="key.serializer"
value="${kafka.key.serializer}" />
<entry key="value.serializer"
value="${kafka.value.serializer}" />
</map>
</constructor-arg>
</bean>
<context:property-placeholder location="classpath*:config/config.properties"/>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="mhb-test"/>
<property name="producerListener" ref="producerListener"/>
</bean>
<!-- 定义producer监听器,如果发送消息,会触发这个类 -->
<bean id="producerListener" class="com.test.myspring.kafka.kafkaProducerListener" />
< 定义消费者consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="false"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建消费者工厂consumerFactory bean -->;
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 如果采用和上面所述注解的方式创建消费者bean,就不需要下面这些配置 -->
<bean id="messageListernerConsumerService" class="com.test.myspring.kafka.KafkaConsumerServer"/>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties_trade" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="mhb-test"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>
<bean id="containerProperties_other" class="org.springframecck.kafka.listener.config.ContainerProperties">
<constructor-arg value="other_test_topic"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>
</beans>
web.xml 加上:
<param-value>classpath*:/config/spring-kafka.xml</param-value>
如果不想创建spring-kafka.xml
配置文件。我们可以自己创建KakfaFactory bean来初始化kafka:
package com.wtsd.myspring.kafka;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
/**
* 生成kafka工厂类
*
* @author test
* @date 2017/4/4 11:27
* @Package com.myspring.kafka
* @Version v1.0
*/
public class KakfaFactory {
private static final Logger logger = Logger.getLogger(KakfaFactory.class);
public static DefaultKafkaProducerFactory<String, String> kafkaProducerFactory;
public static volatile KafkaTemplate<String, String> kafkaTemplate;
/**
* 创建factory
*
* @param []
* @return
* @throws
* @date 2017/4/4 19:04
*/
public static DefaultKafkaProducerFactory<String, String> getKafkaFactory() {
return new DefaultKafkaProducerFactory<String, String>(getConfigs());
}
/**
* 读取*.properties中文件
*
* @param []
* @return
* @throws
* @date 2017/4/4 19:04
*/
public static Map<String, Object> getConfigs() {
logger.info(">>>> 加载kafka配置参数 <<<<<");
Resource resource = new ClassPathResource("config/application.properties");
Map<String, Object> config = new HashMap<String, Object>();
try {
InputStream inputStream = resource.getInputStream();
Properties properties = new Properties();
properties.load(inputStream);
inputStream.close();
config = getKafakProperteis(properties);
} catch (IOException e) {
logger.error("加载kafka配置参数", e);
}
logger.info(">>>配置参数:" + config);
return config;
}
/**
* 获取kafka的配置参数
*
* @param [properties]
* @return
* @throws
* @date 2017/4/4 13:02
*/
public static Map<String, Object> getKafakProperteis(Properties properties) {
HashMap<String, Object> config;
Set<String> set = new HashSet<String>();
if (null != properties) {
config = new HashMap<String, Object>();
set = properties.stringPropertyNames();
for (String s : set) {
if (StringUtils.contains(s, "kafka") && StringUtils.isNotBlank(properties.getProperty(s))) {
config.put(s.replace("kafka.", ""), properties.getProperty(s));
}
}
return config;
}
return null;
}
public static KafkaTemplate<String, String> getKafkaTemplate() {
if (kafkaTemplate == null) {
logger.info("开始获取kafkaTemplae");
synchronized (kafkaTemplate ) {
if (kafkaTemplate == null) {
kafkaTemplate = new KafkaTemplate<String, String>(getKafkaFactory(), true);
}
}
return kafkaTemplate;
}
}
生产者发送消息和消费者消费消息和springboot使用方法一样,这里就不介绍了。
相关文章推荐
- kafka分布式消息队列使用(springboot和springmvc)
- 分布式消息队列Kafka 之 kafka简单部署及使用
- spring-boot 结合spring-data-redis使用redis的消息队列
- 消息队列 概念 配合SpringBoot使用Demo
- spring boot 发送kafka消息队列
- 分布式消息队列Kafka集群安装
- 快速理解Kafka分布式消息队列框架
- 分布式消息队列kafka系列介绍 — 核心API介绍及实例
- linkedin高吞吐量分布式消息系统kafka使用手记
- Kafka 分布式消息队列介绍
- Kafka 分布式消息队列介绍
- 快速理解Kafka分布式消息队列框架
- Linux中Gearman安装与使用,分布式消息队列(CentOS-6.5:gearmand-1.1.12)
- 大数据系列教程_分布式消息队列kafka
- kafka分布式消息队列----安装篇
- linkedin高吞吐量分布式消息系统kafka使用手记
- 快速理解Kafka分布式消息队列框架
- 分布式消息队列(Message Queue)系统:kafka扫盲
- linkedin高吞吐量分布式消息系统kafka使用手记
- 快速理解Kafka分布式消息队列框架