您的位置:首页 > 编程语言 > Java开发

(转)使用Spring-MDP实现分布式作业

2012-01-18 13:31 393 查看
自:/article/11240088.html

JMS的作用就不用赘述,它已经是很老掉牙的技术。习惯于单机模式下或中央处理的用户一定没有体会那么深刻。

简言之,JMS使得多个JVM并行工作成为可能,Spring-MDP(Message-Driven Pojo)是Spring framework提供的消息驱动包,借助于他我们可以很方便地实现JMS的功能。

我现在提供一个场景:一个系统包含客户调用端和服务提供端,客户端非常的Thin,只有接口而并不知道具体的实现类,服务端则包括实现类所需要的各种资源。

@MdpService(serviceName="ProxyService")

public interface IProxyService {

public String hello();

public String sayBye();

}

这里包含一个Annotation:MdpService,它的作用是告诉服务端,Client调用的是那个服务(serviceName唯一)

@Retention(RetentionPolicy.RUNTIME)

public @interface MdpService {

String serviceName() default "";

Class<?> serviceClass() default Void.class;

}

测试类main方法:

public static void main(String[] args){

String[] configuration = {"classpath*:spring-context-test.xml", "classpath:jmsconfig.xml"};

ApplicationContext context = new ClassPathXmlApplicationContext(configuration);

IProxyService proxyService = (IProxyService)context.getBean("proxyFactory");

System.out.println("hello-----------"+proxyService.hello());

System.out.println("sayBye----------"+proxyService.sayBye());

}

客户端需要用到的Spring技术出了Jms外还有:FactoryBean(代理工厂), MethodInterceptor(方法拦截)。所以,这里需要实现一个代理工厂类:

public class ProxyFactoryBean implements FactoryBean, MethodInterceptor{

private Class<Object> serviceInterface;

private MessageProcessor messageProcessor;

@Override

public Object getObject() throws Exception {

//调用this.invoke()

return new ProxyFactory(serviceInterface, this).getProxy();

}

@Override

public Class<Object> getObjectType() {

return serviceInterface;

}

@Override

public boolean isSingleton() {

return true;

}

//override MethodInterceptor

@Override

public Object invoke(MethodInvocation invocation) throws Throwable {

if (AopUtils.isToStringMethod(invocation.getMethod())) {

return "JMS invoker proxy for queue";

}

//远程调用结果,通过MdpService.serviceName找实现

return this.sendRequestMessage(invocation);

}

private Object sendRequestMessage(MethodInvocation invocation){

String serviceName = invocation.getMethod().getDeclaringClass()

.getAnnotation(MdpService.class).serviceName();

LinkedHashMap<String,Object> props = new LinkedHashMap<String,Object>();

props.put("ServiceName", serviceName);

String correlationId = UUID.randomUUID().toString();

props.put("correlationID", correlationId);

RemoteInvocation remote = new RemoteInvocation(invocation);

String content = JsonUtil.toJsonString(remote); //new String(SerializableUtil.serializeObject(remote));

try {

messageProcessor.sendMessage(content, props);

} catch (JMSException e) {

e.printStackTrace();

}

return processResponse(correlationId);

}

private Object processResponse(String correlationId){

boolean synch = true;

if (synch){

try{

Thread.sleep(500);

}catch(Exception e){

e.printStackTrace();

}

return messageProcessor.getInvokeResult(correlationId);

}else{

return null;

}

}

public void setServiceInterface(Class clazz){

this.serviceInterface = clazz;

}

public void setMessageProcessor(MessageProcessor processor){

this.messageProcessor = processor;

}

Server端类:需要托管Spring容器类、远程调用的接口(RemoteInvocation/RemoteInvocationResult)

1. Spring容器实现类AnnoBeanPostProcessor

public class AnnoBeanPostProcessor implements BeanPostProcessor{

private static Hashtable<String, RemoteBeanInvoker> invokerMap

= new Hashtable<String, RemoteBeanInvoker>();

/**

* 在初始化一个<bean/>对象前的操作

*/

@Override

public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

return bean;

}

/**

* 注册RemoteBeanInvoker(MdpService.serviceName, bean)

*/

@Override

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

System.out.println("Intialized bean '"+beanName+"' "+bean);

if (bean.getClass().isAnnotationPresent(MdpService.class)){

MdpService mdpService = bean.getClass().getAnnotation(MdpService.class);

String serviceName = mdpService.serviceName();

RemoteBeanInvoker remoteInvoker = invokerMap.get(serviceName);

if (remoteInvoker != null){

throw new BeanInitializationException("Already exist Service instance for '"+serviceName+"'");

}

remoteInvoker = new RemoteBeanInvoker(); //RemoteInvocationBasedExporter

invokerMap.put(serviceName, remoteInvoker);

remoteInvoker.setService(bean);

if (mdpService.serviceClass() == Void.class){

throw new BeanInitializationException("Mandatory 'serviceClass' must NOT be empty for bean["+beanName+"]");

}

remoteInvoker.setServiceInterface(mdpService.serviceClass());

remoteInvoker.setRemoteInvocationExecutor(new DefaultRemoteInvocationExecutor());

System.out.println("service '"+serviceName+"' remoteImpl:"+bean);

}

return bean;

}

