您的位置:首页 > 编程语言 > Java开发

java代码主动消费消息ActiveMQ消息{StringMessage}

2016-06-03 10:37 351 查看
直接上代码:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

public class ReceiveJMS {
public static void main(String[] args) {
try {
ConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("distort_newtask_queue");
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if(null != message){
System.out.println("收到消息: " + message.getText());
}else {
break;
}
}
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendJMS {

public static void main(String[] args) {
try {
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue_1");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);

session.commit();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}

}


----------------------------------------------
在执行任务中添加了删除功能,需要清理下MQ中被删除的任务。发现不可取,于是只能主动消费不需要你的MQ消息(如果看到的人有更好的方法希望指点)

之前使用SpringMVC结合ACTIVEMQ,所以在主动消费消息时候想继续使用JmsTemplate。

问题1、  JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:

AUTO_ACKNOWLEDGE = 1    自动确认
CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
SESSION_TRANSACTED = 0    事务提交并确认

但是我坑爹的session中没有INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认

   jmsTemplate.setSessionAcknowledgeMode(4);

问题2、其中有两个参数RECEIVE_TIMEOUT_NO_WAIT = -1 || RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0

默认使用RECEIVE_TIMEOUT_INDEFINITE_WAIT既在调用receive()没有消息会一直阻塞,而使用RECEIVE_TIMEOUT_NO_WAIT发现有些消息无法消费

所以在设置setReceiveTimeout() 直接设置数值(毫秒)。

问题3:未解决

        jmsTemplate.setSessionAcknowledgeMode(4);

        jmsTemplate.setReceiveTimeout(500);

好!!问题1、2都解决了,出现了新问题,这边
cbaf
设置消费者A主动接收消息,这样真正的消费端又接收不了消息,全被堵在消费者A这边了。

由于JMS的链接使用Spring控制的连接池获取连接,所以臣妾关不了啊~

所以只有换成纯java不结合Spring来主动消费消息,直接上代码:

package com.eversec.satanbox.util;

import java.util.List;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

import com.eversec.smvc.modules.utils.PropertiesUtils;

/**
* <p>
* 说明: ConsumeMessage 约定message以#做分割,第一位第二位分别是任务ID、任务详情ID
* </p>
* <p>
* 类名: ConsumeMessage
* </p>
* <p>
* 公司: www.eversec.com.cn
* </p>
* <p>
* 修改时间: 2016年4月13日 下午5:02:40
* </p>
*/
@Component
public class ConsumeMessage {
private static Logger logger = Logger.getLogger(ConsumeMessage.class);

public static final String BROKER_URL = PropertiesUtils.getProperty("jms/jms-config", "jms.broker_url");
public static final String DISTORTNEWTASKQUEUE = PropertiesUtils.getProperty("jms/jms-config", "distortNewTaskQueue");
public static final String AHNEWTASKQUEUE = PropertiesUtils.getProperty("jms/jms-config", "ahNewTaskQueue");
public static final String FINGERPRINTNEWTASKQUEUE = PropertiesUtils.getProperty("jms/jms-config", "fingerprintNewTaskQueue");
public static final String HEALTHNEWTASKQUEUE = PropertiesUtils.getProperty("jms/jms-config", "healthNewTaskQueue");
public static final String NESSUSNEWTASKQUEUE = PropertiesUtils.getProperty("jms/jms-config", "nessusNewTaskQueue");
public static final String WVSNEWTASKQUEUE = PropertiesUtils.getProperty("jms/jms-config", "wvsNewTaskQueue");
public static final String XDOWNSNEWTASKQUEUE = PropertiesUtils.getProperty("jms/jms-config", "xdownsNewTaskQueue");

/**
* @描述: 根据任务详情ID 移除指定消息
* @说明: 将activeMQ中的相关消息去除
* @修改时间: 2016年4月13日 下午4:56:13
* @param queueName
* 队列名
* @param taskDetaikIdList
* 任务详情ID
*/
public void reveiveMessage(String queueName, List<Long> taskDetaikIdList) {

Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
connection = new ActiveMQConnectionFactory(BROKER_URL).createConnection();
connection.start();
session = connection.createSession(false, 4);
Destination destination = session.createQueue(queueName);
consumer = session.createConsumer(destination);
while (true) {
TextMessage message;
message = (TextMessage) consumer.receive(100);
if (message != null) {
try {
String messageStr = message.getText();
String[] messageArr = messageStr.split("#");
Long taskId = Long.parseLong(messageArr[1]);
if (taskDetaikIdList.contains(taskId)) {
System.out.println("消费消息:" + messageStr);
message.acknowledge();
}
} catch (JMSException e) {
logger.info("处理jms消息时异常", e);
} catch (Exception e) {
logger.info("消耗消息时异常", e);
}
} else {
break;
}

}
} catch (JMSException e1) {
logger.info("获取连接或接收消息报错", e1);
}

closeConnection(consumer, session, connection);
}

/**
* @描述: 根据任务ID列表 移除指定消息
* @说明:
* @修改时间: 2016年5月16日 上午11:27:34
* @param queueName
* @param taskIdList
*/
public void reveiveMessageById(String queueName, List<Long> taskIdList) {
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
connection = new ActiveMQConnectionFactory(BROKER_URL).createConnection();
connection.start();
session = connection.createSession(false, 4);
Destination destination = session.createQueue(queueName);
consumer = session.createConsumer(destination);
while (true) {
TextMessage message;
message = (TextMessage) consumer.receive(100);
if (message != null) {
try {
String messageStr = message.getText();
String[] messageArr = messageStr.split("#");
Long taskId = Long.parseLong(messageArr[0]);
if (taskIdList.contains(taskId)) {
System.out.println("消费消息:" + messageStr);
message.acknowledge();
}
} catch (JMSException e) {
logger.info("处理jms消息时异常", e);
} catch (Exception e) {
logger.info("消耗消息时异常", e);
}
} else {
break;
}

}
} catch (JMSException e1) {
logger.info("获取连接或接收消息报错", e1);
}

closeConnection(consumer, session, connection);
}

/**
* 调用关闭session和connection
*
* @param consumer
* @param session
* @param connection
*/
private void closeConnection(MessageConsumer consumer, Session session, Connection connection) {

try {
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (session != null)
connection.close();
} catch (JMSException e) {
logger.error("JMS关闭consumer/session/connection时出错", e);
}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: