您的位置:首页 > 其它

bbossgroups jms组件框架

2010-03-20 11:50 134 查看

1 jms组件框架

bboss aop框架的jms组件提供针对jms规范的一组简单的操作接口,可一通过 JMSTemplate组件来实现JMS消息的接收和发送功能。
系统中提供了两个JMS Template实现:
org.frameworkset.mq.JMSTemplate—提供所有的jms接收和发送接口,不带主题订阅功能接口
org.frameworkset.mq.JMSReceiveTemplate-提供所有的jms接收和发送接口,带主题订阅功能接口
下面是举一些简单的例子,说明这连个模板类的使用方法。

系统中可以方便地通过扩展连接工程管理抽象类
org.frameworkset.mq.JMSConnectionFactory来实现不同的jms服务提供商的jms服务器的支持。开发人员只需要实现JMSConnectionFactory的抽象方法:
/**
* 构建特定提供商的连接工厂
*
* @return
*/
public abstract ConnectionFactory buildConnectionFactory() throws Exception;
bboss aop框架中的jms组件提供了对apache activemq server的实现:
org.frameworkset.mq. AMQConnectionFactory
本章都是以apache activemq server为例来说明jms组件的基本接口。

1.1 首先配置一个jmstemplate组件

<?xml version="1.0" encoding="UTF-8"?>
<properties>

1.1.1 使用连接池工厂的JMSTemplate模板

<property name="test.jmstemplate" class="org.frameworkset.mq.JMSTemplate">
<construction>
<property name="connectionfactory"
refid="attr:test.amq.PooledConnectionFactory"/> </construction>
</property>

1.1.2 使用连接池工厂的JMSReceiveTemplate模板(不能用于主题订阅)

<property name="test.jms.receive.template" singlable="true" class="org.frameworkset.mq.JMSReceiveTemplate">
<construction>
<property name="connectionfactory"
refid="attr:test.amq.PooledConnectionFactory"/>
</construction>
</property>

1.1.3 不使用连接池工厂的JMSReceiveTemplate模板

可用于主题订阅,并且指定clientid为topicid,只能用于一个主题的订阅操作,不能用于其它主题的订阅操作,同时不能使用连接工厂池
<property name="test.topic.receive.jmstemplate" singlable="true" class="org.frameworkset.mq.JMSReceiveTemplate">
<construction>
<property name="connectionfactory"
refid="attr:test.amq.ConnectionFactory"/>
<property name="clientid"
value="topicid"/>
</construction>
</property>
可用于主题订阅,并且指定clientid为topicid1,只能用于一个主题的订阅操作,不能用于其它主题的订阅操作,同时不能使用连接工厂池。

<property name="test.topic1.receive.jmstemplate" singlable="true" class="org.frameworkset.mq.JMSReceiveTemplate">
<construction>
<property name="connectionfactory"
refid="attr:test.amq.ConnectionFactory"/>
<property name="clientid"
value="topicid1"/>
</construction>
</property>

1.1.4 jms连接池工厂配置

<property name="test.amq.PooledConnectionFactory" singlable="true" class="org.frameworkset.mq.AMQConnectionFactory">
<construction>
<property name="connectURL"
value="tcp://172.16.17.216:61615"/>
<property name="username"
value="system" />
<property name="password"
value="manager" />
<property name="usepool"
value="true" />

<!-- ssl协议配置 -->
<!--
<property name="ssl"
value="true" />
<property name="keyStore"
value="d:/client.ks" />
<property name="keyStorepassword"
value="123456" />
<property name="trustStore"
value="d:/client.ts" />
<property name="trustPassword"
value="123456" />
-->

<property name="connection.params">
<map>
<property name="connection.params" refid="attr:test.amq.connection.params"/>
<property name="connection.params.prefetchPolicy" refid="attr:test.amq.connection.params.prefetchPolicy"/>
<property name="connection.params.redirectPolicy" refid="attr:test.amq.connection.params.redirectPolicy"/>
<property name="connection.params.reconnectPolicy" refid="attr:test.amq.connection.params.reconnectPolicy"/>
</map>
</property>
</construction>
</property>

1.1.5 jms连接工厂配置

<property name="test.amq.ConnectionFactory" singlable="true" class="org.frameworkset.mq.AMQConnectionFactory">
<construction>
<property name="connectURL"
value="tcp://172.16.17.216:61615"/>
<property name="username"
value="system" />
<property name="password"
value="manager" />
<property name="usepool"
value="false" />

