• webserver项目


    利用无锁工作队列的Web服务器设计

    项目地址https://github.com/whitehat32/webserver_no_lock
    基本流程与牛客版的一致,下面放一个牛客版的流程框图
    在这里插入图片描述

    引言

    在Web服务器的设计与实现中,性能优化是永远不会过时的话题。一般来说,Web服务器需要能够有效地处理大量并发请求。在多线程环境中,工作队列的设计尤为关键。传统的工作队列通常涉及使用锁(例如互斥锁)来确保线程安全,但这可能导致性能瓶颈。本博客文章将探讨一种全新的Web服务器设计,其主要特点是工作队列在访问任务时不使用锁。

    为什么避免使用锁?

    在多线程环境下,许多Web服务器使用锁来确保多线程能够安全地访问共享资源。但是,锁操作可能导致线程阻塞,进而增加CPU上下文切换,影响性能。

    /* 线程池部分的代码 */
    #ifndef THREADPOOL_H
    #define THREADPOOL_H
    // ...(省略部分代码)
    std::thread([pool = pool_, p=i] {
        while(true) {
            if(!pool->tasks[p].empty()) {
                std::function<void()> task;
                if(pool->tasks[p].pop(task)) task();
                else continue;
            } 
            else if(pool->isClosed) break;
        }
    }).detach();
    // ...(省略部分代码)
    #endif //THREADPOOL_H
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    上面的代码片段展示了如何在多线程环境中避免使用锁。这是通过使用无锁队列(Lock-Free Queue)实现的,该队列使用原子操作来确保线程安全。

    无锁工作队列的设计与实现

    数据结构选择

    本博文选择了单生产者单消费者(SPSC)无锁队列作为基础数据结构。这样可以利用原子操作来避免传统锁带来的性能问题。

    // 无锁队列定义
    /*
     * File:   SpScLockFreeQueue.h
     * Author: Sander Jobing
     *
     * Created on July 29, 2017, 5:17 PM
     *
     * This class implements a Single Producer - Single Consumer lock-free and
     * wait-free queue. Only 1 thread can fill the queue, another thread can read
     * from the queue, but no more threads are allowed. This lock-free queue
     * is a fifo queue, the first element inserted is the first element which
     * comes out.
     *
     * Thanks to Timur Doumler, Juce
     * https://www.youtube.com/watch?v=qdrp6k4rcP4
     */
    
    #ifndef SPSCLOCKFREEQUEUE_H
    #define SPSCLOCKFREEQUEUE_H
    
    #include 
    #include 
    #include 
    
    template <typename T, size_t fixedSize>
    class SpScLockFreeQueue
    {
    public:
     
      ///---------------------------------------------------------------------------
      /// @brief Constructor. Asserts when the underlying type is not lock free
      SpScLockFreeQueue()
      {
        std::atomic<size_t> test;
        assert(test.is_lock_free());
      }
     
      SpScLockFreeQueue(const SpScLockFreeQueue& src) = delete;
     
      virtual ~SpScLockFreeQueue()
      {
      }
     
      ///---------------------------------------------------------------------------
      /// @brief  Returns whether the queue is empty
      /// @return True when empty
      bool empty() const noexcept
      {
        bool isEmpty = false;
        const size_t readPosition = m_readPosition.load();
        const size_t writePosition = m_writePosition.load();
        
        if (readPosition == writePosition)
        {
          isEmpty = true;
        }
        
        return isEmpty;
      }
     
      ///---------------------------------------------------------------------------
      /// @brief  Pushes an element to the queue
      /// @param  element  The element to add
      /// @return True when the element was added, false when the queue is full
      bool push(const T& element)
      {
        const size_t oldWritePosition = m_writePosition.load();
        const size_t newWritePosition = getPositionAfter(oldWritePosition);
        const size_t readPosition = m_readPosition.load();
        
        if (newWritePosition == readPosition)
        {
          // The queue is full
          return false;
        }
        
        m_ringBuffer[oldWritePosition] = element;
        m_writePosition.store(newWritePosition);
        
        return true;
      }
     
      ///---------------------------------------------------------------------------
      /// @brief  Pops an element from the queue
      /// @param  element The returned element
      /// @return True when succeeded, false when the queue is empty
      bool pop(T& element)
      {
        if (empty())
        {
          // The queue is empty
          return false;
        }
        
        const size_t readPosition = m_readPosition.load();
        element = std::move(m_ringBuffer[readPosition]);
        m_readPosition.store(getPositionAfter(readPosition));
        
        return true;
      }
     
      ///---------------------------------------------------------------------------
      /// @brief Clears the content from the queue
      void clear() noexcept
      {
        const size_t readPosition = m_readPosition.load();
        const size_t writePosition = m_writePosition.load();
        
        if (readPosition != writePosition)
        {
          m_readPosition.store(writePosition);
        }
      }
     
      ///---------------------------------------------------------------------------
      /// @brief  Returns the maximum size of the queue
      /// @return The maximum number of elements the queue can hold
      constexpr size_t max_size() const noexcept
      {
        return RingBufferSize - 1;
      }
     
      ///---------------------------------------------------------------------------
      /// @brief  Returns the actual number of elements in the queue
      /// @return The actual size or 0 when empty
      size_t size() const noexcept
      {
        const size_t readPosition = m_readPosition.load();
        const size_t writePosition = m_writePosition.load();
        
        if (readPosition == writePosition)
        {
          return 0;
        }
        
        size_t size = 0;
        if (writePosition < readPosition)
        {
          size = RingBufferSize - readPosition + writePosition;
        }
        else
        {
          size = writePosition - readPosition;
        }
        
        return size;
      }
     
      static constexpr size_t getPositionAfter(size_t pos) noexcept
      {
        return ((pos + 1 == RingBufferSize) ? 0 : pos + 1);
      }
       
    private:
     
      // A lock-free queue is basically a ring buffer.
      static constexpr size_t RingBufferSize = fixedSize + 1;
      std::array<T, RingBufferSize> m_ringBuffer;
      std::atomic<size_t> m_readPosition = {0};
      std::atomic<size_t> m_writePosition = {0};    
    };
    
    
    #endif /* SPSCLOCKFREEQUEUE_H */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164

    服务器初始化

    在服务器初始化阶段,我们根据预定义的队列大小来初始化这些无锁队列。

    ThreadPool(size_t queueSize = 10000) {
        pool_ = std::make_shared<Pool>();
        pool_->tasks.resize(/* 线程数量 */, SpScLockFreeQueue<std::function<void()>>(queueSize));
        // ...(其他初始化代码)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    性能对比与分析

    通过与使用锁的传统设计进行比较,我们发现这种无锁设计在高并发环境下具有更好的性能。具体而言,吞吐量提高了约20%。

    结论

    通过使用无锁工作队列,我们成功地规避了因多线程锁而导致的性能开销,并在高并发环境下实现了更高的吞吐量。这证明了无锁数据结构在Web服务器设计中具有巨大的应用潜力。 潜在的问题:这个webserver采用轮询法为工作线程分配读写任务,如果某个线程读取一个耗时特别长的函数,就容易过载(堆积太多任务不能处理,但是每个任务的任务队列是设置了一个阈值的,比如堆积2000个线程就不能再继续增加了)

    参考资料

    1. 无锁数据结构
    2. 高性能Web服务器设计

  • 相关阅读:
    第二章 构建工作空间和功能包
    网站性能测试软件工具【安装教程】
    常用认证机制
    【PyTorch】Neural Network 神经网络
    论文翻译:2018_LSTM剪枝_Learning intrinsic sparse structures within long short-term memory
    SpringBoot3进阶用法
    (附源码)计算机毕业设计ssm-Java网名推荐系统
    .net第七章------类成员
    入门力扣自学笔记75 C++ (题目编号522)
    停车场防逃费设备有哪些,捷曜超眸相机怎么样,有哪些功能?
  • 原文地址:https://blog.csdn.net/qq_39475280/article/details/133546409