• 使用缓冲区提高并发


    使用缓冲区提高并发

    我们知道在计算机处理的处理过程中,CPU运算和内存访问是很快的,而涉及到硬盘访问的数据库或文件操作相对就很慢了。所以在处理较高并发的请求过程中,数据库或文件操作往往会成为处理流程中的性能瓶颈。本文介绍使用缓冲区配合多线程的方法来实现高并发处理能力。

    基本测试环境说明

    下面我们以springboot作为基础,实现一个简单的http接口,需求是按请求到达服务器端的先后顺序对请求进行编号,并且将请求的编号按序打印在控制台(打印时加入sleep延时以模拟慢处理)。

    • DAO层:

    打印控制台的实现类如下:(writeString是打印单行字符串,writeStringInBatch是批量打印多行字符串,用于模拟批量数据库写入,在写入大量数据的时候批量写入数据库往往比循环多次写入单条数据的效率高很多)

    import org.springframework.stereotype.Component;
    import java.util.List;
    import java.util.Random;
    
    @Component
    public class DataWriter {
        public void writeString(String line) {
            try {
                Thread.sleep(new Random().nextInt(500));
                // Thread.sleep(500);
                System.out.println(Thread.currentThread().getName()+"->"+line);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void writeStringInBatch(List<String> lines) {
            try {
                Thread.sleep(new Random().nextInt(500));
                // Thread.sleep(500);
                for(String l: lines) {
                    System.out.println(Thread.currentThread().getName() + "->" + l);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • controller层:收到/queue请求直接调用service层接口

      @RestController
      public class HelloController {
      
          @Autowired
          private HelloService helloService;
      
          @GetMapping("/queue")
          public String queue() {
              return helloService.writeNormal();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    • service层:使用seqNo来记录每个请求的编号,收到请求时,将seqNo加1后交给ISeqHandler处理,因为springboot是多线程处理,因此seqNo加1时需要使用synchronized加锁来保证编号不重复。

      @Service
      public class HelloService {
          private long seqNo = 0;
        
          @Qualifier("NormalSeqHandlerImpl")
          @Autowired
          private ISeqHandler normalHandler;
      
          public synchronized String writeNormal() {
              seqNo++;
              normalHandler.handleData(seqNo+"");
              return "OK";
          }
      }
      
      @Component("NormalSeqHandlerImpl")
      public class NormalSeqHandlerImpl implements ISeqHandler {
      
          @Autowired
          private DataWriter dataWriter;
      
          @Override
          public void handleData(String line) {
              dataWriter.writeString(line);
          }
      
          @Override
          public void handleDataInBatch(List<String> lines) {
              handleDataInBatch(lines);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
    • 接口测试:使用jmeter对接口做性能测试,创建测试项,步骤如下:

      • 启动jmeter后默认有一个TestPlan,右键点击TestPlan添加一个线程组

        在这里插入图片描述

        Number od Threads表示一次测试并发的总线程数,Ramp-up period表示这些线程在几秒内创建完:

        在这里插入图片描述

      • 右键点击建好的线程组,添加http请求:

        在这里插入图片描述

      • 右键点击线程组,添加聚合结果查看器:

        在这里插入图片描述

      • 如果要查看每个http请求返回的结果可以,右键点击线程组,点击Add->Listener->View Results Tree添加结果树查看器

      • 一个TestPlan下还可以添加多个线程组进行多组不同的测试

      • 创建后点击保存,右键点击某个线程组,点击Start即可开始测试,测试跑完后可以在聚合报告中查看测试情况,我们上面的接口没做任何优化,50个并发请求,平均响应时间达到了6489毫秒:

        在这里插入图片描述

        注意:springboot后台启动后的第一次测试可能需要做一些线程创建操作会比较慢可以多测试几次。

    使用缓冲区优化

    上面的service层所有的处理过程(包括慢的打印操作)都放在synchronized中,所以多线程一点没发挥出来,完全是串行执行。

    为了提高性能,引入缓冲区,seqNo编号加1后按顺序放入缓冲区后直接返回(只放在内存中所以很快)。慢操作不能加锁,但是为了保证按编号顺序打印输出,我们引入一个独立的线程作为消费者专门从缓冲区中获取数据进行控制台打印,为了提高打印效率,采用批量打印方式。

    @Service
    public class HelloService {
        private long seqNo = 0;
    
        @Qualifier("NormalSeqHandlerImpl") // 低效处理实现类
        @Autowired
        private ISeqHandler normalHandler;
    
        @Qualifier("BetterSeqHandlerImpl") // 优化的实现类
        @Autowired
        private ISeqHandler betterHandler;
    
        public synchronized String writeNormal() {
            seqNo++;
            normalHandler.handleData(seqNo+"");
            return "OK";
        }
    
        public synchronized String writeBetter() {
            seqNo++;
            betterHandler.handleData(seqNo+"");
            return "OK";
        }
    }
    
    @Component("BetterSeqHandlerImpl")
    public class BetterSeqHandlerImpl implements ISeqHandler, Runnable {
        @Autowired
        private DataWriter dataWriter;
    
        private ConcurrentLinkedDeque<String> buffer = new ConcurrentLinkedDeque();
        private Thread flushThread;
        private boolean isRunning = true;
    
        @PostConstruct // 实例化后启动线程
        public void initThread() {
            flushThread = new Thread(this);
            flushThread.start();
        }
    
        @PreDestroy
        public void exitThread() {
            isRunning = false;
        }
    
        @Override
        public void handleData(String line) {
            buffer.add(line);
        }
    
        @Override
        public void handleDataInBatch(List<String> lines) {
            buffer.addAll(lines);
        }
    
        @Override
        public void run() {
            System.out.println("flushThread is running...");
            List<String> lines = new ArrayList<>();
            while (isRunning) { // 线程一直运行,从缓冲区拿数据进行处理
                if(buffer.isEmpty()) {
                    try {
                        Thread.sleep(10); // 这里要加个sleep,不然CPU会100%
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    while(!buffer.isEmpty()) {
                        lines.add(buffer.poll());
                    }
                    dataWriter.writeStringInBatch(lines); // 批量处理
                    lines.clear();
                }
            }
            System.out.println("flushThread exit.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    优化后50个并发很快(平均2毫秒):

    在这里插入图片描述

    并发加到2000,依然很快(平均6毫秒):

    在这里插入图片描述

    使用springboot内置线程池

    上面的缓冲区消费使用了一个独立线程来消费缓冲的队列,直接使用线程编程比较简单,但对这个线程的可靠性要求比较高,万一线程崩溃了缓冲区就没有消费者了,springboot处理http请求使用的是线程池,我们也可以利用这个线程池来提高一下可靠性,在处理http请求的线程中,由一个线程来消费缓冲区,注意这里不是固定一个线程,而是设置一个标志,每个处理请求线程将数据放入缓冲后判断一下标识,如果没有消费线程就自己来做一次队列消费批量处理,为了避免在有线程处于消费刷写的情况下,其他线程新加的数据无人处理,我们需要设置一个等待线程,这样能确保队列中的数据不会遗漏,下面用一个新的service来完成这个功能:

    @Service
    public class DoubleBufferService {
        @Autowired
        private DataWriter dataWriter;
        private DoubleBuffer doubleBuffer = new DoubleBuffer();
    
        private long seqNo = 0;
        // isWaiting表示是否有线程处于等待中, isFlushing表示是否有线程在进行消费
        private boolean isWaiting = false;
        private boolean isFlushing = false;
    
    
        public String writeNo() {
            synchronized(this) {
                seqNo++;
                doubleBuffer.addData(seqNo + "");
                if(isWaiting) {
                    return "OK";
                }
                isWaiting = true;
                while(isFlushing) {
                    try {
                        this.wait(100); // wait期间会释放锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                isWaiting = false;
                // flush if there is no thread is flushing
                if(doubleBuffer.getCurrentSize() == 0) {
                    return "OK";
                }
                doubleBuffer.exchangeBuf();
                isFlushing = true;
            }
            doubleBuffer.flushBuf();
            synchronized (this) {
                isFlushing = false;
                this.notifyAll(); // 唤醒wait的线程
            }
            return "OK";
        }
    
        private class DoubleBuffer {
            private LinkedList<String> inBuf = new LinkedList();
            private LinkedList<String> outBuf = new LinkedList();
    
            public void addData(String data) {
                inBuf.add(data);
            }
    
            public void exchangeBuf() {
                LinkedList<String> tmp = inBuf;
                inBuf = outBuf;
                outBuf = tmp;
            }
    
            public void flushBuf() {
                dataWriter.writeStringInBatch(outBuf);
                /*for(String l: outBuf) {
                    dataWriter.writeString(l);
                }*/
                outBuf.clear();
            }
            public int getCurrentSize() {
                return inBuf.size();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    测试结果:

    在这里插入图片描述

  • 相关阅读:
    Vue双向绑定的原理
    1.3优先级
    解决element中table在页面切换时候表格底部出现空白
    Redis核心数据结构【set】【从入门到入坟】
    【论文速读,找找启发点】2024/6/16
    Ai语音机器人系统语音识别达到了什么水准
    解密Spring Cloud LoadBalancer:实现高效负载均衡的魔法密卷(一)
    数据库注入提权总结(一)
    C++ 常用时间获取函数汇总
    Flink——Flink检查点(checkpoint)、保存点(savepoint)的区别与联系
  • 原文地址:https://blog.csdn.net/dengruijin/article/details/126671867