您的位置:首页 > 其它

RabbitMQ(二) - HelloWorld

2017-04-28 21:13 134 查看

Hello World

这部分的教程我们将写两个程序:一个生产者发送一条消息和一个消费者接收消息并打印出来。我们会略过
java api
的一些细节的地方,我们从非常简单的开始。一个
Hello World
的消息。

在下图总,
P
是我们的生产者,
C
是我们的消费者。中间的箱子是队列 —— 代表消费者的消息缓存区。



The Java client library

RabbitMQ支持多种协议。这个教程使用
AMQP 0-9-1
,开源的、通用的消息协议。有许多不同的语言实现RabbitMQ,此处我们用java来实现。

下载客户端库,它依赖
SLF4J API
SLF4J Simple
。拷贝这些文件到你的工作目录。

请注意:
SLF4J Simple
对这个教程来说已经足够,但是最好使用完全成熟的日志开发库,例如:logback。

RabbitMQ的java客户端也支持Maven,groupId是
com.rabbitmq
和artifactId是
amqp-client


<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
<scope>compile</scope>
</dependency>


现在我们有了java的客户端以及它的依赖,我们可以写一些代码了。

NOTE:官网是没有使用IDE的。这里我用的是idea,毕竟大家都是开发者,IDE会方便点。

发送(Sending)



我们将调用我们的消息生产者(发送者)
Send
和消息的消费者(接收者)
Recv
。生产者将连接到RabbitMQ,发送一条消息,然后退出。

Send.java
中,我们需要导入一些类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;


设置类和队列名:

public class Send {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv)
throws java.io.IOException {
...
}
}


然后我们创建一个服务连接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();


这个连接抽象了socket连接,维护版本协议和身份认证。这里我们用的是本地机器——因此是
localhost
。如果我们想在不同的机器连接到中间件,只需要指定不同的名字和IP地址即可。

接下来我们创建一个通道(channel)。

我们还需要定义一个队列供我们将消息推送给它。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");


定义一个队列是幂等的——它只有在队列不存在的时候创建。消息的内容是字节数组,所以你能编码成任何你想要的数据。

最后,我们要关闭通道(channel)和连接(conection);

channel.close();
connection.close();


下面是完整的类:Send.java

package com.roachfu.tutorial.rabbitmq.website.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 生产者
*/
public class Send {

private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
//指定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World";
// 向队列中发送一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");

// 关闭通道和连接
channel.close(); connection.close();

}
}


接收端(Receiving)

消费者从RabbitMQ中拉取消息,不同于生产者生产一条消息,我们会一直监听消息并答应它们。



代码(在Recv.java中)导入的和Send导入的差不多:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;


额外不同的
DefaultConsumer
是一个实现了
Consumer
接口的类,通过服务拉取缓存的消息。

设置也和publisher差不多,我们打开一个连接和通道并定义一个队列,需要注意的是队列名和
Send.java
中定义的一样。

public class Recv {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {

ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}


注意:在这里我们也要定义队列,因为我们有可能在生产者之前启动消费者,我们需要保证我们在消费消息的时候队列是存在的。

我们要告知服务从队列中将消息交付给我们。由于它将异步的推送消息给我们,我们提供一个回调对象缓存消息直到我们已经使用了它们。这就是
DefaultConsumer
子类所做的。

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);


以下是Recv.java完整代码

package com.roachfu.tutorial.rabbitmq.website.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者
*/
public class Recv {

private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. . . ");

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};

channel.basicConsume(QUEUE_NAME, true, consumer);
}
}


输出

生产者

[x] Sent ‘Hello World’

消费者

[*] Waiting for messages…

[x] Received ‘Hello World’
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq helloworld