您的位置:首页 > 其它

高性能并发框架 Disruptor 介绍 实现生产者消费者模型

2014-08-22 22:30 826 查看
并发编程栏目代码 GitHub package 地址: 点击打开链接

博客并发编程栏目 : 点击打开链接

51CTO 译文框架介绍: http://developer.51cto.com/art/201306/399370.htm

#Disruptor简单说明

Disruptor 的源码非常精简,没有任何配置文件,所有源文件类加起来也就 58 个(不同版本可能不一样),

用代码行统计工具算了下,一共 6306 行。

对于一个能做到如此成功的开源工具来说,能有这么精短的代码量,确实很不错。

##Disruptor 代码共分为四个包:

1). com.lmax.disruptor: 大部分文件存放于这个目录下,包括 Disruptor 中重要的类文件,

    包括:EventProcessor、RingBuffer、Sequence、Sequencer、WaitStrategy 等

2). com.lmax.disruptor.collections: 该目录下只有一个类:Histogram,

    它不是 Disruptor 运行的必须类,其实我也没用过它,从源码注释来看,

    该类的作用是,在一个对性能要求很高的、有多个消费者的系统中,Histogram 可以用来记录系统耗各个组件的耗时情况,

    并以直方图的形式展示出来。初学 Disruptor 可以不用管关心它。

3). com.lmax.disruptor.dsl: 该包中保存了消费者和生产者的一些信息,核心类文件 Disruptor 也存放在该目录下。

4). com.lmax.disruptor.util: 该包中存放了几个辅助操作类,

    如 Util 类,DaemonThreadFactory 类,PaddedLong 类,该类用来做缓冲行填充的。

简单实现生产者消费者模型

其实所有的事件处理无非都是生产者消费者模型的

事件对象 - 商品对象

/**
* Created by lw on 14-7-3.
* <p>
* 事件对象 - 商品对象
* <p>
* 定义 ValueEvent 类,该类作为填充 RingBuffer 的消息,
* 生产者向该消息中填充数据(就是修改 value 属性值,后文用生产消息代替),
* 消费者从消息体中获取数据(获取 value 值,后文用消费消息代替)
*
* @see com.thread.concurrent_.disruptor.DeliveryReportEventHandler
*/
public final class ValueEvent {

private String value;//模拟任务数据

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

//定义生成的事件对象,注册到创建 Disruptor 对象
public final static EventFactory<ValueEvent> EVENT_FACTORY
= new EventFactory<ValueEvent>() {
public ValueEvent newInstance() {
return new ValueEvent();
}
};
}

事件处理者 - 消费者

/**
* 事件处理者 - 消费者
* @author lw by 14-7-22.
*/
public class DeliveryReportEventHandler implements EventHandler<ValueEvent> {

private int id;//消费者编号

public DeliveryReportEventHandler(int id) {
this.id = id;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

@Override
public String toString() {
return "DeliveryReportEventHandler{" +
"id=" + id +
'}';
}

/**
* @param event 事件
* @param sequence 事件正在处理
* @param endOfBatch 是否是最后一个事件在处理
* @throws Exception Exception
*/
@Override
public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(2000);
System.out.println(this + "\tevent:\t" + event.getValue()
+ "\tsequence:\t" + sequence
+ "\tendOfBatch:\t" + endOfBatch);
}
}


生产者 Disruptor - 核心内容

/**
* 生产者 Disruptor - 核心内容
*
* @author lw by 14-7-2.
*/
public class Disruptor_Example {

private static final int RINGBUFFER_SIZE = 16;//这个参数应该是2的幂,否则程序会抛出异常:
private static RingBuffer<ValueEvent> ringBuffer;//定义环形数组内存
private static Disruptor<ValueEvent> disruptor;
private static final ExecutorService SERVICE//线程池
= Executors.newCachedThreadPool();

/**
* 创建 Disruptor 对象。
* Disruptor 类是 Disruptor 项目的核心类,另一个核心类之一是 RingBuffer。
* 如果把 Disruptor 比作计算机的 cpu ,作为调度中心的话,那么 RingBuffer ,就是计算机的 Memory 。
* 第一个参数,是一个 EventFactory 对象,它负责创建 ValueEvent 对象,并填充到 RingBuffer 中;
* 第二个参数,指定 RingBuffer 的大小。这个参数应该是2的幂,否则程序会抛出异常:
* 第三个参数,就是之前创建的 ExecutorService 对象。
*/
private static void init() {
disruptor = new Disruptor<ValueEvent>(
ValueEvent.EVENT_FACTORY,
RINGBUFFER_SIZE,
SERVICE,
ProducerType.MULTI,
new TimeoutBlockingWaitStrategy(1000, TimeUnit.MINUTES)
);
}

/**
* 添加消费者对象
* {@link com.thread.concurrent_.disruptor.DeliveryReportEventHandler}
*
* @param eventHandlers 消费者对象
*/
private static void handleEventsWith(EventHandler[] eventHandlers) {
disruptor.handleEventsWith(eventHandlers);
}

/**
* 启动disruptor
*/
private static void start() {
ringBuffer = disruptor.start();
}

/**
* 生产者线程
* 通过 next 方法,获取 RingBuffer 可写入的消息索引号 seq;
* 通过 seq 检索消息;
* 修改消息的 value 属性;
* 通过 publish 方法,告知消费者线程,当前索引位置的消息可被消费了
*
* @param event 事件
*/
private static void addEVent(ValueEvent event) {

if (hasCapacity()) {
System.out.println("disruptor:ringbuffer 剩余量低于 10 %");
} else {
long seq = ringBuffer.next();
/**
* @see com.thread.concurrent_.disruptor.ValueEvent.<com.thread.concurrent_.disruptor.DeliveryReportEventHandler>
*/
ValueEvent valueEvent = ringBuffer.get(seq);//获取可用位置
valueEvent.setValue(event.getValue());//填充可用位置
ringBuffer.publish(seq);//通知消费者
}
}

/**
* 停止 Disruptor系统(停止消费者线程)
*/
private static void shutdown() {
disruptor.shutdown();
SERVICE.shutdown();
}

/**
* 获取ringBuffer剩余量是否低于RINGBUFFER_SIZE * 0.1
*
* @return boolean
*/
private static boolean hasCapacity() {
return (ringBuffer.remainingCapacity() < RINGBUFFER_SIZE * 0.1);
}

public static void main(String[] args) {

init();//初始化
handleEventsWith(new EventHandler[]{new DeliveryReportEventHandler(1), new DeliveryReportEventHandler(2)});//添加2个消费者
start();//启动disruptor

//生产10个商品
for (int i = 0; i < 10; i++) {
ValueEvent valueEvent = new ValueEvent();
valueEvent.setValue(UUID.randomUUID().toString());
addEVent(valueEvent);
}
//停止
shutdown();
}

}


执行结果内容

DeliveryReportEventHandler{id=2}
event: de9b74ab-0911-44c1-8166-86ac184b922e
sequence: 0
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
de9b74ab-0911-44c1-8166-86ac184b922e
sequence: 0
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
a17f5b7d-35bf-4181-86ee-45f54dfea6e1
sequence: 1
endOfBatch: true

DeliveryReportEventHandler{id=2} event:
a17f5b7d-35bf-4181-86ee-45f54dfea6e1
sequence: 1
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
18d2fbb3-2dc5-42cd-977c-1eee9e00b9f3
sequence: 2
endOfBatch: false

DeliveryReportEventHandler{id=2} event:
18d2fbb3-2dc5-42cd-977c-1eee9e00b9f3
sequence: 2
endOfBatch: true

DeliveryReportEventHandler{id=1} event:
048ac24f-e196-4dd8-b88e-6a8b87b49cac
sequence: 3
endOfBatch: false

DeliveryReportEventHandler{id=2} event:
048ac24f-e196-4dd8-b88e-6a8b87b49cac
sequence: 3
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
1c850ece-787f-49ab-91b0-37ff812a8d94
sequence: 4
endOfBatch: false

DeliveryReportEventHandler{id=2} event:
1c850ece-787f-49ab-91b0-37ff812a8d94
sequence: 4
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
a6dec83c-f582-42f2-8dd3-ed311af4e41d
sequence: 5
endOfBatch: false

DeliveryReportEventHandler{id=2} event:
a6dec83c-f582-42f2-8dd3-ed311af4e41d
sequence: 5
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
ea0cdd6e-7326-473e-8b7a-e16b8e79c51d
sequence: 6
endOfBatch: false

DeliveryReportEventHandler{id=2} event:
ea0cdd6e-7326-473e-8b7a-e16b8e79c51d
sequence: 6
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
b8ef68fe-67b7-4575-b7ed-921e265d3487
sequence: 7
endOfBatch: false

DeliveryReportEventHandler{id=2} event:
b8ef68fe-67b7-4575-b7ed-921e265d3487
sequence: 7
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
559c0e40-8e00-485f-92f4-b2147ce88d2f
sequence: 8
endOfBatch: false

DeliveryReportEventHandler{id=2} event:
559c0e40-8e00-485f-92f4-b2147ce88d2f
sequence: 8
endOfBatch: false

DeliveryReportEventHandler{id=1} event:
4a87ecf4-f05f-4e2e-863c-6430ac9729cc
sequence: 9
endOfBatch: true

DeliveryReportEventHandler{id=2} event:
4a87ecf4-f05f-4e2e-863c-6430ac9729cc
sequence: 9
endOfBatch: true
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息