您的位置:首页 > 其它

ActiveMq-Publisher/Subscribe入门例子

2015-06-14 15:06 323 查看
启动ActiveMq、添加依赖与之前完全相同,此处省略

1.发布端

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Publisher
{
private static Logger logger = LoggerFactory.getLogger(Publisher.class);

public static void main(String[] args)
{
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try
{
connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);

//注意这里与ptp例子的区别,使用null作为destination
producer = session.createProducer(null);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

sendMessage(session, producer);

session.commit();
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
if (null != connection)
connection.close();
}
catch (Throwable ignore)
{
}
}

}

private static void sendMessage(Session session, MessageProducer producer) throws JMSException
{
Destination destination = session.createTopic("pub/sub");
Message message = session.createMapMessage();
message.setStringProperty("company", "alibaba");
message.setStringProperty("department", "b2b");
logger.info("destination is {};message is ready to send", destination);

producer.send(destination, message);
}
}


2.接收端

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscribe
{
public static void main(String[] args)
{
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;

connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try
{
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);

destination = session.createTopic("pub/sub");
consumer = session.createConsumer(destination);

//设置指定的监听器
consumer.setMessageListener(new MyListener());

Thread.sleep(100*1000);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
if (null != connection)
connection.close();
}
catch (Throwable ignore)
{
}
}

}
}


3.监听器

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyListener implements MessageListener
{
private static Logger logger = LoggerFactory.getLogger(MyListener.class);

public void onMessage(Message message)
{
MapMessage mapMessage = (MapMessage) message;
try
{
String company = mapMessage.getStringProperty("company");
String department = mapMessage.getStringProperty("department");

logger.info("company is {} ; department is {}", company,department);
} catch (JMSException e)
{
e.printStackTrace();
}
}

}


4.测试

先启动接收端,再启动发布端,结果为:

[main] INFO com.mycompany.app.pub.sub.Publisher - destination is topic://pub/sub;message is ready to send


[ActiveMQ Session Task-1] INFO com.mycompany.app.pub.sub.MyListener - company is alibaba ; department is b2b
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: