reactor一行代码高性能并发编程
2016-12-02 17:29
501 查看
reactor一行代码高性能并发编程
作者:链上研发-ouyang时间:2016-12-02
响应式编程是一种面向数据流传播改变的编程范式。它是观察者模式、迭代器模式和函数式编程的一种最佳实践。它通过特别复杂的底层实践来简化开发的代码,所以才有可能进行一行代码的高性能并发编程。
假设有一个需求,有一组资源ID有n个,每个ID都可以用个网络请求获取到资源。需要获取所有的ID内容。
这里简单模拟下,看看并发代码:
/** * 并行执行 * @param concurrent 并行数量 * @param sleeps 模拟停顿时间 * @return 随便返回了 */ public Iterator<String> list(int concurrent, Long... sleeps){ return Flux.fromArray(sleeps) .flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent) .toIterable().iterator(); }
这样就完成了一个执行n个并发的代码,仅仅需要一行。
依赖包
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.0.2.RELEASE</version> </dependency>
完整的测试代码如下:
import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class ReactorTest { private static final Logger LOGGER = LoggerFactory.getLogger(ReactorTest.class); /** * 随便测试下 */ @Test public void concurrentTest(){ //这里没有什么用,纯粹是Schedulers.elastic()可以复用这里的线程池,不想写多的代码了 Flux.range(1,100).map(a -> a*1) .subscribeOn(Schedulers.elastic()) .subscribe(); //开始测试了 long start = System.currentTimeMillis(); //第一个参数20 20个并发 //后面表示N个请求,最长的一个请求可能要2000ms list(20, 1000l,2000l,100l,200l,300l,400l,500l,600l,700l,800l,900l) .forEachRemaining( show -> LOGGER.info(show) ); LOGGER.info("总时间 : {} ms", System.currentTimeMillis() - start ); } /** * 并行执行 * @param concurrent 并行数量 * @param sleeps 模拟停顿时间 * @return 随便返回了 */ public Iterator<String> list(int concurrent, Long... sleeps){ return Flux.fromArray(sleeps) .log() .flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent) .toIterable().iterator(); } /** * 实际上是一个http请求 * @param sleep 请求耗时 * @return */ public String mockHttp(long sleep){ try { Thread.sleep(sleep); LOGGER.info("停顿{}ms真的执行了", sleep); } catch (InterruptedException e) { e.printStackTrace(); } return String.format("停顿了%sms", sleep); } }
看看执行的结果吧:
17:10:59.663 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 17:10:59.748 [main] INFO reactor.Flux.Array.1 - | onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@614ddd49) 17:10:59.750 [main] INFO reactor.Flux.Array.1 - | request(20) 17:10:59.750 [main] INFO reactor.Flux.Array.1 - | onNext(1000) 17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(2000) 17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(100) 17:10:59.811 [main] INFO reactor.Flux.Array.1 - | onNext(200) 17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(300) 17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(400) 17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(500) 17:10:59.812 [main] INFO reactor.Flux.Array.1 - | onNext(600) 17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(700) 17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(800) 17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onNext(900) 17:10:59.813 [main] INFO reactor.Flux.Array.1 - | onComplete() 17:10:59.915 [elastic-4] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿100ms真的执行了 17:10:59.916 [elastic-4] INFO reactor.Flux.Array.1 - | request(1) 17:10:59.918 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了100ms 17:11:00.016 [elastic-5] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿200ms真的执行了 17:11:00.016 [elastic-5] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.016 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了200ms 17:11:00.116 [elastic-6] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿300ms真的执行了 17:11:00.116 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了300ms 17:11:00.116 [elastic-6] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.217 [elastic-7] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿400ms真的执行了 17:11:00.218 [elastic-7] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.219 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了400ms 17:11:00.318 [elastic-8] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿500ms真的执行了 17:11:00.320 [elastic-8] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.320 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了500ms 17:11:00.417 [elastic-9] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿600ms真的执行了 17:11:00.418 [elastic-9] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.419 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了600ms 17:11:00.516 [elastic-10] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿700ms真的执行了 17:11:00.518 [elastic-10] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.523 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了700ms 17:11:00.614 [elastic-11] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿800ms真的执行了 17:11:00.615 [elastic-11] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.616 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了800ms 17:11:00.724 [elastic-12] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿900ms真的执行了 17:11:00.724 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了900ms 17:11:00.724 [elastic-12] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.815 [elastic-2] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿1000ms真的执行了 17:11:00.816 [elastic-2] INFO reactor.Flux.Array.1 - | request(1) 17:11:00.816 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了1000ms 17:11:01.815 [elastic-3] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿2000ms真的执行了 17:11:01.835 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 停顿了2000ms 17:11:01.835 [main] INFO com.lianjia.sh.xcount.point.springmvc.ReactorTest - 总时间 : 2116 ms Process finished with exit code 0
相关文章推荐
- React Native小笔记
- H5、React Native、Native应用对比分析
- react native ref的使用
- 配置React Native的开发环境
- windows 下reactnative环境配置
- 夺命雷公狗-----React_native---5---初步读懂代码模式
- 夺命雷公狗-----React_native---4---初始化项目
- REACT day 1
- react组件服务器渲染问题(一)
- React-native 中因为计时器导致的异常解决办法
- 再一次掉进react native的坑了。ReferenceError can't find variable:location
- ES6语法的reactjs组件babel编译
- React学习笔记-8-属性和状态详解
- React Native 单位详细说明
- Rreact Native 常见错误总结
- React Native ref高级用法&&setNativeProps使用
- 推荐5个值得学习React Native的开源项目
- 【React Native】实现Navigator Back回退
- react-native 简述
- react+antd---1