public static RemoteBeanInvoker getRemoteInvoker(String serviceName){

return invokerMap.get(serviceName);

}

}

2. 远程调用接口封装

public class RemoteBeanInvoker extends RemoteInvocationBasedExporter {

private RemoteInvocation invocation = null;

public Object invoke()throws Throwable{

Object remoteBean = super.getProxyForService(); //from message.property("ServiceName")

RemoteInvocationResult result = super.invokeAndCreateResult(invocation, remoteBean);

Object returnValue = null;

try{

returnValue = result.recreate();

}catch(Throwable t){

if (result.hasInvocationTargetException()) {

throw t;

} else {

throw new RemoteInvocationFailureException("Invoke [" + invocation + "] failed", t);

}

}

this.sendResponseMessage();;

return returnValue;

}

private void sendResponseMessage(){

}

public void setRemoteInvocation(RemoteInvocation invocation){

this.invocation = invocation;

}

public void setMethodInvocation(MethodInvocation method){

invocation = new RemoteInvocation(method);

}

}

3. 最后是Service的实现类

@MdpService(serviceName="ProxyService", serviceClass=IProxyService.class)

public class ServiceImpl implements IProxyService {

@Override

public String hello() {

System.out.println("ServiceImpl.hello()---");

return "hello, xiongsl";

}

@Override

public String sayBye() {

System.out.println("ServiceImpl.sayBye()---");

return "Good bye! xiongsl";

}

}

4. 测试方法:

public static void main(String[] args) throws Throwable{

String[] locations = {"classpath:spring-context-test_svr.xml", "classpath:jmsconfig.xml"};

ApplicationContext context = new ClassPathXmlApplicationContext(locations);

MessageProcessor processor = (MessageProcessor)context.getBean("messageProcessor");

processor.receiveMessage();

}

---------------------------------------------------------------------------------------------------------------

以上是Service的Client/Server相关类。下面才是Spring-MDP接口的调用类。

client.JmsQueueTextSender

|-----MessageProcessor

server.JmsQueueReceiver

|-----MessageProcessor

------------------------------------------------------------------------------------------------------------------

import java.util.Map;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

/**

* Queue消息发送器,同时也可以作为消息监听器

* @author shuilin.xiong

*/

public class JmsQueueTextSender {

protected ConnectionFactory connectFactory;

protected String queueName;

protected Connection connection;

protected Session session;

private Queue queue;

public void sendMessage(String content, Map<String,Object> props) throws JMSException{

if (connection == null){

this.afterProperiesSet();

}

TextMessage message = session.createTextMessage(content);

message.setJMSCorrelationID((String)props.remove("correlationID"));

for (String key : props.keySet()){

message.setObjectProperty(key, props.get(key));

}

session.createProducer(queue).send(message);

}

private void afterProperiesSet()throws JMSException{

connection = connectFactory.createConnection();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

queue = session.createQueue(queueName);

this.processResponseIfNecessary();

}

public void close() throws JMSException{

if (session != null){

session.close();

}

if (connection != null){

connection.close();

}

}

protected void processResponseIfNecessary() throws JMSException{

//监听respQueueName(@see springstudy.proxy.MessageProcessor)

}

public void setConnectFactory(ConnectionFactory connectFactory) {

this.connectFactory = connectFactory;

}

public void setQueueName(String queueName) {

this.queueName = queueName;

}

}

import java.util.HashMap;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.TextMessage;

import com.tsinghuatec.dawn.waf.jms.JmsQueueTextSender;

import com.tsinghuatec.dawn.waf.util.json.JsonUtil;

public class MessageProcessor extends JmsQueueTextSender {

private String respQueueName;

HashMap<String,Object> resultMap = new HashMap<String,Object>();

@Override

protected void processResponseIfNecessary() throws JMSException{

Queue respQueue;

if (respQueueName == null){

respQueue = session.createTemporaryQueue();

}else{

respQueue = session.createQueue(respQueueName);

}

final MessageConsumer consumer = session.createConsumer(respQueue);

consumer.setMessageListener( new MessageListener(){

@Override

public void onMessage(Message message) {

try {

processResponseMessage((TextMessage)message);

} catch (Throwable e) {

try {

consumer.close();

} catch (JMSException e1) {

e1.printStackTrace();

}

}

}

});

}

@SuppressWarnings("unchecked")

public void processResponseMessage(TextMessage message) throws JMSException {

String jsonResult = message.getText();

System.out.println("收到文本消息=====/n" + jsonResult);

String correlationId = message.getJMSCorrelationID();

if (jsonResult != null){

try {

Class returnType = Class.forName(message.getStringProperty("ResultType"));

Object result = JsonUtil.getObject(jsonResult, returnType);

synchronized(resultMap){

resultMap.put(correlationId, result);

}

} catch (ClassNotFoundException e) {

e.printStackTrace();

}catch(Exception e){

e.printStackTrace();

}

}

}

public Object getInvokeResult(String correlationID){

synchronized(resultMap){

return resultMap.remove(correlationID);

}

}

public void setRespQueueName(String respQueueName) {

this.respQueueName = respQueueName;

}

}

Server:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

public class JmsQueueReceiver {

protected ConnectionFactory connectFactory;

protected String queueName;

protected String respQueueName;

private Session session;

private Connection connection;

private Queue queue;

public void receiveMessage() throws JMSException{

if (connection == null){

this.afterProperiesSet();

}

final MessageConsumer consumer = session.createConsumer(queue);

consumer.setMessageListener( new MessageListener(){

@Override

public void onMessage(Message message) {

try {

processMessage((TextMessage)message);

} catch (Throwable e) {

try {

consumer.close();

} catch (JMSException e1) {

e1.printStackTrace();

}

}

}

});

}

protected void processMessage(TextMessage message) throws JMSException {

System.out.println("收到文本消息=====/n"+message.getText());

}

private void afterProperiesSet()throws JMSException{

connection = connectFactory.createConnection();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

queue = session.createQueue(queueName);

}

public void setConnectFactory(ConnectionFactory connectionFactory) {

this.connectFactory = connectionFactory;

}

public void setQueueName(String queueName) {

this.queueName = queueName;

}

public void setRespQueueName(String respQueueName) {

this.respQueueName = respQueueName;

}

}

import javax.jms.Connection;

import javax.jms.JMSException;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.remoting.support.RemoteInvocation;

import com.tsinghuatec.dawn.waf.jms.JmsQueueReceiver;

import com.tsinghuatec.dawn.waf.util.json.JsonUtil;

