您的位置:首页 > 编程语言 > Java开发

HornetQ集群配置及spring集成示例

2012-08-10 11:14 295 查看
HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。还支持RESTfulAPI、STOMP(Stomp的客户端可以用多种编程语言来实现

)、AMQP(HornetQwillshortlybeimplementingAMQP)。

HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!

HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。

HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。

HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。

HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。

用途:松散地联系各系统,不用受其它服务器的制约,有效的减少线程Block的时间.不同于RPC,采用的Request/Reponse的方式.

hornetq支持内容Body
Stream--StreamMessage包含顺序读取值的流
Text--TextMessage)
Map--MapMessage(key/value))
Object--ObjectMessageSupportSerializable序列化的对象.
Bytes--BytesMessage字节信息(如存放图像)

下载:wgethttp://downloads.jboss.org/hornetq/hornetq-2.2.14.Final.zip
yuminstalllibaio

中文文档:http://www.jboss.org/hornetq/chinesedocs.html

1.单机配置:

1.1编写启动脚本:start.sh

IP=`/sbin/ipa|grep'inet'|awk-F'/''{print$1}'|awk'{print$2}'|grep-v127.0.0.1|head-1`
exportCLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP-Djnp.host=$IP"
echo$CLUSTER_PROPS
shrun.sh&


1.2或者修改配置文件

以下两个文件把localhost替换为本机IP

config/stand-alone/non-clustered/hornetq-configuration.xml

config/stand-alone/non-clustered/hornetq-beans.xml

bindAddress">${jnp.host:192.168.100.241}

rmiBindAddress">${jnp.host:192.168.100.241}

${hornetq.remoting.netty.host:192.168.100.241}

....

1.3客户端需要的包

hornetq-core-client.jar

netty.jar

hornetq-jms-client.jar

jboss-jms-api.jar

jnp-client.jar

1.4配置一个队列,添加配置onfig/stand-alone/non-clustered/hornetq-jms.xml

<queuename="OrderQueue">

<entryname="queues/OrderQueue"/>

</queue>

配置一个主题

<topicname="topic1">

<entryname="/my/Topic1"/>

</topic>

hornetq-configuration.xml

在<configuration>节点下增加

<security-enabled>false</security-enabled>

1.5收发消息demo

publicvoidsendToQueue(StringdestinationName,Serializablepayload)throwsException{
InitialContextic=newInitialContext();
ConnectionFactorycf=(ConnectionFactory)ic.lookup("/ConnectionFactory");
Queuequeue=(Queue)ic.lookup(destinationName);
Connectionconnection=cf.createConnection();
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducerpublisher=session.createProducer(queue);
connection.start();
ObjectMessagemessage=session.createObjectMessage(payload);
message.setObject(payload);
publisher.send(message);
if(connection!=null){
connection.close();
}
}
@TransactionAttribute(value=TransactionAttributeType.REQUIRED)
publicvoidonMessage(Messagemessage){
ObjectMessageobj=(ObjectMessage)message;
try{
Serializableser=obj.getObject();
log.info("[NotificationInbound]onMessage!");
}
catch(Exceptione){
log.error("[NotificationInbound]ERROR["+e.getMessage()+"]!!!****");
thrownewIllegalStateException();
}
}


2集群配置
2.1单机集群启动脚本
start-cluster0.bat

setCLUSTER_PROPS=-Ddata.dir=../data-server2-Djnp.port=2099-Djnp.rmiPort=2098-Dhornetq.remoting.netty.port=6445
run../config/stand-alone/clustered


start-cluster1.bat


setCLUSTER_PROPS=-Ddata.dir=../data-server3-Djnp.port=3099-Djnp.rmiPort=3098-Dhornetq.remoting.netty.port=7445
run../config/stand-alone/clustered


2.2集群节点启动脚本
start-node.sh

IP=`/sbin/ipa|grep'inet'|awk-F'/''{print$1}'|awk'{print$2}'|grep-v127.0.0.1|head-1`
exportCLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP-Djnp.host=$IP"
echo$CLUSTER_PROPS
shrun.sh../config/stand-alone/clustered


2.2.1集群节点停止脚本
stop-node.sh

shstop.sh../config/stand-alone/clustered


2.3.集群配置说明

2.3.1集群发现使用udp协议进行组播

hornetq-configuration.xml

<discovery-groups>
<discovery-groupname="my-discovery-group">
<local-bind-address>172.16.9.7</local-bind-address>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<connection-factoryname="ConnectionFactory">
<discovery-group-refdiscovery-group-name="my-discovery-group"/>
<entries>
<entryname="/ConnectionFactory"/>
</entries>
</connection-factory>


2.3.2客户端连接代码:

finalStringgroupAddress="231.7.7.7";
finalintgroupPort=9876;
ConnectionFactoryjmsConnectionFactory=
HornetQJMSClient.createConnectionFactory(groupAddress,groupPort);
ConnectionjmsConnection1=jmsConnectionFactory.createConnection();
ConnectionjmsConnection2=jmsConnectionFactory.createConnection();


2.3.3ServerSideloadbalancing
hornetq-configuration.xml

<cluster-connections>
<cluster-connectionname="my-cluster">
<address>jms</address>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<discovery-group-refdiscovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>


2.3.4ClientSideloadbalancing
hornetq-jms.xml

<connection-factoryname="ConnectionFactory">
<discovery-group-refdiscovery-group-name="my-discovery-group"/>
<entries>
<entryname="/ConnectionFactory"/>
</entries>
<ha>true</ha>
<connection-load-balancing-policy-class-name>
org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
</connection-load-balancing-policy-class-name>
</connection-factory>


3.与spring集成示例
3.1spring配置

<?xmlversion="1.0"encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:util="http://www.springframework.org/schema/util"
xmlns:jee="http://www.springframework.org/schema/jee"xmlns:lang="http://www.springframework.org/schema/lang"
xmlns:jms="http://www.springframework.org/schema/jms"xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/utilhttp://www.springframework.org/schema/util/spring-util-3.0.xsdhttp://www.springframework.org/schema/jeehttp://www.springframework.org/schema/jee/spring-jee-3.0.xsdhttp://www.springframework.org/schema/langhttp://www.springframework.org/schema/lang/spring-lang-3.0.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms-3.0.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-3.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.0.xsd">

<beanid="messageTopic"class="org.hornetq.api.jms.HornetQJMSClient"factory-method="createTopic">
<constructor-argvalue="topic1"/>
</bean>

<beanid="searchAddMessageQueue"class="org.hornetq.api.jms.HornetQJMSClient"factory-method="createQueue">
<constructor-argvalue="ExpiryQueue"></constructor-arg>
</bean>
<!--
<beanid="transportConfiguration"class="org.hornetq.api.core.TransportConfiguration">
<constructor-argvalue="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"/>
<constructor-arg>
<mapkey-type="java.lang.String"value-type="java.lang.Object">
<entrykey="host"value="localhost"></entry>
<entrykey="port"value="5445"></entry>
</map>
</constructor-arg>
</bean>

-->
<beanid="transportConfiguration"class="org.hornetq.api.core.DiscoveryGroupConfiguration">
<constructor-argname="groupAddress"value="231.7.7.7"/>
<constructor-argname="groupPort"value="9876">
</constructor-arg>
</bean>

<beanid="connectionFactory"class="org.hornetq.api.jms.HornetQJMSClient"factory-method="createConnectionFactoryWithHA"destroy-method="close">
<constructor-argtype="org.hornetq.api.jms.JMSFactoryType"value="CF"/>
<constructor-argref="transportConfiguration"/>
</bean>

<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory"ref="connectionFactory"/>
<propertyname="pubSubDomain"value="true"/>
</bean>

