您的位置:首页 > Web前端 > React

Spring reactor框架简介

2015-10-18 00:16 417 查看
Spring社区上月发布了基于事件驱动的异步框架 reactor。当前reactor还在密集研发中,代码几天大变样,非常的不稳定,这2周还完全重写了它自己的reactor-tcp。sample code也跟不上。这里只选取其其中最最基本和核心的功能,让大家先睹为快。

一:核心:基于事件驱动。

事件注册:

Java代码  




//初始化环境变量,若无,会从默认的读取classpath下默认的default.properties 文件  
Environment env = new Environment();  
//创建一个Reactor,指定的dispatcher为eventLoop,(dispatcher 种类下面会再细说)  
Reactor reactor = R.reactor().using(env).dispatcher("eventLoop").get();  
//注册一个名为event的事件,事件为Consumer  
reactor.on($("event"), new Consumer<Event<String>>() {  
@Override  
public void accept(Event<String> t) {  
  System.out.println("This is sample code" + t.getData());  
}  
});  

//初始化环境变量,若无,会从默认的读取classpath下默认的default.properties 文件
Environment env = new Environment();
//创建一个Reactor,指定的dispatcher为eventLoop,(dispatcher 种类下面会再细说)
Reactor reactor = R.reactor().using(env).dispatcher("eventLoop").get();
//注册一个名为event的事件,事件为Consumer
reactor.on($("event"), new Consumer<Event<String>>() {
@Override
public void accept(Event<String> t) {
System.out.println("This is sample code" + t.getData());
}
});


事件发布(或者事件触发)

Java代码  




//传入一个String事件,事件的值为Hello World!  
reactor.notify("event", new Event<String>("Hello World!"));  

//传入一个String事件,事件的值为Hello World!
reactor.notify("event", new Event<String>("Hello World!"));


运行后的结果为: This is sample code!

由此一个最最简单的事件驱动程序就完成了。

二. Composable的使用

Reactor里另外一个核心的功能就是Composable,当一个业务有很多步阻塞调用的时候,在不用callback的情况下,可以应用Composable将其中的阻塞调用,分割成异步非阻塞执行。

我们想象一个应用场景,一个账号服务,需要根据一个accountId来拿到一个account的余额。业务上可能需要这么几步:

a) 验证是否是有效用户 --> 调用authService.isValidUser

b) 根据accountId拿到整个account信息 -->accountService.getAccountById

c) 传入account获得account balance --> billService.getAccountBalance

那么代码怎么实现的。最最简单的直观的就是顺序阻塞调用。

代码A

Java代码  




public Long getAccountBalance1(Integer accountId){  
    boolean auth = authService.isValidUser(accountId);  
    if(auth){  
        Account account = accountService.getAccountById(accountId);  
        if(account!=null){  
            return billService.getAccountBalance(account);  
        }  
        else{  
            return -1l;  
        }  
    }  
    else{  
        return -1l;  
    }  
}  

public Long getAccountBalance1(Integer accountId){
boolean auth = authService.isValidUser(accountId);
if(auth){
Account account = accountService.getAccountById(accountId);
if(account!=null){
return billService.getAccountBalance(account);
}
else{
return -1l;
}
}
else{
return -1l;
}
}


先不论代码的逻辑正确性,上述的代码肯定是吞吐量不高。为求改善,现改为异步调用。最简单的异步就是加callback

代码B:

Java代码  




public Long getAccountBalance1(Integer id){  
        final Integer accountId = id;  
        final Future<Long> balanceFuture;  
        authService.isValidUser(accountId, new ICallback(){  
            @Override  
            public void callback(Object value) {  
                boolean auth = (boolean)value;  
                if(auth){  
                    accountService.getAccountById(accountId, new ICallback(){  
                        @Override  
                        public void callback(Object value) {  
                            if(value!=null){  
                                Account account = (Account)value;  
                               Long balance = billService.getAccountBalance(account);  
                               balanceFuture.setValue(balance);  
                            }  
                        }  
                          
                    });  
                }  
                  
            }  
              
        });  
          
        return balanceFuture.get(3000,TimeUnit.MILLISECONDS);  
    }  

public Long getAccountBalance1(Integer id){
final Integer accountId = id;
final Future<Long> balanceFuture;
authService.isValidUser(accountId, new ICallback(){
@Override
public void callback(Object value) {
boolean auth = (boolean)value;
if(auth){
accountService.getAccountById(accountId, new ICallback(){
@Override
public void callback(Object value) {
if(value!=null){
Account account = (Account)value;
Long balance = billService.getAccountBalance(account);
balanceFuture.setValue(balance);
}
}

});
}

}

});

return balanceFuture.get(3000,TimeUnit.MILLISECONDS);
}


这断代码没有测试过。但是可以明显的看到这里用了嵌套的callback,代码很难看,不优雅,并对代码有侵入(需要有一个callback的同名接口)。

那么用了Reactor Composable的代码又将会是如何呢,请看:

Java代码  




public Long getAccountBalance(Integer id){  
        final Integer accountId = id;  
        Composable<Long> c = S.each(Arrays.asList(accountId))  
                .using(new Environment())  
                .dispatcher("eventLoop")  
                .get()  
                .map(new Function<Integer, Boolean>(){  
                    @Override  
                    public Boolean apply(Integer accountId) {  
                        return authService.isValidUser(accountId);  
                    }  
                })  
                .map(new Function<Boolean,Account>(){  
  
                    @Override  
                    public Account apply(Boolean auth) {  
                        if(auth){  
                            return accountService.getAccountById(accountId);  
                        }  
                        return null;  
                    }  
                }).map(new Function<Account,Long>(){  
  
                    @Override  
                    public Long apply(Account account) {  
                        return billService.getAccountBalance(account);  
                    }  
          
                });  
        try {  
            return c.await(3, TimeUnit.SECONDS);  
        } catch (InterruptedException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
            return -1l;  
        }  
  
    }  

public Long getAccountBalance(Integer id){
final Integer accountId = id;
Composable<Long> c = S.each(Arrays.asList(accountId))
.using(new Environment())
.dispatcher("eventLoop")
.get()
.map(new Function<Integer, Boolean>(){
@Override
public Boolean apply(Integer accountId) {
return authService.isValidUser(accountId);
}
})
.map(new Function<Boolean,Account>(){

@Override
public Account apply(Boolean auth) {
if(auth){
return accountService.getAccountById(accountId);
}
return null;
}
}).map(new Function<Account,Long>(){

@Override
public Long apply(Account account) {
return billService.getAccountBalance(account);
}

});
try {
return c.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return -1l;
}

}


显然代码优雅了很多,没有代码侵入。但这些都不是关键。

关键是业务中的,a,b,c都被分割成一个独立的事件注册到了reactor,每一步都是独立的线程执行,并且在这个列子中,是顺序执行的,线程安全。

三:Dispatcher

最后这里再详细介绍下之前提到的Reactor的Dispatcher,dispatcher是reactor的核心,顾名思义,就是一个分发器,用作事件的分发,当一个事件到达,(即Reacot.notify被调用)最终会由dispatcher进行任务分发调度(dispatcher.dipatch)

Dispatcher根据线程和队列分为下面几种dispatcher,系统现在默认为SynchronousDispatcher(不知道以后会不会变,之前默认的是BlockingQueueDispatcher)

SynchronousDispatcher

当一个事件到达时,直接由reactor所在的线程直接执行

BlockingQueueDispatcher(eventloop)

事件到达时先存储在一个Blockingqueue中,再由统一的后台线程一一顺序执行

ThreadPoolExecutorDispatcher(threadpool)

事件达到时将事件交由线程池统一调度。该线程池为固定大小线程池,(Executors.newFixedThreadPool)线程大小由配置文件指定。

RingBufferDispatcher(ringbuffer)

该dispatcher是吞吐量最高,使用了名头比较响的lmax的Disruptor构建的ringBuffer作为事件存储数组,其实就是一个不断递增,并可覆盖之前值环。(大家可以关注下lmax disruptpr)还是比较有意思的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: