springboot - 集成kafka完整代码实现
2017-09-11 22:33
916 查看
时不我待;老兵不死也会凋零
kafkaListeners:
1.安装kafka(略去)
2.配置jar包依赖:我在使用spring-boot 的1.5.4版本时候发现有kafka版本不兼容的问题,请注意
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency> <!-- 该版本不支持spring-kafka --> <!--<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.0.0.M3</version> </dependency>--> <!--<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.3.6.RELEASE</version> </dependency>--> <!-- 该版本部分不符合要求 --> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency>--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
3.配置生产者消费者代码:
生产者:
@Bean("kafkaTemplate")的注解是非常关键的配置项
@Configuration @EnableKafka public class KafkaProducerConfig { @Bean("kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()); return kafkaTemplate; } public ProducerFactory<String, String> producerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.208.25.294:9092"); properties.put(ProducerConfig.RETRIES_CONFIG, 0); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<String, String>(properties); } }
消费者:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; /** * Created by yuanyirui839 on 2017-09-13. */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.getContainerProperties().setPollTimeout(4000); return factory; } public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:PORT");//注意这里修改为kafka的具体配置项目,我这里只是为了开发演示方便 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<String, String>(properties); } @Bean public KafkaListeners kafkaListeners() { return new KafkaListeners(); } }
kafkaListeners:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import java.util.Optional; /** * Created by yuanyirui839 on 2017-09-13. */ public class KafkaListeners { @KafkaListener(topics = {"test"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("listen " + message); // logService.insertMessage(message); } } }
4.编写测试代码:
import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; /** * */ @RestController public class KafkaController { @Autowired KafkaTemplate kafkaTemplate; /* ###########################kafka 测试类###########################*/ @RequestMapping("/testKafka") @ResponseBody public void testkafka() { String message = "{\"transNo\":\"kafka23456789\",\"idLogDs\":\"idLogDsretyuio\"}"; try { kafkaTemplate.send("test", "hi", message); System.out.println("ok"); //测试依赖项目的配置 System.out.println(StrUtil.YUAN()); UpStreamLog log = new UpStreamLog(); log.setAppId("uuuuuuuuuuuuuuuuuuuuu"); System.out.println(log.getAppId()); } catch (Exception e) { e.printStackTrace(); System.out.println(e.toString()); } } }
相关文章推荐
- springboot - 集成mysql实现方式之一,DruidDataSource 完整代码实现
- springboot - 集成redis完整代码实现
- Springboot集成Kafka实现producer和consumer的示例代码
- spring boot 与kafka集成的示例代码
- spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)
- springboot kafka集成(实现producer和consumer)
- SpringBoot整合SpringKafka实现生产者史上最简代码实现
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- spring boot整合spring-kafka实现发送接收消息实例代码
- (35)Spring Boot集成Redis实现缓存机制【从零开始学Spring Boot】
- spring boot session redis 分布式session实现,提供完整源码
- SpringBoot入门-17(springboot集成mybatis注解形式实现ID自动增长)
- springboot 1.5.2 集成kafka 简单例子
- Spring Boot+AngularJS+BootStrap实现进度条示例代码
- 微服务日志之Spring Boot Kafka实现日志收集
- 使用Logstash同步数据至Elasticsearch,Spring Boot中集成Elasticsearch实现搜索
- 第5篇 RabbitMQ集成SpringBoot实现Direct模式
- 【spring-boot】spring-boot集成ehcache实现缓存机制
- springboot集成多数据源代码
- Spring Boot实现邮件注册功能示例代码