您的位置:首页 > 编程语言 > Java开发

《商城项目06-2》--ActiveMQ整合Spring实现Solr库中商品信息的实时同步

2018-11-18 17:57 369 查看

一, ActiveMQ整合Spring

1, 添加spring-jms.jar包(e3-manager-service)

<dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>

2, 添加配置文件applicationContext-activemq.xml

[code]<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopic" />
</bean>

</beans>

3, 消息发送的测试

运行 --> 查看结果(http://localhost:8161/admin/), 出现spring-queue对应的消息记录即ok

[code]    @Test
public void sendMessage() throws Exception {
//初始化spring容器
ApplicationContext applicationContext =
new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//从容器中获得JmsTemplate对象。
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//从容器中获得一个Destination对象。
Destination destination = (Destination) applicationContext.getBean("queueDestination");
//发送消息
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("send activemq message");
}
});
}

 

4, 消息接收的测试(e3-search-service)

4.1 导jar包

      同1

4.2 添加配置文件applicationContext-activemq.xml

比e3-manager-service中配置文件多了监听器相关配置

[code]<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>

<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopic" />
</bean>

<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="myMessageListener" class="cn.e3mall.search.message.MyMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>

4.3 创建消息监听类MessageListener 

[code]public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//取消息内容
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

4.4 初始化配置文件, 进行测试

运行等待  --> 运行e3-manager-service的sendMessage()测试方法 -->MessageListener能正确接收并打印发送的消息即ok

[code]public class MessageConsumer {
@Test
public void msgConsumer() throws Exception {
//初始化spring容器
ApplicationContext applicationContext =
new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//等待
System.in.read();
}
}

 

二, 实现Solr库中商品信息的实时同步

1, 新增e3-search-service的监听类ItemAddMessageListener

将监听到的商品id(参数message)对应的信息同步到Solr库

[code]/**
* 监听商品添加消息,
* 接收消息后,将对应的商品信息同步到索引库
*/
public class ItemAddMessageListener implements MessageListener {
@Autowired
private ItemMapper itemMapper;
@Autowired
private SolrServer solrServer;
@Override
public void onMessage(Message message) {
try {
//从消息中取商品id
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
Long itemId = new Long(text);
//等待事务(e3-manager-service中操作商品信息)提交
Thread.sleep(1000);
//根据商品id查询商品信息
SearchItem searchItem =
4000
itemMapper.getItemById(itemId);
//创建一个文档对象
SolrInputDocument document = new SolrInputDocument();
//向文档对象中添加域
document.addField("id", searchItem.getId());
document.addField("item_title", searchItem.getTitle());
document.addField("item_sell_point", searchItem.getSell_point());
document.addField("item_price", searchItem.getPrice());
document.addField("item_image", searchItem.getImage());
document.addField("item_category_name", searchItem.getCategory_name());
//把文档写入索引库
solrServer.add(document);
//提交
solrServer.commit();
} catch (Exception e) {
e.printStackTrace();
}
}

}

2, 修改配置文件applicationContext-activemq.xml, 加对应监听器相关配置

[code]<!-- 监听商品添加, 将数据同步到索引库 -->
<bean id="itemAddMessageListener" class="cn.e3mall.search.message.ItemAddMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="itemAddMessageListener" />
</bean>

3, 新增商品操作, 同时发送消息给activeMQ

e3-manager-service应用新增商品时,发送一个商品添加的消息(itemId )给activeMQ

[code]@Service
public class ItemServiceImpl implements ItemService{

@Autowired
private TbItemMapper itemMapper;
@Autowired
private TbItemDescMapper itemDescMapper;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination topicDestination;
@Autowired
private JedisClient jedisClient;

//添加商品信息
@Override
public E3Result addItem(TbItem item, String desc) {
// 1、生成商品id
final long itemId = IDUtils.genItemId();
// 2、补全TbItem对象的属性
item.setId(itemId);
//商品状态,1-正常,2-下架,3-删除
item.setStatus((byte) 1);
Date date = new Date();
item.setCreated(date);
item.setUpdated(date);
// 3、向商品表插入数据
itemMapper.insert(item);
// 4、创建一个TbItemDesc对象
TbItemDesc itemDesc = new TbItemDesc();
// 5、补全TbItemDesc的属性
itemDesc.setItemId(itemId);
itemDesc.setItemDesc(desc);
itemDesc.setCreated(date);
itemDesc.setUpdated(date);
// 6、向商品描述表插入数据
itemDescMapper.insert(itemDesc);

// 发送一个商品添加的消息-->activeMQ
// (传参数itemId过去-->cn.e3mall.search.message.ItemAddMessageListener)
jmsTemplate.send(topicDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(itemId + "");
return textMessage;
}
});

// 7、E3Result.ok()
return E3Result.ok();
}

}

4, 启动项目, 进行新增商品操作

查询Solr后台, 如果新增的商品信息同步到Solr库, 则ok

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: