您的位置:首页 > 编程语言 > Java开发

spring cloud stream kafka实例

2017-04-10 00:00 633 查看

maven

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

生产者配置

server:
port: 8081
spring:
application:
name: output-demo
cloud:
instance-count: 1
instance-index: 0
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2182
auto-add-partitions: true
auto-create-topics: true
min-partition-count: 1
bindings:
output:
destination: event-demo
content-type: text/plain
producer:
partitionCount: 1


java代码

@EnableBinding(Source.class)
public class SendService {

@Autowired
private Source source;

public void sendMessage(String msg) {
try {
source.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
}
}
}

@RestController
public class ProducerController {

@Autowired
private SendService service;

@RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
public void send(@PathVariable("msg") String msg){
service.sendMessage(msg);
}

}

消费者

spring:
application:
name: input-demo
cloud:
instance-count: 1
instance-index: 0
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2182
auto-add-partitions: true
auto-create-topics: true
min-partition-count: 1
bindings:
input:
destination: event-demo
group: s1
consumer:
autoCommitOffset: false
concurrency: 1
partitioned: false


java代码

@EnableBinding(Sink.class)
public class MsgSink {

@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}

运行

先运行生产者,再运行消费者

curl -i localhost:8081/send/hello1

doc

Apache Kafka Binder

mac本地搭建kafka

SpringCloudStream 构建消息驱动的微服务框架
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spring Cloud