教程翻译自Seastar官方文档:https://github.com/scylladb/seastar/blob/master/doc/tutorial.md
转载请注明出处:https://www.cnblogs.com/morningli/p/15961636.html
Fiber
Seastar 延续通常很短,但经常相互链接,因此一个延续会做一些工作,然后安排另一个延续以供以后使用。这样的链可能很长,甚至经常涉及循环 —— 请参阅下一节“循环”。我们将这种链称为执行的“fiber”。
这些fiber不是线程——每一个都只是一串延续——但它们与传统线程有一些共同的要求。例如,我们希望避免一根fiber被饿死,而另一根fiber连续不断地运行它的continuation
。作为另一个例子,fiber可能想要进行通信——例如,一个fiber产生第二个fiber消耗的数据,并且我们希望确保两个fiber都有机会运行,并且如果一个fiber过早停止,另一个fiber不会永远挂起。
循环
大多数耗时的计算都涉及使用循环。Seastar 提供了几个原语来表达它们,与未来/承诺模型很好地组合在一起。Seastar 循环原语的一个非常重要的方面是每次迭代之后都有一个抢占点,从而允许其他任务在迭代之间运行。
repeat
用repeat
创建的循环执行主体,直到它接收到一个stop_iteration
对象,该对象通知迭代应该继续(stop_iteration::no
)还是停止(stop_iteration::yes
)。只有在第一个迭代完成后才会启动下一个迭代。传递给repeat
的循环体应该有一个future<stop_iteration>
的返回类型。
seastar::future<int> recompute_number(int number);
seastar::future<> push_until_100(seastar::lw_shared_ptr<std::vector<int>> queue, int element) {
return seastar::repeat([queue, element] {
if (queue->size() == 100) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return recompute_number(element).then([queue] (int new_element) {
queue->push_back(new_element);
return stop_iteration::no;
});
});
}
do_until
do_until
是repeat
的近亲,但它使用显式传递的条件来决定是否应该停止迭代。上面的例子可以用do_until
表示为:
seastar::future<int> recompute_number(int number);
seastar::future<> push_until_100(seastar::lw_shared_ptr<std::vector<int>> queue, int element) {
return seastar::do_until([queue] { return queue->size() == 100; }, [queue, element] {
return recompute_number(element).then([queue] (int new_element) {
queue->push_back(new_element);
});
});
}
请注意,循环体应返回future<>
,这允许在循环内组合复杂的延续。
do_for_each
do_for_each
相当于Seastar 世界中的for
循环。它接受一个范围(或一对迭代器)和一个函数体,它按顺序一个接一个地应用于每个参数。下一次迭代将仅在第一次迭代完成后启动,就像repeat
. 像往常一样,do_for_each
期望它的循环体返回一个future<>
.
seastar::future<> append(seastar::lw_shared_ptr<std::vector
return seastar::do_for_each(queue2, [queue1] (int element) {
queue1->push_back(element);
});
}
seastar::future<> append_iota(seastar::lw_shared_ptr<std::vector<int>> queue1, int n) {
return seastar::do_for_each(boost::make_counting_iterator<size_t>(0), boost::make_counting_iterator<size_t>(n), [queue1] (int element) {
queue1->push_back(element);
});
}
do_for_each
接受对容器的左值引用或一对迭代器。这意味着在整个循环执行期间确保容器处于活动状态的责任属于调用者。如果容器需要延长其使用寿命,可以用do_with
通过以下方式轻松实现:
seastar::future<> do_something(int number);
seastar::future<> do_for_all(std::vector<int> numbers) {
// Note that the "numbers" vector will be destroyed as soon as this function
// returns, so we use do_with to guarantee it lives during the whole loop execution:
return seastar::do_with(std::move(numbers), [] (std::vector<int>& numbers) {
return seastar::do_for_each(numbers, [] (int number) {
return do_something(number);
});
});
}
parallel_for_each
parallel_for_each
是do_for_each
的高并发变种. 使用 时parallel_for_each,所有迭代都同时排队—— 这意味着无法保证它们完成操作的顺序。
seastar::future<> flush_all_files(seastar::lw_shared_ptr<std::vector<seastar::file>> files) {
return seastar::parallel_for_each(files, [] (seastar::file f) {
// file::flush() returns a future<>
return f.flush();
});
}
parallel_for_each
是一个强大的工具,因为它允许并行生成许多任务。这可能是一个巨大的性能提升,但也有一些警告。首先,太高的并发可能会很麻烦——细节可以在限制循环的并行性一章中找到。
要限制parallel_for_each
的并发性,请使用下面描述的max_concurrent_for_each
。有关处理并行性的更多详细信息,请参阅限制循环的并行性一章。
其次,请注意在parallel_for_each
循环中执行迭代的顺序是任意的——如果需要严格的顺序,请考虑使用do_for_each
。
max_concurrent_for_each
max_concurrent_for_each
是parallel_for_each
有限并行的变体。它接受一个额外的参数——max_concurrent
——最多max_concurrent
迭代同时排队,不保证它们以什么顺序完成它们的操作。
seastar::future<> flush_all_files(seastar::lw_shared_ptr<std::vector<seastar::file>> files, size_t max_concurrent) {
return seastar::max_concurrent_for_each(files, max_concurrent, [] (seastar::file f) {
return f.flush();
});
}
确定最大并发限制超出了本文档的范围。它通常应该源自运行软件的系统的实际功能,例如并行执行单元或 I/O 通道的数量,以便在不使系统不堪重负的情况下优化资源利用率。
when_all:等待多个future
上面我们已经看到parallel_for_each()
,它启动了一些异步操作,然后等待所有操作完成。Seastar 有另一个成语,when_all()
,用于等待几个已经存在的期货完成。
when_all()
的第一个变量是可变的,即future
作为单独的参数给出,其确切数量在编译时是已知的。个别future
可能有不同的类型。例如,
#include <seastar/core/sleep.hh>
future<> f() {
using namespace std::chrono_literals;
future<int> slow_two = sleep(2s).then([] { return 2; });
return when_all(sleep(1s), std::move(slow_two),
make_ready_future<double>(3.5)
).discard_result();
}
这将启动三个期货 —— 一个休眠一秒钟(并且不返回任何内容),一个休眠两秒钟并返回整数 2,以及一个立即返回双精度 3.5 - 然后等待它们。该when_all()
函数返回一个future
,它在所有三个future
解析后立即解析,即两秒后。这个future
也有一个值,我们将在下面解释,但在这个例子中,我们只是等待未来解决并丢弃它的值。
请注意,when_all()
只接受右值,它可以是临时的(如异步函数的返回值或make_ready_future
)或std::move()
持有future
的变量。
由when_all()
返回的future
为已解析的future
元组,并包含三个输入future
的结果。继续上面的例子,
future<> f() {
using namespace std::chrono_literals;
future<int> slow_two = sleep(2s).then([] { return 2; });
return when_all(sleep(1s), std::move(slow_two),
make_ready_future<double>(3.5)
).then([] (auto tup) {
std::cout << std::get<0>(tup).available() << "\n";
std::cout << std::get<1>(tup).get0() << "\n";
std::cout << std::get<2>(tup).get0() << "\n";
});
}
该程序的输出(两秒后)是1, 2, 3.5:元组中的第一个未来可用(但没有值),第二个具有整数值 2,第三个是双精度值 3.5 —— 正如预期的那样。
一个或多个等待的future
可能会在异常中解决,但这不会改变when_all()
工作方式:它仍然等待所有期货解决,每个期货都有一个值或一个异常,并且在返回的元组中,一些期货可能包含异常而不是值。例如,
future<> f() {
using namespace std::chrono_literals;
future<> slow_success = sleep(1s);
future<> slow_exception = sleep(2s).then([] { throw 1; });
return when_all(std::move(slow_success), std::move(slow_exception)
).then([] (auto tup) {
std::cout << std::get<0>(tup).available() << "\n";
std::cout << std::get<1>(tup).failed() << "\n";
std::get<1>(tup).ignore_ready_future();
});
}
两个future
都available()
(已解决),但第二个期货failed()
(导致异常而不是值)。注意我们如何在这个失败的future
上调用ignore_ready_future()
,因为默默地忽略失败的future
被认为是一个错误,并将导致“Exceptional future ignored”错误消息。更典型的是,应用程序将记录失败的future
而不是忽略它。
上面的例子表明正确使用when_all()
是不方便和冗长的。结果被包装在一个元组中,导致冗长的元组语法,并使用就绪的future
,必须单独检查所有的异常以避免错误消息。
所以Seastar也提供了一个更容易使用的when_all_succeed()
功能。此函数也返回一个未来,当所有给定的未来都已解决时,该未来将解决。如果它们都成功了,它将结果值传递给 continuation
,而不将它们包装在future
或元组中。但是,如果一个或多个future
失败,则when_all_succeed()
解析为失败的future
,其中包含来自失败future
之一的异常。如果给定的future
不止一个失败,其中一个将被传递(未指定选择哪一个),其余的将被静默忽略。例如,
using namespace seastar;
future<> f() {
using namespace std::chrono_literals;
return when_all_succeed(sleep(1s), make_ready_future<int>(2),
make_ready_future<double>(3.5)
).then([] (int i, double d) {
std::cout << i << " " << d << "\n";
});
}
请注意,future
持有的整数和双精度值是如何方便地单独(没有元组)传递给continuation
的。由于sleep()
不包含值,因此等待它,但没有第三个值传递给continuation
。这也意味着如果我们when_all_succeed()
对几个future<>(没有值),结果也是一个future<>:
using namespace seastar;
future<> f() {
using namespace std::chrono_literals;
return when_all_succeed(sleep(1s), sleep(2s), sleep(3s));
}
此示例仅等待 3 秒(最大值为 1、2 和 3 秒)。
when_all_succeed()
处理异常的一个例子:
using namespace seastar;
future<> f() {
using namespace std::chrono_literals;
return when_all_succeed(make_ready_future<int>(2),
make_exception_future<double>("oops")
).then([] (int i, double d) {
std::cout << i << " " << d << "\n";
}).handle_exception([] (std::exception_ptr e) {
std::cout << "exception: " << e << "\n";
});
}
在这个例子中,有一个future
失败了,所以when_all_succeed
的结果是一个失败的future
,所以正常的continuation
没有运行,handle_exception()
continuation
就完成了。
信号量
Seastar 的信号量是标准的计算机科学信号量,适用于future
。信号量是一个计数器,您可以在其中存放或取走单元。如果没有足够的单元可用,从计数器取单元可能会等待。
使用信号量限制并行性
Seastar 中信号量最常见的用途是限制并行性,即限制可以并行运行的某些代码的实例数量。当每个并行调用使用有限的资源(例如,内存)时,这可能很重要,因此让无限数量的并行调用可能会耗尽该资源。
考虑外部事件源(例如,传入的网络请求)导致调用异步函数g()
的情况。想象一下,我们希望将并发操作g()
的数量限制为 100。即,如果 g()
在 100 个其他调用仍在进行时启动,我们希望它延迟其实际工作,直到其他调用之一完成。我们可以用信号量来做到这一点:
seastar::future<> g() {
static thread_local seastar::semaphore limit(100);
return limit.wait(1).then([] {
return slow(); // do the real work of g()
}).finally([] {
limit.signal(1);
});
}
在这个例子中,信号量从计数器的 100 开始。异步操作slow()
只有在我们可以将计数器减一(wait(1)
),这样,当slow()
完成,无论是成功还是异常,计数器会加回一(signal(1)
)。通过这样的方式,当100个操作已经开始工作但尚未完成时,第101个操作将等待,直到其中一个正在进行的操作完成并将一个单元返回给信号量。这确保了每次我们在上述代码中最多运行 100个并发操作slow()
。
请注意我们如何使用static thread_local
信号量,以便g()
来自同一分片的所有调用都计入相同的限制;像往常一样,Seastar 应用程序是分片的,因此每个分片(CPU 线程)的限制是分开的。这通常很好,因为分片应用程序认为每个分片的资源是分开的。
幸运的是,上面的代码恰好是异常安全的:limit.wait(1)
可以在内存不足时抛出异常(保留了waiter列表),在这种情况下,信号量计数器不会减少,但下面的continuation
不会运行,所以它不会也增加了。当信号量broken 时,limit.wait(1)
也可以返回一个特殊的future
(我们稍后会讨论),但在这种情况下,额外的signal()
调用被忽略。最后,'slow()'也可以抛出或返回一个异常的未来,但finally()
确保信号量仍然增加。
然而,随着应用程序代码变得越来越复杂,我们都很难确保无论发生在哪个代码路径或异常发生从不会忘记在操作完成后调用signal()
。作为可能出错的示例,请考虑以下错误代码片段,该代码片段与上述代码片段略有不同,并且乍一看似乎是正确的:
seastar::future<> g() {
static thread_local seastar::semaphore limit(100);
return limit.wait(1).then([] {
return slow().finally([] { limit.signal(1); });
});
}
但是这个版本不是异常安全的:考虑如果slow()在返回future
之前抛出异常会发生什么(这与slow()返回异常future
不同 —— 我们在异常处理部分讨论了这种差异)。在这种情况下,我们减少了计数器,但永远不会运行到finally()
,并且永远不会增加计数器。有一种方法可以修复此代码,方法是将slow()
调用替换为seastar::futurize_invoke(slow)
。但我们在这里试图说明的重点不是如何修复有缺陷的代码,而是通过使用单独的semaphore::wait()
和semaphore::signal()
函数,你很容易出错。
为了异常安全,在 C++ 中一般不建议有单独的资源获取和释放函数。相反,C++ 提供了更安全的机制来获取资源(在本例中为信号量单元)并在稍后释放它:lambda 函数和 RAII(“resource acquisition is initialization”):
基于 lambda 的解决方案是一个函数seastar::with_semaphore(),它是上面示例中代码的快捷方式:
seastar::future<> g() {
static thread_local seastar::semaphore limit(100);
return seastar::with_semaphore(limit, 1, [] {
return slow(); // do the real work of g()
});
}
with_semaphore()
和前面的代码片段一样,等待信号量中给定数量的单元,然后运行给定的 lambda,当 lambda 返回的未来被解析时,with_semaphore()
将单元返回给信号量。with_semaphore()
返回一个只有在所有这些步骤完成后才能解决的future
。
函数seastar::get_units()
更通用。它基于 C++ 的 RAII 哲学,为seastar::semaphore
的分开的wait()
和signal()
方法提供了替代方案:该函数返回一个不透明的单位对象,该对象在持有时保持信号量的计数器减少 —— 一旦该对象被破坏,计数器就会增加回来。使用此接口,您不会忘记增加计数器,或将其增加两次,或增加而不减少:当创建单位对象时,计数器将始终减少一次,如果成功,则在对象被销毁时增加。当units
对象被移动到一个continuation
中时,无论这个continuation
如何结束,当continuation
被破坏时,units
对象也被销毁并且单元被返回到信号量的计数器。上面用 get_units()
编写的示例如下所示:
seastar::future<> g() {
static thread_local semaphore limit(100);
return seastar::get_units(limit, 1).then([] (auto units) {
return slow().finally([units = std::move(units)] {});
});
}
请注意get_units()
需要使用的有点复杂的方式:continuation
必须嵌套,因为我们需要将units
对象移动到最后一个continuation
。如果slow()
返回一个future
(并且不立即抛出),则finally()
延续捕获units
对象直到一切完成,但不运行任何代码。
Seastars 程序员通常应该避免直接使用semaphore::wait()
和semaphore::signal()
函数,并且总是更喜欢with_semaphore()
(如果适用)或get_units()
.
限制资源使用
因为信号量支持等待任意数量的单元,而不仅仅是 1,所以我们可以将它们用于更多地限制并行调用的数量。例如,假设我们有一个异步函数using_lots_of_memory(size_t bytes)
,它使用bytes
字节内存,并且我们希望确保该函数的所有并行调用使用的内存不超过 1 MB—— 并且其他调用会延迟到之前的调用完成了。我们可以用信号量来做到这一点:
seastar::future<> using_lots_of_memory(size_t bytes) {
static thread_local seastar::semaphore limit(1000000); // limit to 1MB
return seastar::with_semaphore(limit, bytes, [bytes] {
// do something allocating 'bytes' bytes of memory
});
}
请注意,在上面的示例中,调用using_lots_of_memory(2000000)
将返回一个永远不会解析的未来,因为信号量永远不会包含足够的单元来满足信号量等待。using_lots_of_memory()
应该可能检查是否bytes
超过限制,并在这种情况下抛出异常。Seastar 不会为您执行此操作。
限制循环的并行性
上面,我们查看了一个被某个外部事件调用的函数g()
,并希望控制它的并行性。在本节中,我们将研究循环的并行性,它也可以通过信号量来控制。
考虑以下简单循环:
#include <seastar/core/sleep.hh>
seastar::future<> slow() {
std::cerr << ".";
return seastar::sleep(std::chrono::seconds(1));
}
seastar::future<> f() {
return seastar::repeat([] {
return slow().then([] { return seastar::stop_iteration::no; });
});
}
此循环运行slow()
函数(需要一秒钟才能完成),没有任何并行性 --- 下一个slow()
调用仅在前一个调用完成时开始。但是,如果我们不需要序列化对slow()
的调用,并且希望允许它的多个实例同时进行呢?
天真地,我们可以通过在上一次调用之后立即开始下一次调用来实现更多的并行性slow()--- 忽略上一次调用slow()
返回的future
并且不等待它解决:
seastar::future<> f() {
return seastar::repeat([] {
slow();
return seastar::stop_iteration::no;
});
}
但是在这个循环中,并行性的数量没有限制——sleep()在第一个调用返回之前,数百万个调用可能是并行活动的。最终,此循环可能会消耗所有可用内存并崩溃。
使用信号量允许我们并行运行多个slow()
实例,但将这些并行实例的数量限制为,在以下示例中为 100:
seastar::future<> f() {
return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::repeat([&limit] {
return limit.wait(1).then([&limit] {
seastar::futurize_invoke(slow).finally([&limit] {
limit.signal(1);
});
return seastar::stop_iteration::no;
});
});
});
}
请注意,此代码与我们在上面看到的限制函数g()
并行调用次数的代码有何不同:
- 在这里,我们不能使用单个
thread_local
信号量。每个调用f()
都有其并行度为 100 的循环,因此需要它自己的信号量“limit
”,在循环期间使用do_with()
保活。 - 在这里,我们在继续循环之前不等待
slow()
完成,即,我们没有return
从futurize_invoke(slow)
开始的future
链。当信号量单元可用时,循环继续到下一次迭代,而(在我们的示例中)99 个其他操作可能正在后台进行,我们不等待它们。
在本节的示例中,我们不能使用with_semaphore()
快捷方式。with_semaphore()
返回一个仅在 lambda 返回的future
解决后才解决的future
。但是在上面的例子中,循环需要知道何时只有信号量单元可用,才能开始下一次迭代——而不是等待上一次迭代完成。我们无法通过with_semaphore()
来实现. 但是在这种情况下可以使用更通用的异常安全惯用语seastar::get_units()
,也是推荐使用的:
seastar::future<> f() {
return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::repeat([&limit] {
return seastar::get_units(limit, 1).then([] (auto units) {
slow().finally([units = std::move(units)] {});
return seastar::stop_iteration::no;
});
});
});
}
上面的例子是不现实的,因为它们有一个永远不会结束的循环,f()
返回的future
永远不会解决。在更现实的情况下,循环有一个结束,在循环结束时,我们需要等待循环开始的所有后台操作。我们可以通过wait()
信号量的原始计数来做到这一点:当完整的计数最终可用时,这意味着所有操作都已完成。例如,以下循环在 456 次迭代后结束:
seastar::future<> f() {
return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::do_for_each(boost::counting_iterator<int>(0),
boost::counting_iterator<int>(456), [&limit] (int i) {
return seastar::get_units(limit, 1).then([] (auto units) {
slow().finally([units = std::move(units)] {});
});
}).finally([&limit] {
return limit.wait(100);
});
});
}
最后一个finally
是确保我们等待最后一个操作完成的原因:在repeat
循环结束后(无论是成功还是由于其中一次迭代中的异常而提前结束),我们执行 wait(100)
以等待信号量达到其原始值 100,意味着我们开始的所有操作都已完成。如果没有finally
,则f()
返回的 future
将在循环的所有迭代实际完成之前解析(最后 100 次可能仍在运行)。
在我们在上面的例子中看到的成语中,相同的信号量既用于限制后台操作的数量,又用于等待所有操作完成。有时,我们希望几个不同的循环使用相同的信号量来限制它们的总并行度。在这种情况下,我们必须使用单独的机制来等待由循环启动的后台操作完成。等待正在进行的操作最方便的方法是使用gate
,我们将在后面详细介绍。一个典型的循环示例,其并行性受外部信号量限制:
thread_local seastar::semaphore limit(100);
seastar::future<> f() {
return seastar::do_with(seastar::gate(), [] (auto& gate) {
return seastar::do_for_each(boost::counting_iterator<int>(0),
boost::counting_iterator<int>(456), [&gate] (int i) {
return seastar::get_units(limit, 1).then([&gate] (auto units) {
gate.enter();
seastar::futurize_invoke(slow).finally([&gate, units = std::move(units)] {
gate.leave();
});
});
}).finally([&gate] {
return gate.close();
});
});
}
在这段代码中,我们使用外部信号量limit
来限制并发操作的数量,但另外有一个特定于这个循环的gate来帮助我们等待所有正在进行的操作完成。
管道
Seastar的pipe<T>
是一种在两个fiber
之间传输数据的机制,一个产生数据,另一个消费数据。它有一个固定大小的缓冲区来确保两个fiber
的平衡执行,因为生产者fiber
在写入完整管道时会阻塞,直到消费者fiber
开始运行并从管道中读取。
pipe<T>
类似于Unix 管道,因为它具有读取端、写入端和它们之间的固定大小的缓冲区,并且支持独立关闭任一端(以及使用另一端时的 EOF 或 broken pipe)。pipe<T>
对象将管道的读取端和写入端作为两个独立的对象。这些对象可以移动到两个不同的fiber
中。重要的是,如果其中一个管道末端被销毁(即,捕获它的continuation
结束),管道的另一端将停止阻塞,因此另一根fiber
将不会挂起。
管道的读写接口是基于future
的阻塞。即,write()
和 read()
方法返回一个future
,它在操作完成时实现。管道是单读单写的,这意味着在 read()
返回的 future
完成之前,不得再次调用 read()
(对于 write
也是如此)。注意:管道读取器和写入器是可move
的,但不可复制。将每一端包装在一个共享指针中通常很方便,这样它可以被复制(例如,在需要可复制的 std::function
中使用)或轻松捕获到多个continuation
中。
使用gate关闭服务
考虑一个有一些长时间操作slow()
的应用程序,许多这样的操作可能随时启动。许多slow()
操作甚至可以并行进行。现在,您要关闭此服务,但要确保在此之前完成所有未完成的操作。此外,您不希望slow()
在关闭过程中允许新操作开始。
这就是seastar::gate
的目的。gate
g
维护正在进行的操作的内部计数器。我们在进入操作时调用g.enter()
(即在运行slow()
之前),在离开操作时调用g.leave()
(当调用slow()
完成时)。该方法g.close()
关闭了 gate
,这意味着它禁止任何进一步的调用g.enter()
(这种尝试将产生异常);此外,当所有现有操作都完成时,g.close()
返回一个future
。换句话说,当g.close()
解析时,我们知道不能再调用slow()
—— 因为已经开始的调用已经完成,而新的调用无法开始。
构造
seastar::with_gate(g, [] { return slow(); })
可以用作成语的捷径
g.enter();
slow().finally([&g] { g.leave(); });
这是使用gate
的典型示例:
#include <seastar/core/sleep.hh>
#include <seastar/core/gate.hh>
#include <boost/iterator/counting_iterator.hpp>
seastar::future<> slow(int i) {
std::cerr << "starting " << i << "\n";
return seastar::sleep(std::chrono::seconds(10)).then([i] {
std::cerr << "done " << i << "\n";
});
}
seastar::future<> f() {
return seastar::do_with(seastar::gate(), [] (auto& g) {
return seastar::do_for_each(boost::counting_iterator<int>(1),
boost::counting_iterator<int>(6),
[&g] (int i) {
seastar::with_gate(g, [i] { return slow(i); });
// wait one second before starting the next iteration
return seastar::sleep(std::chrono::seconds(1));
}).then([&g] {
seastar::sleep(std::chrono::seconds(1)).then([&g] {
// This will fail, because it will be after the close()
seastar::with_gate(g, [] { return slow(6); });
});
return g.close();
});
});
}
在这个例子中,我们有一个需要 10 秒才能完成的函数future<> slow()
。我们在循环中运行 5 次,在两次调用之间等待 1 秒,并在每个调用周围加上进出大门(使用with_gate
)。在第 5 次调用之后,虽然所有调用仍在进行中(因为每个调用需要 10 秒才能完成),但我们关闭gate
并等待它退出程序。我们还通过在关闭门后一秒钟尝试再次进入门来测试无法在关闭门后开始新调用。
该程序的输出如下所示:
starting 1
starting 2
starting 3
starting 4
starting 5
WARNING: exceptional future ignored of type 'seastar::gate_closed_exception': gate closed
done 1
done 2
done 3
done 4
done 5
在这里, slow()
的调用以 1 秒的间隔开始。在 " starting 5" 消息之后,我们关闭了gate
,并且再次尝试使用它导致了seastar::gate_closed_exception
,我们忽略了它,因此出现了这条消息。此时应用程序等待由g.close()
返回的future
。 这将在所有slow()
调用完成后发生:打印“done 5”之后,测试程序立即停止。
正如到目前为止所解释的,gate
可以阻止对操作的新调用,并等待任何正在进行的操作完成。但是,这些进行中的操作可能需要很长时间才能完成。通常,长时间操作想知道已请求shut-down
,这样它可以提前停止其工作。一个操作可以通过调用gate
的方法check()
来检查它的gate
是否关闭:如果gate
已经关闭,check()
方法会抛出一个异常(与enter()
一样抛出seastar::gate_closed_exception
)。目的是异常将导致调用它的操作在此时停止。
在前面的示例代码中,我们有一个不间断的操作slow()
,它休眠了 10 秒。让我们将其替换为 10 个一秒睡眠的循环,每秒调用g.check()
一次:
seastar::future<> slow(int i, seastar::gate &g) {
std::cerr << "starting " << i << "\n";
return seastar::do_for_each(boost::counting_iterator<int>(0),
boost::counting_iterator<int>(10),
[&g] (int) {
g.check();
return seastar::sleep(std::chrono::seconds(1));
}).finally([i] {
std::cerr << "done " << i << "\n";
});
}
现在,gate
关闭后仅一秒钟(打印“开始 5”消息后),所有slow()
操作都通知gate
关闭,并停止。正如预期的那样,异常停止了do_for_each()
循环,并执行了finally()``continuation
,因此我们看到所有五个操作的“done”消息。