您的位置:首页 > 编程语言 > Java开发

springboot - 集成kafka完整代码实现

2017-09-11 22:33 916 查看
时不我待;老兵不死也会凋零

1.安装kafka(略去)

2.配置jar包依赖:我在使用spring-boot 的1.5.4版本时候发现有kafka版本不兼容的问题,请注意

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

<!-- 该版本不支持spring-kafka -->
<!--<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.0.0.M3</version>
</dependency>-->

<!--<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.6.RELEASE</version>
</dependency>-->

<!-- 该版本部分不符合要求 -->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>-->

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

3.配置生产者消费者代码:

生产者:

@Bean("kafkaTemplate")的注解是非常关键的配置项


@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Bean("kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
return kafkaTemplate;
}

public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.208.25.294:9092");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<String, String>(properties);
}
}

消费者:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
* Created by yuanyirui839 on 2017-09-13.
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(4000);
return factory;
}

public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:PORT");//注意这里修改为kafka的具体配置项目,我这里只是为了开发演示方便
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<String, String>(properties);
}

@Bean
public KafkaListeners kafkaListeners() {
return new KafkaListeners();
}

}

kafkaListeners:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Optional;

/**
* Created by yuanyirui839 on 2017-09-13.
*/
public class KafkaListeners {
@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("listen " + message);
// logService.insertMessage(message);
}
}
}


4.编写测试代码:

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

/**
*
*/
@RestController
public class KafkaController {

@Autowired
KafkaTemplate kafkaTemplate;

/* ###########################kafka 测试类###########################*/
@RequestMapping("/testKafka")
@ResponseBody
public void testkafka() {
String message = "{\"transNo\":\"kafka23456789\",\"idLogDs\":\"idLogDsretyuio\"}";
try {
kafkaTemplate.send("test", "hi", message);
System.out.println("ok");

//测试依赖项目的配置
System.out.println(StrUtil.YUAN());

UpStreamLog log = new UpStreamLog();
log.setAppId("uuuuuuuuuuuuuuuuuuuuu");
System.out.println(log.getAppId());
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.toString());
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring boot kafka