您的位置:首页 > 其它

MQ学习之实现商品的数据同步

2016-07-23 22:13 381 查看
一、后台系统

1、导入依赖

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>

2、编写配置文件applicationContext-rabbitmq

<pre name="code" class="html"><beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> 
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.vhost}" />

<!-- 定义交换机 -->
<rabbit:topic-exchange name="taotao-item-exchange" auto-declare="true" durable="true">
<!-- 选择采用手动绑定队列 -->
</rabbit:topic-exchange>

<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- 定义模板 -->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="taotao-item-exchange"/>

</beans>




3、service

package com.taotao.manage.service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.abel533.entity.Example;
import com.github.pagehelper.PageInfo;
import com.taotao.common.service.ApiService;
import com.taotao.manage.pojo.Item;
import com.taotao.manage.pojo.ItemDesc;
import com.taotao.manage.pojo.ItemParamItem;

@Service
public class ItemService extends BaseService<Item> {

// 注意:事务的转播性
@Autowired
private ItemDescService itemDescService;

@Autowired
private ItemParamItemService itemParamItemService;

@Value("${TAOTAO_WEB_URL}")
private String TAOTAO_WEB_URL;

@Autowired
private ApiService apiService;

@Autowired
private RabbitTemplate rabbitTemplate;

private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* 新增商品
*
* @param item
* @param desc
*/
public void saveItem(Item item, String desc, String itemParams) {

item.setStatus(1);// 初始状态
item.setId(null);// 强制id为null,考虑到安全性

// 新增商品
super.save(item);

// 新增商品描述数据
ItemDesc itemDesc = new ItemDesc();
itemDesc.setItemDesc(desc);
itemDesc.setItemId(item.getId());
this.itemDescService.save(itemDesc);

if (StringUtils.isNotEmpty(itemParams)) {// 不为空时创建数据
ItemParamItem itemParamItem = new ItemParamItem();
itemParamItem.setItemId(item.getId());
itemParamItem.setParamData(itemParams);
this.itemParamItemService.save(itemParamItem);
}

//发送商品新增的消息到RabbitMQ
sendMsg(item.getId(), "insert");
}

public PageInfo<Item> queryItemList(Integer page, Integer rows) {
Example example = new Example(Item.class);
example.setOrderByClause("updated DESC");
example.createCriteria().andNotEqualTo("status", 3);
return super.queryPageListByExample(example, page, rows);
}

/**
* 实现商品的逻辑删除
*
* @param ids
*/
public void updateByIds(List<Object> ids) {
Example example = new Example(Item.class);
example.createCriteria().andIn("id", ids);
Item item = new Item();
item.setStatus(3);// 更改状态为3,说明该商品已经被删除
super.getMapper().updateByExampleSelective(item, example);

for (Object object : ids) {
//发送商品删除的消息到RabbitMQ
sendMsg(Long.valueOf(object.toString()), "delete");
}
}

public void updateItem(Item item, String desc, ItemParamItem itemParamItem) {
// 强制设置不能被更新的字段为null
item.setStatus(null);
item.setCreated(null);
// 更新商品数据
super.updateSelective(item);

// 更新商品描述数据
ItemDesc itemDesc = new ItemDesc();
itemDesc.setItemId(item.getId());
itemDesc.setItemDesc(desc);
this.itemDescService.updateSelective(itemDesc);

if (null != itemParamItem) {
// 更新规格参数
this.itemParamItemService.updateSelective(itemParamItem);
}

// try {
// // 通知其他系统商品已经更新
// String url = TAOTAO_WEB_URL + "/item/cache/" + item.getId() + ".html";
// this.apiService.doPost(url);
// } catch (Exception e) {
// e.printStackTrace();
// }

//发送商品更新的消息到RabbitMQ
sendMsg(item.getId(), "update");
}

private void sendMsg(Long itemId,String type){
try {
Map<String, Object> msg = new HashMap<String, Object>();
msg.put("itemId", itemId);
msg.put("type", type);
msg.put("created", System.currentTimeMillis());
this.rabbitTemplate.convertAndSend("item." + type, MAPPER.writeValueAsString(msg));
} catch (Exception e) {
e.printStackTrace();
}
}

}


4、rabbitmq.properties
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=taotao
rabbitmq.password=taotao
rabbitmq.vhost=/taotao二、前台系统接收消息
1、导入同样的依赖

2、配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.vhost}" />

<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- 定义队列 -->
<rabbit:queue name="taotao-web-item" auto-declare="true" durable="true"/>

<!-- 设置监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="itemMQHandler" method="execute" queue-names="taotao-web-item"/>
</rabbit:listener-container>

</beans>


3、rabbitmq.properties
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=taotao
rabbitmq.password=taotao
rabbitmq.vhost=/taotao4、ItemMQHandler
package com.taotao.web.mq.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.bea
4000
ns.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.taotao.common.service.RedisService;
import com.taotao.web.service.ItemService;

@Component
public class ItemMQHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ItemMQHandler.class);

private static final ObjectMapper MAPPER = new ObjectMapper();

@Autowired
private RedisService redisService;

public void execute(String msg) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("接收到消息,MSG = {}", msg);
}
try {
JsonNode jsonNode = MAPPER.readTree(msg);
Long itemId = jsonNode.get("itemId").asLong();
this.redisService.del(ItemService.REDIS_ITEM_KEY + itemId);
} catch (Exception e) {
LOGGER.error("处理消息出错! MSG = " + msg, e);
}
}

}
5、



三、搜索系统接收消息
1、导入同样的依赖

2、配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
virtual-host="${rabbitmq.vhost}" />

<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- 定义队列 -->
<rabbit:queue name="taotao-search-item" auto-declare="true" durable="true"/>

<!-- 设置监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="itemMQHandler" method="execute" queue-names="taotao-search-item"/>
</rabbit:listener-container>

</beans>


3、同样的rabbitmq.properties文件
4、ItemMQHandler

package com.taotao.search.mq.handler;

import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.taotao.search.bean.Item;
import com.taotao.search.service.ItemService;

@Component
public class ItemMQHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ItemMQHandler.class);

private static final ObjectMapper MAPPER = new ObjectMapper();

@Autowired
private HttpSolrServer httpSolrServer;

@Autowired
private ItemService itemService;

public void execute(String msg) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("接收到消息,MSG = {}", msg);
}
try {
JsonNode jsonNode = MAPPER.readTree(msg);
Long itemId = jsonNode.get("itemId").asLong();
String type = jsonNode.get("type").asText();
if (StringUtils.equals(type, "insert") || StringUtils.equals(type, "update")) {
// 查询商品的数据
Item item = this.itemService.queryItemById(itemId);
this.httpSolrServer.addBean(item);

} else if (StringUtils.equals(type, "delete")) {
this.httpSolrServer.deleteById(String.valueOf(itemId));
}
// 提交
this.httpSolrServer.commit();
} catch (Exception e) {
LOGGER.error("处理消息出错! MSG = " + msg, e);
}
}

}
5、ItemService
package com.taotao.search.service;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.taotao.search.bean.Item;

@Service
public class ItemService {

@Autowired
private ApiService apiService;

@Value("${MANAGE_TAOTAO_URL}")
private String MANAGE_TAOTAO_URL;

private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* 根据商品id查询商品数据
*
* @param itemId
* @return
*/
public Item queryItemById(Long itemId) {
String url = MANAGE_TAOTAO_URL + "/rest/item/" + itemId;
try {
String jsonData = this.apiService.doGet(url);
if (StringUtils.isEmpty(jsonData)) {
return null;
}
return MAPPER.readValue(jsonData, Item.class);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

}
6、rabbitmq.properties文件加上MANAGE_TAOTAO_URL=http://manage.taotao.com
7、ApiService、applicationContext-httpclient.xml、httpclient.properties复制过来,前面笔记有。

8、绑定

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  MQ