通常在写C++多线程时,linux系统可以使用posix系列线程API,windows可以使用windows API,或者使用C++11的线程函数,这些线程API在使用时,不太好把控,因此很多框架又自己封装了一些线程API, 比如Qt的QThread, boost库的thread等等,本篇博客介绍google c++线程任务调度库marl.
marl仓库:https://github.com/google/marl
Marl 是一个 C++ 用于多线程并发和异步编程的库。Google 开发了 Marl,旨在为现代 CPU 提供高效的任务调度和同步。Marl 与其他线程库的主要区别在于它的任务调度器,该调度器可以动态地在可用的核心之间分配任务,而不需要为每个任务分配一个专用线程。这使得 Marl 可以在高并发下仍然具有高效性。
Marl 支持 Windows、macOS、Linux、FreeBSD、Fuchsia、Emscripten、Android 和 iOS(arm、aarch64、loongarch64、mips64、ppc64、rv64、x86 和 x64)。
Marl 不依赖于其他库(除了在 googletest 上构建可选单元测试)。
Marl除了基本的线程、信号、条件变量、同步机制等用法外,Marl 还提供了一系列其他功能和特性来支持并发编程:
任务调度:
Scheduler::enqueue() 方法调度一个任务以在后台运行。Fibers (协程):Marl 通过使用 fibers (也称为协程) 提供了非阻塞任务切换。当一个任务等待一个锁、信号或其他同步机制时,调度器可以立即切换到其他已经准备好的任务,从而提高核心利用率。
Task Yielding (任务让步):使用 marl::yield() 函数,任务可以主动让出 CPU 控制权,以便调度器可以运行其他任务。
Thread Affinity (线程亲和性):您可以设置任务在特定的 CPU 核心或线程上运行,这对于需要缓存局部性或特定的硬件资源访问的任务可能很有用。
External Threads (外部线程):即使您的应用程序使用了其他线程创建方法(例如,直接使用 C++11 std::thread 或其他线程库),Marl 也允许这些线程与 Marl 任务互动,例如通过同步原语。
Dynamic Thread Pooling (动态线程池):Marl 的任务调度器可以动态地在可用的核心之间分配任务,而无需为每个任务分配一个专用线程。这使得在高并发场景下,Marl 仍然能够高效地工作。
Drop-in Replacement for std:: Utilities:Marl 提供了与 std:: 对应的工具(例如 marl::mutex 相对于 std::mutex),这使得将现有的多线程代码迁移到 Marl 变得相对简单。
Cross-Platform:Marl 在多个平台上都有良好的支持,包括 Windows、macOS 和 Linux。
这只是 Marl 的一部分功能概述。为了充分利用它并理解其所有功能和特性,建议阅读官方文档和示例,以及深入了解源代码。
创建一个调度器:在你的应用程序中,首先需要创建一个任务调度器。
调度任务:使用 enqueue 方法调度任务。
等待任务完成:可以使用 wait 方法等待一个任务或任务组完成。
#include
#include
#include
#include
#include
int main() {
// 创建并绑定调度器
marl::Scheduler scheduler;
scheduler.bind();
defer(scheduler.unbind()); // Ensure the scheduler is unbound when main() returns.
// 调度两个任务
marl::WaitGroup wg(2); // Used to wait for both tasks to complete.
for (int i = 0; i < 2; i++) {
scheduler.enqueue([i, &wg] {
printf("Hello from task %d\n", i);
wg.done();
});
}
// 等待两个任务完成
wg.wait();
return 0;
}
此代码将创建一个调度器,调度两个任务并打印消息,然后等待它们完成。每个任务完成时,都会通知 WaitGroup。
要在你的项目中使用 Marl,需要将其添加为依赖,并确保包含头文件和链接库。
这只是 Marl 的基本用法。它还提供了更高级的特性,如信号、条件变量和其它各种同步机制。如果你对这些特性感兴趣,建议查阅官方文档或其他相关资源以深入了解。
Marl 的 Signal 提供了一个类似于条件变量或事件的机制,允许任务等待某个条件变得为真或被通知。Signal 的核心思想是,你可以从一个任务中发出信号,然后在一个或多个其他任务中等待这个信号。
创建一个信号:使用默认构造函数创建一个 marl::Signal。
等待信号:可以调用 wait() 方法使当前任务阻塞,直到信号被发出。
发出信号:调用 signal() 方法发出信号,从而唤醒一个或多个等待的任务。
下面是一个简单示例,演示了如何使用 Marl 的 Signal:
#include
#include
#include
#include
int main() {
// 创建并绑定调度器
marl::Scheduler scheduler;
scheduler.bind();
defer(scheduler.unbind()); // Ensure the scheduler is unbound when main() returns.
marl::Signal signal;
// 调度一个任务,它会等待信号
scheduler.enqueue([&] {
printf("Waiting for the signal...\n");
signal.wait();
printf("Received the signal!\n");
});
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(1));
// 发出信号
printf("Sending the signal...\n");
signal.signal();
// 模拟一些工作,以确保等待的任务有机会运行
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
在这个示例中,我们创建了一个 Signal 和一个任务。这个任务将等待信号。主线程稍后将发出这个信号,从而唤醒该任务。
注意,为了简单起见,此示例使用了 std::this_thread::sleep_for。在实际应用中,应尽量避免使用这种方法,因为它会阻塞线程。此处仅用作示范。
在 Marl 中,ConditionVariable 提供了一种方法,允许任务等待某个条件变为真。它结合 Mutex 使用,用于保护条件数据并提供原子性的条件检查和等待。
创建一个条件变量:使用默认构造函数创建一个 marl::ConditionVariable。
等待条件:首先锁定相关的 Mutex,然后调用 wait() 方法,传入 Mutex 和一个表示条件的 lambda 函数。如果条件不满足,任务会被阻塞。
通知条件变量:当条件变为真时,使用 notify_one() 或 notify_all() 唤醒等待的任务。
下面是一个简单示例,演示如何使用 Marl 的 ConditionVariable:
#include
#include
#include
#include
#include
int main() {
// 创建并绑定调度器
marl::Scheduler scheduler;
scheduler.bind();
defer(scheduler.unbind()); // Ensure the scheduler is unbound when main() returns.
marl::ConditionVariable cv;
marl::Mutex mutex;
bool ready = false;
// 调度一个任务,它会等待条件变为真
scheduler.enqueue([&] {
marl::lock lock(mutex);
cv.wait(lock, [&] { return ready; }); // Wait until ready becomes true
printf("The condition is met!\n");
});
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(1));
{
marl::lock lock(mutex);
ready = true;
cv.notify_all(); // Notify all waiting tasks
}
// 模拟一些工作,以确保等待的任务有机会运行
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
在此示例中,我们创建了一个 ConditionVariable 和一个与之关联的 Mutex。任务将等待 ready 条件变为真。主线程稍后将设置 ready 并通知条件变量,从而唤醒等待的任务。
和之前一样,这里使用了 std::this_thread::sleep_for 仅作为示例,实际应用中应避免这种方式。
Marl 提供了一系列同步工具来帮助开发者处理多任务并发和同步问题。以下是其中的一些主要同步工具:
Mutex (互斥锁): marl::Mutex 是一个排他锁,一次只允许一个任务访问临界区。
ConditionVariable (条件变量): 结合 Mutex 使用,允许任务等待直到某个条件为真。
Signal (信号): 允许任务等待一个信号,然后在另一个任务中发出该信号。
WaitGroup: 允许任务等待一组任务完成。
Semaphore: 一个计数信号量,用于控制对一个资源或资源池的访问。
下面是一个 Semaphore 的示例,演示如何使用它来限制对资源的并发访问:
#include
#include
#include
#include
int main() {
// 创建并绑定调度器
marl::Scheduler scheduler;
scheduler.bind();
defer(scheduler.unbind()); // Ensure the scheduler is unbound when main() returns.
// 创建一个允许两个并发任务的信号量
marl::Semaphore semaphore(2);
// 调度五个任务
for (int i = 0; i < 5; i++) {
scheduler.enqueue([i, &semaphore] {
semaphore.acquire();
defer(semaphore.release()); // Release the semaphore when done.
printf("Running task %d\n", i);
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Simulate work
});
}
// 等待所有任务完成
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
在这个示例中,我们创建了一个允许最多两个并发任务的信号量。尽管我们调度了五个任务,但由于信号量的限制,任何时候最多只有两个任务在运行。当任务完成其工作并释放信号量后,新的任务将开始运行。
请注意,这只是 Marl 提供的同步工具中的一个示例。为了解决不同的同步问题,您可能需要使用上面列出的其他工具或它们的组合。
Marl 的任务调度是基于协程 (fibers) 的,这意味着它提供了用户级的线程切换,不需要涉及更昂贵的内核级线程上下文切换。以下是关于 Marl 任务调度的一些关键原理:
协程 (Fibers):
任务队列:
线程池:
非阻塞等待:
动态调度:
总的来说,Marl 的任务调度原理是为了提供高效、灵活的并发处理,同时减少上下文切换的开销。它的设计考虑到了现代多核硬件的特性,并尽量减少任务之间的竞争条件和锁的开销。
在 Marl 中,协程 (Fibers) 为非阻塞任务切换提供了轻量级的机制。与传统的线程相比,Fibers 是用户级线程,它们不需要内核级的上下文切换,因此切换成本更低。
Marl 调度器会自动使用 fibers 来执行任务,这样,当一个任务被阻塞,例如等待一个锁或信号,调度器可以迅速地切换到另一个已经准备好的任务,而不会浪费 CPU 时间在内核级上下文切换上。这种自动使用是透明的,因此开发者通常不需要直接管理 fibers。
但是,有时您可能需要直接控制 fibers 的行为。例如,当一个任务知道即将进行一个长时间的操作,并希望允许其他任务在此期间运行时,它可以使用 marl::yield() 主动让出 CPU。
下面是一个简单示例,展示了如何使用 marl::yield():
#include
#include
#include
int main() {
// 创建并绑定调度器
marl::Scheduler scheduler;
scheduler.bind();
defer(scheduler.unbind()); // Ensure the scheduler is unbound when main() returns.
// 创建一个锁来模拟资源争用
marl::Mutex mutex;
// 调度一个任务,它会占用锁一段时间
scheduler.enqueue([&] {
marl::lock lock(mutex);
printf("Task 1 has the lock. Yielding...\n");
marl::yield(); // 让出 CPU 控制权
printf("Task 1 resumes and releases the lock.\n");
});
// 调度另一个任务,它会尝试获取锁
scheduler.enqueue([&] {
printf("Task 2 trying to acquire the lock...\n");
marl::lock lock(mutex); // 这将被阻塞,直到第一个任务释放锁
printf("Task 2 got the lock!\n");
});
// 等待任务完成
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
在这个示例中,我们模拟了两个任务争夺一个锁的场景。第一个任务获得了锁,但在持有锁的同时,它使用 marl::yield() 主动让出 CPU 控制权。这使得第二个任务有机会运行,尽管它仍然被阻塞,直到第一个任务释放锁。
注意:此示例中的 std::this_thread::sleep_for 是为了简化。在实际应用中,您应该使用 Marl 提供的其他同步机制,如 marl::WaitGroup 或 marl::Signal,来等待任务完成,而不是使用 sleep。
Marl 的 “Dynamic Thread Pooling” 是一个核心特性,使得任务调度在多线程环境中更加高效。与固定数量线程的传统线程池不同,动态线程池可以更好地适应运行时的工作负载,并确保最大化地利用计算资源。
自适应线程数: Marl 的调度器根据实际的工作负载动态地调整运行的线程数量,以最大化利用 CPU 资源。
最小化上下文切换: 由于使用了 fibers(协程),当一个任务被阻塞时,调度器可以迅速地切换到另一个就绪的任务,而不是经历昂贵的线程上下文切换。
避免线程过度竞争: 线程池的动态性能确保不会有过多的线程争夺 CPU,从而减少上下文切换和缓存失效。
#include
#include
#include
#include
int main() {
// 创建调度器
marl::Scheduler scheduler;
scheduler.bind();
defer(scheduler.unbind()); // 确保 main() 返回时解绑调度器
marl::WaitGroup wg; // 用于等待所有任务完成
const int numTasks = 100;
// 将工作计数增加到预期的任务数
wg.add(numTasks);
// 调度大量的任务
for (int i = 0; i < numTasks; i++) {
scheduler.enqueue([i, &wg] {
printf("Running task %d on thread %d\n", i, std::this_thread::get_id());
wg.done(); // 标记任务完成
});
}
// 等待所有任务完成
wg.wait();
return 0;
}
在这个示例中,我们创建了一个调度器并调度了大量的任务。尽管我们调度了很多任务,但 Marl 的动态线程池会确保合理地使用系统的 CPU 核心,从而高效地执行这些任务。
此代码中的 marl::WaitGroup 用于同步地等待所有任务完成,确保 main() 函数不会过早地返回。这是一个简化的示例,实际应用中可能会有更复杂的工作负载和同步需求。
在并发编程中,任务让步 (Task Yielding) 是一种技巧,允许当前运行的任务主动放弃其 CPU 执行时间,使调度器可以考虑运行其他任务。这是一个合作性的调度技巧,允许任务在知道自己可能不是当前最重要的任务或在长时间操作之前主动让步。
在 Marl 中,任务让步是通过 marl::yield() 函数实现的。
长时间操作:当任务预计要进行长时间操作,并希望允许其他任务在此期间运行时。
资源竞争:任务可能认识到其他任务可能正在等待某个资源,通过主动让步,它可以允许这些任务更快地获得资源。
合作性调度:在高并发环境中,任务让步允许任务更好地与其他任务一起工作,从而提高整体的应用程序响应性和效率。
#include
#include
#include
#include
#include
int main() {
// 创建并绑定调度器
marl::Scheduler scheduler;
scheduler.bind();
defer(scheduler.unbind());
marl::WaitGroup wg;
// 调度一个长时间运行的任务
wg.add(1);
scheduler.enqueue([&] {
for (int i = 0; i < 5; i++) {
printf("Long-running task, iteration %d. Yielding...\n", i);
marl::yield(); // 主动让出 CPU 控制权
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 模拟长时间操作
}
wg.done();
});
// 调度一个简单的任务
wg.add(1);
scheduler.enqueue([&] {
printf("Short task running.\n");
wg.done();
});
// 等待所有任务完成
wg.wait();
return 0;
}
在此示例中,有一个长时间运行的任务和一个短任务。尽管长时间运行的任务开始得早,但它在每次迭代之前主动让步,这使得短任务有机会在长任务的迭代之间执行。这是一个简单的示例,但它展示了如何使用 marl::yield() 为其他任务提供执行机会。
线程亲和性 (Thread Affinity) 是一种技术,它允许您指定哪些 CPU 核心(或硬件线程)可以执行特定的线程。这可以提高应用程序的性能,因为它减少了线程在不同的 CPU 核心之间的迁移,这样可以更好地利用缓存和避免某些与线程迁移相关的开销。
Marl 支持设置线程亲和性,允许您为调度器内的工作线程指定亲和性。
缓存最大化: 通过保持线程在特定的 CPU 核心上运行,线程可以更好地利用 CPU 的 L1 和 L2 缓存,这些缓存可能与其他核心不共享。
减少线程迁移: 线程迁移可能导致性能下降,特别是当涉及 NUMA 架构时。
隔离任务: 在某些情况下,您可能希望某些特定任务只在特定的 CPU 核心上运行,以避免与其他任务竞争。
以下示例展示了如何创建一个 Marl 调度器,并设置其工作线程的亲和性,使它们只在第一个 CPU 核心上运行:
#include
#include
#include
#include
#include
int main() {
// 创建调度器的配置
marl::Scheduler::Config config;
// 使用 4 个工作线程
config.setWorkerThreadCount(4);
// 设置工作线程的亲和性,只在第一个 CPU 核心上运行
config.setWorkerThreadAffinity([=](size_t /*workerId*/) {
std::thread::native_handle_type handle = std::this_thread::native_handle();
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset); // 设置为只在第一个 CPU 核心上运行
pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuset);
});
// 创建并绑定调度器
marl::Scheduler scheduler(config);
scheduler.bind();
defer(scheduler.unbind());
marl::WaitGroup wg;
// 调度几个任务
wg.add(4);
for (int i = 0; i < 4; i++) {
scheduler.enqueue([i, &wg] {
printf("Task %d running on thread %d\n", i, std::this_thread::get_id());
wg.done();
});
}
// 等待所有任务完成
wg.wait();
return 0;
}
请注意,此代码使用 POSIX 线程 (pthread) 函数设置亲和性,因此它主要适用于支持 POSIX 线程的平台(如 Linux)。Windows 和其他操作系统可能需要不同的方法来设置线程的亲和性。
此代码创建了一个调度器,其中的所有工作线程都被设置为只在第一个 CPU 核心上运行。然后,它调度了几个任务,这些任务应该都在同一核心上执行。
在 Marl 中,“外部线程” (External Threads) 是指那些没有绑定到 Marl 调度器的线程。这些线程是在 Marl 的上下文之外创建和运行的。但即使它们没有被绑定到 Marl,它们仍然可以与通过 Marl 调度的任务互动。
为什么外部线程的概念很重要?
当您在应用程序中使用 Marl,您可能不会完全依赖 Marl 调度所有的任务。可能有一些历史线程或来自其他库的线程正在并行运行。这些线程可能需要与 Marl 任务互动,例如通过共享数据结构、信号或其他同步机制。
互动: 尽管外部线程没有绑定到 Marl 调度器,但它们仍然可以使用 Marl 的同步原语(如 marl::Mutex、marl::ConditionVariable 等)与 Marl 任务互动。
临时绑定: 如果外部线程需要临时执行一些需要 Marl 功能的操作,它可以临时绑定到 Marl 调度器,执行所需的操作,然后再解除绑定。
以下是一个简单示例,其中一个外部线程与通过 Marl 调度的任务互动:
#include
#include
#include
#include
#include
#include
int main() {
marl::Scheduler scheduler;
defer(scheduler.shutdown());
marl::Mutex mutex;
marl::ConditionVariable cv;
bool ready = false;
// 在 Marl 调度器中调度一个任务
scheduler.enqueue([&] {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 休眠1秒
{
std::lock_guard<marl::Mutex> lock(mutex);
ready = true;
}
cv.notify_one();
});
// 在外部线程中等待 Marl 任务完成
std::thread externalThread([&] {
std::unique_lock<marl::Mutex> lock(mutex);
while (!ready) {
cv.wait(lock);
}
std::cout << "Marl task completed. External thread notified." << std::endl;
});
externalThread.join();
return 0;
}
此示例中的 Marl 任务在完成其工作后通过条件变量通知外部线程。外部线程等待这个通知然后继续其工作。尽管这是一个简单的示例,但它展示了外部线程如何使用 Marl 的同步原语与 Marl 任务互动。