<beanid="topicService"class="org.langke.hornetq.ClientServiceImpl">
<propertyname="jmsTemplate"ref="jmsTemplate"/>
<propertyname="topic"ref="messageTopic"/>
</bean>

<beanid="sendMessageService"class="org.langke.hornetq.SendMessageServiceImpl">
<propertyname="jmsTemplate"ref="jmsTemplate"></property>
<propertyname="searchAddMessageQueue"ref="searchAddMessageQueue"></property>
</bean>

<!--thisistheMessageDrivenPOJO(MDP)
<beanid="messageListener"class="org.langke.hornetq.MessageListenerImpl">
</bean>
-->
<beanid="receiveMessageListener"class="org.langke.hornetq.ReceiveMessageListenerImpl"></bean>

<!--andthisisthemessagelistenercontainer-->
<beanid="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="connectionFactory"/>
<!--<propertyname="destination"ref="messageTopic"/>-->
<propertyname="destination"ref="searchAddMessageQueue"></property>
<propertyname="messageListener"ref="receiveMessageListener"/>
</bean>

</beans>



packageorg.langke.common.hornetq;

publicinterfaceMessageService{
publicbooleansendMessage(SerializableObjectmessage);
}


3.2发送消息

packageorg.langke.common.hornetq;

importjava.io.Serializable;

publicclassSerializableObjectimplementsSerializable{
/**
*
*/
privatestaticfinallongserialVersionUID=1L;
privateObjectobj;
privateBooleanisRetry=true;

publicObjectgetObj(){
returnobj;
}

publicvoidsetObj(Objectobj){
this.obj=obj;
}

publicBooleangetIsRetry(){
returnisRetry;
}

publicvoidsetIsRetry(BooleanisRetry){
this.isRetry=isRetry;
}

}


packageorg.langke.common.hornetq;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.ObjectMessage;
importjavax.jms.Queue;
importjavax.jms.Session;

importorg.apache.log4j.Logger;
importorg.springframework.jms.core.JmsTemplate;
importorg.springframework.jms.core.MessageCreator;

publicclassSendMessageServiceImplimplementsMessageService{
privatestaticfinalLoggerlogger=Logger.getLogger(SendMessageServiceImpl.class);
privateJmsTemplatejmsTemplate;
privateQueuesearchAddMessageQueue;
@Override
publicbooleansendMessage(SerializableObjectmessage){
returnsendQueue(message);
}

privatebooleansendQueue(finalSerializableObjectso){
try{
logger.info("starttosendqueueto"+searchAddMessageQueue.getQueueName()+",message:"+so);
jmsTemplate.send(searchAddMessageQueue,newMessageCreator(){
@Override
publicMessagecreateMessage(Sessionsession)
throwsJMSException{
ObjectMessageom=session.createObjectMessage(so);
returnom;
}
});
returntrue;
}catch(Exceptione){
logger.error("Error:sendtopicfailure:"+e.getMessage(),e);
returnfalse;
}
}

publicJmsTemplategetJmsTemplate(){
returnjmsTemplate;
}
publicvoidsetJmsTemplate(JmsTemplatejmsTemplate){
this.jmsTemplate=jmsTemplate;
}

publicQueuegetSearchAddMessageQueue(){
returnsearchAddMessageQueue;
}

publicvoidsetSearchAddMessageQueue(QueuesearchAddMessageQueue){
this.searchAddMessageQueue=searchAddMessageQueue;
}

}


3.3接收消息

packageorg.langke.common.hornetq;

importjava.util.concurrent.atomic.AtomicInteger;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.ObjectMessage;

importorg.apache.log4j.Logger;

