您的位置:首页 > 编程语言 > PHP开发

20181104-消息中间件(二)-ActiveMQ PTP处理模式

2018-11-04 16:22 246 查看

一、生产者将消息发送到队列中,消费者在队列中获取消息。
1.第一种方式,主要通过consumer的receive()方法从队列中获取消息,该方法没执行一次,便从队列中获取
一条消息,开发中少用。
编写两个类,消费者类和生产者类,代码如下
生产者:

package cn.joe.first;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 生产者生产消息到队列中
*
* @author joe
*
*/
public class TestProducer {
/**
* 发送消息到activeMQ jms相关的API一般在javax.jms包下
*/
public void sendTextMessage(String datas) {
// 连接工厂
ConnectionFactory factory = null;
// 连接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息生产者(消息发送者)
MessageProducer producer = null;
// 消息对象
Message message = null;

try {
// 创建连接工厂
// 参数分别为用户名、密码、连接地址
factory = new ActiveMQConnectionFactory("user", "user", "tcp://192.168.48.3:61616");
// 通过工厂创建连接对象
connection = factory.createConnection();
// 建议启动连接,消息的发送者不是必须启动连接,但是消息的消费者必须启动连接.
// ps:原因为消息的生产者在发送消息是会检查是否启动了连接若为启动连接则进行自动启动连接。
connection.start();

// 根据连接对象,创建会话对象,必须绑定目的地。
/**
* 创建会话的时候,必须传递两个参数,分别是代表是否支持事务和如何确认消息处理。 transacted--是否支持事务,数据类型是boolean,true -
* 支持,false - 不支持 true - 支持事务,第二个参数默认无效,建议传递的数据是Session.SESSION_TRANSACTED false
* - 不支持事务,常用参数,第二个参数必须传递,且必须有效。
*
* acknowledgeMode - 如何确认消息的处理,使用确认机制实现的。 AUTO_ACKNOWLEDGE -
* 自动确认消息,消息的消费者处理消息后,自动确认,常用 CLIENT_ACKNOWLEDGE - 客户端手动确认,消息的消费者处理后,必须手动确认。
* DUPS_OK_ACKNOWLEDGE - 有副本的客户端手动确认 一个消息可以多次处理。
* 可以降低Session的消耗,在可以容忍重复消息时使用。(不推荐使用)
*/
session = connection.createSession(false, session.CLIENT_ACKNOWLEDGE);

//创建队列并绑定地址
destination = session.createQueue("first-queue");

//通过会话对象创建消息的生产者,并指定目的地
producer = session.createProducer(destination);

//创建文本消息对象
message = session.createTextMessage(datas);

//使用生产者发送消息
producer.send(message);

System.out.println("消息已经发送!");
} catch (Exception e) {
e.printStackTrace();
}finally {
/*回收消息对象*/
}
}

public static void main(String[] args) {
TestProducer test = new TestProducer();
test.sendTextMessage("Hello Word!");
}
}

消费者

package cn.joe.first;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息消费者
*
* @author joe
*
*/
public class TestConsumer {

public String receiveTextMessage() {
// 连接工厂
ConnectionFactory factory = null;
// 连接
Connection connection = null;
// 会话
Session session = null;
//目的地
Destination destination = null;
//消费者
MessageConsumer consumer = null;
//消息
String resultText = null;

try {
//创建连接共产
factory = new ActiveMQConnectionFactory("user","user","tcp://192.168.48.3:61616");

//创建连接
//开启连接消费者必须开启连接
connection = factory.createConnection();
connection.start();

//创建会话
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地
destination = session.createQueue("first-queue");

//创建生产者
consumer = session.createConsumer(destination);

//从目的地(队列)中获取消息,执行一次receive则从队列中获取一条消息,获取完毕后消息删除
Message message = consumer.receive();
resultText = ((TextMessage)message).getText();
} catch (Exception e) {
// TODO: handle exception
}

return resultText;
}

public static void main(String[] args) {
TestConsumer consumer = new TestConsumer();
String result = consumer.receiveTextMessage();
System.out.println(result);
}
}

2.第二种方式
消费者通过监听器监听队列消息的变化,当队列消息变化时,获取消息消费。
消费者:

