HornetQ集群配置及spring集成示例
2012-08-10 11:14
295 查看
HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。还支持RESTfulAPI、STOMP(Stomp的客户端可以用多种编程语言来实现
)、AMQP(HornetQwillshortlybeimplementingAMQP)。
HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。
用途:松散地联系各系统,不用受其它服务器的制约,有效的减少线程Block的时间.不同于RPC,采用的Request/Reponse的方式.
hornetq支持内容Body
Stream--StreamMessage包含顺序读取值的流
Text--TextMessage)
Map--MapMessage(key/value))
Object--ObjectMessageSupportSerializable序列化的对象.
Bytes--BytesMessage字节信息(如存放图像)
下载:wgethttp://downloads.jboss.org/hornetq/hornetq-2.2.14.Final.zip
yuminstalllibaio
中文文档:http://www.jboss.org/hornetq/chinesedocs.html
1.单机配置:
1.1编写启动脚本:start.sh
1.2或者修改配置文件
以下两个文件把localhost替换为本机IP
config/stand-alone/non-clustered/hornetq-configuration.xml
config/stand-alone/non-clustered/hornetq-beans.xml
bindAddress">${jnp.host:192.168.100.241}
rmiBindAddress">${jnp.host:192.168.100.241}
${hornetq.remoting.netty.host:192.168.100.241}
....
1.3客户端需要的包
hornetq-core-client.jar
netty.jar
hornetq-jms-client.jar
jboss-jms-api.jar
jnp-client.jar
1.4配置一个队列,添加配置onfig/stand-alone/non-clustered/hornetq-jms.xml
<queuename="OrderQueue">
<entryname="queues/OrderQueue"/>
</queue>
配置一个主题
<topicname="topic1">
<entryname="/my/Topic1"/>
</topic>
hornetq-configuration.xml
在<configuration>节点下增加
<security-enabled>false</security-enabled>
1.5收发消息demo
2集群配置
2.1单机集群启动脚本
start-cluster0.bat
2.2集群节点启动脚本
start-node.sh
2.2.1集群节点停止脚本
stop-node.sh
2.3.集群配置说明
2.3.1集群发现使用udp协议进行组播
hornetq-configuration.xml
2.3.2客户端连接代码:
2.3.3ServerSideloadbalancing
hornetq-configuration.xml
2.3.4ClientSideloadbalancing
hornetq-jms.xml
3.与spring集成示例
3.1spring配置
3.2发送消息
3.3接收消息
3.4调用示例
4.其它功能
4.1Messageexpire
HornetQwillnotdeliveramessagetoaconsumerafterit'stimeto
livehasbeenexceeded.
Ifthemessagehasn'tbeendeliveredbeforethetimetoliveis
reached,theservercandiscardit.
//messagewillexpirein5000msfromnow
message.setExpiration(System.currentTimeMillis()+5000);
Expiry-address
<!--expiredmessagesinexampleQueuewillbesenttotheexpiry
addressexpiryQueue-->
<address-settingmatch="jms.queue.exampleQueue">
<expiry-address>jms.queue.expiryQueue</expiry-address>
</address-setting>
4.2
Scheduledmessages
TextMessagemessage=session.createTextMessage("MSG");
message.setLongProperty("_HQ_SCHED_DELIVERY",System.currentTimeMillis()+5000);
producer.send(message);
...
//messagewillnotbereceivedimmediatelybut5secondslater
TextMessagemessageReceived=(TextMessage)consumer.receive();
4.3Messagegroup
Messagegroupsaresetsofmessagesthathavethefollowingcharacteristics:
•Messagesinamessagegroupsharethesamegroupid;thatis,theyhavethesamegroup
identifierproperty(JMSXGroupIDforJMS,_HQ_GROUP_IDforHornetQCoreAPI).
•Messagesinamessagegrouparealwaysconsumedbythesameconsumer,evenifthere
aremanyconsumersonaqueue.Theypinallmessageswiththesamegroupidtothesame
consumer.
Ifthatconsumerclosesanotherconsumerischosenandwillreceiveallmessageswiththe
samegroupid.
Basedonmessage
Messagemessage=...
message.setStringProperty("JMSXGroupID","Group-0");
producer.send(message);
message=...
message.setStringProperty("JMSXGroupID","Group-0");
producer.send(message);
Basedonconnectionfactory...
<connection-factoryname="ConnectionFactory">
<connectors>
<connector-refconnector-name="netty-connector"/>
</connectors>
<entries>
<entryname="ConnectionFactory"/>
</entries>
<group-id>Group-0</group-id>
</connection-factory>
)、AMQP(HornetQwillshortlybeimplementingAMQP)。
HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。
用途:松散地联系各系统,不用受其它服务器的制约,有效的减少线程Block的时间.不同于RPC,采用的Request/Reponse的方式.
hornetq支持内容Body
Stream--StreamMessage包含顺序读取值的流
Text--TextMessage)
Map--MapMessage(key/value))
Object--ObjectMessageSupportSerializable序列化的对象.
Bytes--BytesMessage字节信息(如存放图像)
下载:wget
yuminstalllibaio
中文文档:
1.单机配置:
1.1编写启动脚本:start.sh
IP=`/sbin/ipa|grep'inet'|awk-F'/''{print$1}'|awk'{print$2}'|grep-v127.0.0.1|head-1` exportCLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP-Djnp.host=$IP" echo$CLUSTER_PROPS shrun.sh&
1.2或者修改配置文件
以下两个文件把localhost替换为本机IP
config/stand-alone/non-clustered/hornetq-configuration.xml
config/stand-alone/non-clustered/hornetq-beans.xml
bindAddress">${jnp.host:192.168.100.241}
rmiBindAddress">${jnp.host:192.168.100.241}
${hornetq.remoting.netty.host:192.168.100.241}
....
1.3客户端需要的包
hornetq-core-client.jar
netty.jar
hornetq-jms-client.jar
jboss-jms-api.jar
jnp-client.jar
1.4配置一个队列,添加配置onfig/stand-alone/non-clustered/hornetq-jms.xml
<queuename="OrderQueue">
<entryname="queues/OrderQueue"/>
</queue>
配置一个主题
<topicname="topic1">
<entryname="/my/Topic1"/>
</topic>
hornetq-configuration.xml
在<configuration>节点下增加
<security-enabled>false</security-enabled>
1.5收发消息demo
publicvoidsendToQueue(StringdestinationName,Serializablepayload)throwsException{ InitialContextic=newInitialContext(); ConnectionFactorycf=(ConnectionFactory)ic.lookup("/ConnectionFactory"); Queuequeue=(Queue)ic.lookup(destinationName); Connectionconnection=cf.createConnection(); Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageProducerpublisher=session.createProducer(queue); connection.start(); ObjectMessagemessage=session.createObjectMessage(payload); message.setObject(payload); publisher.send(message); if(connection!=null){ connection.close(); } } @TransactionAttribute(value=TransactionAttributeType.REQUIRED) publicvoidonMessage(Messagemessage){ ObjectMessageobj=(ObjectMessage)message; try{ Serializableser=obj.getObject(); log.info("[NotificationInbound]onMessage!"); } catch(Exceptione){ log.error("[NotificationInbound]ERROR["+e.getMessage()+"]!!!****"); thrownewIllegalStateException(); } }
2集群配置
2.1单机集群启动脚本
start-cluster0.bat
setCLUSTER_PROPS=-Ddata.dir=../data-server2-Djnp.port=2099-Djnp.rmiPort=2098-Dhornetq.remoting.netty.port=6445 run../config/stand-alone/clustered
start-cluster1.bat
setCLUSTER_PROPS=-Ddata.dir=../data-server3-Djnp.port=3099-Djnp.rmiPort=3098-Dhornetq.remoting.netty.port=7445 run../config/stand-alone/clustered
2.2集群节点启动脚本
start-node.sh
IP=`/sbin/ipa|grep'inet'|awk-F'/''{print$1}'|awk'{print$2}'|grep-v127.0.0.1|head-1` exportCLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP-Djnp.host=$IP" echo$CLUSTER_PROPS shrun.sh../config/stand-alone/clustered
2.2.1集群节点停止脚本
stop-node.sh
shstop.sh../config/stand-alone/clustered
2.3.集群配置说明
2.3.1集群发现使用udp协议进行组播
hornetq-configuration.xml
<discovery-groups> <discovery-groupname="my-discovery-group"> <local-bind-address>172.16.9.7</local-bind-address> <group-address>231.7.7.7</group-address> <group-port>9876</group-port> <refresh-timeout>10000</refresh-timeout> </discovery-group> </discovery-groups> <connection-factoryname="ConnectionFactory"> <discovery-group-refdiscovery-group-name="my-discovery-group"/> <entries> <entryname="/ConnectionFactory"/> </entries> </connection-factory>
2.3.2客户端连接代码:
finalStringgroupAddress="231.7.7.7"; finalintgroupPort=9876; ConnectionFactoryjmsConnectionFactory= HornetQJMSClient.createConnectionFactory(groupAddress,groupPort); ConnectionjmsConnection1=jmsConnectionFactory.createConnection(); ConnectionjmsConnection2=jmsConnectionFactory.createConnection();
2.3.3ServerSideloadbalancing
hornetq-configuration.xml
<cluster-connections> <cluster-connectionname="my-cluster"> <address>jms</address> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>false</forward-when-no-consumers> <max-hops>1</max-hops> <discovery-group-refdiscovery-group-name="my-discovery-group"/> </cluster-connection> </cluster-connections>
2.3.4ClientSideloadbalancing
hornetq-jms.xml
<connection-factoryname="ConnectionFactory"> <discovery-group-refdiscovery-group-name="my-discovery-group"/> <entries> <entryname="/ConnectionFactory"/> </entries> <ha>true</ha> <connection-load-balancing-policy-class-name> org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy </connection-load-balancing-policy-class-name> </connection-factory>
3.与spring集成示例
3.1spring配置
<?xmlversion="1.0"encoding="UTF-8"?> <beansxmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:util="http://www.springframework.org/schema/util" xmlns:jee="http://www.springframework.org/schema/jee"xmlns:lang="http://www.springframework.org/schema/lang" xmlns:jms="http://www.springframework.org/schema/jms"xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/utilhttp://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/jeehttp://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/langhttp://www.springframework.org/schema/lang/spring-lang-3.0.xsd http://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.0.xsd"> <beanid="messageTopic"class="org.hornetq.api.jms.HornetQJMSClient"factory-method="createTopic"> <constructor-argvalue="topic1"/> </bean> <beanid="searchAddMessageQueue"class="org.hornetq.api.jms.HornetQJMSClient"factory-method="createQueue"> <constructor-argvalue="ExpiryQueue"></constructor-arg> </bean> <!-- <beanid="transportConfiguration"class="org.hornetq.api.core.TransportConfiguration"> <constructor-argvalue="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"/> <constructor-arg> <mapkey-type="java.lang.String"value-type="java.lang.Object"> <entrykey="host"value="localhost"></entry> <entrykey="port"value="5445"></entry> </map> </constructor-arg> </bean> --> <beanid="transportConfiguration"class="org.hornetq.api.core.DiscoveryGroupConfiguration"> <constructor-argname="groupAddress"value="231.7.7.7"/> <constructor-argname="groupPort"value="9876"> </constructor-arg> </bean> <beanid="connectionFactory"class="org.hornetq.api.jms.HornetQJMSClient"factory-method="createConnectionFactoryWithHA"destroy-method="close"> <constructor-argtype="org.hornetq.api.jms.JMSFactoryType"value="CF"/> <constructor-argref="transportConfiguration"/> </bean> <beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate"> <propertyname="connectionFactory"ref="connectionFactory"/> <propertyname="pubSubDomain"value="true"/> </bean> <beanid="topicService"class="org.langke.hornetq.ClientServiceImpl"> <propertyname="jmsTemplate"ref="jmsTemplate"/> <propertyname="topic"ref="messageTopic"/> </bean> <beanid="sendMessageService"class="org.langke.hornetq.SendMessageServiceImpl"> <propertyname="jmsTemplate"ref="jmsTemplate"></property> <propertyname="searchAddMessageQueue"ref="searchAddMessageQueue"></property> </bean> <!--thisistheMessageDrivenPOJO(MDP) <beanid="messageListener"class="org.langke.hornetq.MessageListenerImpl"> </bean> --> <beanid="receiveMessageListener"class="org.langke.hornetq.ReceiveMessageListenerImpl"></bean> <!--andthisisthemessagelistenercontainer--> <beanid="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <propertyname="connectionFactory"ref="connectionFactory"/> <!--<propertyname="destination"ref="messageTopic"/>--> <propertyname="destination"ref="searchAddMessageQueue"></property> <propertyname="messageListener"ref="receiveMessageListener"/> </bean> </beans>
packageorg.langke.common.hornetq;
publicinterfaceMessageService{
publicbooleansendMessage(SerializableObjectmessage);
}
3.2发送消息
packageorg.langke.common.hornetq;
importjava.io.Serializable;
publicclassSerializableObjectimplementsSerializable{
/**
*
*/
privatestaticfinallongserialVersionUID=1L;
privateObjectobj;
privateBooleanisRetry=true;
publicObjectgetObj(){
returnobj;
}
publicvoidsetObj(Objectobj){
this.obj=obj;
}
publicBooleangetIsRetry(){
returnisRetry;
}
publicvoidsetIsRetry(BooleanisRetry){
this.isRetry=isRetry;
}
}
packageorg.langke.common.hornetq;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.ObjectMessage;
importjavax.jms.Queue;
importjavax.jms.Session;
importorg.apache.log4j.Logger;
importorg.springframework.jms.core.JmsTemplate;
importorg.springframework.jms.core.MessageCreator;
publicclassSendMessageServiceImplimplementsMessageService{
privatestaticfinalLoggerlogger=Logger.getLogger(SendMessageServiceImpl.class);
privateJmsTemplatejmsTemplate;
privateQueuesearchAddMessageQueue;
@Override
publicbooleansendMessage(SerializableObjectmessage){
returnsendQueue(message);
}
privatebooleansendQueue(finalSerializableObjectso){
try{
logger.info("starttosendqueueto"+searchAddMessageQueue.getQueueName()+",message:"+so);
jmsTemplate.send(searchAddMessageQueue,newMessageCreator(){
@Override
publicMessagecreateMessage(Sessionsession)
throwsJMSException{
ObjectMessageom=session.createObjectMessage(so);
returnom;
}
});
returntrue;
}catch(Exceptione){
logger.error("Error:sendtopicfailure:"+e.getMessage(),e);
returnfalse;
}
}
publicJmsTemplategetJmsTemplate(){
returnjmsTemplate;
}
publicvoidsetJmsTemplate(JmsTemplatejmsTemplate){
this.jmsTemplate=jmsTemplate;
}
publicQueuegetSearchAddMessageQueue(){
returnsearchAddMessageQueue;
}
publicvoidsetSearchAddMessageQueue(QueuesearchAddMessageQueue){
this.searchAddMessageQueue=searchAddMessageQueue;
}
}
3.3接收消息
packageorg.langke.common.hornetq;
importjava.util.concurrent.atomic.AtomicInteger;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.ObjectMessage;
importorg.apache.log4j.Logger;
publicclassReceiveMessageListenerImplimplementsMessageListener{
privateAtomicIntegercount=newAtomicInteger(0);
privatestaticLoggerlogger=Logger.getLogger(ReceiveMessageListenerImpl.class);
@Override
publicvoidonMessage(Messagemessage){
try{
if(messageinstanceofObjectMessage){
ObjectMessageobjectMessage=(ObjectMessage)message;
if(objectMessage.getObject()instanceofSerializableObject){
SerializableObjectso=(SerializableObject)objectMessage.getObject();
logger.info(so.getObj());
}else{
logger.info(objectMessage);
}
}else{
System.out.println(message);
}
}catch(JMSExceptione){
logger.error(
"Error:receivemessagefromtopicfailure:"
+e.getMessage(),e);
}finally{
System.out.println(count.incrementAndGet());
}
}
}
3.4调用示例
packageorg.langke.common.hornetq;
importjava.io.File;
importjava.util.HashMap;
importjava.util.Map;
importorg.springframework.context.ApplicationContext;
importorg.springframework.context.support.FileSystemXmlApplicationContext;
publicclassTest{
privatestaticApplicationContextctx;
privatestaticTestinstance=newTest();
publicstaticTestgetInstance(){
returninstance;
}
privateTest(){
if(ctx==null){
Stringlocation=null;
if(System.getProperty("os.name").toLowerCase().contains("windows")){
location="conf/applicationContext.xml";
}else{
location="../conf/applicationContext.xml";
}
Filefile=newFile(location);
ctx=newFileSystemXmlApplicationContext(location);
}
}
/**
*@paramargs
*/
publicstaticvoidmain(String[]args){
getInstance();
MessageServiceservice=ctx.getBean("sendMessageService",MessageService.class);
for(inti=0;i<3000;i++){
Mapmap=newHashMap();
map.put("ooxx",i);
SerializableObjectso=newSerializableObject();
so.setObj(map);
service.sendMessage(so);
}
}
}
4.其它功能
4.1Messageexpire
HornetQwillnotdeliveramessagetoaconsumerafterit'stimeto
livehasbeenexceeded.
Ifthemessagehasn'tbeendeliveredbeforethetimetoliveis
reached,theservercandiscardit.
//messagewillexpirein5000msfromnow
message.setExpiration(System.currentTimeMillis()+5000);
Expiry-address
<!--expiredmessagesinexampleQueuewillbesenttotheexpiry
addressexpiryQueue-->
<address-settingmatch="jms.queue.exampleQueue">
<expiry-address>jms.queue.expiryQueue</expiry-address>
</address-setting>
4.2
Scheduledmessages
TextMessagemessage=session.createTextMessage("MSG");
message.setLongProperty("_HQ_SCHED_DELIVERY",System.currentTimeMillis()+5000);
producer.send(message);
...
//messagewillnotbereceivedimmediatelybut5secondslater
TextMessagemessageReceived=(TextMessage)consumer.receive();
4.3Messagegroup
Messagegroupsaresetsofmessagesthathavethefollowingcharacteristics:
•Messagesinamessagegroupsharethesamegroupid;thatis,theyhavethesamegroup
identifierproperty(JMSXGroupIDforJMS,_HQ_GROUP_IDforHornetQCoreAPI).
•Messagesinamessagegrouparealwaysconsumedbythesameconsumer,evenifthere
aremanyconsumersonaqueue.Theypinallmessageswiththesamegroupidtothesame
consumer.
Ifthatconsumerclosesanotherconsumerischosenandwillreceiveallmessageswiththe
samegroupid.
Basedonmessage
Messagemessage=...
message.setStringProperty("JMSXGroupID","Group-0");
producer.send(message);
message=...
message.setStringProperty("JMSXGroupID","Group-0");
producer.send(message);
Basedonconnectionfactory...
<connection-factoryname="ConnectionFactory">
<connectors>
<connector-refconnector-name="netty-connector"/>
</connectors>
<entries>
<entryname="ConnectionFactory"/>
</entries>
<group-id>Group-0</group-id>
</connection-factory>
相关文章推荐
- HornetQ集群配置及spring集成示例
- spring集成quartz的集群配置实例
- Activiti环境配置、项目搭建、与Spring集成、简单示例
- spring+redis集成,策略设计,灵活配置,支持单机和集群
- Spring集成Ehcache集群配置方式之RMI
- Log4j学习笔记(2)_log4j配置示例&Spring集成log4j
- Spring集成quartz集群配置总结
- Activiti环境配置、项目搭建、与Spring集成、简单示例
- log4j配置示例&Spring集成log4j
- 160530、memcached集群(spring集成的配置)
- Activiti环境配置、项目搭建、与Spring集成、简单示例
- Activiti环境配置、项目搭建、与Spring集成、简单示例
- quartz集成spring下的集群配置
- log4j配置示例&Spring集成log4j
- redis集群的Cluster方式配置以及spring的集成
- Quartz与spring集成及集群环境配置
- Log4j学习笔记(2)_log4j配置示例&Spring集成log4j
- Spring集成Mybatis配置映射文件方法详解
- Spring Boot Redis 集成配置(转)
- Spring中集成Hibernate连接MySQL数据库配置