Spring Cloud Stream总结
概念
1、group:
组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费
组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group
2、destination binder:
与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder
3、destination binding:
Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建
4、partition
一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上
注:严格来说partition不属于概念,而是一种Stream提高伸缩性、吞吐量的一种方式
注解
1、@Input,使用示例:
public interface MySink { @Input("my-input") SubscribableChannel input(); }
作用:
- 用于接收消息
- 为每个binding生成channel实例
- 指定input channel的名称
- 在spring容器中生成一个名为my-input,类型为SubscribableChannel的bean
- 在spring容器中生成一个类,实现MySink接口。
2、@Output,使用示例:
public interface MySource { @Output("my-output") MessageChannel output(); }
作用:
- 与
@Input
类似,只不过是用来生产消息
3、@StreamListener,使用示例:
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'") public void receive(String messageBody) { log.info("Received: {}", messageBody); }
作用:
- 用于消费消息
- condition的作用:用于过滤消息,只有符合条件表达式的消息才会被处理
- condition起作用的两个条件: 注解的方法没有返回值
- 方法是一个独立方法,不支持Reactive API
4、@SendTo,使用示例:
// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel @StreamListener(Sink.INPUT) @SendTo(Source.OUTPUT) public String receive(String receiveMsg) { return "handle..."; }
作用:
- 用于发送消息
4、@InboundChannelAdapter,使用示例:
@Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource<String> producer() { return () -> new GenericMessage<>("Hello Spring Cloud Stream"); }
作用:
- 让添加该注解的方法生产消息
- fixedDelay:多少毫秒发送1次
- maxMessagesPerPoll:每次发送多少条消息
5、@ServiceActivator,使用示例:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT) public String transform(String payload) { return payload.toUpperCase(); }
作用:
- 标注该注解的方法能够处理消息或消息的有效内容,通过监听input消息,用方法体的代码处理后,输出到output中
6、@Transformer,使用示例:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpperCase(); }
作用:
- 与
@ServiceActivator
类似,标注该注解的方法能够转换消息,消息头,或消息有效内容
PollableMessageSource
PollableMessageSource允许消费者可以控制消费速率。举个例子简单演示一下,首先定义一个接口:
public interface PolledProcessor { @Input("pollable-input") PollableMessageSource input(); }
使用示例:
@Autowired private PolledProcessor polledProcessor; @Scheduled(fixedDelay = 5_000) public void poll() { polledProcessor.input().poll(message -> { byte[] bytes = (byte[]) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); }
参考:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers
- spring cloud stream kafka实例
- .net MVC, webAPI,webForm集成steeltoe+springcloud实现调用服务中心服务的总结
- Spring Cloud Stream消费失败后的处理策略(一):自动重试
- spring cloud stream
- spring cloud config使用总结
- Spring Cloud搭建手册(7)——总结
- SpringCloud项目问题总结
- Spring Cloud Stream教程(五)编程模型
- spring cloud Stream input和output重名报错
- spring cloud bus 和 spring cloud stream 的使用场景
- Spring Cloud中关于Feign的常见问题总结
- Spring Cloud Stream:基于事件(消息)驱动的微服务框架
- SpringCloudStream 构建消息驱动的微服务框架 集成kafka_http://blog.spring-cloud.io/blog/sc-stream.html
- Spring Cloud Stream (1)-代码篇
- spring cloud 学习(9) - turbine stream无法在eureka注册的解决办法
- spring-cloud-hystrix之Unable to connect to Command Metric Stream.异常
- Spring Cloud微服务升级总结
- Spring Cloud与微服务学习总结(5)——认证鉴权与API权限控制在微服务架构中的设计与实现(三)
- Spring Cloud Stream教程(一)介绍Spring Cloud Stream
- 异常解决篇:spring cloud stream rabbitMq配置错误,导致无法启动