您的位置:首页 > 其它

ActiveMQ使用线程池实现消息的生产与消费

2017-09-29 14:33 465 查看
1。 首先先引入相关的lib包,重点需引用activemq-client-5.8.0.jar,activemq-core-5.7.0.jar,activemq-pool-5.8.0.jar,activemq-protobuf-1.1.jar等包,其他包

自行配置。

2。 一些公共工具类的代码:

JMSProducer.java

[java] view
plain copy

package com.ffcs.icity.jms;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.MessageProducer;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.pool.PooledConnectionFactory;

/**

* JMS消息生产者

* @author linwei

*

*/

public class JMSProducer implements ExceptionListener{

//设置连接的最大连接数

public final static int DEFAULT_MAX_CONNECTIONS=5;

private int maxConnections = DEFAULT_MAX_CONNECTIONS;

//设置每个连接中使用的最大活动会话数

private int maximumActiveSessionPerConnection = DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION;

public final static int DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION=300;

//线程池数量

private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;

public final static int DEFAULT_THREAD_POOL_SIZE=50;

//强制使用同步返回数据的格式

private boolean useAsyncSendForJMS = DEFAULT_USE_ASYNC_SEND_FOR_JMS;

public final static boolean DEFAULT_USE_ASYNC_SEND_FOR_JMS=true;

//是否持久化消息

private boolean isPersistent = DEFAULT_IS_PERSISTENT;

public final static boolean DEFAULT_IS_PERSISTENT=true;

//连接地址

private String brokerUrl;

private String userName;

private String password;

private ExecutorService threadPool;

private PooledConnectionFactory connectionFactory;

public JMSProducer(String brokerUrl, String userName, String password) {

this(brokerUrl, userName, password, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION, DEFAULT_THREAD_POOL_SIZE, DEFAULT_USE_ASYNC_SEND_FOR_JMS, DEFAULT_IS_PERSISTENT);

}

public JMSProducer(String brokerUrl, String userName, String password, int maxConnections, int maximumActiveSessionPerConnection, int threadPoolSize,boolean useAsyncSendForJMS, boolean isPersistent) {

this.useAsyncSendForJMS = useAsyncSendForJMS;

this.isPersistent = isPersistent;

this.brokerUrl = brokerUrl;

this.userName = userName;

this.password = password;

this.maxConnections = maxConnections;

this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;

this.threadPoolSize = threadPoolSize;

init();

}

private void init() {

//设置JAVA线程池

this.threadPool = Executors.newFixedThreadPool(this.threadPoolSize);

//ActiveMQ的连接工厂

ActiveMQConnectionFactory actualConnectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);

actualConnectionFactory.setUseAsyncSend(this.useAsyncSendForJMS);

//Active中的连接池工厂

this.connectionFactory = new PooledConnectionFactory(actualConnectionFactory);

this.connectionFactory.setCreateConnectionOnStartup(true);

this.connectionFactory.setMaxConnections(this.maxConnections);

this.connectionFactory.setMaximumActiveSessionPerConnection(this.maximumActiveSessionPerConnection);

}

/**

* 执行发送消息的具体方法

* @param queue

* @param map

*/

