您的位置:首页 > 运维架构

JMS发送和接收实例-发布/订阅模式

2012-10-18 11:20 459 查看
发送消息

不管是将消息发送到队列还是发布到主题,编程的步骤是相同的,差别在于使用不同的JMS对象。具体定义见表:



发送消息的过程大体分为以下几步;

1、获得一个Weblogic Server上下文的引用;

2、创建连接工厂;

3、使用连接工厂创建一个连接;

4、使用连接创建一个会话;

5、获取一个目的;

6、使用会话和目的创建消息的生产者;

7、创建消息对象;

8、使用连接创建一个需要发送的消息类型的实例;

9、使用连接的一个队列发送器或主题公布器,然后使用发送器或公布器发送消息。

在敲代码之前要先导入需要的JAR包,并且配置JMS服务器。



 

注意:

wlfullclient.jar生成方式是,进入weblogic的安装目录例如C:\Oracle\Middleware\wlserver_10.3\server\lib,运行 java -jar wljarbuilder.jar就能生成wlfullclient.jar文件

JMS服务器配置请参考《学习在Weblogic服务器中配置消息服务器图解


 

发送消息代码:

package com.xu.Pub2Sub;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MsgTopicProducer {

/**
* @功能:主题发送者
* @作者:
* @日期:2012-10-18
*/

private TopicPublisher publisher;
private TextMessage msg;

public MsgTopicProducer(String[] args) throws NamingException, JMSException
{
/*初始化上下文对象*/
String url = "t3://localhost:7001";
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, url);
Context ctx = new InitialContext(p);

/*创建一个连接工厂*/
TopicConnectionFactory tConFactory = (TopicConnectionFactory)
ctx.lookup("weblogic.jms.ConnectionFactory");
/*创建一个主题*/
Topic messageTopic = (Topic) ctx.lookup("jms/MyTopic");
/*创建一个连接*/
TopicConnection tCon = tConFactory.createTopicConnection();
/*创建一个会话*/
TopicSession session = tCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

/*创建一个主题发布者,并发送消息*/
publisher = session.createPublisher(messageTopic);
msg = session.createTextMessage();

}
public void runClient() throws JMSException
{
msg.setText("Hello");
publisher.publish(msg);
msg.setText("Welcome to JMS!");
publisher.publish(msg);
System.out.println("成功!");
}

public static void main(String[] args) throws NamingException, JMSException {

MsgTopicProducer mp = new MsgTopicProducer(args);
mp.runClient();
}

}


同步接收者代码:

package com.xu.Pub2Sub;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SyncMessageTopicReceiver {

/**
* @功能:发布-订阅消息服务   同步接收者
* @作者:
* @日期:2012-10-18
*/

private TopicSubscriber subscriber ;
private TextMessage msg;

public SyncMessageTopicReceiver() throws NamingException, JMSException
{
/*初始化上下文对象*/
String url = "t3://localhost:7001";
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, url);
Context ctx = new InitialContext(p);

/*创建主题连接工厂*/
TopicConnectionFactory tConFactory = (TopicConnectionFactory)
ctx.lookup("weblogic.jms.ConnectionFactory");
/*创建一个主题*/
Topic messageTopic = (Topic) ctx.lookup("jms/MyTopic");
/*创建连接*/
TopicConnection tCon = tConFactory.createTopicConnection();
/*创建会话*/
TopicSession session = tCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
/*创建接收者*/
subscriber = session.createSubscriber(messageTopic);
tCon.start();
}
public void runClient() throws JMSException
{
msg = (TextMessage) subscriber.receive();
System.out.println("Receiver:"+msg.getText());
msg = (TextMessage) subscriber.receive();
System.out.println("Receiver:"+msg.getText());

}
public static void main(String[] args) throws NamingException, JMSException {
SyncMessageTopicReceiver receiver = new SyncMessageTopicReceiver();
receiver.runClient();
}

}

异步接收者代码:

package com.xu.Pub2Sub;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class AsyncMessageTopicReceiver implements MessageListener {

/**
* @功能:发布-订阅消息服务 异步接收消息
* @作者:
* @日期:2012-10-18
*/

private int EXPECTED_MESSAGE_COUNT = 10;
private int messageCount = 0;
private TopicSubscriber subscriber;

public AsyncMessageTopicReceiver() throws NamingException, JMSException {
/* 初始化上下文对象 */
String url = "t3://localhost:7001";
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY,
"weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, url);
Context ctx = new InitialContext(p);
/*创建主题连接工厂*/
TopicConnectionFactory tConFactory = (TopicConnectionFactory)
ctx.lookup("weblogic.jms.ConnectionFactory");
/*创建一个主题*/
Topic messageTopic = (Topic) ctx.lookup("jms/MyTopic");
/*创建连接*/
TopicConnection tCon = tConFactory.createTopicConnection();
/*创建会话*/
TopicSession session = tCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
/*创建接收者*/
subscriber = session.createSubscriber(messageTopic);
/* 设置监听 */
subscriber.setMessageListener(this);
tCon.start();

}

public boolean expectMoreMessage() {
return messageCount < EXPECTED_MESSAGE_COUNT;
}

@Override
public void onMessage(Message message) {
System.out.println("onMessage");
try {
TextMessage msg = (TextMessage) message;
System.out.println("Receiver:" + msg.getText());
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

messageCount++;
}

public static void main(String[] args) throws NamingException, JMSException {
int MAX_TRIES = 30;
int tryCount = 0;
AsyncMessageTopicReceiver receiver = new AsyncMessageTopicReceiver();
while (receiver.expectMoreMessage() && (tryCount < MAX_TRIES)) {

try {
System.out.println(tryCount);
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
tryCount++;
}

}
}


 

 

 

 

 

 

 

 

 

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息