package cn.joe.two;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息消费者
*
* @author joe
*
*/
public class TestListenerConsumer {

public String receiveTextMessage() {
// 连接工厂
ConnectionFactory factory = null;
// 连接
Connection connection = null;
// 会话
Session session = null;
//目的地
Destination destination = null;
//消费者
MessageConsumer consumer = null;
//消息
String resultText = null;

try {
//创建连接共产
factory = new ActiveMQConnectionFactory("user","user","tcp://192.168.48.3:61616");

//创建连接
//开启连接消费者必须开启连接
connection = factory.createConnection();
connection.start();

//创建会话
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地
destination = session.createQueue("listener-queue");

//创建生产者
consumer = session.createConsumer(destination);

//消费者注册监听器,当队列中的消息变化,自动获取队列中的消息并处理
consumer.setMessageListener(new MessageListener() {
/**
* 监听器一旦注册,永久有效。
* 永久 - consumer线程不关闭,则监听器会一直监听。
* 处理消息的方式:只要有消息未处理,自动调用onMessage方法,处理消息。
* 监听器可以注册若干,注册多个监听器,相当于集群。
* ActiveMQ自动循环调用多个监听器,处理队列中的消息,实现并行处理。
*
* 处理消息的方法,就是监听方法。
* 监听的事件是L:消息,消息未处理。
* 要处理的具体内容:消息处理。
* @param message - 未处理的消息
*/
@Override
public void onMessage(Message message) {
try {
//acknowledge消费者确认已经收到消息,MQ删除对应的消息
message.acknowledge();

ObjectMessage om = (ObjectMessage) message;
Object result = om.getObject();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
});

System.in.read();//阻塞当前线程,使监听器不停止
} catch (Exception e) {
// TODO: handle exception
}

return resultText;
}

public static void main(String[] args) {
TestListenerConsumer consumer = new TestListenerConsumer();
consumer.receiveTextMessage();
}
}

生产者

package cn.joe.two;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 生产者生产消息到队列中
*
* @author joe
*
*/
public class TestObjectProducer {
/**
* 发送消息到activeMQ jms相关的API一般在javax.jms包下
*/
public void sendTextMessage(String datas) {
// 连接工厂
ConnectionFactory factory = null;
// 连接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息生产者(消息发送者)
MessageProducer producer = null;
// 消息对象
Message message = null;

try {
// 创建连接工厂
// 参数分别为用户名、密码、连接地址
factory = new ActiveMQConnectionFactory("user", "user", "tcp://192.168.48.3:61616");
// 通过工厂创建连接对象
connection = factory.createConnection();
// 建议启动连接,消息的发送者不是必须启动连接,但是消息的消费者必须启动连接.
// ps:原因为消息的生产者在发送消息是会检查是否启动了连接若为启动连接则进行自动启动连接。
connection.start();

// 根据连接对象,创建会话对象,必须绑定目的地。
/**
* 创建会话的时候,必须传递两个参数,分别是代表是否支持事务和如何确认消息处理。 transacted--是否支持事务,数据类型是boolean,true -
* 支持,false - 不支持 true - 支持事务,第二个参数默认无效,建议传递的数据是Session.SESSION_TRANSACTED false
* - 不支持事务,常用参数,第二个参数必须传递,且必须有效。
*
* acknowledgeMode - 如何确认消息的处理,使用确认机制实现的。 AUTO_ACKNOWLEDGE -
* 自动确认消息,消息的消费者处理消息后,自动确认,常用 CLIENT_ACKNOWLEDGE - 客户端手动确认,消息的消费者处理后,必须手动确认。
* DUPS_OK_ACKNOWLEDGE - 有副本的客户端手动确认 一个消息可以多次处理。
* 可以降低Session的消耗,在可以容忍重复消息时使用。(不推荐使用)
*/
session = connection.createSession(false, session.CLIENT_ACKNOWLEDGE);

//创建队列并绑定地址
destination = session.createQueue("listener-queue");

//通过会话对象创建消息的生产者,并指定目的地
producer = session.createProducer(destination);

for(int i=0;i<100;i++) {
ObjectMessage objectMessage = session.createObjectMessage(i);

//使用生产者发送消息
producer.send(objectMessage);
}

System.out.println("消息已经发送!");
} catch (Exception e) {
e.printStackTrace();
}finally {
/*回收消息对象*/
}
}

public static void main(String[] args) {
TestObjectProducer producer = new TestObjectProducer();
producer.sendTextMessage(null);
}
}
阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: