Uylin
发布于 2023-09-05 / 33 阅读 / 0 评论 / 0 点赞

disruptor

秒级达百万高并发框架-Disruptor

介绍

Disruptor是一个高性能的并发框架,主要应用于创建具有高吞吐量、低延迟、无锁(lock-free)的数据结构和事件处理系统。它最初由LMAX公司开发的,已经成为了业界广泛使用的高性能并发框架。

特点和优势

  • 高性能:Disruptor框架能够通过无锁的方式提供非常高的并发性能和吞吐量,比如在大规模消息发布订阅场景下,能够每秒处理数百万个消息。
  • 低延迟:与传统的基于共享内存的方式相比,Disruptor框架通过线程之间的缓存操作和快速消息传递实现低延迟。
  • 易用性:Disruptor框架提供了简单的API,可以方便地实现生产者-消费者模式、消息队列、事件处理器等多种应用场景。
  • 可扩展性:Disruptor框架支持多线程处理消息,可以根据实际需求设置线程数,以提高处理效率。

解决了什么问题

  • Disruptor主要解决的是高性能应用中的并发问题,主要涉及数据缓存和线程通信这两个方面。在传统的并发编程中,由于共享状态和锁竞争等问题,很容易导致线程间的同步延迟,从而影响应用程序的性能和可扩展性。
  • Disruptor采用了无锁(Lock-Free)的并发编程技术,将数据存储在一个环形缓冲区中,并通过CAS操作等方式实现数据的并发读写和线程间的通信。它在保证数据一致性的同时,最大限度地压缩了线程间的同步开销,从而能够实现高效的消息传递和事件处理。Disruptor 的高吞吐量、低延迟以及可扩展性好的特点,使得它成为许多高并发应用的首选方案之一。

使用

引入jar包

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

消息载体

/**
 *
 * 功能描述: 定义事件event 通过Disruptor 进行交换的数据类型。
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2023/6/17 11:33 下午
 */
public class LogEvent {
    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

生产者

/**
 *
 * 功能描述: 生产者
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2023/6/18 12:01 上午
 */
public class Producer implements EventTranslator<LongEvent> {

    @Override
    public void translateTo(LongEvent event, long sequence) {
        event.setNumber(sequence);
    }
}

消费者

import com.lmax.disruptor.EventHandler;
/**
 *
 * 功能描述: 消费者
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2023/6/18 12:01 上午
 */
public class Consumer implements EventHandler<LongEvent> {

    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Consumer:" + event.getNumber());
    }
}

运行

rt com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import java.util.stream.Stream;


/**
 * @author: csh
 * @Date: 2023/6/17 23:37
 * @Description:主测试方式
 */
public class DisruptorMain {
    public static void main(String[] args) {
        //开始时间
        Date start = new Date();
        //创建线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //初始化Disruptor 其中参数顺序如下:
        // LongEvent新建
        // ringBufferSize大小一定要是2的N次方
        // executor为线程池
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(LongEvent::new, 1024 * 1024, executor);
        //连接消费者
        disruptor.handleEventsWith(new Consumer());
        //启动
        RingBuffer<LongEvent> ringBuffer = disruptor.start();
        //生产者
        Producer producer = new Producer();
        IntStream.range(0, 1000000)
                .parallel()
                .forEach(i -> {
                    ringBuffer.publishEvent(producer);
                });

        //关闭服务 关闭线程池
        disruptor.shutdown();
        executor.shutdown();
        Date end = new Date();
        System.out.println((end.getTime() - start.getTime()) / 1000 + "秒");
    }
}

等待策略

  • BusySpinWaitStrategy:忙等待策略,使用循环检查的方式等待新事件,能够实现最低的延迟,但会消耗大量的CPU资源。
  • SleepingWaitStrategy:休眠等待策略,当没有新事件到来时,消费者线程会进入睡眠状态,在指定的时间后醒来继续检查RingBuffer,相对于Busy Spin可以更加节约CPU资源。
  • YieldingWaitStrategy:让步等待策略,当没有新事件到来时,消费者线程会暂停执行,将CPU资源让给其他线程,适用于中等延迟场景。
  • BlockingWaitStrategy:阻塞等待策略,它会使消费者线程进入阻塞状态,直到新的事件可用或者超时。适用于更高延迟的场景,但是会对系统吞吐量产生影响。
  • LiteBlockingWaitStrategy:是一种非重入锁的阻塞等待策略,它在实现上相对于BlockingWaitStrategy更加轻量级,同时也能够实现阻塞等待。
  • TimeoutBlockingWaitStrategy:超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出。相较于BlockingWaitStrategy,它具有更好的响应性能和可控的阻塞时间

数据结构

  • RingBuffer:Disruptor的核心数据结构,被用作队列的数据结构,通过Claim Strategy将事件发布到RingBuffer中,然后由Wait Strategy等待消费者获取并处理事件。RingBuffer采用预分配的方式,即在初始化时预先为每个slot分配了内存空间,避免了动态分配内存带来的开销和竞争。
  • Sequence:Sequence是序管理器是Disruptor中的一个序列号(自增),代表了生产者或消费者已经处理到哪个位置。每个生产者和每个消费者都维护自己的Sequence值,RingBuffer就是通过Sequence值来标识每个槽位的。Disruptor使用Sequence来实现流水线的形式,不同的处理阶段之间会通过Sequence进行衔接。
  • SequenceBarrier:SequenceBarrier简称序栅栏是用来保证消费者和生产者之间的协作。当消费者读取RingBuffer中的事件时,需要等待生产者提供的事件可用。而生产者发布事件时,也需要等待消费者已经处理完之前的事件。SequenceBarrier就可以提供这种等待机制,它可以阻塞消费者线程直到生产者发布了足够的事件,也可以阻塞生产者线程直到消费者处理完之前的事件。
  • BatchEventProcessor:BatchEventProcessor称为事件批理处理器是Disruptor的核心处理器,用来处理RingBuffer中的事件。它使用一个Sequence来表示消费者已经处理完哪些事件,当新的事件可用时,它会批量处理一定数量的事件并更新自己维护的Sequence值。
  • EventHandler:EventHandler是Disruptor中的事件处理器,用于处理从RingBuffer中读取的事件。每个EventHandler都需要实现onEvent方法,在其中编写业务逻辑。