您的位置:首页 > 编程语言 > Java开发

Missian指南五:使用异步客户端(With spring

2014-03-29 10:03 369 查看
重要:Missian刚刚更新到0.30,修改了异步回调的实现方式,现在基本上做到0侵入性。

建议下载0.30版本使用,调过本章,直接参考:

Missian指南6:使用无侵入性的异步客户端(Missian 0.30 With Spring)

===================================================================

Missian没有绑定spring,但是强烈推荐配合spring一起使用。异步客户端由于需要调用BeanLocator去寻找回调的Bean,如果配合Spring使用,可以直接使用SpringLocator(BeanLocator的唯一实现),否则需要自己实现。

 

使用异步客户端需要注意一点:由于是异步调用,所以一个远程方法的返回值永远是null(如果不是void的话)或者是原生数据类型的默认值。一段时间后(比如100毫秒)后客户端收到这个返回值,会去找到相应的回调对象进行调用。

 

异步的优势是:在调用的期间我们不需要像同步调用一样有一个线程一直在等着它的返回值,而是调用完即可返回释放线程,当客户端接受到返回值后会进行回调,业务流程可以继续往下执行。不要小看这个等待的时间,假如A服务调用了一个跨机房的服务或者一个重型的服务B,那么B的响应时间可能是100毫秒甚至更多,那么可以想象在高并发的情况下,可能A服务的全部线程都耗死在无穷的等待上了。

 

我们还是先看看如何配合Spring来使用Missian异步客户端。

 

步骤一:给Hello.hello(String, int)创建一个回调类

Java代码


 




public class HelloCallback extends Callback{   
    private AtomicInteger i = new AtomicInteger(0);   
    public HelloCallback() {   
        super(String.class);   
    }   
  
    @Override  
    public void call(Object obj) {   
        int value = i.incrementAndGet();   
        if(value%10000==0) {   
            System.out.println("received:"+value);   
        }   
    }   
}  

public class HelloCallback extends Callback{
private AtomicInteger i = new AtomicInteger(0);
public HelloCallback() {
super(String.class);
}

@Override
public void call(Object obj) {
int value = i.incrementAndGet();
if(value%10000==0) {
System.out.println("received:"+value);
}
}
}

目前回调类必须继承Callback这个抽象类。Callback构造函数参数acceptValueType,是指这个回调类接受的参数类型。由于Hello.hello(String, int)方法的返回值是String,这里我们传入Stirng.class。

 

当Hello.hello()方法的远程调用结果返回时,HelloCallback的call() 方法将会被调用。

 

关于Callback,目前的实现方式不甚理想,正在思考怎么优化,可能有两点:一是不需要指定acceptValueType,二是不需要实现任何接口,力争对业务代码无任何侵入性。

 

步骤二:修改Hello接口,用注解的方法声明回调Bean

Java代码


 




public interface Hello {   
    @CallbackTarget("helloCallback")   
    public String hello(String name, int age);   
}  

public interface Hello {
@CallbackTarget("helloCallback")
public String hello(String name, int age);
}

 

步骤三:在Spring配置文件中配置这个回调Bean

Xml代码


 




<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">  
    <!-- your callback bean, missian client will invoke its execute() method when received the returned object -->  
    <bean id="helloCallback" class="com.missian.example.bean.HelloCallback">  
    </bean>  
</beans>  

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <!-- your callback bean, missian client will invoke its execute() method when received the returned object -->
<bean id="helloCallback" class="com.missian.example.bean.HelloCallback">
</bean>
</beans>

 

步骤四:在Spring中创建AsyncMissianProxyFactory

Xml代码


 




<bean id="asyncMissianProxyFactory" class="com.missian.client.async.AsyncMissianProxyFactory" init-method="init" destroy-method="destroy">  
    <constructor-arg >  
        <bean class="com.missian.common.beanlocate.SpringLocator"/>  
    </constructor-arg>  
</bean>  

<bean id="asyncMissianProxyFactory" class="com.missian.client.async.AsyncMissianProxyFactory" init-method="init" destroy-method="destroy">
<constructor-arg >
<bean class="com.missian.common.beanlocate.SpringLocator"/>
</constructor-arg>
</bean>

这里我们使用的是AsyncMissianProxyFactory的最简单的构造函数,只接受一个BeanLocator。这时候默认创建一个4个线程的线程池用来处理回调逻辑,1个线程用来处理IO,需要指定线程数,或者将一个已经存在的线程池传入,可以参考其它几个构造函数:

Java代码


 




public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool,  int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec, NetworkConfig networkConfig) {}
  
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool,  int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec){}
  
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool) {}   
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize, int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec) {}
  
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool, NetworkConfig networkConfig) {}   
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize, int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec, NetworkConfig networkConfig) {}
  
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize){}  

public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool,  int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec, NetworkConfig networkConfig) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool,  int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec){}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize, int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool, NetworkConfig networkConfig) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize, int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec, NetworkConfig networkConfig) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize){}

假如在服务器里使用Missian客户端,可以考虑将服务器主线程池传入给AsyncMissianProxyFactory,共享线程池。 

 

步骤五:实现异步调用

Java代码


 




public static void main(String[] args) throws IOException {   
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("com/missian/example/client/async/withspring/applicationContext-*.xml");   
    //actually you can inject AsyncMissianProxyFactory into any other beans to use it.  
    //we just show how AsyncMissianProxyFactory works here.  
    AsyncMissianProxyFactory asyncMissianProxyFactory = (AsyncMissianProxyFactory)context.getBean("asyncMissianProxyFactory");   
    Hello hello = (Hello)asyncMissianProxyFactory.create(Hello.class, "tcp://localhost:1235/hello");   
    long time = System.currentTimeMillis();   
    for(int i=0; i<10000; i++) {
  
        hello.hello("gg", 25);     
    }   
    System.out.println(System.currentTimeMillis()-time);   
}  

public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("com/missian/example/client/async/withspring/applicationContext-*.xml");
//actually you can inject AsyncMissianProxyFactory into any other beans to use it.
//we just show how AsyncMissianProxyFactory works here.
AsyncMissianProxyFactory asyncMissianProxyFactory = (AsyncMissianProxyFactory)context.getBean("asyncMissianProxyFactory");
Hello hello = (Hello)asyncMissianProxyFactory.create(Hello.class, "tcp://localhost:1235/hello");
long time = System.currentTimeMillis();
for(int i=0; i<10000; i++) {
hello.hello("gg", 25);
}
System.out.println(System.currentTimeMillis()-time);
}

 你可以清楚地看到,所有的请求都发送出去之后,返回值陆续返回并回掉了HelloCallback。

和同步的客户端一样,可以使用http协议发送数据:

Java代码


 




Hello hello = (Hello)asyncMissianProxyFactory.create(Hello.class, "http://localhost:1235/hello");  

Hello hello = (Hello)asyncMissianProxyFactory.create(Hello.class, "http://localhost:1235/hello");

但目前比较遗憾的是,还不能够支持异步调用Hessian服务。  

另外需要说明的是,这个直接从Context里面取出AsyncMissianProxyFactory只是用来演示异步调用的用法;正常的做法应该是将AsyncMissianProxyFactory注入到我们需要使用它的Bean。

 

Author: gh_aiyz

Forward:  http://missian.iteye.com/blog/830304
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: