spring cloud stream kafka实例
2017-04-10 00:00
633 查看
maven
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
生产者配置
server: port: 8081 spring: application: name: output-demo cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2182 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: output: destination: event-demo content-type: text/plain producer: partitionCount: 1
java代码
@EnableBinding(Source.class) public class SendService { @Autowired private Source source; public void sendMessage(String msg) { try { source.output().send(MessageBuilder.withPayload(msg).build()); } catch (Exception e) { e.printStackTrace(); } } } @RestController public class ProducerController { @Autowired private SendService service; @RequestMapping(value = "/send/{msg}", method = RequestMethod.GET) public void send(@PathVariable("msg") String msg){ service.sendMessage(msg); } }
消费者
spring: application: name: input-demo cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2182 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: input: destination: event-demo group: s1 consumer: autoCommitOffset: false concurrency: 1 partitioned: false
java代码
@EnableBinding(Sink.class) public class MsgSink { @StreamListener(Sink.INPUT) public void process(Message<?> message) { System.out.println(message.getPayload()); Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); if (acknowledgment != null) { System.out.println("Acknowledgment provided"); acknowledgment.acknowledge(); } } }
运行
先运行生产者,再运行消费者curl -i localhost:8081/send/hello1
doc
Apache Kafka Bindermac本地搭建kafka
SpringCloudStream 构建消息驱动的微服务框架
相关文章推荐
- spring cloud stream kafka 动态写入不同的topic(Using dynamically bound destinations)
- Spring Cloud Stream + Kafka
- SpringCloud学习之SpringCloudStream&集成kafka
- spring cloud stream配置多个kafka binders
- SpringCloudStream 构建消息驱动的微服务框架 集成kafka_http://blog.spring-cloud.io/blog/sc-stream.html
- Spring Cloud Stream Binder Kafka Monitor
- 四、kafka+maven+springMVC实例
- spring整合kafka实例
- spring cloud笔记 - stream初篇
- Spring Cloud构建微服务架构(七)消息总线(续:Kafka)
- spring boot与kafka集成的简单实例
- spring boot整合spring-kafka实现发送接收消息实例代码
- spring cloud netflix 微服务使用实例
- SpringCloudStream 构建消息驱动的微服务框架
- Spring Cloud Stream
- Spring Cloud Feign实例讲解
- spring-cloud-hystrix之Unable to connect to Command Metric Stream.异常
- kafka spring 实例
- Spring Cloud Stream 教程
- spring-cloud-stream — Retry With the RabbitMQ Binder