您的位置:首页 > 其它

RabbitMQ收发消息——原生API

2018-01-29 16:47 330 查看
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>2.8.2</version>
</dependency>


首先是我们创建连接对象的函数

//这里请手动填入,我是通过配置文件注入的
protected ConnectionFactory getFactory() {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost(host);//127.0.0.1
factory.setUsername(account);//guest
factory.setPassword(password);//guest
factory.setPort(port);//5672
return factory;
}


然后写一个连接函数

private Channel channel;
public void connect() throws IOException {
//创建一个通道
try {
channel =  getFactory().newConnection().createChannel();
} catch (TimeoutException e) {
e.printStackTrace();
}
}


然后就是发送函数

public boolean sendData(String topicStr,String data,int type) {
return sendData(topicStr,data.getBytes(),type);
}
//topicStr 发送队列名称,data发送的数据,type是我业务需求,不用理会(这里我将type当做头信息进行传递)
public boolean sendData(String topicStr,byte[] data,int type) {
try {
if (channel==null){
connect();
}
// 声明一个队列,如果如果队列不存在,会创建
// 声明一个队列,主题名,是否持久化,是否排外(只能本次连接访问),自动删除,
channel.queueDeclare(topicStr, false, false, false, null);
/*************如果有请求参数的话,就建立一个Map**************/
HashMap<String, Object> map = new HashMap<>();
map.put("type",type);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties("application/octet-stream",
null,map,1,0, null, null, null,
null, null, null, null,
null, null);
/**********如果没有请求参数这里可以省略,同理吓一条语句的basicProperties位置写null***********/
channel.basicPublish("", topicStr,basicProperties ,data);
return true;
} catch (IOException e) {
return false;
}
}


然后就是监听队列消息的函数(消费者)

//topicStr表示要监听的队列
public void getMessage(String topicStr){
try {
if (channel == null) {
connect();
}
//声明要关注的队列
channel.queueDeclare(topicStr, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
messageService.receiveFormMiddleware(body);
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(topicStr, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}


这就是原生API的简单收发功能
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息