您的位置:首页 > 其它

Disruptor 极速体验

2015-06-25 17:18 204 查看
LMAX是一种新型零售金融交易平台,它能够以很低的延迟(latency)产生大量交易(吞吐量). 这个系统是建立在JVM平台上,核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单. 业务逻辑处理器完全是运行在内存中(in-memory),使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors,这是一个并发组件,能够在无锁的情况下实现网络的Queue并发操作。他们的研究表明,现在的所谓高性能研究方向似乎和现代CPU设计是相左的.

一、什么是 Disruptor

从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

一般性地来说,当你需要在两个独立的处理过程(两个线程)之间交换数据时,就可以使用 Disruptor 。当然使用队列(BlockingQueue)也可以,只不过 Disruptor 做得更好。

二、Disruptor 的相关概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

Ring Buffer
如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
Sequence  Disruptor
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
Wait Strategy
定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
Event
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
Producer
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。


三、如何使用 Disruptor

Disruptor 的 API 十分简单,主要有以下几个步骤:

定义事件
事件(Event)就是通过 Disruptor 进行交换的数据类型。


package com.disruptor2;

import com.lmax.disruptor.EventFactory;

/**
* 消费事件
*/
public class PersonEvent {

private Person person;

public Person getPerson() {
return person;
}

public void setPerson(Person person) {
this.person = person;
}
/*
定义事件工厂*/
public final static  EventFactory<PersonEvent> EVENT_FACTORY = new EventFactory<PersonEvent>(){
public PersonEvent newInstance(){
return new PersonEvent();
}
};
}


定义事件处理的具体实现
通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。


package com.disruptor2;

import java.util.Date;

import com.lmax.disruptor.EventHandler;

/**
* 消费事件处理处理器
*/
public class PersonEventHandler  implements EventHandler<PersonEvent>{

public PersonEventHandler(){

}
@Override
public void onEvent(PersonEvent event, long sequence, boolean endOfBatch)
throws Exception {

Person person = event.getPerson();
System.out.println("第 "+sequence+" 个消费结束:"+endOfBatch+"   "+new Date().toLocaleString());
}

}


定义用于事件处理的线程池
Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。例如:

ExecutorService executor = Executors.newCachedThreadPool();

指定等待策略
Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。
Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。
例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,
BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();


package com.disruptor2;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class PersonHelper {

private  static PersonHelper instance;
/**
* ringBuffer的容量,必须是2的N次方
*/
private static final int BUFFER_SIZE = 2048;
//用于处理数据结构跟踪为出版商和相关eventprocessors序列光标协调障碍
private Disruptor<PersonEvent> disruptor;

public PersonHelper(){

ExecutorService executor = Executors.newCachedThreadPool();
disruptor = new Disruptor<PersonEvent>(PersonEvent.EVENT_FACTORY,
BUFFER_SIZE, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());

EventHandler<PersonEvent> eventHandler = new PersonEventHandler();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
}

/**
* 启动消费者线程,实际上调用了AudioDataEventHandler中的onEvent方法进行处理
*/
public static void start(){
instance = new PersonHelper();
}
/**
* 停止
*/
public static void shutdown(){
instance.doHalt();
}
private void doHalt() {
disruptor.halt();
}

/**
* 生产者生产商品
* @param person
*/
private void doProduce(Person person){
RingBuffer<PersonEvent> ringBuffer=disruptor.getRingBuffer();
//获取下一个序号
long sequence = ringBuffer.next();
//写入数据
disruptor.get(sequence).setPerson(person);
//通知消费者该资源可以消费了
System.out.println("第  "+sequence+"  个生产完了  "+"开始消费:"+new Date().toLocaleString());
ringBuffer.publish(sequence);
}
/**
* 生产者压入生产数据
* @param data
*/
public static void produce(Person person){
instance.doProduce(person);
}

}


发布事件
Disruptor 的事件发布过程是一个两阶段提交的过程:
  第一步:先从 RingBuffer 获取下一个可以写入的事件的序号;
  第二步:获取对应的事件对象,将数据写入事件对象;
  第三部:将事件提交到 RingBuffer;


测试类:
package com.disruptor2;
public class Test {
/**
* @param args
*/
public static void main(String[] args) {

PersonHelper.start();
for(int i=0 ; i<200; i++){
Person p = new Person("zs"+i, i , "男", "1234566"+i);

//生产者生产数据
PersonHelper.produce(p);
}
//PersonHelper.shutdown();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: