并发编程相对于单线程编程, 在异常安全上需要注意更多问题.
我学C++时, 只是了解了异常, 异常捕获等等, 没有进行过深的研究, 也就是 C++ primer 所涉猎的部分, 仅此而已, 但书上没有阐明什么是真正的异常安全, 需要补充些前置知识.
所谓异常安全, 意味着无资源泄漏风险, 无数据破坏风险.
C++ 用 RAII 的方式巧妙的应对资源泄漏, 无论异常是否引发, 都会析构, 通过这种设计, 保证不会有资源泄漏.
而应对数据破坏, 则可以使用 swap 策略, 使得数据要不然成功修改, 要不然保持原样. 更深的理解则需要查看更多的资料, 本文作者也没有能力完全阐明清楚, 程序可使用, 和程序有极强的鲁棒性, 难度差异极大.
从前期文章中提到的并发累加算法实现, 看看不涉及异常安全的代码是怎么样的:
#include
#include
#include
#include
#include
// 累加仿函数
template <typename Iterator, typename T>
struct accumulateBlock
{
void operator()(Iterator first, Iterator last, T &result)
{
result = std::accumulate(first, last, result);
}
};
// 并行累加
template <typename Iterator, typename T>
auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
{
const unsigned long length = std::distance(first, last);
if (!length)
{
return init;
}
const unsigned long minPerThread = 25;
// 算法保证最大线程数为 25 的倍数, 如余数不为 0 则加一
const unsigned long maxThreads = (length + minPerThread - 1) / minPerThread;
const unsigned long hardwareThreads = std::thread::hardware_concurrency();
const unsigned long numThreads =
std::min(hardwareThreads != 0 ? hardwareThreads : 2, maxThreads);
// 每个线程需处理的数据量, 余数由主线程处理
const unsigned long blockSize = length / numThreads;
// 结果数组, 个数为线程数
std::vector<T> results(numThreads);
std::vector<std::thread> threads(numThreads - 1);
Iterator blockStart = first;
for (unsigned long i = 0; i < (numThreads - 1); ++i)
{
Iterator blockEnd = blockStart;
// 使迭代器 blockEnd 偏移 blockSize
std::advance(blockEnd, blockSize);
threads[i] = std::thread(accumulateBlock<Iterator, T>(), blockStart,
blockEnd, std::ref(results[i]));
blockStart = blockEnd;
}
// 其它线程累加剩下的元素由主线程收尾
accumulateBlock<Iterator, T>()(blockStart, last, results[numThreads - 1]);
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(), results.end(), init);
}
auto main() -> int
{
std::vector<int> vi;
vi.reserve(100);
for (int i = 0; i != 100; ++i)
{
vi.push_back(i);
}
int result = parallelAccumulate(vi.begin(), vi.end(), 0);
std::cout << result << std::endl;
return 0;
}
以上代码中, accumulateBlock 的内部实现使用了算法库的函数, 而此函数未曾封装在 try catch 结构中, 所以所有对仿函数的调用都可能引发异常, 导致 std::terminate 结束程序.
所以, 这个并发累加的算法函数实现是非异常安全的.
要让程序有一定的异常安全性, 可以改用标准库 std::packaged_task 和 std::future 结构封装, 这两个结构是异常安全的. 为了配合这种结构, 可以将 accumulateBlock 仿函数更改为返回结果的实现, 用于将结果封装于 packaged_task 传递给 future.
所有由其它线程返回的结果都保存在 future 中, 在线程执行运算时, 由 future 进行结果封装或异常抛出.
#include
#include
#include
#include
#include
#include
// 累加仿函数
template <typename Iterator, typename T>
struct accumulateBlock
{
auto operator()(Iterator first, Iterator last) -> T
{
return std::accumulate(first, last, T());
}
};
// 并行累加
template <typename Iterator, typename T>
auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
{
const unsigned long length = std::distance(first, last);
if (!length)
{
return init;
}
const unsigned long minPerThread = 25;
// 算法保证最大线程数为 25 的倍数, 如余数不为 0 则加一
const unsigned long maxThreads = (length + minPerThread - 1) / minPerThread;
const unsigned long hardwareThreads = std::thread::hardware_concurrency();
const unsigned long numThreads =
std::min(hardwareThreads != 0 ? hardwareThreads : 2, maxThreads);
// 每个线程需处理的数据量, 余数由主线程处理
const unsigned long blockSize = length / numThreads;
// 结果数组, 个数为线程数
std::vector<std::future<T>> futures(numThreads - 1);
std::vector<std::thread> threads(numThreads - 1);
Iterator blockStart = first;
for (unsigned long i = 0; i < (numThreads - 1); ++i)
{
Iterator blockEnd = blockStart;
// 使迭代器 blockEnd 偏移 blockSize
std::advance(blockEnd, blockSize);
std::packaged_task<T(Iterator, Iterator)> task(
(accumulateBlock<Iterator, T>()));
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task), blockStart, blockEnd);
blockStart = blockEnd;
}
// 其它线程累加剩下的元素由主线程收尾
T lastResult = accumulateBlock<Iterator, T>()(blockStart, last);
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
T result = init;
for (unsigned long i = 0; i < (numThreads - 1); ++i)
{
result += futures[i].get();
}
result += lastResult;
return result;
}
auto main() -> int
{
std::vector<int> vi(100);
for (int i = 0; i != 100; ++i)
{
vi[i] = i;
}
int result = parallelAccumulate(vi.begin(), vi.end(), 0);
std::cout << result << std::endl;
return 0;
}
以上代码制约了异常只能在最后结果汇合时抛出异常, 我们可以进一步, 将最后部分放入 tyr catch 结构中.
T lastResult;
try
{
for (unsigned long i = 0; i < (numThreads - 1); ++i)
{
Iterator blockEnd = blockStart;
// 使迭代器 blockEnd 偏移 blockSize
std::advance(blockEnd, blockSize);
std::packaged_task<T(Iterator, Iterator)> task(
(accumulateBlock<Iterator, T>()));
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task), blockStart, blockEnd);
blockStart = blockEnd;
}
// 其它线程累加剩下的元素由主线程收尾
lastResult = accumulateBlock<Iterator, T>()(blockStart, last);
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
}
catch (...)
{
for (unsigned long i = 0; i != (numThreads - 1); ++i)
{
if (threads[i].joinable())
{
threads[i].join();
}
throw;
}
}
也可用 RAII 的方式, 用一个结构封装线程数组, 并在析构时保证 join, 可以省略 try catch 结构, 并且省去显示的 join 调用, 因为 future.get 会阻塞线程直至返回值.
#include
#include
#include
#include
#include
#include
// 累加仿函数
template <typename Iterator, typename T>
struct accumulateBlock
{
auto operator()(Iterator first, Iterator last) -> T
{
return std::accumulate(first, last, T());
}
};
struct joinThreads
{
explicit joinThreads(std::vector<std::thread> &rhs)
: threads(rhs)
{}
~joinThreads()
{
for (auto &thread : threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
private:
std::vector<std::thread> &threads;
};
// 并行累加
template <typename Iterator, typename T>
auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
{
const unsigned long length = std::distance(first, last);
if (!length)
{
return init;
}
const unsigned long minPerThread = 25;
// 算法保证最大线程数为 25 的倍数, 如余数不为 0 则加一
const unsigned long maxThreads = (length + minPerThread - 1) / minPerThread;
const unsigned long hardwareThreads = std::thread::hardware_concurrency();
const unsigned long numThreads =
std::min(hardwareThreads != 0 ? hardwareThreads : 2, maxThreads);
// 每个线程需处理的数据量, 余数由主线程处理
const unsigned long blockSize = length / numThreads;
// 结果数组, 个数为线程数
std::vector<std::future<T>> futures(numThreads - 1);
std::vector<std::thread> threads(numThreads - 1);
// 函数结束自动将所有线程 join
joinThreads const joiner(threads);
Iterator blockStart = first;
T lastResult;
for (unsigned long i = 0; i < (numThreads - 1); ++i)
{
Iterator blockEnd = blockStart;
// 使迭代器 blockEnd 偏移 blockSize
std::advance(blockEnd, blockSize);
std::packaged_task<T(Iterator, Iterator)> task(
(accumulateBlock<Iterator, T>()));
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task), blockStart, blockEnd);
blockStart = blockEnd;
}
// 其它线程累加剩下的元素由主线程收尾
lastResult = accumulateBlock<Iterator, T>()(blockStart, last);
T result = init;
for (unsigned long i = 0; i < (numThreads - 1); ++i)
{
result += futures[i].get();
}
result += lastResult;
return result;
}
auto main() -> int
{
std::vector<int> vi(100);
for (int i = 0; i != 100; ++i)
{
vi[i] = i;
}
int result = parallelAccumulate(vi.begin(), vi.end(), 0);
std::cout << result << std::endl;
return 0;
}
用 std::async() 加递归的方式实现并发累加, 通过 async 自动分配线程, 通过 future 捕获异常, 保证异常安全.
#include
#include
#include
#include
#include
template <typename Iterator, typename T>
auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
{
const unsigned long length = std::distance(first, last);
const unsigned long maxChunkSize = 25;
if (length <= maxChunkSize)
{
return std::accumulate(first, last, init);
}
Iterator midPoint = first;
std::advance(midPoint, length / 2);
std::future<T> firstHalfResult =
std::async(parallelAccumulate<Iterator, T>, first, midPoint, init);
T secondHalfResult = parallelAccumulate(midPoint, last, T());
return firstHalfResult.get() + secondHalfResult;
}
auto main() -> int
{
std::vector<int> vi(100);
for (int i = 0; i != 100; ++i)
{
vi[i] = i;
}
const int result = parallelAccumulate(vi.begin(), vi.end(), 0);
std::cout << result << std::endl;
return 0;
}
对于真正可用的并发代码, 需要注意的比单线程要多, 也更难, 异常安全是个较大的话题, 需要更多的资料学习尝试.
参考: C++中的异常安全性