public class MessageProcessor extends JmsQueueReceiver/*implements ApplicationListener*/{

@Autowired

private AnnoBeanPostProcessor postProcessor;

private Session session;

private Connection connection;

/**

* 接收RequestMessage,调用接口,并创建返回消息

*/

protected void processMessage(TextMessage message) throws JMSException {

super.processMessage(message);

String serviceName = message.getStringProperty("ServiceName");

RemoteInvocation remote = JsonUtil.getObject(message.getText(), RemoteInvocation.class);

Object result = this.getInvocationResult(serviceName, remote);

this.sendResponse(result, message.getJMSCorrelationID());

}

private Object getInvocationResult(String serviceName, RemoteInvocation remote){

RemoteBeanInvoker invoker = postProcessor.getRemoteInvoker(serviceName);

try {

invoker.setRemoteInvocation(remote);

return invoker.invoke();

} catch (Throwable e) {

e.printStackTrace();

return null;

}

}

private void sendResponse(Object result, String correlationId) throws JMSException{

if (session == null){

connection = connectFactory.createConnection();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

}

TextMessage message = session.createTextMessage();

message.setJMSCorrelationID(correlationId);

if (result != null){

message.setText(JsonUtil.toJsonString(result));

message.setStringProperty("ResultType", result.getClass().getName());

}

Queue respQueue;

if (respQueueName == null){

respQueue = session.createTemporaryQueue();

}else{

respQueue = session.createQueue(respQueueName);

}

session.createProducer(respQueue).send(message);

}

}

配置文件:jmsconfig.xml

<?xml version="1.0" encoding="UTF-8"?>

<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">

<beans>

<!-- jndiTemplate -->

<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">

<property name="environment">

<props>

<prop key="java.naming.factory.initial">

com.sonicsw.jndi.mfcontext.MFContextFactory

</prop>

<prop key="java.naming.provider.url">tcp://localhost:2506</prop>

<prop key="java.naming.security.principal">Administrator</prop>

<prop key="java.naming.security.credentials">Administrator</prop>

<prop key="com.sonicsw.jndi.mfcontext.domain">Domain1</prop>

<prop key="com.sonicsw.jndi.mfcontext.idleTimeout">60000</prop>

</props>

</property>

</bean>

<!-- JMS连接工厂 -->

<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">

<property name="jndiName" value="seashellSonicSendXAConnectionFactory" />

<property name="jndiTemplate" ref="jndiTemplate" />

</bean>

<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

<property name="targetConnectionFactory" ref="connectionFactory"/>

</bean>

</beans>

spring-context-test.xml:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:os-core="http://www.openspaces.org/schema/core"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd

http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

<context:annotation-config />

<bean id="proxyFactory" class="com.tsinghuatec.dawn.waf.springstudy.proxy.ProxyFactoryBean">

<property name="serviceInterface" value="com.tsinghuatec.dawn.waf.springstudy.proxy.IProxyService"/>

<property name="messageProcessor" ref="messageProcessor"/>

</bean>

<bean id="messageProcessor" class="com.tsinghuatec.dawn.waf.springstudy.proxy.MessageProcessor">

<property name="queueName" value="SampleQ2"/>

<property name="respQueueName" value="SampleQ3"/>

<property name="connectFactory" ref="cachedConnectionFactory"/>

</bean>

</beans>

spring-context-test_svr.xml:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:os-core="http://www.openspaces.org/schema/core"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd

http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

<!--context:property-placeholder location="classpath:/DBConnConfig.properties" /-->

<bean id="beanPostProcessor" class="com.tsinghuatec.dawn.waf.springstudy.startup.AnnoBeanPostProcessor">

</bean>

<bean id="messageProcessor" class="com.tsinghuatec.dawn.waf.springstudy.startup.MessageProcessor">

<property name="queueName" value="SampleQ2"/>

<property name="respQueueName" value="SampleQ3"/>

<property name="connectFactory" ref="cachedConnectionFactory"/>

</bean>

<bean class="com.tsinghuatec.dawn.waf.springstudy.startup.ServiceImpl" />

</beans>

先运行Server端,后运行Client端。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: