您的位置:首页 > 编程语言 > Java开发

Activemq+spring的第一个程序(入门程序--内嵌Broker--消息队列)

2013-02-19 16:26 736 查看
转自:http://blog.sina.com.cn/s/blog_616e189f0100o00f.html

本次我将给出ActiveMQ和Spring结合使用的一个实例,在这个实例中创建了一个队列,服务端向队列中写入一条消息,客户端从队列中读取消息,并输出到控制台。为了简化问题,我们将服务端和客户端放在一个应用程序中,您完全可以将其拆分成两个程序。这种情况,相当于应用程序发送一条消息到ActiveMQ,然后ActiveMQ将消息反馈给应用程序。程序中使用的是内嵌的ActiveMQ,就是说您在使用的时候不必单独开启ActiveMQ,而是程序会在jvm中启动一个内嵌的ActiveMQ。

如果您对上述某些概念不是很清楚,请参考我的其他关于ActiveMQ的文章。您也可以直接参考ActiveMQ in Action的教程。(我前段时间对这部教材的关键部分进行了翻译,可直接参考我翻译的内容,翻译文档放在我博客的其他文章中)。

在编写应用之前,可以看到整个的文档结构如下:





注意这是一个java se工程,工程需要的包主要来自于spring-framework-3.0.3.RELEASE 和apache-activemq-5.4.2。其他的包,包括日志相关的和spring运行环境需要的可以很容易找到。另外xbean-spring-3.6.jar是ActiveMQ和spring结合必须使用的包,可以参考ActiveMQ官方文档:http://activemq.apache.org/maven/5.3.1/activemq-core/apidocs/overview-summary.html

db-derby-10.6.2.1-bin.zip主要用于ActiveMQ的数据存储,如果您无需持久化数据,将不会使用此包。

下面给出ActiveMQ的配置文件,即activemq.xml:

<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<!—定义了一个Broker,就是一个ActiveMQ的实例—>
<broker
xmlns="http://activemq.apache.org/schema/core"
useJmx="false">

<!—定义了activemq对数据的存储方式,即使用KahaDB进行存储,并指定了存储目录
-->
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="5"
dataDirectory="../data"/>
</persistenceFactory>

<!—定义了一个传输连接器,应用程序可以使用指定地址和端口与ActiveMQ进行链接,收发消息,这里使用的是TCP连接
-->
<transportConnectors>
<transportConnector
uri="tcp://localhost:61616"/>

</transportConnectors>

</broker>

</beans>

下面给出spring的配置文件,即SpringConfiguration.java:

package
com.guan.ActivemqTest;

import
org.apache.activemq.ActiveMQConnectionFactory;
import
org.apache.activemq.command.ActiveMQQueue;
import
org.apache.activemq.xbean.BrokerFactoryBean;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
org.springframework.core.io.ClassPathResource;
import
org.springframework.jms.connection.SingleConnectionFactory;
import
org.springframework.jms.core.JmsTemplate;

@Configuration
public
class
SpringConfiguration {

//注入一个连接工厂,这个连接工厂bean是在本类中定义的。
@Autowired
private
ActiveMQConnectionFactory
connectionFactory;

//定义activemq代理的一个实例,这里主要是为了启动一个嵌入式的jms代理(vm)
@Bean(name="brokerFactory")
public
BrokerFactoryBean brokerFactoryBean()
{
//创建代理时,指定代理分析他的配置文档,就是先前我们配置的那个文档
BrokerFactoryBean bfb =
new
BrokerFactoryBean(new
ClassPathResource("org/activemq/xbean/activemq.xml"));
//设置程序运行时开启代理
bfb.setStart(true);
return
bfb;
}

//创建一个连接工厂,用于程序连接到activemq代理。
@Bean(name="connectionFactory")
public
ActiveMQConnectionFactory connectionFactory()
{
//指定连接到本地vm的那个代理(注意这个vm就是brokerFactory中启动的那个代理)
ActiveMQConnectionFactory amqf =
new
ActiveMQConnectionFactory("vm://localhost");
return
amqf;
}
//创建一个消息队列
@Bean(name="queue")
public
ActiveMQQueue queue()
{
ActiveMQQueue q =
new
ActiveMQQueue();
q.setPhysicalName("com.guan.activemqTest");
return
q;
}
//创建服务端的jmsTemplate(一般情况下服务端和客户端的jmsTemplate可以设置相同,但为了效率因素,我们将其分开设置,服务端应尽量减少连接数量,所以使用singleConnectionFactory)。
@Bean(name="serviceJmsTemplate")
public
JmsTemplate jmsTemplate()
{
JmsTemplate jt =
new
JmsTemplate();
SingleConnectionFactory scf =
new
SingleConnectionFactory();
scf.setTargetConnectionFactory(connectionFactory);
jt.setConnectionFactory(scf);
return
jt;
}
//客户端的jmsTemplate
@Bean(name="customJmsTemplate")
public
JmsTemplate jmsTemplateCustom()
{
JmsTemplate jt =
new
JmsTemplate();
jt.setConnectionFactory(connectionFactory);
return
jt;
}
}

