您的位置:首页 > 编程语言 > Java开发

spring4和kafka_0.10.0.0集成

2017-08-22 17:30 246 查看
当前环境:
spring-4.3.1

kafka-0.10.0.0

 

pom.xml

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>



 

spring-kafka-producer.xml

<?xml version="1.0" encoding="UTF-8"?> 

<beans xmlns="http://www.springframework.org/schema/beans

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context

       xsi:schemaLocation="http://www.springframework.org/schema/beans 

         http://www.springframework.org/schema/beans/spring-beans.xsd 
         http://www.springframework.org/schema/context 
         http://www.springframework.org/schema/context/spring-context.xsd"> 
  

    <!-- 定义producer的参数 --> 

    <bean id="producerProperties" class="java.util.HashMap"> 

        <constructor-arg> 

            <map> 

                <entry key="bootstrap.servers" value="10.9.1.120:9092" /> 

                <entry key="group.id" value="0" /> 

                <entry key="retries" value="10" /> 

                <entry key="batch.size" value="16384" /> 

                <entry key="linger.ms" value="1" /> 

                <entry key="buffer.memory" value="33554432" /> 

                <entry key="key.serializer" 

                       value="org.apache.kafka.common.serialization.StringSerializer" /> 

                <entry key="value.serializer" 

                       value="org.apache.kafka.common.serialization.StringSerializer" /> 

            </map> 

        </constructor-arg> 

    </bean> 

 

    <!-- 创建kafkatemplate需要使用的producerfactory bean --> 

    <bean id="producerFactory" 

          class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 

        <constructor-arg> 

            <ref bean="producerProperties" /> 

        </constructor-arg> 

    </bean> 

 

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> 

    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 

        <constructor-arg ref="producerFactory" /> 

        <constructor-arg name="autoFlush" value="true" /> 

        <property name="defaultTopic" value="orderTopic" /> 

        <property name="producerListener" ref="producerListener"/> 

    </bean> 

 

    <bean id="producerListener" class="cn.xxx.xxx.kafka.listener.KafkaProducerListener" /> 

</beans>

 

spring-kafka-consumer.xml

<?xml version="1.0" encoding="UTF-8"?> 

<beans xmlns="http://www.springframework.org/schema/beans

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance

       xmlns:context="http://www.springframework.org/schema/context

       xsi:schemaLocation="http://www.springframework.org/schema/beans 

     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
      "> 

 

 

    <!-- 定义consumer的参数 --> 

    <bean id="consumerProperties" class="java.util.HashMap"> 

        <constructor-arg> 

            <map> 

                <entry key="bootstrap.servers" value="10.9.1.120:9092"/> 

               <entry key="group.id" value="0"/> 

                <entry key="enable.auto.commit" value="true"/> 

                <entry key="auto.commit.interval.ms" value="1000"/> 

                <entry key="session.timeout.ms" value="15000"/> 

                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 

                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 

           </map> 

        </constructor-arg> 

    </bean> 

 

    <!-- 创建consumerFactory bean --> 

    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 

        <constructor-arg> 

            <ref bean="consumerProperties"/> 

        </constructor-arg> 

    </bean> 

 

    <!-- 实际执行消息消费的类 --> 

    <bean id="messageListernerConsumerService" class="cn.xxx.xxx.analyzer.transfer.KafkaConsumerServer"/> 

 

 

    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> 

        <constructor-arg value="aaaa"/><!-- 监听的topic ContainerProperties 中构造函数参数,多个toppic 用,隔开 --> 

        <property name="messageListener" ref="messageListernerConsumerService"/> 

    </bean> 

 

 

    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 

         init-method="doStart"> 

       <constructor-arg ref="consumerFactory"/> 

       <constructor-arg ref="containerProperties"/> 

   </bean> 

  

</beans> 

web.xml

<?xml version="1.0" encoding="UTF-8"?>

<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaeehttp://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"

         version="3.1">

    <!-- 1.引入Spring的配置文件 -->

    <context-param>

        <param-name>contextConfigLocation</param-name>

        <param-value>classpath:spring-database.xml;classpath:spring-kafka-producer.xml;classpath:spring-kafka-consumer.xml</param-value>

    </context-param>

     ...

 kafkaProducer监听器KafkaProducerListener.java

import org.apache.kafka.clients.producer.RecordMetadata;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.support.ProducerListener;

/**

 * kafkaProducer监听器,在producer配置文件中开启

 *

 * @author

 *

 */

@SuppressWarnings("rawtypes")

public class KafkaProducerListener implements ProducerListener {

 protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer");

 /**

  * 发送消息成功后调用

  */

 public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {

  LOG.info("==========kafka发送数据成功(日志开始)==========");

  LOG.info("----------topic:" + topic);

  LOG.info("----------partition:" + partition);

  LOG.info("----------key:" + key);

  LOG.info("----------value:" + value);

  LOG.info("----------RecordMetadata:" + recordMetadata);

  LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");

 }

 /**

  * 发送消息错误后调用

  */

 public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {

  LOG.info("==========kafka发送数据错误(日志开始)==========");

  LOG.info("----------topic:" + topic);

  LOG.info("----------partition:" + partition);

  LOG.info("----------key:" + key);

  LOG.info("----------value:" + value);

  LOG.info("----------Exception:" + exception);

  LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");

  exception.printStackTrace();

 }

 /**

  * 方法返回值代表是否启动kafkaProducer监听器

  */

 public boolean isInterestedInSuccess() {

  LOG.info("///kafkaProducer监听器启动///");

  return true;

 }

}

 

kafka producer发送器

KafkaProducer.java

import java.util.Map;

public interface KafkaProducer {

 /**

  *

  * @param topic 主题

  * @param data messageValue

  * @return

  */

 public Map<String, Object> sndMesForTemplate(String topic, String data);

 /**

  * kafka发送消息模板

  *

  * @param topic

  *            主题

  * @param value

  *            messageValue

  * @param ifPartition

  *            是否使用分区 0是\1不是

  * @param partitionNum

  *            分区数 如果是否使用分区为0,分区数必须大于0

  * @param role

  *            角色:bbc app erp...

  */

 public Map<String, Object> sndMesForTemplate(String topic, Object value, String ifPartition, Integer partitionNum,

   String role);

}

KafkaProducerImpl.java

import java.util.HashMap;

import java.util.Map;

import java.util.Random;

import java.util.concurrent.ExecutionException;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Service;

import org.springframework.util.concurrent.ListenableFuture;

import com.alibaba.fastjson.JSON;

import cn.xbsafe.hgc.kafka.KafkaMesConstant;

import cn.xbsafe.hgc.kafka.producer.service.KafkaProducer;

@Service

public class KafkaProducerImpl implements KafkaProducer{

  @Autowired 

  private KafkaTemplate<String, String> kafkaTemplate; 

 public Map<String, Object> sndMesForTemplate(String topic, Object value, String ifPartition, Integer partitionNum,

   String role) {

  String key = role + "-" + value.hashCode();

  String valueString = JSON.toJSONString(value);

  if (ifPartition.equals("0")) {

   // 表示使用分区

   int partitionIndex = getPartitionIndex(key, partitionNum);

   ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, partitionIndex, key,

     valueString);

   Map<String, Object> res = checkProRecord(result);

   return res;

  } else {

   ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);

   Map<String, Object> res = checkProRecord(result);

   return res;

  }

 }

 

 public Map<String, Object> sndMesForTemplate(String topic, String data){

  ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, data);

  Map<String, Object> res = checkProRecord(result);

  return res;

 }

 

 /**

  *  根据key值获取分区索引

     * @param key

     * @param partitionNum

  * @return

 */

 private int getPartitionIndex(String key, int partitionNum){ 

         if (key == null) { 

             Random random = new Random(); 

             return random.nextInt(partitionNum); 

         } 

         else { 

             int result = Math.abs(key.hashCode())%partitionNum; 

             return result; 

         } 

     }

 /**

     * 检查发送返回结果record

     * @param res

     * @return

     */

 @SuppressWarnings("rawtypes") 

     private Map<String,Object> checkProRecord(ListenableFuture<SendResult<String, String>> res){ 

         Map<String,Object> m = new HashMap<String,Object>(); 

         if(res!=null){ 

             try { 

                 SendResult r = res.get();//检查result结果集 

                 /*检查recordMetadata的offset数据,不检查producerRecord*/ 

                 Long offsetIndex = r.getRecordMetadata().offset(); 

                 if(offsetIndex!=null && offsetIndex>=0){ 

                     m.put("code", KafkaMesConstant.SUCCESS_CODE); 

                     m.put("message", KafkaMesConstant.SUCCESS_MES); 

                     return m; 

                 }else{ 

                    m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE); 

                    m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES); 

                     return m; 

                 } 

             } catch (InterruptedException e) { 

                 e.printStackTrace(); 

                 m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE); 

                 m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES); 

                 return m; 

             } catch (ExecutionException e) { 

                e.printStackTrace(); 

                 m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE); 

                m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES); 

                return m; 

            } 

         }else{ 

           m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE); 

           m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES); 

             return m; 

         } 

     }

}

 

测试kafka producer

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONArray;

import com.alibaba.fastjson.JSONObject;

 

@Controller

@RequestMapping("/subscriber")

public class SubscriberController {

 private Logger logger = LoggerFactory.getLogger(getClass());

 @Autowired

 private KafkaProducer kafkaProducer;

.....

  Map kafkasend=kafkaProducer.sndMesForTemplate(TOPIC, data);

   logger.info("kafkaproducer send result--->"+kafkasend);

.....

 

kafka监听器KafkaConsumerServer.java

import org.apache.kafka.clients.consumer.ConsumerRecord; 

import org.slf4j.Logger; 

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.listener.MessageListener;

import com.alibaba.fastjson.JSONObject;

 

public class KafkaConsumerServer implements MessageListener<String, String> { 

    protected final Logger LOG = LoggerFactory.getLogger("KafkaConsumerServer");  

  

   

  

    /**

     * 监听器自动执行该方法

     *     消费消息

     *     自动提交offset

     *     执行业务代码

     *     (high level api 不提供offset管理,不能指定offset进行消费)

     */ 

   public void onMessage(ConsumerRecord<String, String> record)  {  

   

       try { 

          LOG.info("=============kafkaConsumer开始消费============="); 

         String topic = record.topic(); 

           String key = record.key(); 

           String value = record.value(); 

          long offset = record.offset(); 

           int partition = record.partition(); 

          LOG.info("-------------topic:" + topic); 

            LOG.info("-------------value:" + value); 

          LOG.info("-------------key:" + key); 

            LOG.info("-------------offset:" + offset); 

          LOG.info("-------------partition:" + partition); 

          LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");  

    //get kafka data info ,to do something....

      }catch (Exception e){  

          e.printStackTrace();

      }  finally {

         // elasticSearchClientPool.returnAClientToPool(esClient);

      }

  } 

 



当producer成功后,consumer可以监听订阅到发布的数据信息

 

 

 

 



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: