springboot整合kafka
2017-12-20 19:37
441 查看
1.idea选择web,kafka…
2.kafka配置类如下:
生产者:
消费者:
==Listener类==
@KafkaListener 这个注解,用于指定消费kafka的group.id,topics(是个数组)
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
2.kafka配置类如下:
生产者:
@Configuration//表明这个是配置类 @EnableKafka //打开kafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } //创建工厂 public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean //注入模板实例 public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
消费者:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } @Bean //这个类用于消费数据操作; public Listener listener() { return new Listener(); } }
==Listener类==
@KafkaListener 这个注解,用于指定消费kafka的group.id,topics(是个数组)
public class Listener { protected final Logger logger = LoggerFactory.getLogger("default-appender"); @KafkaListener(id = "group-id-test" , topics = {"test"}) public void listen(ConsumerRecord<?, ?> record) { logger.error("kafka的key: " + record.key()); System.out.println("kafka的key: " + record.key()); logger.error("kafka的value: " + record.value().toString()); System.out.println("kafka的value: " + record.value().toString()); } }
相关文章推荐
- Spring Boot整合Kafka的简单用例
- SpringBoot整合Kafka:简单收发消息案例
- spring boot 整合kafka
- 被版本更新坑到哭系列:SpringBoot整合Kafka
- SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)
- spring boot整合spring-kafka实现发送接收消息实例代码
- Spring Boot整合Kafka的简单用例(@KafkaListener注解)
- Spring Boot 构建应用——整合消息中间件 Kafka
- SpringBoot整合SpringKafka实现生产者史上最简代码实现
- springboot整合kafka应用
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- springboot整合kafka(window单机版kafka简单向)
- spring cache ehcache2.x 基于spring boot 整合
- SpringBoot整合Redis
- Spring-boot整合Dubbo构建分布式开发配置
- Springboot 整合 Mybatis 的完整 Web 案例
- springboot整合阿里短信服务
- Spring Boot对dubbo的整合
- SpringBoot(六):SpringBoot整合Redis
- 自己现实mybatis分页插件 整合springboot 运行