您的位置:首页 > 编程语言 > Java开发

通过Java操作ActiveMQ的代码记录

2015-10-28 00:00 309 查看
ActiveMQ的数据发送类

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;

public class ActiveMQSend {

private final String ip;
private final String port;
private PooledConnection pooledConnection;

/**
* 构造方法(传入需要连接的IP和端口)
*
* @param ip (AvctiveMQ的服务IP)
* @param port (ActiveMQ的服务端口)
*/
public ActiveMQSend(String ip, String port) {
this.ip = ip;
this.port = port;
this.init();
}

/**
* 初始化ActiveMQ的连接池
*/
private void init() {
try {
String[] ips = this.ip.split(",");
String[] ports = this.port.split(",");
StringBuilder tcpLink = new StringBuilder();
for (int i = 0; i < ips.length; i++) {
tcpLink.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");
}
String mqLink = tcpLink.toString();
if (tcpLink.length() > 0) {
if (',' == tcpLink.charAt(tcpLink.length() - 1)) {
mqLink = tcpLink.substring(0, tcpLink.length() - 1);
}
}

String url = String.format("failover:(%s)?initialReconnectDelay=1000&timeout=3000&startupMaxReconnectAttempts=2", mqLink);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
factory.setMaxThreadPoolSize(50);
PooledConnectionFactory poolFactory = new PooledConnectionFactory(factory);
pooledConnection = (PooledConnection) poolFactory.createConnection();
pooledConnection.start();
} catch (Exception ex) {
&n
3ff8
bsp;        LogUtil.error(ex);
this.destroy();
}
}

/**
* 向ActiveMQ中发送数据
*
* @param needSendMsg 需要发送的数据信息
* @param sendMQName 需要发送到的队列名称
*/
public void send(String needSendMsg, String sendMQName) {
if (this.pooledConnection == null) {
this.init();
}

if (this.pooledConnection != null) {
try {
Session session = this.pooledConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(sendMQName);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(needSendMsg);
producer.send(message);
session.close();
} catch (Exception ex) {
LogUtil.error(ex);
this.destroy();
}
}
}

/**
* 回收连接池
*/
public void destroy() {
try {
if (pooledConnection != null) {
pooledConnection.stop();
}
} catch (Exception e) {
LogUtil.error(e);
}
try {
if (pooledConnection != null) {
pooledConnection.close();
}
} catch (Exception e) {
LogUtil.error(e);
}
pooledConnection = null;
}
}


调用发送

public static void main(String[] args) {

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
ActiveMQSend sen = new ActiveMQSend("127.0.0.1", "61616");
for (int i = 0; i < 50000; i++) {
String msg = String.format("这是TestQueue 2 第 %s 条发送的数据!", i);
sen.send(msg, "TestQueue2");
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
ActiveMQSend sen = new ActiveMQSend("127.0.0.1", "61616");
for (int i = 0; i < 50000; i++) {
String msg = String.format("这是TestQueue 1 第 %s 条发送的数据!", i);
sen.send(msg, "TestQueue1");
}
}
});

t1.start();
t2.start();
}


ActiveMQ数据接收类

import javax.jms.MessageConsumer;
import javax.jms.Destination;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;

public class Receiver {

private PooledConnection pooledConnection;

private void init() {
try {
String url = "failover:(tcp://192.168.10.219:61616)?initialReconnectDelay=1000&timeout=3000&startupMaxReconnectAttempts=2";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
factory.setMaxThreadPoolSize(100);
PooledConnectionFactory poolFactory = new PooledConnectionFactory(factory);
pooledConnection = (PooledConnection) poolFactory.createConnection();
pooledConnection.start();
} catch (Exception ex) {
Log.error(ex);
this.destroy();
}
}

public void receive(String queueName) {
if (this.pooledConnection == null) {
this.init();
}
if (this.pooledConnection != null) {
try {
Session session = this.pooledConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(10);
if (null != message) {
System.out.println("收到消息" + message.getText());
}
}
} catch (Exception ex) {
Log.info(ex.getMessage());
this.destroy();
}
}
}

public void destroy() {
try {
if (pooledConnection != null) {
pooledConnection.stop();
}
} catch (Exception e) {
Log.error(e);
}
try {
if (pooledConnection != null) {
pooledConnection.close();
}
} catch (Exception e) {
Log.error(e);
}
pooledConnection = null;
}
}


数据接收类的调用

public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
Receiver r1 = new Receiver();
r1.receive("TestQueue2");
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
Receiver r2 = new Receiver();
r2.receive("TestQueue1");
}
});
t1.start();
t2.start();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java ActiveMQ