publicclassReceiveMessageListenerImplimplementsMessageListener{
privateAtomicIntegercount=newAtomicInteger(0);
privatestaticLoggerlogger=Logger.getLogger(ReceiveMessageListenerImpl.class);
@Override
publicvoidonMessage(Messagemessage){
try{
if(messageinstanceofObjectMessage){
ObjectMessageobjectMessage=(ObjectMessage)message;
if(objectMessage.getObject()instanceofSerializableObject){
SerializableObjectso=(SerializableObject)objectMessage.getObject();
logger.info(so.getObj());
}else{
logger.info(objectMessage);
}
}else{

System.out.println(message);
}
}catch(JMSExceptione){
logger.error(
"Error:receivemessagefromtopicfailure:"
+e.getMessage(),e);
}finally{

System.out.println(count.incrementAndGet());
}
}

}


3.4调用示例

packageorg.langke.common.hornetq;

importjava.io.File;
importjava.util.HashMap;
importjava.util.Map;

importorg.springframework.context.ApplicationContext;
importorg.springframework.context.support.FileSystemXmlApplicationContext;

publicclassTest{

privatestaticApplicationContextctx;
privatestaticTestinstance=newTest();

publicstaticTestgetInstance(){
returninstance;
}

privateTest(){
if(ctx==null){
Stringlocation=null;
if(System.getProperty("os.name").toLowerCase().contains("windows")){
location="conf/applicationContext.xml";
}else{
location="../conf/applicationContext.xml";
}
Filefile=newFile(location);
ctx=newFileSystemXmlApplicationContext(location);
}
}

/**
*@paramargs
*/
publicstaticvoidmain(String[]args){

getInstance();
MessageServiceservice=ctx.getBean("sendMessageService",MessageService.class);

for(inti=0;i<3000;i++){
Mapmap=newHashMap();
map.put("ooxx",i);
SerializableObjectso=newSerializableObject();
so.setObj(map);
service.sendMessage(so);
}

}
}


4.其它功能

4.1Messageexpire
HornetQwillnotdeliveramessagetoaconsumerafterit'stimeto
livehasbeenexceeded.
Ifthemessagehasn'tbeendeliveredbeforethetimetoliveis
reached,theservercandiscardit.
//messagewillexpirein5000msfromnow
message.setExpiration(System.currentTimeMillis()+5000);
Expiry-address
<!--expiredmessagesinexampleQueuewillbesenttotheexpiry
addressexpiryQueue-->
<address-settingmatch="jms.queue.exampleQueue">
<expiry-address>jms.queue.expiryQueue</expiry-address>
</address-setting>

4.2
Scheduledmessages
TextMessagemessage=session.createTextMessage("MSG");
message.setLongProperty("_HQ_SCHED_DELIVERY",System.currentTimeMillis()+5000);
producer.send(message);
...
//messagewillnotbereceivedimmediatelybut5secondslater
TextMessagemessageReceived=(TextMessage)consumer.receive();

4.3Messagegroup
Messagegroupsaresetsofmessagesthathavethefollowingcharacteristics:
•Messagesinamessagegroupsharethesamegroupid;thatis,theyhavethesamegroup
identifierproperty(JMSXGroupIDforJMS,_HQ_GROUP_IDforHornetQCoreAPI).
•Messagesinamessagegrouparealwaysconsumedbythesameconsumer,evenifthere
aremanyconsumersonaqueue.Theypinallmessageswiththesamegroupidtothesame
consumer.
Ifthatconsumerclosesanotherconsumerischosenandwillreceiveallmessageswiththe
samegroupid.

Basedonmessage
Messagemessage=...
message.setStringProperty("JMSXGroupID","Group-0");
producer.send(message);
message=...
message.setStringProperty("JMSXGroupID","Group-0");
producer.send(message);
Basedonconnectionfactory...
<connection-factoryname="ConnectionFactory">
<connectors>
<connector-refconnector-name="netty-connector"/>
</connectors>
<entries>
<entryname="ConnectionFactory"/>
</entries>
<group-id>Group-0</group-id>
</connection-factory>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: