你了解并发框架 Disruptor 吗?(下篇)
那么事件发布模板该怎么写呢?
long sequence = ringBuffer.next(); // Grab the next sequence
try {
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(8888L); // Fill with data
} finally {
ringBuffer.publish(sequence);
}
我们使用EventTranslator发布事件。
//===============================================================
EventTranslator<LongEvent> translator1 = new EventTranslator<LongEvent>() {
@Override
public void translateTo(LongEvent event, long sequence) {
event.set(8888L);
}
};
ringBuffer.publishEvent(translator1);
//===============================================================
EventTranslatorOneArg<LongEvent, Long> translator2 = new EventTranslatorOneArg<LongEvent, Long>() {
@Override
public void translateTo(LongEvent event, long sequence, Long l) {
event.set(l);
}
};
ringBuffer.publishEvent(translator2, 7777L);
//===============================================================
EventTranslatorTwoArg<LongEvent, Long, Long> translator3 = new EventTranslatorTwoArg<LongEvent, Long, Long>() {
@Override
public void translateTo(LongEvent event, long sequence, Long l1, Long l2) {
event.set(l1 + l2);
}
};
ringBuffer.publishEvent(translator3, 10000L, 10000L);
//===============================================================
EventTranslatorThreeArg<LongEvent, Long, Long, Long> translator4 = new EventTranslatorThreeArg<LongEvent, Long, Long, Long>() {
@Override
public void translateTo(LongEvent event, long sequence, Long l1, Long l2, Long l3) {
event.set(l1 + l2 + l3);
}
};
ringBuffer.publishEvent(translator4, 10000L, 10000L, 1000L);
//===============================================================
EventTranslatorVararg<LongEvent> translator5 = new EventTranslatorVararg<LongEvent>() {
@Override
public void translateTo(LongEvent event, long sequence, Object... objects) {
long result = 0;
for(Object o : objects) {
long l = (Long)o;
result += l;
}
event.set(result);
}
};
ringBuffer.publishEvent(translator5, 10000L, 10000L, 10000L, 10000L);
使用Lamda表达式。
public class Main03
{
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent((event, sequence) -> event.set(10000L));
System.in.read();
}
}
ProducerType生产者线程模式:
ProducerType有两种模式 Producer.MULTI和Producer.SINGLE,默认是MULTI,表示在多线程模式下产生sequence,如果确认是单线程生产者,那么可以指定SINGLE,效率会提升,如果是多个生产者(多线程),但模式指定为SINGLE,会出什么问题呢?
等待策略:
1.(常用)BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。
2.BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu。
3.LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数。
4.LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。
5.PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略。
6.TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常。
7.(常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu。
8.(常用)SleepingWaitStrategy : sleep
消费者异常处理:
默认:
disruptor.setDefaultExceptionHandler()
覆盖:
disruptor.handleExceptionFor().with()
程序员Tony 原创文章 25获赞 174访问量 1万+ 关注 私信- 2019最新Java并发编程高级技术Netty整合并发编程框架Disruptor实战教程
- 2019最新某Java并发编程高级技术Netty整合并发编程框架Disruptor实战教程
- 最新Java并发编程高级技术Netty整合并发编程框架Disruptor实战视频教程
- [翻译]高并发框架 LMAX Disruptor 介绍
- Disruptor并发框架
- 并发框架Disruptor场景应用
- 并发框架Disruptor译文
- Disruptor(无锁并发框架)-发布
- 你需要知道的高性能并发框架Disruptor原理
- Disruptor并发框架
- Disruptor(无锁并发框架)-发布
- 并发框架Disruptor几个Demo
- 并发框架Disruptor几个Demo
- disruptor - 并发编程框架
- 并发框架Disruptor译文
- 结合实际使用场景, 谈谈并发场景,或常用框架中间件的深度了解
- 并发框架Disruptor
- disruptor并发框架,为什么会这么快
- Disruptor(无锁并发框架)-发布
- 并发框架Disruptor译文