springboot kafka集成
2017-08-11 00:00
435 查看
github : https://github.com/zhouPingHua/spring-data-kafka2
1.依赖(注意kafka客户版本与服务版本要一致,不然会出错)
2.application.yml
3.生产者 producer
4.消费者
5.监听类
6.controller
7.项目启动
1.依赖(注意kafka客户版本与服务版本要一致,不然会出错)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.0.RC1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
2.application.yml
#服务prot server: port : 20001 #kafka相关 kafka: brokers : 127.0.0.1:9092 groupid : test-group
3.生产者 producer
package com.caiyi.financial.data.kafka; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; /** * Created by zph Date:2017/8/9. * kafka生产者配置 */ @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.brokers}") private String brokers; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
4.消费者
package com.caiyi.financial.data.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; /** * Created by zph Date:2017/8/9. */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.brokers}") private String brokers; @Value("${kafka.groupid}") private String groupid; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupid); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
5.监听类
package com.caiyi.financial.data.kafka; /** * Created by zph Date:2017/8/9. */ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import java.util.Optional; public class Listener { @KafkaListener(topics = {"testtopic"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("testtopic " + message); } } @KafkaListener(topics = {"testtopic2"}) public void listen2(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("testtopic2" + message); } } }
6.controller
package com.caiyi.financial.data.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * Created by zph Date:2017/8/10. */ @RestController public class Controller { @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping(value = "/send", method = RequestMethod.GET) public void send(@RequestParam(required = true) String topic, @RequestParam(required = true) String message) { kafkaTemplate.send(topic, message); } }
7.项目启动
package com.caiyi.financial.data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; /** * Created by zph Date:2017/8/8. */ @SpringBootApplication public class Application { protected static final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); logger.info("spring BOOT Start"); } protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(Application.class); } }
相关文章推荐
- springboot kafka集成(实现producer和consumer)
- spring boot kafka集成
- springboot - 集成kafka完整代码实现
- spring boot 与kafka集成的示例代码
- spring boot 集成spark-streaming-kafka
- springboot 1.5.2 集成kafka 简单例子
- 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便。
- 利用kafka与springboot高消费集成
- springboot 1.5.2 集成kafka 简单例子
- springboot 1.5.3集成kafka
- spring boot与kafka集成
- springboot kafka集成
- 教你无脑在springBoot项目中集成ELK+Kafka
- springboot和kafka集成
- spring boot与kafka集成(spring boot 1.5.1版本)
- spring boot 与kafka集成
- spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)
- Spring Boot集成kafka笔记
- Springboot集成Kafka实现producer和consumer的示例代码
- spring boot与kafka集成