public void send(final String queue, final Map<String, Object> map) {

//直接使用线程池来执行具体的调用

this.threadPool.execute(new Runnable(){

@Override

public void run() {

try {

sendMsg(queue,map);

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

/**

* 真正的执行消息发送

* @param queue

* @param map

* @throws Exception

*/

private void sendMsg(String queue, Map<String, Object> map) throws Exception {

Connection connection = null;

Session session = null;

try {

//从连接池工厂中获取一个连接

connection = this.connectionFactory.createConnection();

/*createSession(boolean transacted,int acknowledgeMode)

transacted - indicates whether the session is transacted acknowledgeMode - indicates whether the consumer or the client

will acknowledge any messages it receives; ignored if the session is transacted.

Legal values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE.

*/

//false 参数表示 为非事务型消息,后面的参数表示消息的确认类型

session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

//Destination is superinterface of Queue

//PTP消息方式

Destination destination = session.createQueue(queue);

//Creates a MessageProducer to send messages to the specified destination

MessageProducer producer = session.createProducer(destination);

//set delevery mode

producer.setDeliveryMode(this.isPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

//map convert to javax message

Message message = getMessage(session, map);

producer.send(message);

} finally {

closeSession(session);

closeConnection(connection);

}

}

private Message getMessage(Session session, Map<String, Object> map) throws JMSException {

MapMessage message = session.createMapMessage();

if (map != null && !map.isEmpty()) {

Set<String> keys = map.keySet();

for (String key : keys) {

message.setObject(key, map.get(key));

}

}

return message;

}

private void closeSession(Session session) {

try {

if (session != null) {

session.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

private void closeConnection(Connection connection) {

try {

if (connection != null) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

@Override

public void onException(JMSException e) {

e.printStackTrace();

}

}

JMSConsumer.java

[java] view
plain copy

package com.ffcs.icity.jms;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.ActiveMQPrefetchPolicy;

/**

* JMS消息消费者

* @author linwei

*

*/

public class JMSConsumer implements ExceptionListener {

//队列预取策略

private int queuePrefetch=DEFAULT_QUEUE_PREFETCH;

public final static int DEFAULT_QUEUE_PREFETCH=10;

private String brokerUrl;

private String userName;

private String password;

private MessageListener messageListener;

private Connection connection;

private Session session;

//队列名

private String queue;

/**

* 执行消息获取的操作

* @throws Exception

*/

public void start() throws Exception {

//ActiveMQ的连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);

connection = connectionFactory.createConnection();

//activeMQ预取策略

ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();

prefetchPolicy.setQueuePrefetch(queuePrefetch);

((ActiveMQConnection) connection).setPrefetchPolicy(prefetchPolicy);

connection.setExceptionListener(this);

connection.start();

//会话采用非事务级别,消息到达机制使用自动通知机制

session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue(this.queue);

MessageConsumer consumer = session.createConsumer(destination);

consumer.setMessageListener(this.messageListener);

}

/**

* 关闭连接

*/

public void shutdown(){

try {

if (session != null) {

session.close();

session=null;

}

if (connection != null) {

connection.close();

connection=null;

}

} catch (Exception e) {

e.printStackTrace();

}

}

@Override

public void onException(JMSException e) {

e.printStackTrace();

}

public String getBrokerUrl() {

return brokerUrl;

}

public void setBrokerUrl(String brokerUrl) {

this.brokerUrl = brokerUrl;

}

public String getUserName() {

return userName;

}

public void setUserName(String userName) {

this.userName = userName;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

public String getQueue() {

return queue;

}

public void setQueue(String queue) {

this.queue = queue;

}

public MessageListener getMessageListener() {

return messageListener;

}

public void setMessageListener(MessageListener messageListener) {

this.messageListener = messageListener;

}

public int getQueuePrefetch() {

return queuePrefetch;

}

public void setQueuePrefetch(int queuePrefetch) {

this.queuePrefetch = queuePrefetch;

}

}

MessageHandler.java

[java] view
plain copy

package com.ffcs.icity.jms;

import javax.jms.Message;

/**

* 提供消息操作的回调接口

* @author linwei

*

*/

public interface MessageHandler {

/**

* 消息回调提供的调用方法

* @param message

*/

public void handle(Message message);

}

MultiThreadMessageListener.java

[java] view
plain copy

package com.ffcs.icity.jms;

import java.util.concurrent.ExecutorService;

import javax.jms.Message;

import javax.jms.MessageListener;

/**

* 消息消费者中使用的多线程消息监听服务

* @author linwei

*

*/

public class MultiThreadMessageListener implements MessageListener {

//默认线程池数量

public final static int DEFAULT_HANDLE_THREAD_POOL=10;

//最大的处理线程数.

private int maxHandleThreads;

//提供消息回调调用接口

private MessageHandler messageHandler;

private ExecutorService handleThreadPool;

public MultiThreadMessageListener(MessageHandler messageHandler){

this(DEFAULT_HANDLE_THREAD_POOL, messageHandler);

}

public MultiThreadMessageListener(int maxHandleThreads,MessageHandler messageHandler){

this.maxHandleThreads=maxHandleThreads;

this.messageHandler=messageHandler;

//支持阻塞的固定大小的线程池(自行手动创建的)

this.handleThreadPool = new FixedAndBlockedThreadPoolExecutor(this.maxHandleThreads);

}

/**

* 监听程序中自动调用的方法

*/

@Override

public void onMessage(final Message message) {

//使用支持阻塞的固定大小的线程池来执行操作

this.handleThreadPool.execute(new Runnable() {

public void run() {

try {

MultiThreadMessageListener.this.messageHandler.handle(message);

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

FixedAndBlockedThreadPoolExecutor.java

[java] view
plain copy

package com.ffcs.icity.jms;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;

/**

* 支持阻塞的固定大小的线程池

* @author linwei

*

*/

public class FixedAndBlockedThreadPoolExecutor extends ThreadPoolExecutor {

//一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。

//使用 lock 块来调用 try,在之前/之后的构造中

private ReentrantLock lock = new ReentrantLock();

private Condition condition = this.lock.newCondition();

public FixedAndBlockedThreadPoolExecutor(int size) {

super(size, size, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

}

/**

* 当线程池中没有空闲线程时,会挂起此方法的调用线程.直到线程池中有线程有空闲线程.

*/

@Override

public void execute(Runnable command) {

//进行同步锁定

this.lock.lock();

super.execute(command);

try {

//如果线程池的数量已经达到最大线程池的数量,则进行挂起操作

if (getPoolSize() == getMaximumPoolSize()) {

this.condition.await();

}

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

this.lock.unlock();

}

}

@Override

protected void afterExecute(Runnable r, Throwable t) {

super.afterExecute(r, t);

try {

this.lock.lock();

this.condition.signal();

} finally {

this.lock.unlock();

}

}

}

3. 调用例子说明:

生产者调用代码,JMSProducerTest.java

[java] view
plain copy

package com.ffcs.icity.test;

import java.util.HashMap;

import java.util.Map;

import com.ffcs.icity.jms.JMSProducer;

public class JMSProducerTest {

public static void main(String[] args) {

locationTest();

System.out.println("over.");

}

private static void locationTest() {

//** JMSProducer 可以设置成全局的静态变量,只需实例化一次即可使用,禁止循环重复实例化JMSProducer(因为其内部存在一个线程池)

//支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为tcp://localhost:61613。

//tcp和nio的区别

//nio://localhost:61617 以及 tcp://localhost:61616均可在 activemq.xml配置文件中进行配置

JMSProducer producer = new JMSProducer("nio://localhost:61617", "system", "manager");

Map<String, Object> map = new HashMap<String, Object>();

map.put("id", "1");

map.put("name", "sss1113333");

map.put("password", "password");

producer.send("test", map);

}

}

消费者调用代码,JMSConsumerTest.java

[java] view
plain copy

package com.ffcs.icity.test;

import javax.jms.MapMessage;

import javax.jms.Message;

import com.ffcs.icity.jms.JMSConsumer;

import com.ffcs.icity.jms.MessageHandler;

import com.ffcs.icity.jms.MultiThreadMessageListener;

public class JMSConsumerTest {

public static void main(String[] args) throws Exception {

//** JMSConsumer 可以设置成全局的静态变量,只需实例化一次即可使用,禁止循环重复实例化JMSConsumer(因为其内部存在一个线程池)

JMSConsumer consumer = new JMSConsumer();

consumer.setBrokerUrl("tcp://localhost:61616");

consumer.setQueue("test");

consumer.setUserName("system");

consumer.setPassword("manager");

consumer.setQueuePrefetch(500);

consumer.setMessageListener(new MultiThreadMessageListener(50,new MessageHandler() {

public void handle(Message message) {

try {

System.out.println("name is " + ((MapMessage)message).getString("name"));

Thread.sleep(5000);

} catch (Exception e) {

e.printStackTrace();

}

}

}));

consumer.start();

// Thread.sleep(5000);

// consumer.shutdown();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: