x265默认采用了帧级并行和帧内行级并行两种基础的并行编码。帧级并行即多帧同时编码,帧内行级并行主要利用HEVC的wavefront编码工具来实现,可以同时编码多个CTU行。
x265采用了基于线程池(threadpool)的多线程机制。threadPool中包含了众多的WorkerThread。WorkerThread是独立的线程,是具体干活的线程,线程的主循环函数为WorkerThread::threadMain()。给WorkerThread派活的叫做JobProvider。x265中FrameEncoder、WaveFront和Lookahead都属于JobProvider。JobProvider的findJob()函数是真正的干活过程。
这里可以用甲方、包工头和工人进行一个不恰当的类比。threadpool相当于包工头,而WorkerThread相当于工人,包工头手底下有很多工人。没活干的时候,工人处于休息(Sleep)状态。JobProvider则是甲方,甲方手里有工程时,对包工头(threadpool)提需求(tryWakeOne)及对应的实施方法。包工头(threadpool)找到手下空闲的工人(WorkThread)。然后,工人则根据甲方的实施方法(WaveFront::findJob())开始干活。
对于WPP编码,x265是如何用线程池进行编码的呢?
WPP多线程编码,FrameEncoder是作为甲方(JobProvider)发布需求的。具体过程在FrameEncode::compressFrame()函数中,代码段如下所示。FrameEncoder通过调用tryWakeOne()对threadpool发布需求,但发需求是有条件的。在HEVC编码中,WPP启动一行编码,需要满足两个条件:1、参考帧所参考的区域都已经编码重建;2、上一行的编码进度领先当前行至少2个CTU(为了获取正确的帧内预测参考像素)。x265中用m_externalDependencyBitmap和m_internalDependencyBitmap变量来分别标识这两个条件是否满足。条件1满足,则调用enableRowEncoder()将m_externalDependencyBitmap当前行所对应的位上置为1;条件2满足,则调用enqueueRowEncoder()将m_internalDependencyBitmap当前行所对应的位上置为1。
- if (m_param->bEnableWavefront)
- {
- for (uint32_t rowInSlice = 0; rowInSlice < m_sliceGroupSize; rowInSlice++)
- {
- for (uint32_t sliceId = 0; sliceId < m_param->maxSlices; sliceId++)
- {
- const uint32_t sliceStartRow = m_sliceBaseRow[sliceId];
- const uint32_t sliceEndRow = m_sliceBaseRow[sliceId + 1] - 1;
- const uint32_t row = sliceStartRow + rowInSlice;
-
- X265_CHECK(row < m_numRows, "slices row fault was detected");
-
- if (row > sliceEndRow)
- continue;
-
- // block until all reference frames have reconstructed the rows we need
- // 等待参考帧中当前行所参考的最大区域都重建好
- for (int l = 0; l < numPredDir; l++)
- {
- for (int ref = 0; ref < slice->m_numRefIdx[l]; ref++)
- {
- Frame *refpic = slice->m_refFrameList[l][ref];
-
- // NOTE: we unnecessary wait row that beyond current slice boundary
- const int rowIdx = X265_MIN(sliceEndRow, (row + m_refLagRows));
-
- while (refpic->m_reconRowFlag[rowIdx].get() == 0)
- refpic->m_reconRowFlag[rowIdx].waitForChange(0); // 等待该行重建完成
-
- if ((bUseWeightP || bUseWeightB) && m_mref[l][ref].isWeighted)
- m_mref[l][ref].applyWeight(rowIdx, m_numRows, sliceEndRow, sliceId);
- }
- }
-
- enableRowEncoder(m_row_to_idx[row]); /* clear external dependency for this row,当期行的外部依赖已经满足 */
- if (!rowInSlice) // 第0行不依赖帧内其他行,即满足帧内依赖关系
- {
- m_row0WaitTime = x265_mdate();
- enqueueRowEncoder(m_row_to_idx[row]); /* clear internal dependency, start wavefront, 当前行的内部依赖已经满足 */
- }
- tryWakeOne(); // 对线程池发布一次需求
- } // end of loop rowInSlice
- } // end of loop sliceId
-
-
- tryWakeOne(); /* ensure one thread is active or help-wanted flag is set prior to blocking */
-
- // 如果一帧没有编码结束,则每隔250ms给线程池发布一次需求
- static const int block_ms = 250;
- while (m_completionEvent.timedWait(block_ms))
- tryWakeOne();
- }
下面是函数tryWakeOne()的相关代码:首先让包工头(m_pool)找手底下有没有空闲的工人,如果所有工人都已经在干活了,则只设置一下m_helpWanted为true就直接返回了,后续甲方每隔250ms还会再来找包工头的;如果有空闲工人,则让该工人对接甲方(JobProvider),并且激活(worker.awaken)他。
- void JobProvider::tryWakeOne()
- {
- // 先让包工头找一下有没有空闲的工人
- int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
- if (id < 0) // 没有空闲工人,直接返回
- {
- m_helpWanted = true;
- return;
- }
-
- WorkerThread& worker = m_pool->m_workers[id]; // 当前空闲的工人
- if (worker.m_curJobProvider != this) /* poaching */
- {
- sleepbitmap_t bit = (sleepbitmap_t)1 << id;
- SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
- worker.m_curJobProvider = this; // 将甲方赋值给该工人,方便工人后续按照甲方需求干活
- SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
- }
- worker.awaken(); // 激活该工人
- }
下面是工人(WorkerThread)的主函数。在工人的主循环函数中,调用甲方的findJob()来干具体的活。干完之后,会主动找下一个需要帮助(m_helpWanted==true)的甲方,继续干活(可称为主动接单模式)。如果没有找到有需求的甲方了,则进入Sleep状态,等待甲方有需求时主动激活(可称为被动接单模式)。
- void WorkerThread::threadMain()
- {
- THREAD_NAME("Worker", m_id);
-
- #if _WIN32
- SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
- #else
- __attribute__((unused)) int val = nice(10);
- #endif
-
- m_pool.setCurrentThreadAffinity();
-
- sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
- m_curJobProvider = m_pool.m_jpTable[0];
- m_bondMaster = NULL;
-
- SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
- SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
- m_wakeEvent.wait(); // 等待甲方激活工人
-
- while (m_pool.m_isActive)
- {
- if (m_bondMaster)
- {
- m_bondMaster->processTasks(m_id);
- m_bondMaster->m_exitedPeerCount.incr();
- m_bondMaster = NULL;
- }
-
- do
- {
- /* do pending work for current job provider */
- m_curJobProvider->findJob(m_id); // 完成甲方提出的需求
-
- /* if the current job provider still wants help, only switch to a
- * higher priority provider (lower slice type). Else take the first
- * available job provider with the highest priority */
- int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
- INVALID_SLICE_PRIORITY + 1;
- // 主动寻找有需求的甲方,继续干活(主动接单模式)
- int nextProvider = -1;
- for (int i = 0; i < m_pool.m_numProviders; i++)
- {
- if (m_pool.m_jpTable[i]->m_helpWanted &&
- m_pool.m_jpTable[i]->m_sliceType < curPriority)
- {
- nextProvider = i;
- curPriority = m_pool.m_jpTable[i]->m_sliceType;
- }
- }
- if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
- {
- SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
- m_curJobProvider = m_pool.m_jpTable[nextProvider];
- SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
- }
- }
- while (m_curJobProvider->m_helpWanted);
-
- /* While the worker sleeps, a job-provider or bond-group may acquire this
- * worker's sleep bitmap bit. Once acquired, that thread may modify
- * m_bondMaster or m_curJobProvider, then waken the thread */
- SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); // 所有甲方的需求暂时都完成了,进入休息状态
- m_wakeEvent.wait(); // 等待甲方有需求时主动激活工人(被动接单模式)
- }
-
- SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
- }
WaveFont作为甲方之一,他的需求具体实施过程在WaveFont::findJob()函数中,代码如下。CTZ(m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w])用于找到当前能编的最上边的CTU Row。processRow()用于处理一行的编码或者滤波。
- void WaveFront::findJob(int threadId)
- {
- unsigned long id;
-
- /* Loop over each word until all available rows are finished */
- for (int w = 0; w < m_numWords; w++)
- {
- // oldval中bit为1的位表示同时满足帧内和帧间依赖的CTU行
- uint32_t oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
- while (oldval)
- {
- CTZ(id, oldval); // 取值为1的最低的位,表示最上边的能编的CTU行
-
- uint32_t bit = 1 << id;
- if (ATOMIC_AND(&m_internalDependencyBitmap[w], ~bit) & bit) // 清除internalDepencyBitmap中该行的bit
- {
- /* we cleared the bit, we get to process the row */
- processRow(w * 32 + id, threadId); // 编码当前CTU行
- m_helpWanted = true;
- return; /* check for a higher priority task */
- }
-
- oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
- }
- }
-
- m_helpWanted = false;
- }
待续......