服务端的程序代码:
package
com.guan.ActivemqTest;

import
javax.jms.JMSException;
import
javax.jms.Message;
import
javax.jms.Session;

import
org.apache.activemq.command.ActiveMQQueue;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.beans.factory.annotation.Qualifier;
import
org.springframework.jms.core.JmsTemplate;
import
org.springframework.jms.core.MessageCreator;
import
org.springframework.stereotype.Component;

@Component("serviceSide")
public
class
JMSServiceSide {
@Autowired
@Qualifier("serviceJmsTemplate")
private
JmsTemplate template;

@Autowired
private
ActiveMQQueue queue;
//发送一条简短的消息
public
void
simpleSend()
{
template.send(queue,new
MessageCreator() {

@Override
public
Message createMessage(Session session) throws
JMSException {

return
session.createTextMessage("您好,关新全!");
}
});
}
}

接收客户端的代码:
package
com.guan.ActivemqTest;

import
javax.jms.JMSException;
import
javax.jms.TextMessage;

import
org.apache.activemq.command.ActiveMQQueue;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.beans.factory.annotation.Qualifier;
import
org.springframework.jms.core.JmsTemplate;
import
org.springframework.stereotype.Component;

@Component("clientSide")
public
class
JMSClientSide {
@Autowired
@Qualifier("customJmsTemplate")
private
JmsTemplate template;
@Autowired
private
ActiveMQQueue queue;

//接收并输出消息
public
void
simpleReceive()
throws
JMSException
{
TextMessage tm = (TextMessage)
template.receive(queue);
System.out.println(tm.getText());
}

}
测试代码:
package
com.guan.ActivemqTest;

import
javax.jms.JMSException;

import
org.junit.Test;
import
org.springframework.context.annotation.AnnotationConfigApplicationContext;

public
class
ActivemqTest {
@Test
public
void
startTest()
throws
JMSException
{
AnnotationConfigApplicationContext ctx =
new
AnnotationConfigApplicationContext();
ctx.scan("com.guan");
ctx.refresh();

//注意下面使用了两种方法获取bean
JMSServiceSide ss = (JMSServiceSide) ctx.getBean("serviceSide");
JMSClientSide cs = ctx.getBean(JMSClientSide.class);
//发送一条消息,然后接收一条消息

ss.simpleSend();
cs.simpleReceive();
}
}

输出结果:您好,关新全!

最后log4j的配置文档:
### direct log messages to stdout ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE}
%5p %c{1}:%L
- %m%n

### direct messages to file hibernate.log ###
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.File=hibernate.log
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n

### set log levels - for more verbose logging change 'info' to 'debug' ###

log4j.rootLogger=warn,
stdout

#log4j.logger.org.hibernate=info
#log4j.logger.org.hibernate=debug

### log HQL query parser activity
#log4j.logger.org.hibernate.hql.ast.AST=debug

### log just the SQL
#log4j.logger.org.hibernate.SQL=debug

### log JDBC bind parameters ###
#log4j.logger.org.hibernate.type=info
#log4j.logger.org.hibernate.type=debug

### log schema export/update ###
log4j.logger.org.hibernate.tool.hbm2ddl=debug

### log HQL parse trees
#log4j.logger.org.hibernate.hql=debug

### log cache activity ###
#log4j.logger.org.hibernate.cache=debug

### log transaction activity
#log4j.logger.org.hibernate.transaction=debug

### log JDBC resource acquisition
#log4j.logger.org.hibernate.jdbc=debug

### enable the following line if you want to track down connection ###
### leakages when using DriverManagerConnectionProvider ###
#log4j.logger.org.hibernate.connection.DriverManagerConnectionProvider=trace
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: