本章主要内容
上一章中,我们了解了线程间保护共享数据的方法。当然,我们不仅想要保护数据,还想对单独的线程进行同步。例如,在第一个线程完成前,等待另一个线程执行完成。通常,线程会等待特定事件发生,或者等待某一条件达成。这可能需要定期检查“任务完成”标识,或将类似的东西放到共享数据中。像这种情况就需要在线程中进行同步,C++标准库提供了一些工具可用于同步,形式上表现为条件变量(condition variables)和future。并发技术规范中,为future添加了非常多的操作,并可与新工具锁存器(latches)(轻量级锁资源)和栅栏(barriers)一起使用。
本章将讨论如何使用条件变量等待事件,介绍future,锁存器和栅栏,以及如何简化同步操作。
假设你正在一辆在夜间运行的火车上,在夜间如何在正确的站点下车呢?有一种方法是整晚都要醒着,每停一站都能知道,这样就不会错过你要到达的站点,但会很疲倦。另外,可以看一下时间表,估计一下火车到达目的地的时间,然后在一个稍早的时间点上设置闹铃,然后安心的睡会。这个方法听起来也很不错,也没有错过你要下车的站点,但是当火车晚点时,就要被过早的叫醒了。当然,闹钟的电池也可能会没电了,并导致你睡过站。理想的方式是,无论是早或晚,只要当火车到站的时候,有人或其他东西能把你叫醒就好了。
这和线程有什么关系呢?当一个线程等待另一个线程完成时,可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标识进行重置。不过,这种方式会消耗线程的执行时间检查标识,并且当互斥量上锁后,其他线程就没有办法获取锁,就会持续等待。因为对等待线程资源的限制,并且在任务完成时阻碍对标识的设置。类似于保持清醒状态和列车驾驶员聊了一晚上:驾驶员不得不缓慢驾驶,因为你分散了他的注意力,所以火车需要更长的时间,才能到站。同样,等待的线程会等待更长的时间,也会消耗更多的系统资源。
另外,在等待线程在检查间隙,使用std::this_thread::sleep_for()进行周期性的间歇(详见4.3节):
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再锁互斥量
}
}
循环中,休眠前②函数对互斥量进行解锁①,并且在休眠结束后再对互斥量上锁,所以另外的线程就有机会获取锁并设置标识。
这个实现就进步很多,当线程休眠时没有浪费执行时间,但很难确定正确的休眠时间。太短的休眠和没有一样,都会浪费执行时间。太长的休眠时间,可能会让任务等待时间过久。休眠时间过长比较少见,这会影响到程序的行为,在高节奏的游戏中,就意味着丢帧或错过了一个时间片。
\第三个选择(也是优先选择的),使用C++标准库提供的工具去等待事件的发生。通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”。从概念上来说,条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行),终止线程将会向等待着的线程广播“条件达成”的信息。
C++标准库对条件变量有两套实现:std::condition_variable和std::condition_variable_any,这两个实现都包含在头文件的声明中。两者都需要与互斥量一起才能工作(互斥量是为了同步),前者仅能与std::mutex一起工作,而后者可以和合适的互斥量一起工作,从而加上了_any的后缀。因为 std::condition_variable_any更加通用,不过在性能和系统资源的使用方面会有更多的开销,所以通常会将std::condition_variable作为首选类型。当对灵活性有要求时,才会考虑std::condition_variable_any。
所以,使用std::condition_variable去处理之前提到的情况——当有数据需要处理时,如何唤醒休眠中的线程?以下代码展示了使用条件变量唤醒线程的方式。
代码4.1 使用std::condition_variable处理数据等待
std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // 2
data_cond.notify_one(); // 3
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut); // 4
data_cond.wait(
lk,[]{return !data_queue.empty();}); // 5
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock(); // 6
process(data);
if(is_last_chunk(data))
break;
}
}
首先,队列中中有两个线程,两个线程之间会对数据进行传递①。数据准备好时,使用std::lock_guard锁定队列,将准备好的数据压入队列②之后,线程会对队列中的数据上锁,并调用std::condition_variable的notify_one()成员函数,对等待的线程(如果有等待线程)进行通知③。
另外的一个线程正在处理数据,线程首先对互斥量上锁(这里使用std::unique_lock要比std::lock_guard④更加合适)。之后会调用std::condition_variable的成员函数wait(),传递一个锁和一个Lambda表达式(作为等待的条件⑤)。Lambda函数是C++11添加的新特性,可以让一个匿名函数作为其他表达式的一部分,并且非常合适作为标准函数的谓词。例子中,简单的Lambda函数[]{return !data_queue.empty();}会去检查data_queue是否为空,当data_queue不为空,就说明数据已经准备好了。
wait()会去检查这些条件(通过Lambda函数),当条件满足(Lambda函数返回true)时返回。如果条件不满足(Lambda函数返回false),wait()将解锁互斥量,并且将线程(处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时,处理数据的线程从睡眠中苏醒,重新获取互斥锁,并且再次进行条件检查。在条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并重新等待。这就是为什么用std::unique_lock而不使用std::lock_guard的原因——等待中的线程必须在等待期间解锁互斥量,并对互斥量再次上锁,而std::lock_guard没有这么灵活。如果互斥量在线程休眠期间保持锁住状态,准备数据的线程将无法锁住互斥量,也无法添加数据到队列中。同样,等待线程也永远不会知道条件何时满足。
代码4.1使用了简单的Lambda函数用于等待⑤(用于检查队列何时不为空),不过任意的函数和可调用对象都可以传入wait()。当写好函数做为检查条件时,不一定非要放在一个Lambda表达式中,也可以直接将这个函数传入wait()。调用wait()的过程中,在互斥量锁定时,可能会去检查条件变量若干次,当提供测试条件的函数返回true就会立即返回。当等待线程重新获取互斥量并检查条件变量时,并非直接响应另一个线程的通知,就是所谓的伪唤醒(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,所以不建议使用有副作用的函数做条件检查。
本质上, std::condition_variable::wait是“忙碌-等待”的优化。下面用简单的循环实现了一个“忙碌-等待”:
template<typename Predicate>
void minimal_wait(std::unique_lock<std::mutex>& lk, Predicate pred){
while(!pred()){
lk.unlock();
lk.lock();
}
}
为wait()准备一个最小化实现,只需要notify_one()或notify_all()。
std::unique_lock的灵活性,不仅适用于对wait()的调用,还可以用于待处理的数据⑥。处理数据可能是耗时的操作,并且长时间持有锁是个糟糕的主意。
使用队列在多个线程中转移数据(如代码4.1)很常见。做得好的话,同步操作可以在队列内部完成,这样同步问题和条件竞争出现的概率也会降低。鉴于这些好处,需要从代码4.1中提取出一个通用线程安全的队列。
设计通用队列时,就要花时间想想,哪些操作需要添加到队列实现中去,就如之前在3.2.3节看到的线程安全的栈。可以看一下C++标准库提供的实现,找找灵感。std::queue<>容器的接口展示如下:
代码4.2 std::queue接口
template <class T, class Container = std::deque<T> >
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template <class... Args> void emplace(Args&&... args);
};
忽略构造、赋值以及交换操作,剩下了三组操作:
和3.2.3中的栈一样,也会遇到接口上的条件竞争。因此,需要将front()和pop()合并成一个函数调用,就像之前在栈实现时合并top()和pop()一样。与代码4.1不同的是,当队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里提供pop()函数的两个变种:try_pop()和wait_and_pop()。
try_pop() ,尝试从队列中弹出数据,即使没有值可检索,也会直接返回。
wait_and_pop(),将会等待有值可检索的时候才返回。
当使用之前栈的方式来实现队列,接口可能会是下面这样:
代码4.3 线程安全队列的接口
#include // 为了使用std::shared_ptr
template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(
const threadsafe_queue&) = delete; // 不允许简单的赋值
void push(T new_value);
bool try_pop(T& value); // 1
std::shared_ptr<T> try_pop(); // 2
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};
就像之前的栈,裁剪了很多构造函数,并禁止简单赋值。需要提供两个版本的try_pop()和wait_for_pop()。第一个重载的try_pop()①在引用变量中存储着检索值,可以用来返回队列中值的状态。当检索到一个变量时,将返回true,否则返回false(详见A.2节)。第二个重载②就不行了,因为它是用来直接返回检索值的,当没有值可检索时,这个函数返回NULL。
那么问题来了,如何将以上这些和代码4.1相关联呢?从之前的代码中提取push()和wait_and_pop(),如以下代码所示。
代码4.4 从代码4.1中提取push()和wait_and_pop()
#include
#include
#include
template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue; // 1
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
data_queue.push(data); // 2
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data); // 3
process(data);
if(is_last_chunk(data))
break;
}
}
线程队列中有互斥量和条件变量,所以独立的变量就不需要了①,并且push()不需要外部同步②。当然,wait_and_pop()还要兼顾条件变量的等待③。
另一个wait_and_pop()的重载写起来就很琐碎,剩下的函数就像从代码3.5实现的栈中粘过来一样。
代码4.5 使用条件变量的线程安全队列(完整版)
#include
#include
#include
#include
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 1 互斥量必须是可变的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
empty()是一个const成员函数,并且传入拷贝构造函数的other形参是一个const引用。因为其他线程可能有非const引用对象,并调用变种成员函数,所以这里有必要对互斥量上锁。又因为锁住互斥量是个可变操作,所以互斥量成员必须为mutable①才能在empty()和拷贝构造函数中进行上锁。
条件变量在多个线程等待同一个事件时也很有用。当线程用来分解工作负载,并且只有一个线程可以对通知做出反应时,与代码4.1中结构完全相同。当数据准备完成时,调用notify_one()将会唤醒一个正在wait()的线程,检查条件和wait()函数的返回状态(因为仅是向data_queue添加了一个数据项)。 这里不保证线程一定会被通知到,即使只有一个等待线程收到通知,其他处理线程也有可能因为在处理数据,而忽略了这个通知。
另一种可能是,很多线程等待同一事件。对于通知,都需要做出回应。这会发生在共享数据初始化的时候,当处理线程使用同一数据时,就要等待数据被初始化,或等待共享数据的更新,比如:周期性初始化(periodic reinitialization)。这些情况下,线程准备好数据时,就会通过条件变量调用notify_all(),而非调用notify_one()。顾名思义,这就是全部线程在都去执行wait()(检查他们等待的条件是否满足)的原因。
当条件为true时,等待线程只等待一次,就不会再等待条件变量了,所以尤其是在等待一组可用的数据块时,一个条件变量并非同步操作最好的选择。
接下来就来了解一下future,对于条件变量的补足。
假设你要乘飞机去国外度假,当到达机场办理完各种登机手续后,还需要等待机场广播通知登机。这段时间内,你可能会在候机室里面找一些事情来打发时间,比如:读书,上网,或者来一杯咖啡。不过,你就在等待一件事情:机场广播通知登机。
C++标准库将这种事件称为future。当线程需要等待特定事件时,某种程度上来说就需要知道期望的结果。之后,线程会周期性(较短的周期)的等待或检查事件是否触发(检查信息板),检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间也可以先执行另外的任务,直到对应的任务触发,而后等待future的状态会变为就绪状态。future可能是和数据相关(比如,登机口编号),也可能不是。当事件发生时(状态为就绪),这个future就不能重置了。
C++标准库中有两种future,声明在头文件中: unique future(std::future<>)和shared futures(std::shared_future<>),与了std::unique_ptr和std::shared_ptr非常类似。std::future只能与指定事件相关联,而std::shared_future就能关联多个事件。
后者的实现中,所有实例会在同时变为就绪状态,并且可以访问与事件相关的数据。这种关联与模板有关,比如std::unique_ptr 和std::shared_ptr的模板参数就是相关的数据类型。与数据无关处的,可以使用std::future与std::shared_future的特化模板。虽然,我倾向于线程通讯,但future对象本身并不提供同步访问。当多个线程需要访问一个独立future对象时,必须使用互斥量或类似同步机制进行保护。不过,当多个线程对一个std::shared_future<>副本进行访问,即使同一个异步结果,也不需要同步future。
并行技术规范将这两个模板类在std::experimental命名空间中进行了扩展:std::experimental::future<>和std::experimental::shared_future<> 。这个命名空间是为了将其与std命名空间中的模板类进行区分,实验命名空间中为这两个模板类添加了更多的功能。尤其是std::experimental中的内容与代码质量无关(我希望这里也会有较高质量的实现),需要强调的是这个命名空间提供的都不是标准类和函数,这个命名空间中类和函数的语法和语义,很可能与纳入C++标准(也就是std命名空间)后有所不同。如果想要使用这两个试验性的模板类,需要包含头文件。
最简单的事件,就是在后台运行的计算操作。第2章中已经清楚了std::thread 执行的任务不能有返回值,不过这个问题能使用future进行解决。
假设有一个需要长时间的运算,需要计算出一个有效值,但并不迫切需要这个值。你可以启动新线程来执行这个计算,你需要计算的结果,而std::thread并不提供直接接收返回值的机制。这里就需要std::async函数模板(也是在头文件)。
当不着急让任务结果时,可以使用std::async启动一个异步任务。与std::thread对象等待的方式不同,std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的get()成员函数,就会阻塞线程直到future为就绪为止,并返回计算结果。
代码4.6 std::future从异步任务中获取返回值
#include
#include
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}
与std::thread方式一样,std::async允许通过添加额外的调用参数,向函数传递额外的参数。第一个参数是指向成员函数的指针,第二个参数提供这个函数成员类的具体对象(是通过指针,也可以包装在std::ref中),剩余的参数可作为函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。和std::thread一样,当参数为右值时,拷贝操作将使用移动的方式转移原始数据,就可以使用“只移动”类型作为函数对象和参数。
代码4.7 使用std::async向函数传递参数
#include
#include
struct X
{
void foo(int,std::string const&);
std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello"); // 调用p->foo(42, "hello"),p是指向x的指针
auto f2=std::async(&X::bar,x,"goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本
struct Y
{
double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
auto f4=std::async(std::ref(y),2.718); // 调用y(2.718)
X baz(X&);
std::async(baz,std::ref(x)); // 调用baz(x)
class move_only
{
public:
move_only();
move_only(move_only&&)
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;
void operator()();
};
auto f5=std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到
future的等待取决于std::async是否启动一个线程,或是否有任务在进行同步。大多数情况下,也可以在函数调用之前向std::async传递一个额外参数,这个参数的类型是std::launch,还可以是std::launch::defered,表明函数调用延迟到wait()或get()函数调用时才执行,std::launch::async表明函数必须在其所在的独立线程上执行,std::launch::deferred | std::launch::async表明实现可以选择这两种方式的一种。最后一个选项是默认的,当函数调用延迟,就可能不会再运行了。如下所示:
auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行
auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在wait()或get()调用时执行
auto f8=std::async(
std::launch::deferred | std::launch::async,
baz,std::ref(x)); // 实现选择执行方式
auto f9=std::async(baz,std::ref(x));
f7.wait(); // 调用延迟函数
本章的后续小节和第8章中,会再次看到这段程序,使用std::async会将算法分割到各个任务中,这样程序就能并发了。不过,这不是让std::future与任务实例相关联的唯一方式,也可以将任务包装入std::packaged_task<>中,或通过编写代码的方式,使用std::promise<>模板显式设置值。与std::promise<>相比,std::packaged_task<>具有更高的抽象,所以我们从“高抽象”模板说起。
std::packaged_task<>会将future与函数或可调用对象进行绑定。当调用std::packaged_task<>对象时,就会调用相关函数或可调用对象,当future状态为就绪时,会存储返回值。这可以用在构建线程池(可见第9章)或其他任务的管理中,比如:在任务所在线程上运行其他任务,或将它们串行运行在一个特殊的后台线程上。当粒度较大的操作被分解为独立的子任务时,每个子任务都可以包含在std::packaged_task<>实例中,之后将实例传递到任务调度器或线程池中。对任务细节进行抽象,调度器仅处理std::packaged_task<>实例,而非处理单独的函数。
std::packaged_task<>的模板参数是一个函数签名,比如void()就是一个没有参数也没有返回值的函数,或int(std::string&, double*)就是有一个非const引用的std::string参数和一个指向double类型的指针参数,并且返回类型是int。构造std::packaged_task<>实例时,就必须传入函数或可调用对象。这个函数或可调用的对象,需要能接收指定的参数和返回(可转换为指定返回类型的)值。类型可以不完全匹配,因为这里类型可以隐式转换,可以用int类型参数和返回float类型的函数,来构建std::packaged_task实例。
函数签名的返回类型可以用来标识从get_future()返回的std::future<>的类型,而函数签名的参数列表,可用来指定packaged_task的函数调用操作符。例如,模板偏特化std::packaged_task会在下面的代码中使用到。
代码4.8 std::packaged_task<>的偏特化
template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
template<typename Callable>
explicit packaged_task(Callable&& f);
std::future<std::string> get_future();
void operator()(std::vector<char>*,int);
};
std::packaged_task是个可调用对象,可以封装在std::function对象中,从而作为线程函数传递到std::thread对象中,或作为可调用对象传递到另一个函数中或直接调用。当std::packaged_task作为函数调用时,实参将由函数调用操作符传递至底层函数,并且返回值作为异步结果存储在std::future中,并且可通过get_future()获取。因此可以用std::packaged_task对任务进行打包,并适时的取回future。当异步任务需要返回值时,可以等待future状态变为“就绪”。
线程间传递任务
很多图形架构需要特定的线程去更新界面,所以当线程对界面更新时,需要发出一条信息给正确的线程,让相应的线程来做界面更新。std::packaged_task提供了这种功能,且不需要发送一条自定义信息给图形界面线程。
代码4.9 使用std::packaged_task执行一个图形界面线程
#include
#include
#include
#include
#include
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread() // 1
{
while(!gui_shutdown_message_received()) // 2
{
get_and_process_gui_message(); // 3
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty()) // 4
continue;
task=std::move(tasks.front()); // 5
tasks.pop_front();
}
task(); // 6
}
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f); // 7
std::future<void> res=task.get_future(); // 8
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task)); // 9
return res; // 10
}
代码十分简单:图形界面线程①循环直到收到一条关闭图形界面的信息后关闭界面②。关闭界面前,进行轮询界面消息处理③,例如:用户点击和执行在队列中的任务。当队列中没有任务④时,循环将继续。除非能在队列中提取出一个任务⑤,释放队列上的锁,并且执行任务⑥。这里future与任务相关,当任务执行完时,其状态会置为“就绪”。
将任务传入队列:提供的函数⑦可以提供一个打包好的任务,通过这个任务⑧调用get_future()成员函数获取future对象,并且在任务推入列表⑨之前,future将返回调用函数⑩。
例子中使用std::packaged_task创建任务,其中包含了一个无参数无返回值的函数或可调用对象(如果当这个调用有返回值时,返回值会被丢弃)。这可能是最简单的任务,std::packaged_task也可以用于复杂的情况——通过指定不同的函数签名作为模板参数,不仅可以改变其返回类型(因此该类型的数据会存在期望相关的状态中),也可以改变函数操作符的参数类型。这个例子可以简单的扩展成允许任务运行在图形界面线程上,并且接受传参,还可以通过std::future获取返回值。
这些任务能作为简单的函数调用来表达吗?还有,任务的结果能从很多地方得到吗?这些问题可以使用第三种方法创建future来解决:使用std::promise对值进行显示设置。
当需要处理很多网络连接时,会使用不同线程尝试连接每个接口,能使网络尽早联通。不幸的是,随着连接数量的增长,这种方式变的越来越不合适。因为大量的线程会消耗大量的系统资源,还有可能造成线程上下文频繁切换(当线程数量超出硬件可接受的并发数时),这都会对性能有影响。最极端的例子:线程会将系统资源消耗殆尽,系统连接网络的能力会变的极差。因此通过少数线程处理网络连接,每个线程同时处理多个连接,对需要处理大量网络连接的应用而言,这是一种比较普遍的做法。
当线程处理多个连接事件,来自不同的端口连接的数据包基本上以乱序方式进行处理。同样的,数据包也将以乱序的方式进入队列。很多情况下,一些应用不是等待数据成功的发送,就是等待(新的)指定网络接口数据的接收成功。
std::promise提供设定值的方式(类型为T),这个类型会和后面看到的std::future对象相关联。std::promise/std::future对提供一种机制:future可以阻塞等待线程,提供数据的线程可以使用promise对相关值进行设置,并将future的状态置为“就绪”。
代码4.10中是单线程处理多接口的实现,这个例子中,可以使用一对std::promise找出传出成功的数据块,与future相关的只是简单的“成功/失败”标识。对于传入包,与future相关的数据就是数据包的有效负载。
代码4.10 使用promise解决单线程多连接问题
#include
void process_connections(connection_set& connections)
{
while(!done(connections)) // 1
{
for(connection_iterator // 2
connection=connections.begin(),end=connections.end();
connection!=end;
++connection)
{
if(connection->has_incoming_data()) // 3
{
data_packet data=connection->incoming();
std::promise<payload_type>& p=
connection->get_promise(data.id); // 4
p.set_value(data.payload);
}
if(connection->has_outgoing_data()) // 5
{
outgoing_packet data=
connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true); // 6
}
}
}
}
process_connections()中(直到done()返回true①为止)每一次循环,都会依次的检查每个连接②,检索是否有数据③或正在发送已入队的传出数据⑤。假设输入数据包是具有ID和有效负载的(有实际的数在其中),一个ID映射到一个std::promise(可能是在相关容器中进行的依次查找)④,并且值是在包的有效负载中。传出包是在传出队列中检索,从接口直接发送出去。当发送完成,传出数据相关的promise将置为true,来表明传输成功⑥。是否能映射到实际网络协议上,取决于所用协议。
上面的代码不理会异常,一切工作都会很好的执行,但有悖常理。有时候磁盘满载,有时候会找不到东西,有时候网络会断,还有时候数据库会崩溃。当需要某个操作的结果时,就需要在对应的线程上执行这个操作,因为代码可以通过异常来报告错误。不过,这会对使用std::packaged_task或std::promise带来一些不必要的限制。因此,C++标准库提供了一种在以上情况下清理异常的方法,并且允许将异常存储为相关结果的一部分。
看完下面的代码段,思考一下:当你传递-1到square_root()中时,它将抛出一个异常,并且你想让调用者看到这个异常:
double square_root(double x)
{
if(x<0)
{
throw std::out_of_range(“x<0”);
}
return sqrt(x);
}
假设调用square_root()函数不是当前线程,
double y=square_root(-1);
将调用改为异步调用:
std::future<double> f=std::async(square_root,-1);
double y=f.get();
当y获得函数调用的结果,线程调用f.get()时,就能再看到异常了。
函数作为std::async的一部分时,当调用抛出一个异常时,这个异常就会存储到future中,之后future的状态置为“就绪”,之后调用get()会抛出已存储的异常(注意:标准级别没有指定重新抛出的这个异常是原始的异常对象,还是一个拷贝。不同的编译器和库将会在这方面做出不同的选择)。将函数打包入std::packaged_task任务包后,当任务调用时,同样的事情也会发生。打包函数抛出一个异常,这个异常将存储在future中,在get()调用时会再次抛出。
当然,通过函数的显式调用,std::promise也能提供同样的功能。当存入的是异常而非数值时,就需要调用set_exception()成员函数,而非set_value()。这通常是用在一个catch块中,并作为算法的一部分。为了捕获异常,这里使用异常填充promise:
extern std::promise<double> some_promise;
try
{
some_promise.set_value(calculate_value());
}
catch(...)
{
some_promise.set_exception(std::current_exception());
}
这里使用std::current_exception()来检索抛出的异常,可用std::copy_exception()作为替代方案,std::copy_exception()会直接存储新的异常而不抛出:
some_promise.set_exception(std::copy_exception(std::logic_error("foo ")));
这比使用try/catch块更加清晰,当异常类型已知,就应该优先使用。不是因为代码实现简单,而是给编译器提供了极大的优化空间。
另一种向future中存储异常的方式,在没有调用promise上的任何设置函数前,或正在调用包装好的任务时,销毁与std::promise或std::packaged_task相关的future对象。任何情况下,当future的状态还不是“就绪”时,调用std::promise或std::packaged_task的析构函数,将会存储一个与std::future_errc::broken_promise错误状态相关的std::future_error异常。通过创建一个future,可以构造一个promise为其提供值或异常,也可以通过销毁值和异常源,去违背promise。这种情况下,编译器没有在future中存储任何东西,线程可能会永远的等下去。
现在,例子中都在用std::future,不过std::future也有局限性。很多线程在等待的时候,只有一个线程能获取结果。当多个线程等待相同事件的结果时,就需要使用std::shared_future来替代std::future了。
虽然std::future可以处理所有在线程间数据转移的同步,但是调用某一特殊 std::future对象的成员函数,就会让这个线程的数据和其他线程的数据不同步。多线程在没有额外同步的情况下,访问独立std::future对象时,就会有数据竞争和未定义行为。因为std::future独享同步结果,并且通过调用get()函数,一次性的获取数据,这就让并发访问变的毫无意义。
如果并行代码没办法让多个线程等待同一个事件,std::shared_future可以帮你解决这个问题。因为std::future是只移动的,所以其所有权可以在不同的实例中互相传递,但只有一个实例可以获得特定的同步结果,而std::shared_future实例是可拷贝的,所以多个对象可以引用同一关联期望值的结果。
每一个std::shared_future的独立对象上,成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时避免数据竞争,必须使用锁来对访问进行保护。
优先使用的办法:为了替代只有一个拷贝对象的情况,可以让每个线程都拥有自己对应的拷贝对象。这样,当每个线程都通过自己拥有的std::shared_future对象获取结果,那么多个线程访问共享同步结果就是安全的。可见图4.1。
图4.1 使用多个std::shared_future对象来避免数据竞争
可能会使用std::shared_future的场景,例如:实现类似于复杂的电子表格的并行执行,每一个单元格有唯一终值,这个终值可能由其他单元格中的数据通过公式计算得到。公式计算得到的结果依赖于其他单元格,然后可以使用std::shared_future对象引用第一个单元格的数据。当每个单元格内的所有公式并行执行后,任务会以期望的方式完成工作。不过,当其中有计算需要依赖其他单元格的值时就会阻塞,直到依赖单元格的数据准备就绪。这可以让系统在最大程度上使用硬件并发。
std::shared_future的实例同步std::future实例的状态。当std::future对象没有与其他对象共享同步状态所有权,那么所有权必须使用std::move将所有权传递到std::shared_future,其默认构造函数如下:
std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid()); // 1 期望值 f 是合法的
std::shared_future<int> sf(std::move(f));
assert(!f.valid()); // 2 期望值 f 现在是不合法的
assert(sf.valid()); // 3 sf 现在是合法的
期望值f开始是合法的①,因为引用的是promise p的同步状态,但是在转移sf的状态后,f就不合法了②,而sf就是合法的了③。
如其他可移动对象一样,转移所有权是对右值的隐式操作,所以可以通过std::promise对象的成员函数get_future()的返回值,直接构造一个std::shared_future对象,例如:
std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future()); // 1 隐式转移所有权
转移所有权是隐式的,用右值构造std::shared_future<>,得到std::future类型的实例①。
std::future的这种特性,可促进std::shared_future的使用,容器可以自动的对类型进行推断,从而初始化该类型的变量(详见附录A,A.6节)。std::future有一个share()成员函数,可用来创建新的std::shared_future ,并且可以直接转移future的所有权。这样也就能保存很多类型,并且使得代码易于修改:
std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator,
SomeAllocator>::iterator> p;
auto sf=p.get_future().share();
这个例子中,sf的类型推导为std::shared_future,还真的长。当比较器或分配器有所改动,只需要对promise的类型进行修改即可。future的类型会自动与promise的修改进行匹配。
有时需要限定等待事件的时间,不论是因为时间上有硬性规定(一段指定的代码需要在某段时间内完成),还是因为在事件没有很快的触发,或是有工作需要特定线程来完成,为了处理这种情况,需要等待函数能对超时进行指定。
阻塞调用会将线程挂起一段(不确定的)时间,直到相应的事件发生。通常情况下,这样的方式很不错,但是在一些情况下,需要限定线程等待的时间。可以发送一些类似“我还存活”的信息,无论是对交互式用户,或是其他进程,亦或当用户放弃等待,也可以按下“取消”键终止等待。
这里介绍两种指定超时方式:一种是“时间段”,另一种是“时间点”。第一种方式,需要指定一段时间(例如,30毫秒)。第二种方式,就是指定一个时间点(例如,世界标准时间[UTC]17:30:15.045987023,2011年11月30日)。多数等待函数提供变量,对两种超时方式进行处理。处理持续时间的变量以_for作为后缀,处理绝对时间的变量以_until作为后缀。
所以,std::condition_variable的两个成员函数wait_for()和wait_until()成员函数分别有两个重载,这两个重载都与wait()成员函数的重载相关——其中一个只是等待信号触发,或超期,亦或伪唤醒,并且醒来时会使用谓词检查锁,并且只有在校验为true时才会返回(这时条件变量的条件达成),或直接超时。
观察使用超时函数的细节前,我们来检查一下在C++中指定时间的方式,就从“时钟”开始吧!
对于C++标准库来说,时钟就是时间信息源。并且,时钟是一个类,提供了四种不同的信息:
当前时间可以通过静态成员函数now()从获取。例如,std::chrono::system_clock::now()会返回系统的当前时间。特定的时间点可以通过time_point的typedef成员来指定,所以some_clock::now()的类型就some_clock::time_point。
时钟节拍被指定为1/x(x在不同硬件上有不同的值)秒,这是由时间周期所决定——一个时钟一秒有25个节拍,因此一个周期为std::ratio<1, 25>,当一个时钟的时钟节拍每2.5秒一次,周期就可以表示为std::ratio<5, 2>。当时钟节拍在运行时获取时,可以使用给定的应用程序运行多次,用执行的平均时间求出,其中最短的时间可能就是时钟节拍,或者是写在手册当中,这就不保证在给定应用中观察到的节拍周期与指定的时钟周期是否相匹配。
当时钟节拍均匀分布(无论是否与周期匹配),并且不可修改,这种时钟就称为稳定时钟。is_steady静态数据成员为true时,也表明这个时钟就是稳定的。通常情况下,因为std::chrono::system_clock可调,所以是不稳定的。这可调可能造成首次调用now()返回的时间要早于上次调用now()所返回的时间,这就违反了节拍频率的均匀分布。稳定闹钟对于计算超时很重要,所以C++标准库提供一个稳定时钟std::chrono::steady_clock。C++标准库提供的其他时钟可表示为std::chrono::system_clock,代表了系统时钟的“实际时间”,并且提供了函数,可将时间点转化为time_t类型的值。std::chrono::high_resolution_clock 可能是标准库中提供的具有最小节拍周期(因此具有最高的精度)的时钟。它实际上是typedef的另一种时钟,这些时钟和与时间相关的工具,都在库头文件中定义。
我们先看一下时间段是怎么表示的。
时间部分最简单的就是时间段,std::chrono::duration<>函数模板能够对时间段进行处理(线程库使用到的所有C++时间处理工具,都在std::chrono命名空间内)。第一个模板参数是一个类型表示(比如,int,long或double),第二个模板参数是定制部分,表示每一个单元所用秒数。例如,当几分钟的时间要存在short类型中时,可以写成std::chrono::duration,因为60秒是才是1分钟,所以第二个参数写成std::ratio<60, 1>。当需要将毫秒级计数存在double类型中时,可以写成std::chrono::duration,因为1秒等于1000毫秒。
标准库在std::chrono命名空间内为时间段变量提供一系列预定义类型:nanoseconds[纳秒] , microseconds[微秒] , milliseconds[毫秒] , seconds[秒] , minutes[分]和hours[时]。比如,你要在一个合适的单元表示一段超过500年的时延,预定义类型可充分利用了大整型,来表示所要表示的时间类型。当然,这里也定义了一些国际单位制(SI, [法]le Système international d’unités)分数,可从std::atto(10^(-18))到std::exa(10^(18))(题外话:当你的平台支持128位整型),也可以指定自定义时延类型。例如:std::duration,就可以使用一个double类型的变量表示1/100。
方便起见,C++14中std::chrono_literals命名空间中有许多预定义的后缀操作符用来表示时长。下面的代码就是使用硬编码的方式赋予变量具体的时长:
using namespace std::chrono_literals;
auto one_day=24h;
auto half_an_hour=30min;
auto max_time_between_messages=30ms;
使用整型字面符时,15ns和std::chrono::nanoseconds(15)就是等价的。不过,当使用浮点字面量时,且未指明表示类型时,数值上会对浮点时长进行适当的缩放。因此,2.5min会被表示为std::chrono::duration。如果非常关心所选的浮点类型表示的范围或精度,就需要构造相应的对象来保证表示范围或精度,而不是去苛求字面值来对范围或精度进行表达。
当不要求截断值的情况下(时转换成秒是没问题,但是秒转换成时就不行)时间段的转换是隐式的,显示转换可以由std::chrono::duration_cast<>来完成。
std::chrono::milliseconds ms(54802);
std::chrono::seconds s=
std::chrono::duration_cast<std::chrono::seconds>(ms);
这里的结果就是截断的,而不是进行了舍入,所以s最后的值为54。
时间值支持四则运算,所以能够对两个时间段进行加减,或者是对一个时间段乘除一个常数(模板的第一个参数)来获得一个新时间段变量。例如,5*seconds(1)与seconds(5)或minutes(1)-seconds(55)是一样。在时间段中可以通过count()成员函数获得单位时间的数量。例如,std::chrono::milliseconds(1234).count()就是1234。
基于时间段的等待可由std::chrono::duration<>来完成。例如:等待future状态变为就绪需要35毫秒:
std::future<int> f=std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==std::future_status::ready)
do_something_with(f.get());
等待函数会返回状态值,表示是等待是超时,还是继续等待。等待future时,超时时会返回std::future_status::timeout。当future状态改变,则会返回std::future_status::ready。当与future相关的任务延迟了,则会返回std::future_status::deferred。基于时间段的等待使用稳定时钟来计时,所以这里的35毫秒不受任何影响。当然,系统调度的不确定性和不同操作系统的时钟精度意味着:线程调用和返回的实际时间间隔可能要比35毫秒长。
现在,来看看“时间点”如何工作。
时间点可用std::chrono::time_point<>来表示,第一个参数用来指定使用的时钟,第二个函数参数用来表示时间单位(特化的std::chrono::duration<>)。时间点就是时间戳,而时间戳是时钟的基本属性,不可以直接查询,其在C++标准中已经指定。通常,UNIX时间戳表示1970年1月1日 00:00。时钟可能共享一个时间戳,或具有独立的时间戳。当两个时钟共享一个时间戳时,其中一个time_point类型可以与另一个时钟类型中的time_point相关联。虽然不知道UNIX时间戳的具体值,但可以通过对指定time_point类型使用time_since_epoch()来获取时间戳,该成员函数会返回一个数值,这个数值是指定时间点与UNIX时间戳的时间间隔。
例如,指定一个时间点std::chrono::time_point,这就与系统时钟有关,且实际中的一分钟与系统时钟精度应该不相同(通常差几秒)。
可以通过对std::chrono::time_point<>实例进行加/减,来获得一个新的时间点,所以std::chrono::hight_resolution_clock::now() + std::chrono::nanoseconds(500)将得到500纳秒后的时间,这对于计算绝对时间来说非常方便。
也可以减去一个时间点(二者需要共享同一个时钟),结果是两个时间点的时间差。这对于代码块的计时是很有用的,例如:
auto start=std::chrono::high_resolution_clock::now();
do_something();
auto stop=std::chrono::high_resolution_clock::now();
std::cout<<”do_something() took “
<<std::chrono::duration<double,std::chrono::seconds>(stop-start).count()
<<” seconds”<<std::endl;
std::chrono::time_point<>的时钟参数不仅能够指定UNIX时间戳。当等待函数(绝对时间超时)传递时间点时,时间点参数就可以用来测量时间。当时钟变更时,会产生严重的后果,因为等待轨迹随着时钟的改变而改变,并且直到调用now()成员函数时,才能返回一个超过超时时间的值。
后缀为_unitl的(等待函数的)变量会使用时间点。通常是使用时钟的::now()(程序中一个固定的时间点)作为偏移,虽然时间点与系统时钟有关,可以使用std::chrono::system_clock::to_time_point()静态成员函数,对时间点进行操作。
代码4.11 等待条件变量满足条件——有超时功能
#include
#include
#include
std::condition_variable cv;
bool done;
std::mutex m;
bool wait_loop()
{
auto const timeout= std::chrono::steady_clock::now()+
std::chrono::milliseconds(500);
std::unique_lock<std::mutex> lk(m);
while(!done)
{
if(cv.wait_until(lk,timeout)==std::cv_status::timeout)
break;
}
return done;
}
当没有什么可以等待时,可在一定时限中等待条件变量。这种方式中,循环的整体长度有限。4.1.1节中当使用条件变量时,就使用了循环,这是为了处理假唤醒。当循环中使用wait_for()时,可能在等待了足够长的时间后结束等待(在假唤醒之前),且下一次等待又开始了。这可能重复很多次,出现无限等待的情况。
至此,有关时间点的基本知识已经了解差不多了。现在,让我们来了解一下如何在函数中使用超时。
使用超时的最简单方式,就是对特定线程添加延迟处理。当线程无所事事时,就不会占用其他线程的处理时间。4.1节中的例子,循环检查“done”标志,两个处理函数分别是std::this_thread::sleep_for()和std::this_thread::sleep_until()。它们的工作就像一个简单的闹钟:当线程因为指定时长而进入睡眠时,可使用sleep_for()唤醒,可指定休眠的时间点,之后可使用sleep_until唤醒。sleep_for()的使用和4.1节一样,有些事必须在指定时间内完成,所以耗时就很敏感。另一方面,sleep_until()允许在某个特定时间点将调度线程唤醒。可能在晚间备份或在早上6:00打印工资条时使用,亦或挂起线程直到下一帧刷新时进行视频播放。
当然,休眠只是超时处理的一种形式,超时可以配合条件变量和future一起使用。超时甚至可以在获取互斥锁时(当互斥量支持超时时)使用。std::mutex和std::recursive_mutex都不支持超时,而std::timed_mutex和std::recursive_timed_mutex支持超时。这两种类型也有try_lock_for()和try_lock_until()成员函数,可以在一段时期内尝试获取锁,或在指定时间点前获取互斥锁。表4.1展示了C++标准库中支持超时的函数。参数列表为“延时”(duration)必须是std::duration<>的实例,并且列出为时间点(time_point)必须是std::time_point<>的实例。
表4.1 可接受超时的函数
| 类型/命名空间 | 函数 | 返回值 |
|---|---|---|
| std::this_thread 命名空间 | sleep_for(duration) | N/A |
| sleep_until(time_point) | ||
| std::condition_variable 或 std::condition_variable_any | wait_for(lock, duration) | std::cv_status::time_out 或 std::cv_status::no_timeout |
| wait_until(lock, time_point) | ||
| wait_for(lock, duration, predicate) | bool —— 当唤醒时,返回谓词的结果 | |
| wait_until(lock, duration, predicate) | ||
| std::timed_mutex 或 std::recursive_timed_mutex | try_lock_for(duration) | bool —— 获取锁时返回true,否则返回fasle |
| try_lock_until(time_point) | ||
| std::unique_lock | unique_lock(lockable, duration) | N/A —— 对新构建的对象调用owns_lock(); |
| unique_lock(lockable, time_point) | 当获取锁时返回true,否则返回false | |
| try_lock_for(duration) | bool —— 当获取锁时返回true,否则返回false | |
| try_lock_until(time_point) | ||
| std::future或std::shared_future | wait_for(duration) | 当等待超时,返回std::future_status::timeout |
| wait_until(time_point) | 当期望值准备就绪时,返回std::future_status::ready | |
| 当期望值持有一个为启动的延迟函数,返回std::future_status::deferred |
现在,我们讨论过的机制有:条件变量、future、promise,还有打包任务。是时候从更高的角度去看待这些机制,以及如何使用这些机制简化线程的同步操作。
同步工具在本章称为构建块。你可以关注下同步的操作,而非具体机制。当程序需要并发时,可提供更多的函数化方式简化代码。比起在多个线程间共享数据,每个任务最好拥有自己的数据,并且其他线程可以使用future获取运行结果。
函数化编程(functional programming)是一种编程方式,函数结果只依赖于传入函数的参数。使用相同的参数调用函数,不管多少次都会获得相同的结果。C++标准库中与数学相关的函数都有这个特性,例如:sin,cos和sqrt。基本类型间的简单运算,例如:3+3,6*9,或1.3/4.7。纯粹的函数不会改变任何外部状态,并且这种特性限制了函数的返回值。
第3章我们讨论过,不修改共享数据,就不存在条件竞争,并且没有必要使用互斥量保护共享数据。这是对编程极大的简化,例如Haskell语言[2]中所有函数默认都是“纯粹的”。
函数化编程的好处并不限于将“纯粹”作为默认方式(范型)的语言。C++是一个多范型的语言,也可以写出FP类型的程序。C++11的方式要比C++98简单许多,因为C++11支持Lambda表达式(详见附录A,A.6节),还加入了Boost和TR1中的std::bind,以及自动可以自行推断类型的自动变量(详见附录A,A.7节)。future作为最后一块拼图,使得函数化编程模式并发化(FP-style concurrency)在C++中成为可能。future可以在线程间互相传递,并允许计算结果互相依赖。
快速排序——FP模式版
为了展示在函数化(PF)并发中如何使用future,让我们来看看一个简单的实现——快速排序算法。该算法的基本思想很简单:给定一个数据列表,然后选取其中一个数为“中间”值,之后将列表中的其他数值分成两组——一组比中间值大,另一组比中间值小。之后对小于“中间”值的组进行排序,并返回排序好的列表,再返回“中间”值,再对比“中间”值大的组进行排序,并返回排序的列表。图4.2中展示了10个整数在这种方式下进行排序的过程。
图4.2 FP-模式的递归排序
下面代码中的是FP-模式的串行实现,需要传入列表,并且返回一个列表,与std::sort()做同样的事情不同。 (译者:std::sort()是无返回值的,因为参数接收的是迭代器,所以其可以对原始列表直进行修改与排序。可参考sort())
代码4.12 快速排序——串行版
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin()); // 1
T const& pivot=*result.begin(); // 2
auto divide_point=std::partition(input.begin(),input.end(),
[&](T const& t){return t<pivot;}); // 3
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),
divide_point); // 4
auto new_lower(
sequential_quick_sort(std::move(lower_part))); // 5
auto new_higher(
sequential_quick_sort(std::move(input))); // 6
result.splice(result.end(),new_higher); // 7
result.splice(result.begin(),new_lower); // 8
return result;
}
虽然接口是FP的,但需要做大量的拷贝操作,所以内部会使用命令模式。选择第一个数为“中间”值,使用splice()①将输入的首个元素(中间值)放入结果列表中,这种方式产生的结果可能不是最优的。因为链表的访问方式是遍历,所以对std::list做任何事都需要花费较长的时间。我们清楚想要什么样的结果,可以将“中间”值进行拼接,还需要使用“中间”值进行比较,所以使用了引用②,避免过多的拷贝。之后,可以使用std::partition将序列中的值分成小于“中间”值的组和大于“中间”值的组③,最简单的方法就是使用Lambda函数指定区分的标准
std::partition()会对列表进行重置,并返回指向首元素(不小于“中间”值)的迭代器。迭代器的类型全称可能会很长,可以使用auto让编译器帮忙定义迭代器类型的变量
现在,选择了FP模式的接口,要使用递归对两部分排序,所以需要创建两个列表。可以用splice()来完成,将input列表小于divided_point的值移动到新列表lower_part④中,其他数继续留在input列表中。而后,可以递归调用⑤⑥,对两个列表进行排序。显式使用std::move()将列表传递到函数中后,可以再次使用splice(),将result中的结果以正确的顺序进行拼接。new_higher指向的值放在“中间”值的后面⑦,new_lower指向的值放在“中间”值的前面⑧。
快速排序——FP模式线程强化版
使用纯函数模式容易转化为并行版本
代码4.13 快速排序——并行版
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),
[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),
divide_point);
std::future<std::list<T> > new_lower( // 1
std::async(¶llel_quick_sort<T>,std::move(lower_part)));
auto new_higher(
parallel_quick_sort(std::move(input))); // 2
result.splice(result.end(),new_higher); // 3
result.splice(result.begin(),new_lower.get()); // 4
return result;
}
当前线程不对小于“中间”值部分的列表进行排序,std::async()①会使用另一线程对列表进行排序。大于部分如同之前一样,使用递归进行排序②。通过递归调用parallel_quick_sort(),可以使用硬件并发。std::async()会启动一个新线程,这样当递归三次时,就会有八个线程在运行了。当递归十次(对于大约有1000个元素的列表),如果硬件能处理这十次递归调用,将会创建1024个执行线程。当运行库认为产生了太多的任务时(也许是因为数量超过了硬件并发的最大值),可能会同步的切换新产生的任务。
当任务过多时(已影响性能),为了避免任务传递的开销,这些任务应该在使用get()获取结果的线程上运行,而不是在新线程上运行。这也符合std::async的行为,为每一个任务启动一个线程(甚至是在任务超额时,也就是在std::launch::deferred没有明确规定的情况下),或为了同步执行所有任务(std::launch::async有明确规定的情况下)。当运行库自动裁剪线程时,建议去查看一下运行库的实现文档,了解一下将会有怎样的行为表现。
比起使用std::async(),可以写一个spawn_task()函数对std::packaged_task和std::thread做一下包装。如代码4.14中所示,需要为函数结果创建一个std::packaged_task对象, 并从这个对象中获取future,或在线程中返回future。其本身并没有太多优势(事实上会造成大规模的超额任务),但可为转型成一个更复杂的实现进行铺垫,实现会向队列添加任务,而后使用线程池的方式来运行。std::async更适合于已知所有任务的情况,并且要能完全控制线程池中构建或执行过任务的线程。
代码4.14 spawn_task的简单实现
template<typename F,typename A>
std::future<std::result_of<F(A&&)>::type>
spawn_task(F&& f,A&& a)
{
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)>
task(std::move(f)));
std::future<result_type> res(task.get_future());
std::thread t(std::move(task),std::move(a));
t.detach();
return res;
}
其他先不管,回到parallel_quick_sort函数。因为直接递归去获取new_higher列表,就可以如之前一样对new_higher进行拼接③。new_lower列表是std::future类型,而非是一个简单的列表,所以需要调用get()成员函数在调用splice()④之前去检索数值。之后,等待后台任务完成,并且将结果移入splice()中。get()返回一个包含结果的右值引用,这样的结果就是可以移动的
假设使用std::async()是硬件并发最好的选择,但这样的并行实现对于快速排序来说,依然不理想。std::partition已经足够好了,但依旧是串行调用。如果对实现最快并行感兴趣的话,可以去查阅一些学术文献,或者可以选择C++17中的并行重载版本(详见第10章)。
函数化编程可算作是并发编程的范型,并且也是通讯顺序进程(CSP,Communicating Sequential Processer[3])的范型,这里的线程没有共享数据,但有通讯通道允许信息在不同线程间进行传递。这种范型被Erlang语言所采纳,并且常用在MPI(Message Passing Interface,消息传递接口)上做高性能运算。现在不会再对它们而感到惊奇了吧,C++就能支持它们。
接下来的一节中,我们会讨论实现这种操作的方式。
CSP的概念很简单:没有共享数据时,每个线程可以基于所接收到的信息独立运行。每个线程就都有状态机:当收到一条信息,会以某种方式更新状态,并且可能向其他线程发出信息(消息处理机制依赖于线程的初始化状态)。这是有限状态机模式的实现,并且状态机可以隐式实现,这种方式更加依赖于明确的行为要求和专业的编程团队。无论选用哪种方式去实现线程,任务都会进行独立处理,消除潜在的混乱(数据共享并发),就让编程变的更加简单。
通讯处理没有共享数据,所有消息都是通过消息队列传递,而C++线程共享一块地址空间,所以达不到真正通讯处理要求。这就需要一些约定来支持:作为应用或是库作者有责任确保在实现中不存在共享数据。当然,为了线程间的通信,消息队列必须共享,具体的细节要包含在库中。
试想有一天要为实现ATM(自动取款机)写一个应用。这个应用需要处理:取钱时和银行之间的交互情况,以及控制器械接受用户的卡片,显示适当的信息,处理按钮事件,吐出现金,还有退还卡。
一种处理方法是让代码将所有事情分配到三个独立线程上去:一个线程去处理物理机械,一个去处理ATM机的逻辑,还有一个用来与银行通讯,这些线程不共享任何数据,比如:当有人在ATM机上插入了卡片或者按下按钮,处理机械的线程将会发送一条信息到逻辑线程上,并且逻辑线程将发送一条消息到机械线程,告诉机械线程可以分配多少钱等等。
ATM机逻辑建模的方式,就可以将其当状态机。线程的每一个状态都会等待一条可接受的信息,这条信息包含需要处理的内容。图4.3中将展示有状态参与的一个简单是实现。这个简化实现中,系统在等待一张卡插入。当有卡插入时,系统将会等待用户输入PIN(类似身份码的东西),并且用户可以将最后输入的数字删除。当数字输入完成,需要验证PIN。当验证有问题时,就需要退出卡,并且继续等待其他人将卡插入到机器中。当验证通过时,要等待用户取消交易或选择取款。当用户选择取消交易,程序可以结束,并返还卡片。当用户选择取出一定量的现金,程序就要在吐出现金和返还卡片前等待银行方面的确认,或显示“余额不足”的信息,并返还卡片。很明显,一个真正的ATM机要考虑的东西更多、更复杂。
图4.3 一台ATM机的状态机模型(简化)
我们已经为ATM机的逻辑设计了状态机,可以使用一个类进行实现,类中有成员函数可以获取每一个状态。每一个成员函数可以等待从指定集合中传入的信息,以及当进行处理时,这就有可能触发原始状态向另一个状态的转化。每种不同的信息类型由一个独立的struct表示。代码4.15展示了ATM逻辑部分的简单实现(以上描述的系统中,有主循环和对第一状态的实现),并且一直在等待卡片插入。
如你所见,所有信息传递所需的的同步,完全包含在“信息传递”库中
代码4.15 ATM逻辑类的简单实现
struct card_inserted
{
std::string account;
};
class atm
{
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
void (atm::*state)();
std::string account;
std::string pin;
void waiting_for_card() // 1
{
interface_hardware.send(display_enter_card()); // 2
incoming.wait(). // 3
handle<card_inserted>(
[&](card_inserted const& msg) // 4
{
account=msg.account;
pin="";
interface_hardware.send(display_enter_pin());
state=&atm::getting_pin;
}
);
}
void getting_pin();
public:
void run() // 5
{
state=&atm::waiting_for_card; // 6
try
{
for(;;)
{
(this->*state)(); // 7
}
}
catch(messaging::close_queue const&)
{
}
}
};
这个实现对于实际ATM机来说非常简单,但是能让你感受到信息传递编程的方式。这里无需考虑同步和并发问题,只需要考虑什么时候接收和发送信息即可。与系统的其他部分一起,为ATM逻辑所设的状态机运行在独立的线程上,比如:与银行通讯的接口,以及运行在独立线程上的终端接口。这种程序设计的方式被称为参与者模式(Actor model)——在系统中有很多独立的(运行在一个独立的线程上)参与者,这些参与者会互相发送信息,去执行手头上的任务,并且不会共享状态,除非是通过信息直接传入的。
从run()成员函数开始⑤运行,初始化waiting_for_card⑥的状态,然后反复执行当前状态的成员函数(无论这个状态时怎么样的)⑦。状态函数是简易atm类的成员函数。wait_for_card函数①依旧很简单:发送一条信息到接口,让终端显示“等待卡片”的信息②,之后就等待传入一条消息进行处理③。处理的消息类型只能是card_inserted类,这里使用Lambda函数④对其进行处理。当然,可以传递任何函数或函数对象处理函数。注意,handle()与wait()进行连接,当收到的信息类型与处理类型不匹配时收到的信息将会被丢弃,并且线程继续等待,直到接收到一条类型匹配的消息。
Lambda函数自身只是将用户账号信息缓存到成员变量中去,并且清除PIN信息,再发送一条消息到硬件接口,让显示界面提示用户输入PIN,然后将线程状态改为“获取PIN”。当消息处理程序结束,状态函数就会返回,然后主循环会调用新的状态函数⑦。
如图4.3,getting_pin状态函数会复杂一些,因为其要处理三个不同的信息类型。具体代码展示如下:
代码4.16 简单ATM实现中的getting_pin状态函数
void atm::getting_pin()
{
incoming.wait()
.handle<digit_pressed>( // 1
[&](digit_pressed const& msg)
{
unsigned const pin_length=4;
pin+=msg.digit;
if(pin.length()==pin_length)
{
bank.send(verify_pin(account,pin,incoming));
state=&atm::verifying_pin;
}
}
)
.handle<clear_last_pressed>( // 2
[&](clear_last_pressed const& msg)
{
if(!pin.empty())
{
pin.resize(pin.length()-1);
}
}
)
.handle<cancel_pressed>( // 3
[&](cancel_pressed const& msg)
{
state=&atm::done_processing;
}
);
}
这次需要处理三种消息类型,所以wait()函数后面接了三个handle()函数调用①②③。每个handle()都有对应的消息类型作为模板参数,并且将消息传入一个Lambda函数中(其获取消息类型作为参数)。因为调用连接在了一起,wait()的实现知道在等待一条digit_pressed消息,或是一条clear_last_pressed消息,亦或是一条cancel_pressed消息,而其他的消息类型将会被丢弃。
当获取一条消息时,无需再去改变状态,比如:获取一条digit_pressed消息时,仅需要将其添加到pin中。(代码4.15中)主循环⑦将会再次调用getting_pin()去等待下一个数字(或清除数字,或取消交易)。
这里对应的动作如图4.3所示,每个状态的实现都由不同的成员函数构成,等待相关信息并适当的更新状态。
一个并发系统中,这种编程方式可以极大的简化任务的设计,因为每一个线程都完全被独立对待。因此,使用多线程去分离关注点时,需要明确线程之间的任务应该如何分配。
并发技术扩展规范在std::experiment命名空间中提供了新的类型std::promise和std::packaged_taks。与std命名空间中类型完全不同,其返回实例类型为std::experimental::future,而不是std::future。这能让使用者体会std::experimental::future所带来的新特性——持续性。
假设任务产生了一个结果,并且future持有这个结果。然后,需要写一些代码来处理这个结果。使用std::future时,必须等待future的状态变为就绪态,不然就使用全阻塞函数wait(),或是使用wait_for()/wait_unitl()成员函数进行等待,而这会让代码变得非常复杂。用一句话来说“完事俱备,只等数据”,这也就是持续性的意义。为了给future添加持续性,只需要在成员函数后添加then()即可。比如:给定一个future fut,添加持续性的调用即为fut.then(continuation)。
与std::future类似 , std::experimental::future的存储值也只能检索一次。如果future处于持续使用状态,其他代码就不能访问这个furture。因此,使用fut.then()为fut添加持续性后,对原始fut的操作就是非法的。另外,调用fut.then()会返回一个新future,这个新future会持有持续性调用的结果。具体代码,如下所示:
std::experimental::future<int> find_the_answer;
auto fut=find_the_answer();
auto fut2=fut.then(find_the_question);
assert(!fut.valid());
assert(fut2.valid());
当原始future为就绪态时,find_the_question持续性函数没有安排在指定的线程上运行。这就给予了实现的自由,函数可以在线程池或是在另一个线程管理库上运行。这样做是经过考虑的,将持续性引入C++标准时,能让实现者能基于其丰富的经验,选择更好的线程使用方式,并为用户提供合适的机制来控制线程。
与直接调用std::async或std::thread不同,持续性函数不需要传入参数,因为运行库已经为其定义好了参数——会传入处于就绪态的future,这个future保存了持续性触发后的结果。假设find_the_answer返回类型为int,find_the_question函数会传入std::experimental::future作为参数:
std::string find_the_question(std::experimental::future<int> the_answer);
这样做的原因是,持续性的过程中会持有具体值或是异常。如果future隐式的解引用,将其值直接传递给持续性函数,那么运行库将会决定如何处理这种异常。反之,将future传递给持续性函数,那么持续性函数将回来处理这个异常。举个简单的例子,就是通过fut.get()获取future持有的值,并且在持续性函数外将异常重新抛出并传播。就如同将函数传入std::async一样,异常存在于持有持续性结果的future中,这样异常就会重新传播。
并发技术扩展规范中没有指定这种情况等价于std::async,但实现可以相同。这种函数也很简单:使用std::experimental::promise获取future,并且生成新的线程运行Lambda表达式,该Lambda表达式为promise设置返回值,代码如下所示。
代码4.17 使用并发技术扩展规范中的特性,实现与std::async等价的功能
template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())>
spawn_async(Func&& func){
std::experimental::promise<
decltype(std::declval<Func>()())> p;
auto res=p.get_future();
std::thread t(
[p=std::move(p),f=std::decay_t<Func>(func)]()
mutable{
try{
p.set_value_at_thread_exit(f());
} catch(...){
p.set_exception_at_thread_exit(std::current_exception());
}
});
t.detach();
return res;
}
和std::aync一样,这里将函数的结果存储在future中,或捕获函数抛出的异常,将异常存到future中。同样的,为了保证在future达到就绪态时,需要保证thread_local变量已经使用set_value_at_thread_exit和set_exception_at_thread_exit清理过了。
值是从then()调用中返回,其返回的future是完整的future。也就意味着,持续性可以进行连接。
假设有一些列耗时任务要完成,并且要使用异步多线程完成这些任务,从而减轻主线程的计算压力,例如:用户登录应用时,需要将登录凭证发送给后台,在对身份信息进行验证后,从后台获取用户的账户信息,使用获取到的信息对显示进行更新。
代码4.18 处理用户登录——同步方式
void process_login(std::string const& username, std::string const& password)
{
try{
user_id const id = backend.authenticate_user(username, password);
user_data const info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch(std::exception& e){
display_error(e);
}
}
我们想要的是一段异步代码,所以不想阻塞UI线程。使用std::async将另一个列表放在后台线程上,不过依旧会阻塞UI线程,等待任务完成的同时,会消耗大量的资源。如果这样的任务很多,可以结束一些等待线程节省资源。
代码4.19 处理用户登录——异步方式
std::future<void> process_login(
std::string const& username, std::string const& password)
{
return std::async(std::launch::async,[=](){
try
{
user_id consst id = backend.authenticate_user(username, password);
user_data const info_to_display =
backend.request_current_info(id);
update_display(info_to_display);
} catch(std::exception& e){
display_error(e);
}
});
}
为了避免线程阻塞,机制需要对每个完成的任务进行连接:持续性。下面的代码与之前大体相同,但这次将整个任务分成了一系列任务,并且每个任务在完成时回连到前一个任务上。
代码4.20 处理用户登录——持续性方式
std::experimental::future<void> process_login(
std::string const& username, std::string const& password)
{
return spawn_async([=](){
return backend.authenticate_user(username, password);
}).then([](std::experimental::future<user_id> id){
return backend.request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display){
try{
update_display(info_to_display.get());
} catch(std::exception& e){
display_error(e);
}
});
}
每个持续性函数都以std::experimental::future作为独立参数,然后使用.get()来获取其拥有的值。这意味着异常会沿着链条进行传播,如果有函数抛出异常,就会在调用info_to_display.get()时抛出,捕获结构可以处理所有的异常类型。
因为等待消息需要通过网络或数据操作进行传输,所函数内部会对后端模块进行调用,但这时前端的任务可能还没有完成。虽然已经将任务进行分割成独立的小任务,但仍然会阻塞线程的运行。这些需要在后端任务完成,前端处理就已经准备好了,而不是对线程进行阻塞。这样的话,backend.async_authenticate_user(username, password)返回std::experimental::future会比返回user_id更加合适。
因为持续函数返回的future类型为future,可能觉得这段代码比较复杂,否则只能将调用.then的语句放置在持续函数中。如果这么想就错了,因为持续性支持一种极为精妙的特性,叫做future展开(future-unwrapping)。当向.then()传递了持续性函数,并且返回一个future.then()返回值类型也是future
代码4.21 处理用户登录——全异步操作
std::experimental::future<void> process_login(
std::string const& username, std::string const& password)
{
return backend.async_authenticate_user(username, password).then(
[](std::experimental::future<user_id> id){
return backend.async_request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display){
try
{
update_display(info_to_display.get());
} catch(std::exception& e){
display_error(e);
}
});
}
和代码4.18几乎一模一样,区别就是Lambda表达式和将相应的功能包裹在.then的调用中。如果所用编译器支持C++14泛型Lambda表达式,那么Lambda表达式的参数列表中的类型可以使用auto替换,例如:
return backend.async_authenticate_user(username, password).then(
[](auto id){
return backend.async_request_current_info(id.get());
});
如果比较简单的线性控制流,这里的控制流比较复杂,可以使用Lambda表达式来实现一些逻辑功能。如果控制流是真的很复杂,就需要单独写一个函数来完成这件事了。
目前,我们一直将注意力放在支持持续性的std::experimental::future上,std::experimental::shared_future同样支持持续性。二者的区别在于std::experimental::shared_future对象可以具有多个持续性对象,并且持续性参数是 std::experimental::shared_future,而不是std::experimental::future。std::experimental::shared_future脱离了共享的本性——因为多个对象可以引用相同的共享状态,如果只允许一个延续,那么多个线程的情况下就会产生条件竞争,每个线程都试图将持续性添加到在自己的std::experimental::shared_future对象中。这种情况的确很糟糕,所以才允许多持续性的存在。当使用多持续性时,可以通过同一个std::experimental::shared_future对象对其进行添加。另外,当打算给第二个持续性传递对象时,不能给第一个持续性对象传递临时std::experimental::shared_future对象。因此,传递给延续性函数的参数也必须是std::experimental::shared_future对象。
auto fut = spawn_async(some_function).share();
auto fut2 = fut.then([](std::experimental::shared_future<some_data> data){
do_stuff(data);
});
auto fut3 = fut.then([](std::experimental::shared_future<some_data> data){
return do_other_stuff(data);
});
由于调用了share(),fut是std::experimental::share_future实例,这是因为持续性函数必须将std::experimental::shared_future对象作为参数。不过,持续性返回的值为std::experimental::future——目前这个值无法共享——所以fut2和fut3的类型都是 std::experimental::future。
技术规范中,持续性只是增强future能力的一种方式。另外还提供了两个重载函数,并等待其中任意一个future状态为就绪,或是等待所有future状态为就绪。
假设有很多的数据需要处理,每个数据都可以单独的进行处理,这就是利用硬件的好机会。可以使用异步任务组来处理数据项,每个任务通过future返回处理结果。不过,需要等待所有任务完成,才能得到最终的结果。对逐个future进行收集,然后再整理结果,总感觉不是很爽。如果用异步任务来收集结果,先要生成异步任务,这样就会占用线程的资源,并且需要不断的对future进行轮询,当所有future状态为就绪时生成新的任务。
代码4.22 使用std::async从多个future中收集结果
std::future process_data(std::vector& vec)
{
size_t const chunk_size = whatever;
std::vector> results;
for (auto begin=vec.begin(), end=vec.end(); beg!=end;){
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
std::async(process_chunk, begin, begin+this_chunk_size));
begin += this_chunk_size;
}
return std::async([all_results=std::move(results)](){
std::vector v;
v.reserve(all_results.size());
for (auto& f : all_results)
{
v.push_back(f.get()); // 1
}
return gather_results(v);
});
}
这段代码会生成异步任务来处理结果,所有结果都就绪时对结果进行整合。每个任务都是独立的,因此调度程序会在①处反复的进行唤醒,当发现有非就绪态的结果时,将再次回到休眠的状态。这样的方式不仅会占用线程资源,而且在之后对future的操作会增加上下文切换频率,从而增加很多额外的开销。
可以使用 std::experimental::when_all来避免这里的等待和切换,可以将需要等待的future传入when_all函数中,函数会返回新的future——当传入的future状态都为就绪时,新future的状态就会置为就绪,这个future可以和持续性配合起来处理其他的任务。
代码4.23 使用 std::experimental::when_all从多个future中收集结果
std::experimental::future process_data(
std::vector& vec)
{
size_t const chunk_size = whatever;
std::vector> results;
for (auto begin = vec.begin(), end = vec.end(); beg != end){
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
spawn_async(
process_chunk, begin, begin+this_chunk_size));
begin += this_chunk_size;
}
return std::experimental::when_all(
results.begin(), results.end()).then( // 1
[](std::future>> ready_results){
std::vector> all_results = ready_results.get();
std::vector v;
v.reserve(all_results.size());
for (auto& f: all_results){
v.push_back(f.get()); // 2
}
return gather_results(v);
});
}
这个例子中,when_all函数会等待所有future的状态变为就绪,然后用.then调用函数①,而不是使用async。虽然Lambda表达式看上去是一样的,但这里将results的vector作为参数(包装到future中),而不是放在捕获器中,并在之后对每个future使用get②,从而无阻塞的获得所有处理结果。
为了补全when_all,也有when_any。其也会产生future,当future组中任意一个为就绪态,这个新future的状态即为就绪。这对于并发性任务是一个不错的选择,也就需要为第一个就绪的线程找点事情来做。
假设要在一大堆数据里面找一个符合要求的值(符合这样要求的值有很多),找到任何一个即可。这种任务是可以并行的,可以多线程完成,每个任务去检查数据的一个子集,如果有线程找到了合适的值,这个线程就会设置一个标志,让其他线程停止搜索,并返回结果。这种情况下,还希望第一个完成搜索任务的线程,能对数据进行进一步的处理。
这就可以使用 std::experimental::when_any将future收集在一起,当future有一个为就绪时,任务即为完成。when_all会根据传入的future集合返回一个新的future,when_any会添加额外的层,并将集合和索引值组合在一起,这里的索引用于表示触发就绪的future,并将这个future添加到std::experimental::when_any_result类模板实例中。
代码4.24 使用std::experimental::when_any处理第一个被找到的值
std::experimental::future
find_and_process_value(std::vector &data)
{
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_tasks = (concurrency > 0)? concurrency : 2;
std::vector> results;
auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
auto chunk_begin = data.begin();
std::shared_ptr> done_flag =
std::make_shared>(false);
for (unsigned i = 0; i < num_tasks; ++i){ // 1
auto chunk_end =
(i < (num_tasks - 1)? chunk_begin + chunk_size : data.end());
results.push_back(spawn_async([=]{ // 2
for (auto entry = chunk_begin;
!*done_flag && (entry != chunk_end);
++entry){
if (matches_find_criteria(*entry)){
*done_flag = true;
return &*entry;
}
}
return (MyData *)nullptr;
}));
chunk_begin = chunk_end;
}
std::shared_ptr> final_result =
std::make_shared>();
struct DoneCheck {
std::shared_ptr>
final_result;
DoneCheck(
std::shared_ptr>
final_result_)
: final_result(std::move(final_result_)) {}
void operator()( // 4
std::experimental::future>>>
results_param) {
auto results = results_param.get();
MyData *const ready_result =
results.futures[results.index].get(); // 5
if (ready_result)
final_result->set_value( // 6
process_found_value(*ready_result));
else {
results.futures.erase(
results.futures.begin() + results.index); // 7
if (!results.futures.empty()) {
std::experimental::when_any( // 8
results.futures.begin(), results.futures.end())
.then(std::move(*this));
} else {
final_result->set_exception(
std::make_exception_ptr( // 9
std::runtime_error(“Not found”)));
}
}
};
std::experimental::when_any(results.begin(), results.end())
.then(DoneCheck(final_result)); // 3
return final_result->get_future(); // 10
}
初始化循环①会产生num_tasks个异步任务,每个任务都会执行②处的Lambda表达式。这个Lambda表达式的捕获方式是拷贝,所以每个任务都有自己的chunk_begin和chunk_end,这里同样也拷贝了共享指针done_flag。这就避免了生命周期所带来的问题。
当所有任务都已经产生,希望对任务的返回结果进行处理。可以调用when_any③通过连接持续性完成。这次可将持续性以类的方式去编写,因为想要对其进行递归复用。当其中一个任务完成初始化,DoneCheck的函数操作符会被调用④。首先,准备好从就绪的future中获取值⑤,并且当符合条件的值被找到,可以对结果进行处理,并对最终结果进行设置⑥。 否则,就需要从集合中丢弃就绪的future⑦。当还有很多future需要检查时,会产生对when_any的再次调用⑧。要再触发其持续性,需要等待下个future到达就绪态。如果没有剩下任何future,就说明这个值没有找到,将会在future中存储一个异常⑨。函数的返回值是一个future,其包含有最终的结果⑩。当然,这个问题还有其他解法,不过这里就展示一下when_any如何使用。
这两个使用when_all和when_any的例子中,都使用了重载版的迭代器范围,使用一堆迭代器来表示一组处于等待状态future的开始和末尾。这两个函数也可以以变量的形式出现,可以将一组future作为参数直接进行传入。例子中,future中存储的是元组(或when_any_result持有一个元组),而不是vector:
std::experimental::future f1=spawn_async(func1);
std::experimental::future f2=spawn_async(func2);
std::experimental::future f3=spawn_async(func3);
std::experimental::future<
std::tuple<
std::experimental::future,
std::experimental::future,
std::experimental::future>> result=
std::experimental::when_all(std::move(f1),std::move(f2),std::move(f3));
这个例子强调了when_any和when_all的重要性——可以通过容器中的任意std::experimental::future实例进行移动,并且通过值获取参数,因此需要显式的将future传入,或是传递一个临时变量。
有时等待的事件是一组线程,或是代码的某个特定点,亦或是协助处理一定量的数据。这种情况下,最好使用锁存器或栅栏,而非future。
首先,锁存器或是栅栏是什么东西?锁存器是一种同步对象,当计数器减为0时,就处于就绪态了。锁存器是基于其输出特性——当处于就绪态时,就会保持就绪态,直到被销毁。因此,锁存器是为同步一系列事件的轻量级机制。
栅栏是一种可复用的同步机制,其用于一组线程间的内部同步。虽然,锁存器不在乎是哪个线程使得计数器递减——同一个线程可以对计数器递减多次,或多个线程对计数器递减一次,再或是有些线程对计数器有两次的递减——对于栅栏来说,每一个线程只能在每个周期到达栅栏一次。当线程都抵达栅栏时,会对线程进行阻塞,直到所有线程都达到栅栏处,这时阻塞将会被解除。栅栏可以复用——线程可以再次到达栅栏处,等待下一个周期的所有线程。
锁存器其实要比栅栏简单很多,就先从简单std::experimental::latch说起。
std::experimental::latch声明在头文件中。构造std::experimental::latch时,将计数器的值作为构造函数的唯一参数。当等待的事件发生,就会调用锁存器count_down成员函数。当计数器为0时,锁存器状态变为就绪。可以调用wait成员函数对锁存器进行阻塞,直到等待的锁存器处于就绪状态。如果需要对锁存器是否就绪的状态进行检查,可调用is_ready成员函数。想要减少计数器1并阻塞直至0,则可以调用count_down_and_wait成员函数。
代码4.25 使用 std::experimental::latch等待所有事件
void foo(){
unsigned const thread_count=...;
latch done(thread_count); // 1
my_data data[thread_count];
std::vector<std::future<void> > threads;
for(unsigned i=0;i<thread_count;++i)
threads.push_back(std::async(std::launch::async,[&,i]{ // 2
data[i]=make_data(i);
done.count_down(); // 3
do_more_stuff(); // 4
}));
done.wait(); // 5
process_data(data,thread_count); // 6
} // 7
使用需要等待的事件数量对done的构造进行初始化①,并使用std::async产生适量的线程②。进行下一步前④,线程生成了相应的数据块,都会对锁存器的计数器进行递减③。处理生成的数据⑥之前,主线程只需要等待锁存器成为就绪态即可⑤。⑥处的数据处理可能需要与线程的最终处理同步进行④——所以在std::future析构之前⑦,无法保证所有线程都已完成。
需要注意的是,②传递给std::async的Lambda表达式中,通过引用的方式对除了i之外的所有内容进行捕获,而i是通过值捕获的方式进行传递。这是因为i是这里的循环计数器,数据和完成状态是共享访问的,所以通过引用捕获将会导致数据竞争和未定义的行为。此外,这里只要一个锁存器就够了,因为线程在数据准备好之后,还有其他任务要做。否则,就需要在处理数据前等待所有future,从确保所有任务都已经完成。
process_data中对data的访问是安全的⑥,即便这个值是其他线程上的任务存储的,因为锁存器是一个同步对象,所以线程调用cound_down改变计数器的行为是可见的,从而保证对wait的调用和返回在同一个锁存器上为可见。本质上,对count_down的调用与对wait的调用同步——第5章中了解了底层内存需和同步约束之后,就会明白这意味着什么了。
除了锁存器之外,并发技术扩展规范还为我们提供了用于同步一组线程的可复用的同步对象——栅栏。
并发技术扩展规范提供了两种栅栏机制,头文件中,分别为:std::experimental::barrier 和std::experimental::flex_barrier 。前者更简单,开销更低。后者更灵活,开销较大。
假设有一组线程对某些数据进行处理。每个线程都在处理独立的任务,因此在处理过程中无需同步。但当所有线程都必须处理下一个数据项前完成当前的任务时,就可以使用std::experimental::barrier来完成这项工作了。可以为同步组指定线程的数量,并为这组线程构造栅栏。当每个线程完成其处理任务时,都会到达栅栏处,并且通过调用栅栏对象的arrive_and_wait成员函数,等待小组的其他线程。当最后一个线程抵达时,所有线程将被释放,栅栏重置。组中的线程可以继续接下来的任务,或是处理下一个数据项,或是进入下一个处理阶段。
锁存器一旦就绪就会保持状态,不会有释放等待线程、重置、复用的过程。栅栏也只能用于一组线程内的同步——除非组中只有一个线程,否则无法等待栅栏就绪。可以通过显式调用栅栏对象的arrive_and_drop成员函数让线程退出组,这样就不用再受栅栏的约束,所以下一个周期到达的线程数就必须要比当前周期到达的线程数少一个了。
代码4.26 std::experimental::barrier的用法
result_chunk process(data_chunk);
std::vector<data_chunk>
divide_into_chunks(data_block data, unsigned num_threads);
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::experimental::barrier sync(num_threads);
std::vector<joining_thread> threads(num_threads);
std::vector<data_chunk> chunks;
result_block result;
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // 6
if (!i) { // 1
data_block current_block =
source.get_next_data_block();
chunks = divide_into_chunks(
current_block, num_threads);
}
sync.arrive_and_wait(); // 2
result.set_chunk(i, num_threads, process(chunks[i])); // 3
sync.arrive_and_wait(); // 4
if (!i) { // 5
sink.write_data(std::move(result));
}
}
});
}
} // 7
代码4.26中展示了如何使用栅栏来对一组线程进行同步。这里的数据来源是source,输出是sink,为了并发运行,需要将数据划分成num_threads块。这个操作是串行的,所以需要在初始化数据块①时进行,并且初始化过程只运行在i为0的线程上。并行执行任务之前,所有线程都会在栅栏处等待数据划分②,而后每个线程都会处理属于自己的数据块,再次同步之前④,将结果更新到result中③。然后到达下一个需要串行处理域,只有0号线程可以将结果输出到sink中⑤。这时,所有线程都会循环等待,直到将source中的任务全部处理完(done)⑥。当线程进入循环时,串行部分与循环是连接在一起的。因为在串行部分只有0号线程会执行,所以也没什么问题,在第一个栅栏处②将所有线程进行同步。当所有的处理都结束了,就意味着所有线程将会退出循环。并且,等待所有joining_thread对象的外部函数结束时,对这些对象进行析构⑦(joining_thread在第2章的代码2.7中有过介绍)。
需要着重注意的是arrive_and_wait的调用位置。所有线程就绪前,确定没有运行线程这点很重要。第一个同步点,所有线程都在等待0号线程到达。而第二个同步点,情况刚好相反,0号线程在等待其他线程都到达之后,才能将完成的结果写入sink中。
并发技术扩展规范不止提供了一种栅栏,与std::experimental::barrier相同, std::experimental::flex_barrier这个类型的栅栏更加的灵活。灵活之处在于,栅栏拥有完成阶段,一旦参与线程集中的所有线程都到达同步点,则由参与线程之一去执行完成阶段。
std::experimental::flex_barrier与std::experimental::barrier有一点不同:有一个额外的构造函数,需要传入一个完整的函数和线程数量。当所有线程都到达栅栏处,那么这个函数就由其中一个线程运行。其不仅指定了串行代码的运行方式,还提供了一种修改下一个周期到达栅栏处线程个数的方式。对于线程的计数可以修改成任何数字,无论这个数字比当前数字高或低。这样,开发者就能确定下一次到达栅栏处的线程数量了。
下面的代码展示了使用std::experimental::flex_barrier对代码4.26的进行重写:
代码4.27 使用std::experimental::flex_barrier管理串行部分
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::vector<data_chunk> chunks;
auto split_source = [&] { // 1
if (!source.done()) {
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
};
split_source(); // 2
result_block result;
std::experimental::flex_barrier sync(num_threads, [&] { // 3
sink.write_data(std::move(result));
split_source(); // 4
return -1; // 5
});
std::vector<joining_thread> threads(num_threads);
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // 6
result.set_chunk(i, num_threads, process(chunks[i]));
sync.arrive_and_wait(); // 7
}
});
}
}
与代码4.26的第一个不同在于,使用一个Lambda表达式对数据进行拆分①。这个Lambda表达式会在运行前调用②,并封装在迭代开始时的0号线程上运行。
第二个区别在于,sync对象的类型为 std::experimental::flex_barrier,并且需要将一个完整的函数和线程数量对实例进行构造③。该函数会在所有线程抵达栅栏处的时候,运行在0号线程上,然后由0号线程调用Lambda表达式对数据进行拆分,当拆分结束后,下一轮迭代开始④。返回值-1表示线程数目保持不变,返回值为0或其他数值则指定的是下一个周期中参与迭代的线程数量。
主循环⑥就简单了:其只包含了并行部分的代码,所以只要有一个同步点就够了⑦。使用std::experimental::flex_barrier能够很好的对代码进行简化。
使用完整函数作为串行块是一种很强大的功能,因为这能够改变参与并行的线程数量。例如:流水线类型代码在运行时,当流水线的各级都在进行处理时,线程的数量在初始阶段和执行阶段要少于主线程处理阶段。