您的位置:首页 > 编程语言 > Java开发

activeMQ队列模式和主题模式的Java实现

2017-07-23 17:23 309 查看
一、队列模式

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AppProducer {
public static final String url = "tcp://127.0.0.1:61616";
public static final String queueName = "queue-test";

public static void main(String[] args) throws JMSException{
//1. 创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);

//2. 创建Connection
Connection connection = connectionFactory.createConnection();

//3. 启动链接
connection.start();

//4. 创建会话
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);

//5. 创建一个目标
Destination destination = session.createQueue( queueName);

//6. 创建一个生产者
MessageProducer producer = session.createProducer( destination);

for( int i=0; i<100; i++){
//7. 创建消息
TextMessage textMessage = session.createTextMessage( "test" + i);
//8. 发布消息
producer.send( textMessage);

System.out.println( "发送消息" + textMessage.getText());
}

//9. 关闭链接
connection.close();
}

}


消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AppConsumer {
public static final String url = "tcp://127.0.0.1:61616";
public static final String queueName = "queue-test";

public static void main(String[] args) throws JMSException{
//1. 创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);

//2. 创建Connection
Connection connection = connectionFactory.createConnection();

//3. 启动链接
connection.start();

//4. 创建会话
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);

//5. 创建一个目标
Destination destination = session.createQueue( queueName);

//6. 创建一个消费者
MessageConsumer consumer = session.createConsumer( destination);

//7. 创建一个监听器
consumer.setMessageListener( new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = ( TextMessage) message;
try{
System.out.println( "0接收消息" + textMessage.getText());
}catch( JMSException e){
e.printStackTrace();
}

}
});

//8. 关闭链接
//connection.close();
}
}


二、主题模式

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AppProducer {
public static final String url = "tcp://127.0.0.1:61616";
public static final String topicName = "topic-test";

public static void main(String[] args) throws JMSException{
//1. 创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);

//2. 创建Connection
Connection connection = connectionFactory.createConnection();

//3. 启动链接
connection.start();

//4. 创建会话
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);

//5. 创建一个目标
Destination destination = session.createTopic( topicName);

//6. 创建一个生产者
MessageProducer producer = session.createProducer( destination);

for( int i=0; i<100; i++){
//7. 创建消息
TextMessage textMessage = session.createTextMessage( "test" + i);
//8. 发布消息
producer.send( textMessage);

System.out.println( "发送消息" + textMessage.getText());
}

//9. 关闭链接
connection.close();
}

}


消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AppConsumer {
public static final String url = "tcp://127.0.0.1:61616";
public static final String topicName = "topic-test";

public static void main(String[] args) throws JMSException{
//1. 创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);

//2. 创建Connection
Connection connection = connectionFactory.createConnection();

//3. 启动链接
connection.start();

//4. 创建会话
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);

//5. 创建一个目标
Destination destination = session.createTopic( topicName);

//6. 创建一个消费者
MessageConsumer consumer = session.createConsumer( destination);

//7. 创建一个监听器
consumer.setMessageListener( new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = ( TextMessage) message;
try{
System.out.println( "0接收消息" + textMessage.getText());
}catch( JMSException e){
e.printStackTrace();
}

}
});

//8. 关闭链接
//connection.close();
}
}


三、activeMQ的maven依赖

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: