SpringBoot实践之---集成Kafka
2018-02-26 11:38
633 查看
首先build.gradle配置文件中引入 Kafka
//kafka compile('org.apache.kafka:kafka_2.10:0.8.1.1') { exclude module:'slf4j-log4j12' exclude module:'log4j-over-slf4j' }其中排除的关联引用包,根据实际的工程情况选,我这儿是刚好重复了所以排除了
然后是application.properties配置文件类中加入kafka的配置
#kafka metadata.broker.list=172.16.3.31:6667,172.16.3.32:6667,172.16.3.33:6667 request.required.acks=1 kafka.topic.overdue=diplomat_userinfo_update
Kafka发送消息类
public class KafkaProducerUtils { /** kafka生产者对象 */ private static Producer<String, String> producer; //属性初始化 static { Properties prop = new Properties(); prop.setProperty("serializer.class", "kafka.serializer.StringEncoder"); prop.setProperty("metadata.broker.list", PropertyReaderUtils.getProValue("metadata.broker.list")); prop.setProperty("request.required.acks", PropertyReaderUtils.getProValue("request.required.acks")); ProducerConfig config = new ProducerConfig(prop); producer = new Producer(config); } /** * 发送单条kafka消息 * * @param topic 消息topic枚举值 * @param message json格式的消息内容 */ public static void sendMsg(EKafkaTopic topic, String message) { KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic.getTopic(), message); producer.send(keyedMessage); } /** * 批量发送kafka消息 * * @param topic 消息topic枚举值 * @param messages json格式的消息内容列表 */ public static void batchSendMsg(EKafkaTopic topic, List<String> messages) { List<KeyedMessage<String, String>> keyedMessages = new ArrayList<>(); for (String message : messages) { KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic.getTopic(), message); keyedMessages.add(keyedMessage); } producer.send(keyedMessages); } }
Kafka消费消息类
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,master:9093"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String,String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Arrays.asList("message")); while(true){ ConsumerRecords<String,String> records = consumer.poll(10); for(ConsumerRecord<String,String> record : records){ System.out.println("offset=" + record.offset() + ",--key=" + record.key() + ",--value=" + record.value()); } } } }在实际项目中,可以在项目启动时就运行的消费者监听类里面
相关文章推荐
- Kafka和Spring集成实践
- springboot集成kafka
- springboot 集成kafka 简例
- springboot 1.5.2 集成kafka的简单例子
- Kafka和Spring集成实践
- CDH集成kafka方法实践
- SpringBoot实践之---集成Spring cache和EhCache
- SpringBoot集成kafka
- springboot集成kafka
- Kafka和Spring集成实践
- 对于持续集成实践的常见问题的解答
- 来自京东、唯品会对微服务编排、API网关、持续集成的实践分享(上)
- springboot集成mybatis
- springboot集成cors
- SpringBoot实践之---集成mail邮件服务
- 使用sklearn进行集成学习——实践
- 将 Flex 集成到 Java EE 应用程序的最佳实践
- spring4和kafka_0.10.0.0集成
- SpringBoot实践之---集成RabbitMQ
- sparkstreaming接受kafka数据实时存入hbse并集成rest服务