您的位置:首页 > 编程语言 > Java开发

spring集成kafka实现producer和consumer

2017-09-21 16:52 531 查看
目前网上针对kafka0.10没找到合适的集成案例,想使用spring-integration-kafka框架,但发现官方文档也不全,干脆自己用spring简单实现了一下

pom.xml里的代码:

[html] view
plain copy

<repositories>  

    <repository><!-- Spring-kafka的资源库地址 -->  

        <id>spring-milestones</id>  

        <name>Spring Milestones</name>  

        <url>https://repo.spring.io/libs-milestone</url>  

        <snapshots>  

            <enabled>false</enabled>  

        </snapshots>  

    </repository>  

</repositories>  

[html] view
plain copy

<dependency>  

  <groupId>org.springframework.kafka</groupId>  

  <artifactId>spring-kafka</artifactId>  

  <version>1.0.0.RC1</version>  

</dependency>  

消息生产者的配置如下:
kafka-producer.xml

[html] view
plain copy

<?xml version="1.0" encoding="UTF-8"?>  

<beans xmlns="http://www.springframework.org/schema/beans"  

     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

     xmlns:context="http://www.springframework.org/schema/context"  

     xsi:schemaLocation="http://www.springframework.org/schema/beans  

         http://www.springframework.org/schema/beans/spring-beans.xsd  

         http://www.springframework.org/schema/context  

         http://www.springframework.org/schema/context/spring-context.xsd">  

      

     <context:property-placeholder location="classpath:init.properties" />  

      

     <!-- 定义producer的参数 -->  

     <bean id="producerProperties" class="java.util.HashMap">  

        <constructor-arg>  

            <map>  

                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>  

                <entry key="group.id" value="0"/>  

                <entry key="retries" value="10"/>  

                <entry key="batch.size" value="16384"/>  

                <entry key="linger.ms" value="1"/>  

                <entry key="buffer.memory" value="33554432"/>  

                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>  

                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>  

            </map>  

        </constructor-arg>  

     </bean>  

       

     <!-- 创建kafkatemplate需要使用的producerfactory bean -->  

     <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">  

        <constructor-arg>  

            <ref bean="producerProperties"/>  

        </constructor-arg>  

     </bean>  

       

     <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->  

     <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">  

        <constructor-arg ref="producerFactory"/>  

        <constructor-arg name="autoFlush" value="true"/>  

        <property name="defaultTopic" value="mhb-test"/>  

     </bean>  

  

</beans>  

KafkaProducerTest.java

[java] view
plain copy

package com.tonsonmiao.common.kafka;  

  

import org.junit.Test;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.kafka.core.KafkaTemplate;  

import org.springframework.kafka.listener.KafkaMessageListenerContainer;  

  

/** 

 * kafka读写测试类 

 * @author miaohongbin  

 * @version      

 * Date: 2016年6月24日 下午6:22:38 <br/>  

 * @since 

 */  

<p class="p1"><span class="s1">@RunWith</span>(SpringJUnit4ClassRunner.<span class="s2">class</span>)</p><p class="p2">@ContextConfiguration<span class="s3">(locations = {</span></p><p class="p3"><span class="s3"><span>   </span><span>   </span></span>"classpath:/bean/ja-kafka-producer.xml"</p><p class="p3"><span style="font-family: Arial, Helvetica, sans-serif;">})</span></p>public class KafkaTest {  

      

    @Autowired  

    private KafkaTemplate<Integer, String> kafkaTemplate;  

      

    /** 

     * 向kafka里写数据.<br/>   

     * @author miaohongbin 

     * Date:2016年6月24日下午6:22:58 

     */  

    @Test  

    public void testTemplateSend(){  

        kafkaTemplate.sendDefault("haha111");  

    }  

  

}  

消费者代码:

kafka-consumer.xml

[html] view
plain copy

<?xml version="1.0" encoding="UTF-8"?>  

<beans xmlns="http://www.springframework.org/schema/beans"  

     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

     xmlns:context="http://www.springframework.org/schema/context"  

     xsi:schemaLocation="http://www.springframework.org/schema/beans  

         http://www.springframework.org/schema/beans/spring-beans.xsd  

         http://www.springframework.org/schema/context  

         http://www.springframework.org/schema/context/spring-context.xsd">  

      

     <context:property-placeholder location="classpath:init.properties" />  

  

    <!-- 定义consumer的参数 -->  

     <bean id="consumerProperties" class="java.util.HashMap">  

        <constructor-arg>  

            <map>  

                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>  

                <entry key="group.id" value="0"/>  

                <entry key="enable.auto.commit" value="true"/>  

                <entry key="auto.commit.interval.ms" value="1000"/>  

                <entry key="session.timeout.ms" value="15000"/>  

                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>  

                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>  

            </map>  

        </constructor-arg>  

     </bean>  

       

     <!-- 创建consumerFactory bean -->  

     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">  

        <constructor-arg>  

            <ref bean="consumerProperties"/>  

        </constructor-arg>  

     </bean>  

       

     <!-- 实际执行消息消费的类 -->  

     <bean id="messageListernerConsumerService" class="com.tonsonmiao.common.kafka.KafkaConsumer"/>  

       

     <!-- 消费者容器配置信息 -->  

     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">  

        <constructor-arg value="mhb-test"/>  

        <property name="messageListener" ref="messageListernerConsumerService"/>  

     </bean>  

       

     <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->  

     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">  

        <constructor-arg ref="consumerFactory"/>  

        <constructor-arg ref="containerProperties"/>  

     </bean>  

  

</beans>  

KafkaConsumer.java

[java] view
plain copy

package com.tonsonmiao.common.kafka;  

  

import org.apache.kafka.clients.consumer.ConsumerRecord;  

import org.springframework.kafka.listener.MessageListener;  

  

public class KafkaConsumer implements MessageListener<Integer, String>{  

  

    @Override  

    public void onMessage(ConsumerRecord<Integer, String> record) {  

        System.out.println(record);  

    }  

  

}  

配置文件init.properties

[plain] view
plain copy

bootstrap.servers=10.94.97.59:9092  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: