• 并发模式之生产者消费者模式


    生产者 - 消费者模式

    生产者消费者模式是一个经典的多线程设计模式。

    总结:

    1. 生产者线程将任务提交到内存缓冲区,消费者线程从内存缓冲区获取任务并执行。
    2. 通过内存缓冲区,避免了生成者和消费者直接通信,从而将生产者和消费者解耦。
    3. 通过内存缓冲区,允许生产者和消费者的性能差异。

    JDK中提供的线程池(ThreadPoolExecutor)就是典型的生产者消费者模式(其中任务是线程),其中内存缓冲区的实现使用的是BlockingQueue阻塞队列。

    生产者 - 消费者模式(无锁实现)

    ThreadPoolExecutor中使用了BlockingQueue阻塞队列来做内存缓冲区,但是由于使用了锁和阻塞等待来实现线程间的同步,所以新能不高。
    而LMAX公司开发了一套无锁实现的高性能生产者消费者模式的框架,叫做Disruptor

    例子:(生产者生成数据,消费者计算数据平方)

    引入依赖:

    
        com.lmax
        disruptor
        3.4.4
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    数据实体:

    public class PCData {
        private long value;
        public long getValue() {
            return value;
        }
        public void setValue(long value) {
            this.value = value;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    数据工厂:

    public class PCDataFactory implements EventFactory {
        @Override
        public PCData newInstance() {
            return new PCData();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    生产者:

    import com.lmax.disruptor.RingBuffer;
    import java.nio.ByteBuffer;
    public class Producer {
        private final RingBuffer ringBuffer;
    
        public Producer(RingBuffer ringBuffer){
            this.ringBuffer = ringBuffer;
        }
        public void pushData(ByteBuffer bb){
            // 获取环上的下一个序列
            long sequence = ringBuffer.next();
            PCData data = ringBuffer.get(sequence);
            // 设置数据
            data.setValue(bb.getLong(0));
            // 发布序列
            ringBuffer.publish(sequence);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消费者:

    import com.lmax.disruptor.WorkHandler;
    public class Consumer implements WorkHandler {
        @Override
        public void onEvent(PCData pcData) throws Exception {
            // 打印平方值
            System.out.println(Thread.currentThread().getName() +
                    " -- value="+pcData.getValue() +
                    " -- 平方="+Math.pow(pcData.getValue(),2));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    客户端:

    public class Main {
        public static void main(String[] args) throws InterruptedException {
            // 大小需要是2的幂
            int bufferSize = 1024;
            Disruptor disruptor = new Disruptor<>(
                    new PCDataFactory(),
                    bufferSize,
                    Executors.defaultThreadFactory(),
                    ProducerType.MULTI,
                    // 选择合适的策略,提高消费者的响应时间
                    new BlockingWaitStrategy() // 阻塞等待策略
                    // new SleepingWaitStrategy() // 休眠等待策略
                    // new YieldingWaitStrategy() // 谦让等待策略
                    // new BusySpinWaitStrategy() // 忙自旋等待策略,死循环
            );
            // 4个消费者
            disruptor.handleEventsWithWorkerPool(
                    new Consumer(),
                    new Consumer(),
                    new Consumer(),
                    new Consumer()
            );
            disruptor.start();
    
            // 生成数据
            RingBuffer ringBuffer = disruptor.getRingBuffer();
            long size = 1000L;
            // 2个生产者
            new Thread(()->{
                Producer producer = new Producer(ringBuffer);
                ByteBuffer bb = ByteBuffer.allocate(8);
                for(long i = 0L;i{
                Producer producer = new Producer(ringBuffer);
                ByteBuffer bb = ByteBuffer.allocate(8);
                for(long i = size;i<2*size;i++){
                    bb.putLong(0,i);
                    producer.pushData(bb);
                    System.out.println(Thread.currentThread().getName() + " - 产生数据:"+i);
                }
            }).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    总结:

    1. 选择合适的策略,提高消费者的响应时间
    new BlockingWaitStrategy() // 阻塞等待策略,省CPU
    new SleepingWaitStrategy() // 休眠等待策略,中等延迟,自旋等待失败后休眠,不占用太多CPU
    new YieldingWaitStrategy() // 谦让等待策略,低延迟,CPU物理核大于线程数
    new BusySpinWaitStrategy() // 忙自旋等待策略,死循环,吃掉所有CPU资源
    
    • 1
    • 2
    • 3
    • 4
    1. DisruptorSequence使用对齐填充的方式解决CPU缓存伪共享问题。

    CPU缓存伪共享

    看下图,能知道CPU缓存伪共享的问题

    可以通过将存储的数据使用填充对齐到缓存行(64字节)大小,使得每个缓存行只存一个数据。

    如下代码片段是DisruptorSequence继承的RhsPadding类,里面填充了7个long类型的值(一个long类型64位即8字节,补7个加上自己的一个工8个,合计64字节,刚好占一个缓存行大小)

    class RhsPadding extends Value {
        protected long p9;
        protected long p10;
        protected long p11;
        protected long p12;
        protected long p13;
        protected long p14;
        protected long p15;
    
        RhsPadding() {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    参考资料

    • 书籍 葛一鸣 * 《Java高并发程序设计》
  • 相关阅读:
    内置模板市场,DataEase开源数据可视化分析平台v1.13.0发布
    Maven学习(二)
    机器学习实战 | SKLearn最全应用指南
    SpringCloud中服务间通信方式以及Ribbon、Openfeign组件的使用
    webpack优化篇(五十):使用动态 Polyfill 服务
    扒去Spring事件监听机制的外衣,竟然是观察者模式
    CMake教程系列-03-依赖管理
    基于SpringBoot的花店销售网站
    电脑怎样抠图?这几个软件能轻松实现
    candence画环形贴片焊盘
  • 原文地址:https://blog.csdn.net/mg0324/article/details/126029736