架构师入门笔记八 并发框架Disruptor场景应用
2017-09-09 16:31
627 查看
架构师入门笔记八 并发框架Disruptor场景应用
今天用一个停车场问题来加深对Disruptor的理解。一个有关汽车进入停车场的问题。当汽车进入停车场时,系统首先会记录汽车信息。同时也会发送消息到其他系统处理相关业务,最后发送短信通知车主收费开始。看了很多文章,里面的代码都是大同小异的,可能代码真的是很经典。以下代码也是来源网络,只是自己手动敲的,加了一些注释。
代码包含以下内容:
1) 事件对象Event
2)三个消费者Handler
3)一个生产者Processer
4)执行Main方法
Event类:汽车信息
用了两篇博客简单的介绍了Disruptor并发框架,如果想深入学习,可以到并发网里面找文章。下一章介绍BIO,NIO,AIO知识,为Netty5的入门打个基础。
学习博客:
LMAX Disruptor——一个高性能、低延迟且简单的框架
简单了解Disruptor
今天用一个停车场问题来加深对Disruptor的理解。一个有关汽车进入停车场的问题。当汽车进入停车场时,系统首先会记录汽车信息。同时也会发送消息到其他系统处理相关业务,最后发送短信通知车主收费开始。看了很多文章,里面的代码都是大同小异的,可能代码真的是很经典。以下代码也是来源网络,只是自己手动敲的,加了一些注释。
代码包含以下内容:
1) 事件对象Event
2)三个消费者Handler
3)一个生产者Processer
4)执行Main方法
Event类:汽车信息
public class MyInParkingDataEvent { private String carLicense; // 车牌号 public String getCarLicense() { return carLicense; } public void setCarLicense(String carLicense) { this.carLicense = carLicense; } }Handler类:一个负责存储汽车数据,一个负责发送kafka信息到其他系统中,最后一个负责给车主发短信通知
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * Handler 第一个消费者,负责保存进场汽车的信息 * */ public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{ @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号 System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense)); } @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { this.onEvent(myInParkingDataEvent); } }
import com.lmax.disruptor.EventHandler; /** * 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统) */ public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{ @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号 System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense)); } }
import com.lmax.disruptor.EventHandler; /** * 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。 */ public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{ @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号 System.out.println(String.format("Thread Id %s 给 %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense)); } }Producer类:负责上报停车数据
import java.util.concurrent.CountDownLatch; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; /** * 生产者,进入停车场的车辆 */ public class MyInParkingDataEventPublisher implements Runnable{ private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作 private Disruptor<MyInParkingDataEvent> disruptor; private static final Integer NUM = 1; // 1,10,100,1000 public MyInParkingDataEventPublisher(CountDownLatch countDownLatch, Disruptor<MyInParkingDataEvent> disruptor) { this.countDownLatch = countDownLatch; this.disruptor = disruptor; } @Override public void run() { MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator(); try { for(int i = 0; i < NUM; i ++) { disruptor.publishEvent(eventTranslator); Thread.sleep(1000); // 假设一秒钟进一辆车 } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); // 执行完毕后通知 await()方法 System.out.println(NUM + "辆车已经全部进入进入停车场!"); } } } class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> { @Override public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) { this.generateData(myInParkingDataEvent); } private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) { myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int)(Math.random() * 100000)); // 随机生成一个车牌号 System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event"); return myInParkingDataEvent; } }执行的Main方法:
import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; /** * 执行的Main方法 , * 一个生产者(汽车进入停车场); * 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机) * 前两个消费者同步执行,都有结果了再执行第三个消费者 */ public class MyInParkingDataEventMain { public static void main(String[] args) { long beginTime=System.currentTimeMillis(); int bufferSize = 2048; // 2的N次方 try { // 创建线程池,负责处理Disruptor的四个消费者 ExecutorService executor = Executors.newFixedThreadPool(4); // 初始化一个 Disruptor Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() { @Override public MyInParkingDataEvent newInstance() { return new MyInParkingDataEvent(); // Event 初始化工厂 } }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith( new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler()); // 当上面两个消费者处理结束后在消耗 smsHandler MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler(); handlerGroup.then(myParkingDataSmsHandler); // 启动Disruptor disruptor.start(); CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了 // 生产者生成数据 executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor)); countDownLatch.await(); // 等待生产者结束 disruptor.shutdown(); executor.shutdown(); } catch (Exception e) { e.printStackTrace(); } System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); } }
用了两篇博客简单的介绍了Disruptor并发框架,如果想深入学习,可以到并发网里面找文章。下一章介绍BIO,NIO,AIO知识,为Netty5的入门打个基础。
学习博客:
LMAX Disruptor——一个高性能、低延迟且简单的框架
简单了解Disruptor
相关文章推荐
- 架构师入门笔记七 并发框架Disruptor快速入门
- 并发框架Disruptor场景应用
- Java并发28:ThreadLocal学习笔记-简介、基本方法及应用场景
- 无锁并发框架Disruptor学习入门
- 学习笔记之Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用
- 快速入门过程与方法:设计与思路;如何学习新的知识框架,建立思维模式,熟悉应用场景体系
- 高并发场景下各种框架和开发语言的选择应用处理
- StudyAI上MatConvNet框架课程学习笔记1:CPU编译入门
- 【Java并发编程】3、DelayQueue应用场景,多考生考试
- 多研究些架构,少谈些框架——一名阿里架构师的微服务笔记
- Flask微型框架入门笔记
- memcached在大负载高并发网站上的应用(2)---应用场景
- 并发框架Disruptor译文
- Vert.x,一个异步、可伸缩、并发应用框架引发的思考
- Chrome扩展及应用开发 入门笔记(六)进阶(网络请求,脚本注入)
- Bootstrap响应式前端框架笔记二十——工具条的应用
- RabbitMQ入门教程(十七):消息队列的应用场景和常见的消息队列之间的比较
- EventBus 3.0 从入门到精通——EventBus的应用场景
- 使用 ACE 库框架在 UNIX 中开发高性能并发应用
- TP框架中空方法应用场景