秒级达百万高并发框架-Disruptor
介绍
Disruptor是一个高性能的并发框架,主要应用于创建具有高吞吐量、低延迟、无锁(lock-free)的数据结构和事件处理系统。它最初由LMAX公司开发的,已经成为了业界广泛使用的高性能并发框架。
特点和优势
- 高性能:Disruptor框架能够通过无锁的方式提供非常高的并发性能和吞吐量,比如在大规模消息发布订阅场景下,能够每秒处理数百万个消息。
- 低延迟:与传统的基于共享内存的方式相比,Disruptor框架通过线程之间的缓存操作和快速消息传递实现低延迟。
- 易用性:Disruptor框架提供了简单的API,可以方便地实现生产者-消费者模式、消息队列、事件处理器等多种应用场景。
- 可扩展性:Disruptor框架支持多线程处理消息,可以根据实际需求设置线程数,以提高处理效率。
解决了什么问题
- Disruptor主要解决的是高性能应用中的并发问题,主要涉及数据缓存和线程通信这两个方面。在传统的并发编程中,由于共享状态和锁竞争等问题,很容易导致线程间的同步延迟,从而影响应用程序的性能和可扩展性。
- Disruptor采用了无锁(Lock-Free)的并发编程技术,将数据存储在一个环形缓冲区中,并通过CAS操作等方式实现数据的并发读写和线程间的通信。它在保证数据一致性的同时,最大限度地压缩了线程间的同步开销,从而能够实现高效的消息传递和事件处理。Disruptor 的高吞吐量、低延迟以及可扩展性好的特点,使得它成为许多高并发应用的首选方案之一。
使用
引入jar包
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
消息载体
/**
*
* 功能描述: 定义事件event 通过Disruptor 进行交换的数据类型。
*
* @param:
* @return:
* @auther: csh
* @date: 2023/6/17 11:33 下午
*/
public class LogEvent {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
生产者
/**
*
* 功能描述: 生产者
*
* @param:
* @return:
* @auther: csh
* @date: 2023/6/18 12:01 上午
*/
public class Producer implements EventTranslator<LongEvent> {
@Override
public void translateTo(LongEvent event, long sequence) {
event.setNumber(sequence);
}
}
消费者
import com.lmax.disruptor.EventHandler;
/**
*
* 功能描述: 消费者
*
* @param:
* @return:
* @auther: csh
* @date: 2023/6/18 12:01 上午
*/
public class Consumer implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Consumer:" + event.getNumber());
}
}
运行
rt com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
* @author: csh
* @Date: 2023/6/17 23:37
* @Description:主测试方式
*/
public class DisruptorMain {
public static void main(String[] args) {
//开始时间
Date start = new Date();
//创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
//初始化Disruptor 其中参数顺序如下:
// LongEvent新建
// ringBufferSize大小一定要是2的N次方
// executor为线程池
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(LongEvent::new, 1024 * 1024, executor);
//连接消费者
disruptor.handleEventsWith(new Consumer());
//启动
RingBuffer<LongEvent> ringBuffer = disruptor.start();
//生产者
Producer producer = new Producer();
IntStream.range(0, 1000000)
.parallel()
.forEach(i -> {
ringBuffer.publishEvent(producer);
});
//关闭服务 关闭线程池
disruptor.shutdown();
executor.shutdown();
Date end = new Date();
System.out.println((end.getTime() - start.getTime()) / 1000 + "秒");
}
}
等待策略
- BusySpinWaitStrategy:忙等待策略,使用循环检查的方式等待新事件,能够实现最低的延迟,但会消耗大量的CPU资源。
- SleepingWaitStrategy:休眠等待策略,当没有新事件到来时,消费者线程会进入睡眠状态,在指定的时间后醒来继续检查RingBuffer,相对于Busy Spin可以更加节约CPU资源。
- YieldingWaitStrategy:让步等待策略,当没有新事件到来时,消费者线程会暂停执行,将CPU资源让给其他线程,适用于中等延迟场景。
- BlockingWaitStrategy:阻塞等待策略,它会使消费者线程进入阻塞状态,直到新的事件可用或者超时。适用于更高延迟的场景,但是会对系统吞吐量产生影响。
- LiteBlockingWaitStrategy:是一种非重入锁的阻塞等待策略,它在实现上相对于BlockingWaitStrategy更加轻量级,同时也能够实现阻塞等待。
- TimeoutBlockingWaitStrategy:超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出。相较于BlockingWaitStrategy,它具有更好的响应性能和可控的阻塞时间
数据结构
- RingBuffer:Disruptor的核心数据结构,被用作队列的数据结构,通过Claim Strategy将事件发布到RingBuffer中,然后由Wait Strategy等待消费者获取并处理事件。RingBuffer采用预分配的方式,即在初始化时预先为每个slot分配了内存空间,避免了动态分配内存带来的开销和竞争。
- Sequence:Sequence是序管理器是Disruptor中的一个序列号(自增),代表了生产者或消费者已经处理到哪个位置。每个生产者和每个消费者都维护自己的Sequence值,RingBuffer就是通过Sequence值来标识每个槽位的。Disruptor使用Sequence来实现流水线的形式,不同的处理阶段之间会通过Sequence进行衔接。
- SequenceBarrier:SequenceBarrier简称序栅栏是用来保证消费者和生产者之间的协作。当消费者读取RingBuffer中的事件时,需要等待生产者提供的事件可用。而生产者发布事件时,也需要等待消费者已经处理完之前的事件。SequenceBarrier就可以提供这种等待机制,它可以阻塞消费者线程直到生产者发布了足够的事件,也可以阻塞生产者线程直到消费者处理完之前的事件。
- BatchEventProcessor:BatchEventProcessor称为事件批理处理器是Disruptor的核心处理器,用来处理RingBuffer中的事件。它使用一个Sequence来表示消费者已经处理完哪些事件,当新的事件可用时,它会批量处理一定数量的事件并更新自己维护的Sequence值。
- EventHandler:EventHandler是Disruptor中的事件处理器,用于处理从RingBuffer中读取的事件。每个EventHandler都需要实现onEvent方法,在其中编写业务逻辑。