• x265线程池机制和并行编码机制分析


        x265默认采用了帧级并行和帧内行级并行两种基础的并行编码。帧级并行即多帧同时编码,帧内行级并行主要利用HEVC的wavefront编码工具来实现,可以同时编码多个CTU行。

        x265采用了基于线程池(threadpool)的多线程机制。threadPool中包含了众多的WorkerThread。WorkerThread是独立的线程,是具体干活的线程,线程的主循环函数为WorkerThread::threadMain()。给WorkerThread派活的叫做JobProvider。x265中FrameEncoder、WaveFront和Lookahead都属于JobProvider。JobProvider的findJob()函数是真正的干活过程。

        这里可以用甲方、包工头和工人进行一个不恰当的类比。threadpool相当于包工头,而WorkerThread相当于工人,包工头手底下有很多工人。没活干的时候,工人处于休息(Sleep)状态。JobProvider则是甲方,甲方手里有工程时,对包工头(threadpool)提需求(tryWakeOne)及对应的实施方法。包工头(threadpool)找到手下空闲的工人(WorkThread)。然后,工人则根据甲方的实施方法(WaveFront::findJob())开始干活。

        对于WPP编码,x265是如何用线程池进行编码的呢?

      WPP多线程编码,FrameEncoder是作为甲方(JobProvider)发布需求的。具体过程在FrameEncode::compressFrame()函数中,代码段如下所示。FrameEncoder通过调用tryWakeOne()对threadpool发布需求,但发需求是有条件的。在HEVC编码中,WPP启动一行编码,需要满足两个条件:1、参考帧所参考的区域都已经编码重建;2、上一行的编码进度领先当前行至少2个CTU(为了获取正确的帧内预测参考像素)。x265中用m_externalDependencyBitmap和m_internalDependencyBitmap变量来分别标识这两个条件是否满足。条件1满足,则调用enableRowEncoder()将m_externalDependencyBitmap当前行所对应的位上置为1;条件2满足,则调用enqueueRowEncoder()将m_internalDependencyBitmap当前行所对应的位上置为1。

    1. if (m_param->bEnableWavefront)
    2. {
    3. for (uint32_t rowInSlice = 0; rowInSlice < m_sliceGroupSize; rowInSlice++)
    4. {
    5. for (uint32_t sliceId = 0; sliceId < m_param->maxSlices; sliceId++)
    6. {
    7. const uint32_t sliceStartRow = m_sliceBaseRow[sliceId];
    8. const uint32_t sliceEndRow = m_sliceBaseRow[sliceId + 1] - 1;
    9. const uint32_t row = sliceStartRow + rowInSlice;
    10. X265_CHECK(row < m_numRows, "slices row fault was detected");
    11. if (row > sliceEndRow)
    12. continue;
    13. // block until all reference frames have reconstructed the rows we need
    14. // 等待参考帧中当前行所参考的最大区域都重建好
    15. for (int l = 0; l < numPredDir; l++)
    16. {
    17. for (int ref = 0; ref < slice->m_numRefIdx[l]; ref++)
    18. {
    19. Frame *refpic = slice->m_refFrameList[l][ref];
    20. // NOTE: we unnecessary wait row that beyond current slice boundary
    21. const int rowIdx = X265_MIN(sliceEndRow, (row + m_refLagRows));
    22. while (refpic->m_reconRowFlag[rowIdx].get() == 0)
    23. refpic->m_reconRowFlag[rowIdx].waitForChange(0); // 等待该行重建完成
    24. if ((bUseWeightP || bUseWeightB) && m_mref[l][ref].isWeighted)
    25. m_mref[l][ref].applyWeight(rowIdx, m_numRows, sliceEndRow, sliceId);
    26. }
    27. }
    28. enableRowEncoder(m_row_to_idx[row]); /* clear external dependency for this row,当期行的外部依赖已经满足 */
    29. if (!rowInSlice) // 第0行不依赖帧内其他行,即满足帧内依赖关系
    30. {
    31. m_row0WaitTime = x265_mdate();
    32. enqueueRowEncoder(m_row_to_idx[row]); /* clear internal dependency, start wavefront, 当前行的内部依赖已经满足 */
    33. }
    34. tryWakeOne(); // 对线程池发布一次需求
    35. } // end of loop rowInSlice
    36. } // end of loop sliceId
    37. tryWakeOne(); /* ensure one thread is active or help-wanted flag is set prior to blocking */
    38. // 如果一帧没有编码结束,则每隔250ms给线程池发布一次需求
    39. static const int block_ms = 250;
    40. while (m_completionEvent.timedWait(block_ms))
    41. tryWakeOne();
    42. }

        下面是函数tryWakeOne()的相关代码:首先让包工头(m_pool)找手底下有没有空闲的工人,如果所有工人都已经在干活了,则只设置一下m_helpWanted为true就直接返回了,后续甲方每隔250ms还会再来找包工头的;如果有空闲工人,则让该工人对接甲方(JobProvider),并且激活(worker.awaken)他。

    1. void JobProvider::tryWakeOne()
    2. {
    3. // 先让包工头找一下有没有空闲的工人
    4. int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
    5. if (id < 0) // 没有空闲工人,直接返回
    6. {
    7. m_helpWanted = true;
    8. return;
    9. }
    10. WorkerThread& worker = m_pool->m_workers[id]; // 当前空闲的工人
    11. if (worker.m_curJobProvider != this) /* poaching */
    12. {
    13. sleepbitmap_t bit = (sleepbitmap_t)1 << id;
    14. SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
    15. worker.m_curJobProvider = this; // 将甲方赋值给该工人,方便工人后续按照甲方需求干活
    16. SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
    17. }
    18. worker.awaken(); // 激活该工人
    19. }

        下面是工人(WorkerThread)的主函数。在工人的主循环函数中,调用甲方的findJob()来干具体的活。干完之后,会主动找下一个需要帮助(m_helpWanted==true)的甲方,继续干活(可称为主动接单模式)。如果没有找到有需求的甲方了,则进入Sleep状态,等待甲方有需求时主动激活(可称为被动接单模式)。

    1. void WorkerThread::threadMain()
    2. {
    3. THREAD_NAME("Worker", m_id);
    4. #if _WIN32
    5. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
    6. #else
    7. __attribute__((unused)) int val = nice(10);
    8. #endif
    9. m_pool.setCurrentThreadAffinity();
    10. sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
    11. m_curJobProvider = m_pool.m_jpTable[0];
    12. m_bondMaster = NULL;
    13. SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
    14. SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
    15. m_wakeEvent.wait(); // 等待甲方激活工人
    16. while (m_pool.m_isActive)
    17. {
    18. if (m_bondMaster)
    19. {
    20. m_bondMaster->processTasks(m_id);
    21. m_bondMaster->m_exitedPeerCount.incr();
    22. m_bondMaster = NULL;
    23. }
    24. do
    25. {
    26. /* do pending work for current job provider */
    27. m_curJobProvider->findJob(m_id); // 完成甲方提出的需求
    28. /* if the current job provider still wants help, only switch to a
    29. * higher priority provider (lower slice type). Else take the first
    30. * available job provider with the highest priority */
    31. int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
    32. INVALID_SLICE_PRIORITY + 1;
    33. // 主动寻找有需求的甲方,继续干活(主动接单模式)
    34. int nextProvider = -1;
    35. for (int i = 0; i < m_pool.m_numProviders; i++)
    36. {
    37. if (m_pool.m_jpTable[i]->m_helpWanted &&
    38. m_pool.m_jpTable[i]->m_sliceType < curPriority)
    39. {
    40. nextProvider = i;
    41. curPriority = m_pool.m_jpTable[i]->m_sliceType;
    42. }
    43. }
    44. if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
    45. {
    46. SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
    47. m_curJobProvider = m_pool.m_jpTable[nextProvider];
    48. SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
    49. }
    50. }
    51. while (m_curJobProvider->m_helpWanted);
    52. /* While the worker sleeps, a job-provider or bond-group may acquire this
    53. * worker's sleep bitmap bit. Once acquired, that thread may modify
    54. * m_bondMaster or m_curJobProvider, then waken the thread */
    55. SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); // 所有甲方的需求暂时都完成了,进入休息状态
    56. m_wakeEvent.wait(); // 等待甲方有需求时主动激活工人(被动接单模式)
    57. }
    58. SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
    59. }

        WaveFont作为甲方之一,他的需求具体实施过程在WaveFont::findJob()函数中,代码如下。CTZ(m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w])用于找到当前能编的最上边的CTU Row。processRow()用于处理一行的编码或者滤波。

    1. void WaveFront::findJob(int threadId)
    2. {
    3. unsigned long id;
    4. /* Loop over each word until all available rows are finished */
    5. for (int w = 0; w < m_numWords; w++)
    6. {
    7. // oldval中bit为1的位表示同时满足帧内和帧间依赖的CTU行
    8. uint32_t oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
    9. while (oldval)
    10. {
    11. CTZ(id, oldval); // 取值为1的最低的位,表示最上边的能编的CTU行
    12. uint32_t bit = 1 << id;
    13. if (ATOMIC_AND(&m_internalDependencyBitmap[w], ~bit) & bit) // 清除internalDepencyBitmap中该行的bit
    14. {
    15. /* we cleared the bit, we get to process the row */
    16. processRow(w * 32 + id, threadId); // 编码当前CTU行
    17. m_helpWanted = true;
    18. return; /* check for a higher priority task */
    19. }
    20. oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
    21. }
    22. }
    23. m_helpWanted = false;
    24. }

    待续......

  • 相关阅读:
    “蔚来杯“2022牛客暑期多校训练营2 DGHJKL题解
    Spark基础入门(01)—RDD
    全国计算机四级之网络工程师知识点(五)
    【仿牛客网笔记】 Spring Boot进阶,开发社区核心功能-过滤敏感词
    django上传excel并读入数据库
    MySQL--MySQL索引事务
    单正态总体和双正态总体的假设检验
    【App自动化测试】(八)三种等待方式——强制等待、隐式等待、显示等待
    Spring Security 中多个身份验证
    JDK1.8中Date_Time API使用
  • 原文地址:https://blog.csdn.net/lrzkd/article/details/125888778