您的位置:首页 > 编程语言 > Java开发

java通过ActiveMQ实现JMS的消息队列实例

2015-12-30 16:15 711 查看
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>


package jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
下载安装http://apache.fayea.com/activemq/
启动命令:bin\win64\activemq.bat http://www.cnblogs.com/phoebus0501/archive/2011/02/24/1964228.html */

public class MessageReceiver implements Runnable {
private String url;
private String user;
private String password;
private final String QUEUE;

public MessageReceiver(String queue, String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
this.QUEUE = queue;
}

@Override
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Session session = null;
Destination receiveQueue;
try {
Connection connection = connectionFactory.createConnection();

session = connection
.createSession(true, Session.SESSION_TRANSACTED);
receiveQueue = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(receiveQueue);

connection.start();
System.out.println(Thread.currentThread().getName()+" start");

while (true) {
Message message = consumer.receive();

if (message instanceof TextMessage) {
TextMessage receiveMessage = (TextMessage) message;
System.out.println("我是Receiver,收到消息如下: \r\n"
+ receiveMessage.getText());
} else {
session.commit();
break;
}

}
connection.close();
System.out.println(Thread.currentThread().getName()+" close");
} catch (JMSException e) {
e.printStackTrace();
}
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

}


package jms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Date;
/**
* 消息发送器
* @author xiaochuanyu
*
*/
public class MessageSender implements Runnable {

private String url;
private String user;
private String password;
private final String QUEUE;

public MessageSender(String queue, String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
this.QUEUE = queue;
}

@Override
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Session session = null;
Destination sendQueue;
Connection connection = null;

int messageCount = 0;
try {
connection = connectionFactory.createConnection();

connection.start();
System.out.println(Thread.currentThread().getName()+" start");

while (true) {
session = connection.createSession(true,
Session.SESSION_TRANSACTED);

sendQueue = session.createQueue(QUEUE);
MessageProducer sender = session.createProducer(sendQueue);
TextMessage outMessage = session.createTextMessage();
outMessage.setText(new Date() + "现在发送是第" + messageCount + "条消息");

sender.send(outMessage);

session.commit();

sender.close();

if ((++messageCount) == 10) {
// 发够十条消息退出
break;
}
Thread.sleep(1000);
}

connection.close();
System.out.println(Thread.currentThread().getName()+" close");
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

}


package jms;

/**
java通过ActiveMQ实现JMS的消息队列实例
下载安装http://apache.fayea.com/activemq/
启动命令:bin\win64\activemq.bat http://www.cnblogs.com/phoebus0501/archive/2011/02/24/1964228.html */
public class MyActiveMQDemo {
//http://blog.sina.com.cn/s/blog_a459dcf501017oml.html需要安装ActiveMQ 然后启动bin\win64\activemq.bat

public static void main(String[] args) {
String url = "tcp://localhost:61616";
String user = "xxx";
String password = "xxx";
String query = "MyQueue";
new Thread(new MessageReceiver(query,url,user,password), "Name-Receiver").start();
new Thread(new MessageSender(query,url,user,password), "Name-Sender").start();
}
}

官网例子:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
* Hello world!
*/
public class App {

public static void main(String[] args) throws Exception {
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
}

public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}

public static class HelloWorldProducer implements Runnable {
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();

// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");

// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// Create a messages
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);

// Tell the producer to send the message
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);

// Clean up
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}

public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
try {

// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();

connection.setExceptionListener(this);

// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");

// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);

// Wait for a message
Message message = consumer.receive(1000);

if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}

consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}

public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured.  Shutting down client.");
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: