您的位置:首页 > 编程语言 > Java开发

Spring Cloud Stream RabbitMQ 简单实现

2018-10-03 17:36 726 查看

参考官方文档:http://cloud.spring.io/spring-cloud-static/Finchley.SR1/single/spring-cloud.html#_rabbitmq_binder

环境:win10 rabbit 默认配置

一、添加依赖

[code]<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

二、配置文件

[code]spring:
application:
name: rabbitmq-client
cloud:
stream:
bindings:
chen2018:  #通道名
destination: chen-test #目的地
rabbitmq:
host: localhost
port: 5672

三、构建消息发送者

  1.先创建通道

[code]public interface ChenService {
@Output("chen2018")
MessageChannel chen();

}

  2.扫描接口

[code]@EnableBinding(ChenService.class)

  3.发送消息类

   为了防止中文乱码加了消息头

[code]@RestController
public class TestController {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private ChenService chenService;

@GetMapping("/send")
public Boolean sendMsg(@RequestParam String msg){
//获取MessageChanel
MessageChannel messageChannel = chenService.chen();
Map<String,Object> headers = new HashMap<>();
headers.put("charset-encoding","UTF-8");
headers.put("content-type",MediaType.TEXT_PLAIN_VALUE);
return messageChannel.send(new GenericMessage<>(msg,headers));
}
}

四、构建消息订阅者

  1.构建订阅通道

[code]public interface ChenService {
@Input("chen2018")
SubscribableChannel chen();
}

  2.扫描接口

[code]@EnableBinding(ChenService.class)

  3.多种编程模型根据实际情况选择

[code]    @Autowired
private ChenService chenService;

@PostConstruct
public void init(){//接口编程
//获取Subscribe chanel
SubscribableChannel subscribableChannel = chenService.chen();
subscribableChannel.subscribe(message -> {
MessageHeaders headers = message.getHeaders();
String encoding = (String) headers.get("charset-encoding");
String text = (String)headers.get("content-type");
byte[] content = (byte[]) message.getPayload();
try {
System.out.println("接收到消息:"+ new String(content,encoding));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
}
@StreamListener("chen2018")
public void onMessage(byte[] data){//注解编程
System.out.println("onMessage(byte[]):"+data);
}

@StreamListener("chen2018")
public void onMessage(String data){//注解编程
System.out.println("onMessage(String):"+data);
}

@ServiceActivator(inputChannel = "chen2018") //Spring Integration注解
public void onServiceActivator(String data){
System.out.println("onServiceActivator(String):"+data);
}

    建议:使用同一种编程模型
    相同的编程模型重复执行,不同的编程模型轮流执行

五、测试结果

访问:http://localhost:8088/send?msg=加油!fighting

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: