您的位置:首页 > Web前端 > React

reactor一行代码高性能并发编程

2016-12-02 17:29 495 查看

reactor一行代码高性能并发编程

作者:链上研发-ouyang

时间:2016-12-02

响应式编程是一种面向数据流传播改变的编程范式。它是观察者模式、迭代器模式和函数式编程的一种最佳实践。它通过特别复杂的底层实践来简化开发的代码,所以才有可能进行一行代码的高性能并发编程。

假设有一个需求,有一组资源ID有n个,每个ID都可以用个网络请求获取到资源。需要获取所有的ID内容。

这里简单模拟下,看看并发代码:

/**
* 并行执行
* @param concurrent 并行数量
* @param sleeps 模拟停顿时间
* @return 随便返回了
*/
public Iterator<String> list(int concurrent, Long... sleeps){
return Flux.fromArray(sleeps)
.flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent)
.toIterable().iterator();
}


这样就完成了一个执行n个并发的代码,仅仅需要一行。

依赖包

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.2.RELEASE</version>
</dependency>


完整的测试代码如下:

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ReactorTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ReactorTest.class);

/**
* 随便测试下
*/
@Test
public void concurrentTest(){

//这里没有什么用,纯粹是Schedulers.elastic()可以复用这里的线程池,不想写多的代码了
Flux.range(1,100).map(a -> a*1)
.subscribeOn(Schedulers.elastic())
.subscribe();

//开始测试了
long start = System.currentTimeMillis();

//第一个参数20 20个并发
//后面表示N个请求,最长的一个请求可能要2000ms
list(20, 1000l,2000l,100l,200l,300l,400l,500l,600l,700l,800l,900l)
.forEachRemaining( show ->  LOGGER.info(show) );

LOGGER.info("总时间 : {} ms", System.currentTimeMillis() - start );

}

/**
* 并行执行
* @param concurrent 并行数量
* @param sleeps 模拟停顿时间
* @return 随便返回了
*/
public Iterator<String> list(int concurrent, Long... sleeps){
return Flux.fromArray(sleeps)
.log()
.flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent)
.toIterable().iterator();
}

/**
* 实际上是一个http请求
* @param sleep 请求耗时
* @return
*/
public String mockHttp(long sleep){
try {
Thread.sleep(sleep);
LOGGER.info("停顿{}ms真的执行了", sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
return String.format("停顿了%sms", sleep);
}

}


看看执行的结果吧:

17:10:59.663 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:10:59.748 [main] INFO reactor.Flux.Array.1 - | onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@614ddd49)
17:10:59.750 [main] INFO reactor.Flux.Array.1 - | request(20)
17:10:59.750 [main] INFO reactor.Flux.Array.1 - | onNext(1000)
17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(2000)
17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(100)
17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(200)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(300)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(400)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(500)
17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(600)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(700)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(800)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(900)
17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onComplete()
17:10:59.915 [elastic-4] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿100ms真的执行了
17:10:59.916 [elastic-4] INFO reactor.Flux.Array.1 - | request(1)
17:10:59.918 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了100ms
17:11:00.016 [elastic-5] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿200ms真的执行了
17:11:00.016 [elastic-5] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.016 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了200ms
17:11:00.116 [elastic-6] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿300ms真的执行了
17:11:00.116 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了300ms
17:11:00.116 [elastic-6] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.217 [elastic-7] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿400ms真的执行了
17:11:00.218 [elastic-7] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.219 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了400ms
17:11:00.318 [elastic-8] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿500ms真的执行了
17:11:00.320 [elastic-8] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.320 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了500ms
17:11:00.417 [elastic-9] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿600ms真的执行了
17:11:00.418 [elastic-9] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.419 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了600ms
17:11:00.516 [elastic-10] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿700ms真的执行了
17:11:00.518 [elastic-10] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.523 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了700ms
17:11:00.614 [elastic-11] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿800ms真的执行了
17:11:00.615 [elastic-11] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.616 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了800ms
17:11:00.724 [elastic-12] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿900ms真的执行了
17:11:00.724 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了900ms
17:11:00.724 [elastic-12] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.815 [elastic-2] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿1000ms真的执行了
17:11:00.816 [elastic-2] INFO reactor.Flux.Array.1 - | request(1)
17:11:00.816 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了1000ms
17:11:01.815 [elastic-3] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿2000ms真的执行了
17:11:01.835 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了2000ms
17:11:01.835 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 总时间 : 2116 ms

Process finished with exit code 0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: