您的位置:首页 > 运维架构

RabbitMQ3.7.2入门到进阶之Topic主题模型

2018-02-01 17:15 453 查看
此套免费课程视频,本人录播到了腾讯课堂

更多请关注腾讯课堂牧码人或 登录网站http://www.51mmr.net

https://ke.qq.com/course/288116#tuin=5740604a

主题模式



生产者

public class Send {

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();

// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 消息内容
String message = "id=1001";
channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
}


消费者1

public class Recv {

private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}


消费者2

public class Recv {

private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: