您的位置:首页 > 其它

分布式服务框架学习笔记8 ActiveMQ入门使用

2017-03-10 08:35 906 查看

测试环境

windows+Eclipse

下载

http://activemq.apache.org/download-archives.html

下载后 打开c:/tools/apache-activemq-5.9.0/bin/activemq.bat启动服务

添加依赖

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.1</version>
</dependency>


Persistence持久化存储

AMQ Message Store

ActiveMQ 5.0 的缺省持久化存储方式。

Kaha Persistence

这是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。

JDBC Persistence

数据库方式:Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。

Disable Persistence

不应用持久化存储。

方法解析

生产者创建过程

Created with Raphaël 2.1.0创建生产者生产消息发送给消息队列

消费者消费消息

Created with Raphaël 2.1.0创建消费者监听消息队列获得消息

代码

App.java

package com.whrsmart.ActiveMQ;

import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnection;

/**
*
*/
public class App
{
public static void main( String[] args )
{
String username = "admin1";
String password = "admin1";
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// 创建生产者
Producer producer;
try {
producer = new Producer(url, username, password);
// 生产者产生一条消息
producer.sendMessage("Hello World");

// 创建消费者
Consumer consumer = new Consumer(url, username, password);
// 消费者读取一条消息
Object msg = consumer.receive();

// 输出消息内容
System.out.println(msg);

producer.close();
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}


Consumer.java

package com.whrsmart.ActiveMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消费者
*
*/
public class Consumer {

/**
* 用户名
*/
private String username;

/**
* 密码
*/
private String password;

/**
* 连接地址,host:port,如localhost:8161
*/
private String url;

/**
* 队列名
*/
private static final String QUEUE_NAME = "HELLO";

//连接工厂
private ConnectionFactory connectionFactory;

//会话 接受或者发送消息的线程
private Session session;

//消息的目的地
private Destination destination;

//消息消费者
private MessageConsumer messageConsumer;

//与MQ的连接
private Connection connection = null;

public Consumer(String url, String username, String password) throws JMSException {
this.url = url;
this.username = username;
this.password = password;
this.init();
}

/**
* 初始化方法
* @throws JMSException
*/
private void init() throws JMSException {
// 实例化工厂
connectionFactory = new ActiveMQConnectionFactory(
username, password, url);
// 获取连接
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建指定队列名的队列
destination = session.createQueue(Consumer.QUEUE_NAME);
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
}

/**
* 接收消息
* @throws JMSException
*/
public String receive() throws JMSException {
TextMessage msg = (TextMessage) messageConsumer.receive(5000);
if(msg == null) {
return null;
}
return msg.getText();
}

public void close() throws JMSException {
session.close();
connection.close();
}

}


Producer.java

package com.whrsmart.ActiveMQ;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 生产者
*/
public class Producer implements Cloneable{

/**
* 用户名
*/
private String username;

/**
* 密码
*/
private String password;

/**
* 连接地址,host:port,如localhost:8161
*/
private String url;

/**
* 队列名
*/
private static final String QUEUE_NAME = "HELLO";

//连接工厂
private ConnectionFactory connectionFactory;

//会话 接受或者发送消息的线程
private Session session;

//消息的目的地
private Destination destination;

//消息生产者
private MessageProducer messageProducer;

//与MQ的连接
private javax.jms.Connection  connection = null;

public Producer(String url, String username, String password) throws JMSException {
this.url = url;
this.username = username;
this.password = password;
this.init();
}

/**
* 初始化方法
* @throws JMSException
*/
private void init() throws JMSException {
// 实例化工厂
connectionFactory = new ActiveMQConnectionFactory(username, password, url);
// 获取连接
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建指定队列名的队列
destination = session.createQueue(Producer.QUEUE_NAME);
// 创建消息生产者
messageProducer = session.createProducer(destination);
}

/**
* 发送消息
* @param data 数据
* @throws JMSException
*/
public void sendMessage(String data) throws JMSException {
// 创建一个Message
TextMessage msg = session.createTextMessage(data);
// 通过消息发送者进行消息发送
this.messageProducer.send(msg);
}

public void close() throws JMSException {
session.close();
connection.close();
}
}


本文参考:

http://www.tuicool.com/articles/EjyqA3R
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: