基于ActiveMQ的Topic的数据同步——初步实现
2012-11-04 19:09
260 查看
一、背景介绍
公司自成立以来,一直以做项目为主,算是经累经验吧,自去年以来,我们部门准备将以前的项目做成产品,大概细分了几个小的产品,部们下面又分了几个团队,分别负责产品的研发,而我们属于平台团队,负责公用组件、开发平台的研发。前期各个项目组使用的技术、框架等都不一样,想把技术、框架统一起来比较困难,并且在早期项目研发的时,各自为战,没有形成合力,有些共性的东西,都是各自做自己的,现在转将项目做成产品时,首先就是要将共性的东西,抽取出来,做成组件,通过SOA架构,将组件的服务和能力暴露出来,提高组件的重用性,例如邮件服务,任务一个产品或者系统通过标准的接口,即可发送邮件,不需要重新编写邮件的代码,短信服务、权限服务等
由于几个项目之间有些数据是共有的,例如人员、组织,HR系统已经有人员、组织的功能,在做其它项目时,人员、组织也需要,例如4A平台,这就需要将人员、组织的数据同步,将来的目标,是由ESB同步,由于时间紧,暂时选择了ActiveMQ的方式,HR系统中的人员、组织的数据项很多,而其它系统需要的很少,可能只需要人员和组织的名称及其标识列,并且数据量不大,不会一次性发送上百个人员或者组织的信息,基于这个考虑,通过将人员、组织信息的数据放在消息内放到消息中件上,各个系统通过订阅的方式获取消息中的数据。
二、实现
1、安装ActiveMQ
到ActiveMQ的官方网站下载ActiveMQ,我下载的5.7.0版,解压,例如D盘,打开bin目录,执行acticemq.bat,启动ActiveMQ。我是基于Spring编写的,新建两个Java工程,将Spring和ActiveMQ的包导入工程中
2、发送接收的前提
为发送和接收方式,把将要发送的信息封装成对象,分别为用户和组织的对象,包括了用户和组织的信息,我们来看看这两个对象用户对象,BaseModel是一个基类,封装的用ID,创建人,创建时,最后更新人,最后更新时间,这个对象不再单独列出来
public class JmsFaUser extends BaseModel implements java.io.Serializable { // Fields /** * */ private static final long serialVersionUID = 1L; private Long id; private String userNo; private String userName; private String userType; private String identity; private String region; private String userStatus; private String officeEmail; private String employeeWorkNo; private Long orgId; private String description; private String attribute1; private String attribute2; private String attribute3; private Long OId; private String userSex; private String mobileTel; private String officeTel; private String selfEmail; // Constructors /** default constructor */ public JmsFaUser() { } public String getUserNo() { return this.userNo; } public void setUserNo(String userNo) { this.userNo = userNo; } public String getUserName() { return this.userName; } public void setUserName(String userName) { this.userName = userName; } public String getUserType() { return this.userType; } public void setUserType(String userType) { this.userType = userType; } public String getIdentity() { return this.identity; } public void setIdentity(String identity) { this.identity = identity; } public String getRegion() { return this.region; } public void setRegion(String region) { this.region = region; } public String getUserStatus() { return this.userStatus; } public void setUserStatus(String userStatus) { this.userStatus = userStatus; } public String getOfficeEmail() { return this.officeEmail; } public void setOfficeEmail(String officeEmail) { this.officeEmail = officeEmail; } public String getEmployeeWorkNo() { return this.employeeWorkNo; } public void setEmployeeWorkNo(String employeeWorkNo) { this.employeeWorkNo = employeeWorkNo; } public String getDescription() { return this.description; } public void setDescription(String description) { this.description = description; } public String getAttribute1() { return this.attribute1; } public void setAttribute1(String attribute1) { this.attribute1 = attribute1; } public String getAttribute2() { return this.attribute2; } public void setAttribute2(String attribute2) { this.attribute2 = attribute2; } public String getAttribute3() { return this.attribute3; } public void setAttribute3(String attribute3) { this.attribute3 = attribute3; } public String getUserSex() { return this.userSex; } public void setUserSex(String userSex) { this.userSex = userSex; } /** * @return the oId */ public Long getOId() { return OId; } /** * @param oId the oId to set */ public void setOId(Long oId) { OId = oId; } public String getSelfEmail() { return this.selfEmail; } public void setSelfEmail(String selfEmail) { this.selfEmail = selfEmail; } /** * @param orgId the orgId to set */ public void setOrgId(Long orgId) { this.orgId = orgId; } /** * @return the orgId */ public Long getOrgId() { return orgId; } /** * @param officeTel the officeTel to set */ public void setOfficeTel(String officeTel) { this.officeTel = officeTel; } /** * @return the officeTel */ public String getOfficeTel() { return officeTel; } /** * @param mobileTel the mobileTel to set */ public void setMobileTel(String mobileTel) { this.mobileTel = mobileTel; } /** * @return the mobileTel */ public String getMobileTel() { return mobileTel; } /** * @param id the id to set */ public void setId(Long id) { this.id = id; } /** * @return the id */ public Long getId() { return id; } }
组织对象
public class JmsOrganize extends BaseModel implements java.io.Serializable { // Fields /** * */ private static final long serialVersionUID = 1L; private Long id; private String orgName; //中文名 private String orgFullName; private String orgEngName;//英文名 private Long orgTypeNo; private String orgLevel; private Long parentOrgId; private String orgCode; private String orgDesc; private String isbranch; // Constructors /** default constructor */ public JmsOrganize() { } /** default constructor */ public JmsOrganize(Long id, String orgName) { this.setId(id); this.orgName = orgName; } public JmsOrganize(Long id, Long parendId) { this.setId(id); this.parentOrgId = parendId; } /** minimal constructor */ public JmsOrganize(String orgName) { this.orgName = orgName; } public String getOrgName() { return this.orgName; } public void setOrgName(String orgName) { this.orgName = orgName; } public String getOrgFullName() { return this.orgFullName; } public void setOrgFullName(String orgFullName) { this.orgFullName = orgFullName; } public Long getOrgTypeNo() { return this.orgTypeNo; } public void setOrgTypeNo(Long orgTypeNo) { this.orgTypeNo = orgTypeNo; } public String getOrgLevel() { return this.orgLevel; } public void setOrgLevel(String orgLevel) { this.orgLevel = orgLevel; } public String getOrgDesc() { return this.orgDesc; } public void setOrgDesc(String orgDesc) { this.orgDesc = orgDesc; } public String getIsbranch() { return this.isbranch; } public void setIsbranch(String isbranch) { this.isbranch = isbranch; } /** * @param parentOrgId the parentOrgId to set */ public void setParentOrgId(Long parentOrgId) { this.parentOrgId = parentOrgId; } /** * @return the parentOrgId */ public Long getParentOrgId() { return parentOrgId; } /** * @param orgCode the orgCode to set */ public void setOrgCode(String orgCode) { this.orgCode = orgCode; } /** * @return the orgCode */ public String getOrgCode() { return orgCode; } /** * @param orgEngName the orgEngName to set */ public void setOrgEngName(String orgEngName) { this.orgEngName = orgEngName; } /** * @return the orgEngName */ public String getOrgEngName() { return orgEngName; } /** * @param id the id to set */ public void setId(Long id) { this.id = id; } /** * @return the id */ public Long getId() { return id; } }
由于发送的是对象,所以提供一个转换器,Convertor,该类要继承Spring的MessageConvertor
public class FaJmsConverter implements MessageConverter { /* (non-Javadoc) * @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message) * @date 2012-10-22 * @user */ public Object fromMessage(Message message) throws JMSException, MessageConversionException{ System.out.println("fromMessage"); ObjectMessage objMessage = (ObjectMessage)message; String dataFlag = objMessage.getStringProperty("dataFlag"); if("FaUser".equals(dataFlag)){ JmsFaUser user = new JmsFaUser(); user.setId(objMessage.getLongProperty("userId")); user.setUserName(objMessage.getStringProperty("userName")); user.setOfficeEmail(objMessage.getStringProperty("officeEmail")); user.setSelfEmail(objMessage.getStringProperty("selfEmail")); user.setOfficeTel(objMessage.getStringProperty("officeTel")); user.setMobileTel(objMessage.getStringProperty("mobileTel")); String lastDate = objMessage.getStringProperty("lastUpdatedDate"); try { if(lastDate != null){ user.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate)); } } catch (Exception e) { e.printStackTrace(); } return user; }else if("Organize".equals(dataFlag)){ Long orgId = objMessage.getLongProperty("orgId"); String orgName = objMessage.getStringProperty("orgName"); JmsOrganize organize = new JmsOrganize(orgId, orgName); String lastDate = objMessage.getStringProperty("lastUpdatedDate"); try { organize.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate)); } catch (ParseException e) { e.printStackTrace(); } return organize; } return null; } /* (non-Javadoc) * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, javax.jms.Session) * @date 2012-10-22 * @user */ public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException { ObjectMessage objMsg=session.createObjectMessage(); if(obj instanceof JmsFaUser){ JmsFaUser user = (JmsFaUser)obj; Logger.getLogger().info("The user message's userId is " + user.getId()); objMsg.setStringProperty("dataFlag", "FaUser"); objMsg.setStringProperty("userName", user.getUserName()); objMsg.setLongProperty("userId", user.getId()); objMsg.setStringProperty("officeEmail", user.getOfficeEmail()); objMsg.setStringProperty("selfEmail", user.getSelfEmail()); if(user.getOfficeTel() != null){ objMsg.setStringProperty("officeTel", user.getOfficeTel()); }else{ objMsg.setLongProperty("officeTel", new Long(0)); } if(user.getMobileTel() != null){ objMsg.setStringProperty("mobileTel", user.getMobileTel()); }else{ objMsg.setLongProperty("mobileTel", new Long(0)); } if(user.getLastUpdatedDate() != null){ objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(user.getLastUpdatedDate())); } }else if(obj instanceof JmsOrganize){ JmsOrganize org = (JmsOrganize)obj; Logger.getLogger().info("The org message's userId is " + org.getId()); objMsg.setStringProperty("dataFlag", "Organize"); objMsg.setLongProperty("orgId", org.getId()); objMsg.setStringProperty("orgName", org.getOrgName()); if(org.getLastUpdatedDate() != null){ objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(org.getLastUpdatedDate())); } } return objMsg; } }
3、发送
Sprng为我们提供了JMSTemplate,基于这个发送消息,我们先来看看Spring配置文件<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <!-- 外部属性文件的定义 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:init.properties</value> </list> </property> </bean> <!-- 配置connectionFactory --> <bean id="jmsSenderFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.sendBrokerURL}"> </property> <property name="useAsyncSend" value="true"></property> <property name="userName" value="system"></property> <property name="password" value="yxtech"></property> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <bean id="jmsConverter" class="${jms.jmsSendConverterClass}" /> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsSenderFactory"></property> <property name="defaultDestinationName" value="${jms.sendDestinationName}" /> <!-- 区别它采用的模式为false是p2p为true是订阅 --> <property name="pubSubDomain" value="true" /> <property name="messageConverter" ref="jmsConverter"></property> </bean> <!-- 发送消息的目的地(一个队列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${jms.sendDestinationName}" /> </bean> <bean id="messageProducer" class="com.vis.faf.jms.MessageProducer"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> </beans>
根据上面的配置,只需要获得messageProducer这个Bean,便可以发送,下面我来看看MessageProducer这个类及其接口
public interface IJMSMessageProducer { public abstract void converAndSendObjectMessage(Object obj); }
实现类
public class MessageProducer implements IJMSMessageProducer { private JmsTemplate jmsTemplate; /** * @param jmsTemplate the jmsTemplate to set */ public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } /* (non-Javadoc) * @see com.vispractice.faf.jms.IJMSMessageProducer#converAndSendObjectMessage(java.lang.Object) * @date 2012-10-25 * @user */ public void converAndSendObjectMessage(Object obj){ jmsTemplate.convertAndSend(obj); Logger.getLogger().info("The message pub success, the Object is " + obj); } }
发送测试类
public class SendTest { /** * 2012-10-23 下午07:34:24 * * @author * @param args */ public static void main(String[] args) { ApplicationContext ac = new ClassPathXmlApplicationContext( "jms-sender.xml"); IJMSMessageProducer messageProducer = (IJMSMessageProducer) ac.getBean("messageProducer"); JmsFaUser user = new JmsFaUser(); user.setUserName("ssss"); user.setId(new Long(111)); messageProducer.converAndSendObjectMessage(user); } }
那发送到神马位置呢?在发送消息的Spring配置文件里面,有一个jms.sendBrokerURL,这个值是在init.properties文件中配置的,方便修改,我们来看一下关于发送消息时所以配置的参数信息
jms.sendBrokerURL=tcp\://localhost\:61616 jms.sendDestinationName=faJMS jms.jmsSendConverterClass=com.vispractice.fa.jms.FaJmsConverter
第一个指地址,因为ActiveMQ部署的是我本机,所以使用localhost,端口号在部署的时候,就是默认的
第二个JMS的名称,这个可以自取
第三个是指定转换换,这个方便在以后的扩展中,我要更新这个转换类,具体关于Convertor可以自查信息
如果发送,可以通过localhost:8161的控制台,看到你所发送的消息,这个地址是ActiveMQ的Web控制台,接下为我们看看接收
4、接收
MQ给我提供一种方式,当接收到消息的时候,自动去执行我们业务代码,<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <description>jms receiver configuration</description> <!-- 配置connectionFactory --> <bean id="jmsReceiverFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.receiveBrokerURL}" /> <!-- <property name="userName" value="reader"></property> --> <!-- <property name="password" value="readeryxtech"></property> --> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${jms.receiveDestinationName}" /> </bean> <bean id="jmsConverter" class="${jms.jmsReceiveConverterClass}" /> <!--异步调用消息 --> <bean id="receive" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsReceiverFactory"></property> <property name="destinationName" value="${jms.receiveDestinationName}"></property> <property name="messageListener" ref="messageListener"></property> </bean> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="${jms.onMessageBeanName}"></property> <property name="defaultListenerMethod" value="onMessage"></property> <property name="messageConverter" ref="jmsConverter" /> </bean> </beans>
先看一下messageListener,这个监听,是自己写的,其中delegate是当你接收消息之后所执行的业务代码的bean,我这里将这个bean做成配置,方便修改,而defaultListenerMethod是指定执行的方法,这里设置定了,执行omMessage方法,也就是说类你可以指定,但是onMessage方法必须要有,而convertor也是可配的,与发送的convertor是一样的,这个转换器发送与接收都必须要使用
注意:这里为神马要将执行的类的bean做成可配呢?因为我在做这一块的工作时,我不知道是由那个bean,但是类中的方法我可设置,这样做的目地就是在消息的消费者不在去关心这个消息,而只需要写一个类,里面有onMessage方法,在这个方法里做自己的业务逻辑即可,将其关注点放到业务处理上,那肿么样来设计这个方法的名字呢,我们可以设计了一个接口,如果消费者想消费这个消息,就必须实现这上接口,下面我们来看看这个接口
public interface FaJmsReceiveListener{ public void onMessage(Object baseModel); }
这个接口中只有一个方法就是onMessage,而实现类则交由具体的消费者,因为消费可能在实现的时候可能引用别的bean,进而处理别的业务,例如入库,所以我们在做接收消息的配置上,只设计处理消息的bean的名字,不设置具体的class,只要消费者配置了这个bean的名字即可,当然这个名字也是可配的,配置在了init.propertis中,我们来看看这个接口的实现类
public class JmsBusinessProcess implements FaJmsReceiveListener{ /* (non-Javadoc) * @see com.vispractice.fa.jms.FaJmsReceiveListener#onMessage(com.vispractice.faf.base.model.BaseModel) * @date 2012-10-24 * @user */ @Override public void onMessage(Object baseModel) { if(baseModel instanceof JmsFaUser){ JmsFaUser user = (JmsFaUser)baseModel; System.out.println(user.getId()); }else if (baseModel instanceof JmsOrganize){ JmsOrganize org = (JmsOrganize)baseModel; System.out.println(org.getId()); } } }
上述的代码可以共用,当发送的类变更时,只需要编写Convertor,并在init.properties中配置上即可,而消息的消费者只需要实现该接口即可
测试,测试的Spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd" default-autowire="byName"> <description>jms receiver configuration</description> <!-- 外部属性文件的定义 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:init.properties</value> </list> </property> </bean> <bean id="delegate" class="Test.JmsBusinessProcess"></bean> <import resource="jms-receiver.xml" /> </beans>
测试类
public class ReceiveTest { public static void main(String[] args) { ApplicationContext a2c = new ClassPathXmlApplicationContext("fa-jms.xml"); System.out.println("receiver"); } }
接下来,看看接收的init.properties
jms.receiveBrokerURL=tcp\://localhost\:61616 jms.receiveDestinationName=faJMS jms.jmsReceiveConverterClass=com.vispractice.fa.jms.FaJmsConverter jms.onMessageBeanName=delegate
第一个、第二个、第三个参数不再详细述说,第四就是执行业务逻辑的bean的名字,这个参数可以设定死也可以设定灵活
上述只是初步的能发送和接收消息,后续是考虑安全、性能的问题。
相关文章推荐
- 基于ActiveMQ的Topic的数据同步——初步实现
- 基于ActiveMQ的Topic的数据同步——消费者持久化
- 基于ActiveMQ的Topic的数据同步——消费者持久化
- 基于 WebSocket 实现 WebGL 3D 拓扑图实时数据通讯同步(二)
- sersync:基于 rsync + inotify 实现数据实时同步
- spring整合JMS实现同步收发消息(基于ActiveMQ的实现)
- 基于 WebSocket 实现 WebGL 3D 拓扑图实时数据通讯同步(二)
- sersync:基于 rsync + inotify 实现数据实时同步
- 基于rsync+inotify实现数据实时同步传输
- spring整合JMS一同步收发消息(基于ActiveMQ的实现)
- 基于 WebSocket 实现 WebGL 3D 拓扑图实时数据通讯同步(二)
- 基于 WebSocket 实现 WebGL 3D 拓扑图实时数据通讯同步(一)
- 基于netcore实现mongodb和ElasticSearch之间的数据实时同步的工具(Mongo2Es)
- sersync:基于 rsync + inotify 实现数据实时同步
- 基于 WebSocket 实现 WebGL 3D 拓扑图实时数据通讯同步(一)
- 基于rsync+inotify实现数据实时同步传输
- 基于netcore实现mongodb和ElasticSearch之间的数据实时同步的工具(Mongo2Es)
- 基于Webservice API 的数据同步的reset的实现
- [置顶] 基于 WebSocket 实现 WebGL 3D 拓扑图实时数据通讯同步(一)
- 基于rsync和inotify实现web网站文件的同步,并基于IPTABLES做用户限制 推荐