<!-- ssl协议配置 -->
<!--
<property name="ssl"
value="true" />
<property name="keyStore"
value="d:/client.ks" />
<property name="keyStorepassword"
value="123456" />
<property name="trustStore"
value="d:/client.ts" />
<property name="trustPassword"
value="123456" />
-->
<property name="connection.params">
<map>
<property name="connection.params" refid="attr:test.amq.connection.params"/>
<property name="connection.params.prefetchPolicy" refid="attr:test.amq.connection.params.prefetchPolicy"/>
<property name="connection.params.redirectPolicy" refid="attr:test.amq.connection.params.redirectPolicy"/>
<property name="connection.params.reconnectPolicy" refid="attr:test.amq.connection.params.reconnectPolicy"/>
</map>
</property>

</construction>
</property>

1.1.6 连接工厂参数配置

<!--
mq连接工厂的配置参数模板
-->
<property name="test.amq.connection.params" label="连接参数">
<!-- http://activemq.apache.org/maven/activemq-core/xsddoc/http___activemq.org_config_1.0/element/connectionFactory.html -->
<map>
<property label="alwaysSessionAsync" name="alwaysSessionAsync"
value="true" class="boolean">
<description> <![CDATA[If this flag is set then a seperate thread is not used for dispatching messages for each Session in the Connection.
However, a separate thread is always used if there is more than one session,
or the session isn't in auto acknowledge or dups ok mode]]></description>
</property>
<property label="alwaysSyncSend" name="alwaysSyncSend"
value="false" class="boolean" min="10" max="100">
<description> <![CDATA[Set true if always require messages to be sync sent ]]></description>
</property>
<property label="closeTimeout(单位:ms)" name="closeTimeout" class="int" value="15000">
<description> <![CDATA[Sets the timeout before a close is considered complete. ]]></description>
</property>
<property label="copyMessageOnSend" class="boolean" name="copyMessageOnSend"
value="true">
<description> <![CDATA[Should a JMS message be copied to a new JMS Message object as part of the send() method in JMS. ]]></description>
</property>
<property label="disableTimeStampsByDefault" class="boolean" name="disableTimeStampsByDefault"
value="false">
<description> <![CDATA[Sets whether or not timestamps on messages should be disabled or not.]]></description>
</property>
<property label="dispatchAsync" class="boolean" name="dispatchAsync" value="true">
<description> <![CDATA[ Enables or disables the default setting of whether or not consumers have their messages dispatched synchronously or asynchronously by the broker. ]]></description>
</property>
<property label="objectMessageSerializationDefered" class="boolean" name="objectMessageSerializationDefered"
value="false">
<description> <![CDATA[When an object is set on an ObjectMessage, the JMS spec requires the object to be serialized by that set method. ]]></description>
</property>
<property label="optimizeAcknowledge" class="boolean" name="optimizeAcknowledge"
value="false">
<description> <![CDATA[ optimizeAcknowledge ]]></description>
</property>
<property label="optimizedMessageDispatch" class="boolean" name="optimizedMessageDispatch"
value="true">
<description> <![CDATA[ If this flag is set then an larger prefetch limit is used - only applicable for durable topic subscribers. ]]></description>
</property>
<property label="producerWindowSize" class="int" name="producerWindowSize"
value="0">
<description> <![CDATA[ The ProducerWindowSize is the maximum number of bytes in memory that a producer will transmit to a broker before waiting for acknowledgement messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends where the client is responsible for managing memory usage. The default value of 0 means no flow control at the client. See also Producer Flow Control. This option only affects Openwire clients as Stomp does not have a means of notifying the clients of accepted messages asynchronously. ]]></description>
</property>
<property label="statsEnabled" class="boolean" name="statsEnabled" value="false">
<description> <![CDATA[ statsEnabled ]]></description>
</property>
<property label="useAsyncSend" class="boolean" name="useAsyncSend" value="false">
<description> <![CDATA[ Forces the use of Async Sends which adds a massive performance boost; but means that the send() method will return immediately whether the message has been sent or not which could lead to message loss. ]]></description>
</property>
<property label="useCompression" class="boolean" name="useCompression"
value="false">
<description> <![CDATA[ Enables the use of compression of the message bodies ]]></description>
</property>
<property label="useRetroactiveConsumer" class="boolean" name="useRetroactiveConsumer"
value="false">
<description> <![CDATA[ Sets whether or not retroactive consumers are enabled ]]></description>
</property>
<property label="watchTopicAdvisories" class="boolean" name="watchTopicAdvisories"
value="true">
<description> <![CDATA[ Whether to use advisory messages ]]></description>
</property>
<property label="sendTimeout(单位:ms)" class="int" name="sendTimeout" value="0">
<description> <![CDATA[ Time to wait on Message Sends for a Response, default value of zero indicates to wait forever. Waiting forever allows the broker to have flow control over messages coming from this client if it is a fast producer or there is no consumer such that the broker would run out of memory if it did not slow down the producer. Does not affect Stomp clients as the sends are ack'd by the broker. ]]></description>
</property>
</map>
</property>
<!--
mq连接工厂的配置参数模板,参数类型为
-->
<property name="test.amq.connection.params.prefetchPolicy" label="预处理策略" >
<map>
<!--
引用的对象 现不维护
id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"
以下配置的属性是org.apache.activemq.ActiveMQPrefetchPolicy该对象的属性配置
-->

<property label="durableTopicPrefetch" class="int" name="durableTopicPrefetch"
value="100">
<description> <![CDATA[ The maximum number of messages sent to a consumer on a durable topic until acknowledgements are received ]]></description>
</property>
<property label="inputStreamPrefetch" class="int" name="inputStreamPrefetch"
value="100">
<description> <![CDATA[ The maximum number of messages sent to a consumer on a JMS stream until acknowledgements are received ]]></description>
</property>
<property label="maximumPendingMessageLimit" class="int" name="maximumPendingMessageLimit"
value="0">
<description> <![CDATA[ Sets how many messages a broker will keep around, above the prefetch limit, for non-durable topics before starting to discard older messages ]]></description>
</property>
<property label="optimizeDurableTopicPrefetch" class="int" name="optimizeDurableTopicPrefetch"
value="1000">
<description> <![CDATA[ optimizeDurableTopicPrefetch ]]></description>
</property>
<property label="queueBrowserPrefetch" class="int" name="queueBrowserPrefetch"
value="500">
<description> <![CDATA[ queueBrowserPrefetch]]></description>
</property>
<property label="queuePrefetch" class="int" name="queuePrefetch" value="1000">
<description> <![CDATA[ The maximum number of messages sent to a consumer on a queue until acknowledgements are received ]]></description>
</property>
<property label="topicPrefetch" class="int" name="topicPrefetch" value="32766">
<description> <![CDATA[ The maximum number of messages sent to a consumer on a non-durable topic until acknowledgements are received ]]></description>
</property>
</map>
</property>
<!--
消息重发机制配置参数模板
-->
<property name="test.amq.connection.params.redirectPolicy" label="消息重发机制">
<map>
<!--
private double collisionAvoidanceFactor = 0.15d;//collisionAvoidancePercent
private int maximumRedeliveries = 6;
private long initialRedeliveryDelay = 1000L;
private boolean useCollisionAvoidance;
private boolean useExponentialBackOff;
private short backOffMultiplier = 5;
-->
<property label="消息冲突百分比" name="collisionAvoidancePercent"
value="15" class="int">
<description> <![CDATA[避免消息冲突百分比,默认为15,也就是15%]]></description>
</property>
<property label="最大重发次数" name="maximumRedeliveries"
value="6" class="int">
<description> <![CDATA[设置为-1时无限重发]]></description>
</property>
<property label="重发时间间隔" name="initialRedeliveryDelay"
value="1000" class="int">
<description> <![CDATA[毫秒]]></description>
</property>
<property label="是否启用冲突避免策略" name="useCollisionAvoidance"
value="false" class="boolean">
<description> <![CDATA[]]></description>
</property>
<property label="消息重发递增延迟策略" name="useExponentialBackOff"
value="false" class="boolean">
<description> <![CDATA[]]></description>
</property>
<property label="消息重发递增比例" name="backOffMultiplier"
value="5" class="int">
<description> <![CDATA[]]></description>
</property>
</map>
</property>
<!--
故障重连机制配置参数模板
-->
<property name="test.amq.connection.params.reconnectPolicy" label="故障重连机制">
<map>
<property label="是否启用重连机制" name="USE_FAILOVER"
value="false" class="boolean">
<description> <![CDATA[]]></description>
</property>
<property label="重连时间间隔" name="reconnectPolicy.initialReconnectDelay"
value="10" class="long">
<description> <![CDATA[避免消息冲突百分比,默认为15,也就是15%]]></description>
</property>
<property label="最大重连时间间隔" name="reconnectPolicy.maxReconnectDelay"
value="30000" class="long">
<description> <![CDATA[设置为-1时无限重发]]></description>
</property>
<property label="是否启用指数延时机制" name="reconnectPolicy.useExponentialBackOff"
value="true" class="boolean">
<description> <![CDATA[毫秒]]></description>
</property>
<property label="指数补偿机制中指数值" name="reconnectPolicy.reconnectDelayExponent"
value="2" class="int">
<description> <![CDATA[]]></description>
</property>
<property label="最大重连次数" name="reconnectPolicy.maxReconnectAttempts"
value="5" class="int">
<description> <![CDATA[]]></description>
</property>
<property label="是否随机选项URI" name="reconnectPolicy.randomize"
value="false" class="boolean">
<description> <![CDATA[]]></description>
</property>
</map>
</property>

</properties>

1.2 测试用例

1.2.1 从连接池工厂中获取jms connection

下面的代码从连接池工厂中获取jms connection对象,先后获取两次验证连接池是否生效。
JMSConnectionFactory factory = (JMSConnectionFactory)BaseSPIManager.getBeanObject("test.amq.PooledConnectionFactory");
try
{
Connection connection = factory.getConnection();
connection.start();
connection.close();

connection = factory.getConnection();
connection.start();
connection.close();
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}

1.2.2 从连接工厂中获取jms connection

从连接工厂中获取jms connection,每次都会创建新的jms connection
JMSConnectionFactory factory = (JMSConnectionFactory)BaseSPIManager.getBeanObject("test.amq.ConnectionFactory");
try
{
Connection connection = factory.getConnection();
connection.start();
connection.close();

connection = factory.getConnection();
connection.start();
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}

1.2.3 向队列atest中发送消息

JMSTemplate template = (JMSTemplate)BaseSPIManager.getBeanObject("test.jmstemplate");
try
{
template.send("atest", "ahello");

}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
template.stop();
}

1.2.4 向队列atest发送持久化的消息

JMSTemplate template = (JMSTemplate)BaseSPIManager.getBeanObject("test.jmstemplate");
try
{
template.send("atest", "phello",true);
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
template.stop();
}

1.2.5 向队列atest中发送持久化消息,指定优先级和过期时间

JMSTemplate template = (JMSTemplate)BaseSPIManager.getBeanObject("test.jmstemplate");
try
{
template.send("atest", "allhello",true,4,10000);
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
template.stop();
}

1.2.6 从队列atest中同步接收消息

JMSTemplate template = (JMSTemplate)BaseSPIManager.getBeanObject("test.jmstemplate");
try
{
Message msg = template.receive("atest");
System.out.println("testReceiveMessage:"+ msg);
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
template.stop();
}

1.2.7 从队列atest中异步接收消息

利用test.jms.receive.template模板从队列atest中异步接收消息。
JMSReceiveTemplate template = (JMSReceiveTemplate)BaseSPIManager.getBeanObject("test.jms.receive.template");
try
{
template.setMessageListener("atest",new MessageListener() {

public void onMessage(Message arg0)
{
System.out.println("msg comming:"+arg0);
}
});

}
catch (Exception e)
{

template.stop();
}
finally
{

}

1.2.8 向主题getsubtest发送消息

JMSTemplate template = (JMSTemplate)BaseSPIManager.getBeanObject("test.jmstemplate");
try
{
template.send("topic://getsubtest", "getsubtest");

}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
template.stop();
}

1.2.9 从主题订阅消息

JMSReceiveTemplate template = (JMSReceiveTemplate)BaseSPIManager.getBeanObject("test.topic.receive.jmstemplate");
try
{
template.getTopicSubscriber("getsubtest", "subscribename").setMessageListener(new MessageListener() {

public void onMessage(Message arg0)
{
System.out.println("topic msg comming:"+arg0);
}
});

}
catch (Exception e)
{

template.stop();
}
finally
{

}

1.2.10 从主题退订消息

JMSReceiveTemplate template = (JMSReceiveTemplate)BaseSPIManager.getBeanObject("test.topic.receive.jmstemplate");
try
{
template.unsubscribe("subscribename");

}
catch (Exception e)
{

template.stop();
}
finally
{

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: