RabbitMQ实践——生产者(传递消息体),消费者(获取消息体)
1.生产者
package net.stxy.one.mq;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂,并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.创建连接通道
Channel channel = connection.createChannel();
Map<String,Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)//2表示持久化的投递,发送消息时,服务器重启,消息依然存在。1表示不是持久化:发送消息时服务器出现bug或者重启,该消息已经消失
.contentEncoding("UTF-8")
.expiration("10000")//过期时间
.headers(headers)//自定义属性
.build();
//4.发送数据,exchange,routingkey,props:消息附加属性,body:消息实体
for (int i = 0; i < 5; i++) {
String msg = "lulu张雪喜欢你";
channel.basicPublish("", "lulu", properties, msg.getBytes());
System.out.println(msg);
}
//5.关闭连接:由小到大
channel.close();
connection.close();
}
}
2.消费者
package net.stxy.one.mq;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂,并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.创建连接通道
Channel channel = connection.createChannel();
//4.生命一个队列 1. 队列称,2.持久化,3.独占:只只有改通道能监听,4.脱离exchange会自动删除5.扩展参数
String queueName = "lulu";
channel.queueDeclare(queueName, true, false, false, null);
//5.创建一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6.设置channel 1.队列名称,2.自动签收 3.消费者对象
channel.basicConsume(queueName, true, queueingConsumer);
//7.获取消息 会一直阻塞。可以设置时间通过
while (true) {
System.out.println("开始消费数据");
Delivery delivery = queueingConsumer.nextDelivery();//没有设置时间一直阻塞。
String msg = new String(delivery.getBody());//消息实体body
Map<String,Object> headers = delivery.getProperties().getHeaders();
System.out.println(headers.get("my1"));
System.out.println(headers.get("my2"));
//Envelope envelope = delivery.getEnvelope();//消息属性
}
}
}
- rabbitMQ——生产者(头部消息),消费者(获取头部消息)
- RabbitMQ 消息发送和消息获取 之 rabbitMQ消息生产者和消费者
- rabbitMQ——生产者(Topic主题连接),消费者(Topic主题连接)
- Linux多线程实践(8) --Posix条件变量解决生产者消费者问题
- rabbitMQ 生产者消费者
- php rabbitmq操作类及生产者和消费者实例代码
- RabbitMQ 生产者、消费者基本流程
- Linux多线程实践(五 )Posix信号量和互斥锁解决生产者消费者问题
- RabbitMQ消息通信,生产者发送消息给指定的消费者的消息队列
- rabbitMQ——生产者(Fanout分发模式连接连接),消费者(Fanout分发模式连接连接)
- Linux多线程实践(5) --Posix信号量与互斥量解决生产者消费者问题
- 【RabbitMQ】生产者,消费者,信道,队列,交换器和绑定
- RabbitMQ与spring集成,配置完整的生产者和消费者
- 1.【RabbitMQ】生产者,消费者,信道,队列,交换器和绑定
- RabbitMQ发送端接收端生产者消费者实例
- Linux多线程实践(5) --Posix信号量与互斥量解决生产者消费者问题
- RabbitMQ-理解消息通信-消费者和生产者
- kafka的javaapi生产者生产消息,消费者获取不到
- springboot-整合rabbitmq-生产者-消费者
- RabbitMQ消息队列生产者和消费者