使用JMS发布订阅topic,传递ObjectMessage
2016-06-21 17:44
495 查看
使用工具eclipse+activeMQ和前一篇完全相同
建立两个工程,一个里面放publisher.java+Book.java
另外一个工程里放subscriber.java+Book.java
两个工程里的Book是一模一样的,分别实现Serializable接口,要求序列号也是相同的:
private static final long serialVersionUID = 2504467948968634865L;
发布者//book是一个封装好的对象
Book book=new Book(productName,productAmount,tyname,writerName,prename);
//jms的publisher
try {
Context jndiContext = new InitialContext();
TopicConnectionFactory factory =(TopicConnectionFactory) jndiContext.lookup("topicConnectionFactry");
Topic objTopic =(Topic) jndiContext.lookup("obj");
TopicConnection topicConnection =factory.createTopicConnection();
topicConnection.setClientID("8");
TopicSession session =topicConnection.createTopicSession (false,Session.CLIENT_ACKNOWLEDGE);
TopicPublisher publisher =session.createPublisher(objTopic);
topicConnection.start();
ObjectMessage message=session.createObjectMessage();
message.setObject(book);
publisher.publish(message);
publisher.setDeliveryMode(DeliveryMode.PERSISTENT); //加了这句就持久化
session.close();
topicConnection.close();
}catch(Exception e5){
e5.printStackTrace();
}接受者
package MainPackage;
import java.rmi.RemoteException;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import ObjectPackage.Book;
public class Subscriber {
public static void main(String[] args) throws JMSException, NamingException {
try {
Context jndiContext = new InitialContext();
TopicConnectionFactory factory =(TopicConnectionFactory) jndiContext.lookup("topicConnectionFactry");
//factory.setTrustAllPackages(true);或者topicConnection.setTrustAllPackages(true);不适合topicConnnectionFactory,适合activeMQconnnectionFactory
Topic objTopic =(Topic) jndiContext.lookup("obj"); // 好像是jndi有个注册表,先去注册表里找信息,然后再根据这个信息找具体的message
TopicConnection topicConnection =factory.createTopicConnection();
//持久化要加ClientID
topicConnection.setClientID("8");
TopicSession session =topicConnection.createTopicSession (false,Session.CLIENT_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(objTopic,"12");
//TopicSubscriber subscriber = session.createSubscriber(objTopic);
subscriber.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message msg){
try
{
System.out.println("我在接受对象啦");
ObjectMessage message=(ObjectMessage) msg;
//Object obj = message.getObject();
Book book= (Book) message.getObject();;
System.out.println(book.getName());
new Management().book_addition(book);
} catch (JMSException e)
{
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (RemoteException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
});
topicConnection.start();
/*session.close();
topicConnection.close(); */
}catch(Exception e){
e.printStackTrace();
}
}
}
两个工程里完全相同的Book类
package ObjectPackage;
import java.io.Serializable;
public class Book implements Serializable{
int id;
String name;
int amount;
String description;
String writer;
String press;
//Book两个Book需要一模一样,序列号也相同
private static final long serialVersionUID = 2504467948968634865L;
public Book(String name, int amount, String description, String writer, String press) {
super();
this.name = name;
this.amount = amount;
this.description = description;
this.writer = writer;
this.press = press;
}
public Book() {
// TODO Auto-generated constructor stub
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getWriter() {
return writer;
}
public void setWriter(String writer) {
this.writer = writer;
}
public String getPress() {
return press;
}
public void setPress(String press) {
this.press = press;
}
}
容易出现的几种错误
1.创建持久订阅的时候,必须要设置client, connection.setClientID("1");否则会报错:
javax.jms.JMSException: You cannot create a durable subscriber without specifying a unique clientID on a Connection
at org.apache.activemq.ActiveMQConnection.checkClientIDWasManuallySpecified(ActiveMQConnection.java:1284)
at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1467)
at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1411)
at org.apache.activemq.ActiveMQTopicSession.createDurableSubscriber(ActiveMQTopicSession.java:147)
at MainPackage.Subscriber.main(Subscriber.java:50)
2.如果clientID重复(已经存在相同id的活动已连接),会报错,即同一个ID的Client只能同时运行一个
// javax.jms.InvalidClientIDException: Broker: localhost - Client: 1
// already connected from tcp://127.0.0.1:2758
// connection.setClientID("1");
3.在同一个连接的ClientID下,持久订阅者的名称必须唯一
// javax.jms.JMSException: Durable consumer is in use for client: 1 and
// subscriptionName: 11
4.运行顺序不对,会一直报反序列化异常
javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mq.bean.Student! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:208)
at com.mq.listener.ConsumerMessageListener.onMessage(ConsumerMessageListener.java:23)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:714)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:652)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:619)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:307)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mq.bean.Student! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)
... 10 more
activeMQ官网给出的解释是不信任Book类,解决办法要把Book类加入到activeMQ信任的白名单里
但我的其实不是这个问题,我是运行顺序不对导致的两种办法:
(1)启动activeMQ服务,先运行subscriber,启动监听器,再运行publisher,发布信息,此时subscriber可以接受到消息。
(2)启动activeMQ,再运行publisher,发布信息,关闭activeMQ
重启activeMQ,,再运行subscriber,即可接收消息
另外:我的subscriber写的有些问题,后续再改善,先匿去验收作业了,等考试周过了再完善。
建立两个工程,一个里面放publisher.java+Book.java
另外一个工程里放subscriber.java+Book.java
两个工程里的Book是一模一样的,分别实现Serializable接口,要求序列号也是相同的:
private static final long serialVersionUID = 2504467948968634865L;
发布者//book是一个封装好的对象
Book book=new Book(productName,productAmount,tyname,writerName,prename);
//jms的publisher
try {
Context jndiContext = new InitialContext();
TopicConnectionFactory factory =(TopicConnectionFactory) jndiContext.lookup("topicConnectionFactry");
Topic objTopic =(Topic) jndiContext.lookup("obj");
TopicConnection topicConnection =factory.createTopicConnection();
topicConnection.setClientID("8");
TopicSession session =topicConnection.createTopicSession (false,Session.CLIENT_ACKNOWLEDGE);
TopicPublisher publisher =session.createPublisher(objTopic);
topicConnection.start();
ObjectMessage message=session.createObjectMessage();
message.setObject(book);
publisher.publish(message);
publisher.setDeliveryMode(DeliveryMode.PERSISTENT); //加了这句就持久化
session.close();
topicConnection.close();
}catch(Exception e5){
e5.printStackTrace();
}接受者
package MainPackage;
import java.rmi.RemoteException;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import ObjectPackage.Book;
public class Subscriber {
public static void main(String[] args) throws JMSException, NamingException {
try {
Context jndiContext = new InitialContext();
TopicConnectionFactory factory =(TopicConnectionFactory) jndiContext.lookup("topicConnectionFactry");
//factory.setTrustAllPackages(true);或者topicConnection.setTrustAllPackages(true);不适合topicConnnectionFactory,适合activeMQconnnectionFactory
Topic objTopic =(Topic) jndiContext.lookup("obj"); // 好像是jndi有个注册表,先去注册表里找信息,然后再根据这个信息找具体的message
TopicConnection topicConnection =factory.createTopicConnection();
//持久化要加ClientID
topicConnection.setClientID("8");
TopicSession session =topicConnection.createTopicSession (false,Session.CLIENT_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(objTopic,"12");
//TopicSubscriber subscriber = session.createSubscriber(objTopic);
subscriber.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message msg){
try
{
System.out.println("我在接受对象啦");
ObjectMessage message=(ObjectMessage) msg;
//Object obj = message.getObject();
Book book= (Book) message.getObject();;
System.out.println(book.getName());
new Management().book_addition(book);
} catch (JMSException e)
{
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (RemoteException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
});
topicConnection.start();
/*session.close();
topicConnection.close(); */
}catch(Exception e){
e.printStackTrace();
}
}
}
两个工程里完全相同的Book类
package ObjectPackage;
import java.io.Serializable;
public class Book implements Serializable{
int id;
String name;
int amount;
String description;
String writer;
String press;
//Book两个Book需要一模一样,序列号也相同
private static final long serialVersionUID = 2504467948968634865L;
public Book(String name, int amount, String description, String writer, String press) {
super();
this.name = name;
this.amount = amount;
this.description = description;
this.writer = writer;
this.press = press;
}
public Book() {
// TODO Auto-generated constructor stub
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getWriter() {
return writer;
}
public void setWriter(String writer) {
this.writer = writer;
}
public String getPress() {
return press;
}
public void setPress(String press) {
this.press = press;
}
}
容易出现的几种错误
1.创建持久订阅的时候,必须要设置client, connection.setClientID("1");否则会报错:
javax.jms.JMSException: You cannot create a durable subscriber without specifying a unique clientID on a Connection
at org.apache.activemq.ActiveMQConnection.checkClientIDWasManuallySpecified(ActiveMQConnection.java:1284)
at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1467)
at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1411)
at org.apache.activemq.ActiveMQTopicSession.createDurableSubscriber(ActiveMQTopicSession.java:147)
at MainPackage.Subscriber.main(Subscriber.java:50)
2.如果clientID重复(已经存在相同id的活动已连接),会报错,即同一个ID的Client只能同时运行一个
// javax.jms.InvalidClientIDException: Broker: localhost - Client: 1
// already connected from tcp://127.0.0.1:2758
// connection.setClientID("1");
javax.jms.InvalidClientIDException: Broker: localhost - Client: 8 already connected from tcp://127.0.0.1:62315 at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255) at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227) at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98 ad96 ) at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116) at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98) at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98) at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103) at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:817) at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77) at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) at java.lang.Thread.run(Unknown Source)
3.在同一个连接的ClientID下,持久订阅者的名称必须唯一
// javax.jms.JMSException: Durable consumer is in use for client: 1 and
// subscriptionName: 11
4.运行顺序不对,会一直报反序列化异常
javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mq.bean.Student! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:208)
at com.mq.listener.ConsumerMessageListener.onMessage(ConsumerMessageListener.java:23)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:714)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:652)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:619)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:307)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mq.bean.Student! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)
... 10 more
activeMQ官网给出的解释是不信任Book类,解决办法要把Book类加入到activeMQ信任的白名单里
但我的其实不是这个问题,我是运行顺序不对导致的两种办法:
(1)启动activeMQ服务,先运行subscriber,启动监听器,再运行publisher,发布信息,此时subscriber可以接受到消息。
(2)启动activeMQ,再运行publisher,发布信息,关闭activeMQ
重启activeMQ,,再运行subscriber,即可接收消息
另外:我的subscriber写的有些问题,后续再改善,先匿去验收作业了,等考试周过了再完善。
相关文章推荐
- 解析ActiveMQ的使用说明总结
- Java中的命名与目录接口JNDI基本操作方法概览
- Spring的JNDI数据源连接池配置
- Jackson VS FastJson VS Gson
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- activemq报EOFExceptionjvm错误
- ActiveMQ 消息服务(一)
- ActiveMQ 消息服务(二)
- ActiveMQ 消息服务(三)
- Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析
- JMS-使用消息队列优化网站性能
- 架构优化 - 应用,MQ Broker,业务处理分层
- 基于zookeeper+leveldb搭建activemq集群
- 序列化与反序列化
- jms异步通信全攻略
- JMS
- 消息中间件之ActiveMQ
- 关于使用MQ系统解耦的一点思考
- 解决Activemq5.8启动报存储空间不足
- 解决Spring集成Activemq使用ObjectMessage报错