您的位置:首页 > 其它

rabbitmq用法--直接发送消息给队列

2017-12-17 13:56 453 查看
rabbitmq 术语:ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息接收消息等。Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。第一种: 直接发送消息给队列直接发送消息给队列不需要指定exchange ,默认为空。直接发送消息给队列之前需要先创建此队列。代码如下:
private static final String HOST = "XXXX";

private static final String USERNAME = "xxx";
private static final String PASSWORD = "xxx";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 一定先创建队列,否则系统不会报错,但是信息被丢弃了。
channel.queueDeclare("WSX", false,false,false,null);
channel.basicPublish("","WSX",null,"wsx".getBytes());
channel.close();
connection.close();
}
消费者consumer代码:这里切记,接收到消息之后要调用channel.basicAck 返回给服务器已经接收到消息,否则此消息在队列中不会删除
public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(HOST);connectionFactory.setUsername(USERNAME);connectionFactory.setPassword(PASSWORD);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("WSX", false,false,false,null);channel.basicConsume("WSX",new ConsumerImpl(channel));}
public class ConsumerImpl implements Consumer{private Channel channel;public ConsumerImpl(Channel channel){this.channel = channel;}public void handleConsumeOk(String consumerTag) {System.out.println("isOk");}public void handleCancelOk(String consumerTag) {}public void handleCancel(String consumerTag) throws IOException {}public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {}public void handleRecoverOk(String consumerTag) {System.out.println("recoverOk");}public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));System.out.println(envelope.getDeliveryTag());channel.basicAck(envelope.getDeliveryTag(),false);}}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: