• 【翻译】Seastar 教程(四)


    教程翻译自Seastar官方文档:https://github.com/scylladb/seastar/blob/master/doc/tutorial.md
    转载请注明出处:https://www.cnblogs.com/morningli/p/15963859.html

    介绍 Seastar 的网络堆栈

    为了获得最佳性能,Seastar 的网络堆栈像 Seastar 应用程序一样被分片:每个分片(线程)负责连接的不同子集。每个传入的连接都指向其中一个线程,在建立连接后,它会继续在同一个线程上处理。

    在我们之前看到的示例中,main()只在第一个线程上运行了一次我们的函数f()。除非服务器使用"-c1"选项运行(仅一个线程),否则这将意味着任何到达不同线程的连接都不会被处理。因此,在下面的所有示例中,我们将需要在所有内核上运行相同的服务循环。我们可以使用smp::submit_to函数轻松做到这一点:

    seastar::future<> service_loop();
    
    seastar::future<> f() {
    	return seastar::parallel_for_each(boost::irange<unsigned>(0, seastar::smp::count),
    			[] (unsigned c) {
    		return seastar::smp::submit_to(c, service_loop);
    	});
    }
    

    在这里,我们要求每个 Seastar 内核(从 0 到smp::count-1)运行相同的函数service_loop()。这些调用中的每一个都会返回一个futuref()会在它们全部返回时返回(在下面的示例中,它们永远不会返回 —— 我们将在后面的部分中讨论关闭服务)。

    我们从一个用 Seastar 编写的 TCP 网络服务器的简单示例开始。此服务器反复接受 TCP 端口 1234 上的连接,并返回一个空响应:

    #include <seastar/core/seastar.hh>
    #include <seastar/core/reactor.hh>
    #include <seastar/core/future-util.hh>
    #include <seastar/net/api.hh>
    
    seastar::future<> service_loop() {
    	return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234})),
    			[] (auto& listener) {
    		return seastar::keep_doing([&listener] () {
    			return listener.accept().then(
    				[] (seastar::accept_result res) {
    					std::cout << "Accepted connection from " << res.remote_address << "\n";
    			});
    		});
    	});
    }
    

    此代码的工作原理如下:

    1. listen()调用创建了一个server_socket对象 ,listener,它侦听 TCP 端口 1234(在任何网络接口上)。
    2. 我们使用do_with()用来确保监听套接字在整个循环中都存在。
    3. 为了处理一个连接,我们调用listeneraccept()方法。该方法返回一个future<accept_result>,即最终通过来自客户端的传入 TCP 连接 ( accept_result.connection) 以及客户端的 IP 地址和端口 ( accept_result.remote_address) 进行解析。
    4. 为了反复接受新的连接,我们使用keep_doing()循环成语。keep_doing()一遍又一遍地运行它的 lambda 参数,一旦上一次迭代返回的future完成,就开始下一次迭代。只有遇到异常时迭代才会停止。仅当迭代停止时(即仅在异常情况下),keep_doing()返回的future才会完成。

    此服务器的输出类似于以下示例:

    $ ./a.out
    Accepted connection from 127.0.0.1:47578
    Accepted connection from 127.0.0.1:47582
    ...
    

    如果你在杀死之前的服务器后立即运行上面的示例服务器,它经常无法重新启动,会返回下面的错误:

    $ ./a.out
    program failed with uncaught exception: bind: Address already in use
    

    发生这种情况是因为默认情况下,如果使用该端口的旧连接有任何痕迹,Seastar 将拒绝重用本地端口。在我们这种愚蠢的服务器中,由于服务器是最先关闭连接的一方,每个连接在关闭后都会在“TIME_WAIT”状态下徘徊一段时间,这些都阻止了在同一个端口上listen()的成功。幸运的是,我们可以给listen指定一个选项来忽略这些存在着的TIME_WAIT。这个选项类似于socket(7)SO_REUSEADDR选项:

    seastar::listen_options lo;
    lo.reuse_address = true;
    return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
    

    大多数服务器将始终打开reuse_address监听选项。Stevens 的《Unix 网络编程》一书甚至说“所有 TCP 服务器都应指定此套接字选项以允许重新启动服务器”。因此,未来 Seastar 可能应该默认启用此选项 —— 即使出于历史原因,这不是 Linux 套接字 API 中的默认设置。

    让我们通过向每个连接输出一些预设响应来推进我们的示例服务器,而不是立即用空回复关闭每个连接。

    #include <seastar/core/seastar.hh>
    #include <seastar/core/reactor.hh>
    #include <seastar/core/future-util.hh>
    #include <seastar/net/api.hh>
    
    const char* canned_response = "Seastar is the future!\n";
    
    seastar::future<> service_loop() {
    	seastar::listen_options lo;
    	lo.reuse_address = true;
    	return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
    			[] (auto& listener) {
    		return seastar::keep_doing([&listener] () {
    			return listener.accept().then(
    					[] (seastar::accept_result res) {
    				auto s = std::move(res.connection);
    				auto out = s.output();
    				return seastar::do_with(std::move(s), std::move(out),
    						[] (auto& s, auto& out) {
    					return out.write(canned_response).then([&out] {
    						return out.close();
    					});
    				});
    			});
    		});
    	});
    }
    

    这段代码的新部分以connected_socket的output()开始,它返回一个output_stream<char>对象。在这个输出流out上,我们可以使用write()方法编写我们的响应。看似简单的write()操作,其实是一个复杂的后台异步操作,可能会导致根据需要发送、重传等多个数据包。write()返回一个future告诉我们什么时候可以再次write()到这个输出流;这并不一定保证远程对等方接收到我们发送给它的所有数据,但它保证输出流有足够的缓冲区空间(或者在 TCP 的情况下,TCP 拥塞窗口中有足够的空间)允许另一个写入开始。

    write()响应out之后,示例代码调用out.close()并等待它返回的future 。这是必要的,因为write()尝试批量写入,所以此时可能还没有向 TCP 堆栈写入任何内容,只有当 close() 结束时,我们才能确定我们写入输出流的所有数据实际上已经到达 TCP stack —— 只有在这一点上,我们才能最终处理outs对象。

    事实上,这个服务器返回了预期的响应:

    $ telnet localhost 1234
    ...
    Seastar is the future!
    Connection closed by foreign host.	
    

    在上面的例子中,我们只看到了对套接字的写入。真正的服务器也需要从套接字中读取。connected_socketinput()方法返回一个可用于从套接字读取的input_stream<char>对象。从此流中读取数据的最简单方法是使用read()方法,这个方法会返回一个future``temporary_buffer<char>,该方法包含从套接字读取的更多字节 —— 或远程端关闭连接时的空缓冲区。

    temporary_buffer<char>是一种用来传递仅临时需要的字节缓冲区(例如,在处理请求时)的方便且安全的方式。一旦该对象超出范围(通过正常返回或异常),它持有的内存就会自动释放。也可以通过std::move() 来转移缓冲区的所有权。我们将在后面的部分中更详细地讨论temporary_buffer

    让我们看一个涉及读取和写入的简单示例服务器。这是一个简单的回显服务器,如 RFC 862 中所述:服务器侦听来自客户端的连接,一旦建立连接,接收到的任何数据都会被简单地返回 —— 直到客户端关闭连接。

    #include <seastar/core/seastar.hh>
    #include <seastar/core/reactor.hh>
    #include <seastar/core/future-util.hh>
    #include <seastar/net/api.hh>
    
    seastar::future<> handle_connection(seastar::connected_socket s,
    									seastar::socket_address a) {
    	auto out = s.output();
    	auto in = s.input();
    	return do_with(std::move(s), std::move(out), std::move(in),
    			[] (auto& s, auto& out, auto& in) {
    		return seastar::repeat([&out, &in] {
    			return in.read().then([&out] (auto buf) {
    				if (buf) {
    					return out.write(std::move(buf)).then([&out] {
    						return out.flush();
    					}).then([] {
    						return seastar::stop_iteration::no;
    					});
    				} else {
    					return seastar::make_ready_future<seastar::stop_iteration>(
    							seastar::stop_iteration::yes);
    				}
    			});
    		}).then([&out] {
    			return out.close();
    		});
    	});
    }
    
    seastar::future<> service_loop_3() {
    	seastar::listen_options lo;
    	lo.reuse_address = true;
    	return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
    			[] (auto& listener) {
    		return seastar::keep_doing([&listener] () {
    			return listener.accept().then(
    					[] (seastar::accept_result res) {
    				// Note we ignore, not return, the future returned by
    				// handle_connection(), so we do not wait for one
    				// connection to be handled before accepting the next one.
    				(void)handle_connection(std::move(res.connection), std::move(res.remote_address)).handle_exception(
    						[] (std::exception_ptr ep) {
    					fmt::print(stderr, "Could not handle connection: {}\n", ep);
    				});
    			});
    		});
    	});
    }
    

    主函数service_loop()循环接受新的连接,并为每个连接调用handle_connection()来处理这个连接。当处理这个连接完成时,我们handle_connection()返回一个future说明这个连接什么时候处理完成,但重要的是,我们不等待这个future:记住,keep_doing只有当前一个迭代返回的future被解决时,才会开始下一个迭代。因为我们希望允许并行正在进行的连接,我们不希望下一个accept()等到之前接受的连接关闭。所以我们调用handle_connection()来开始处理连接,但没有从continuation中返回任何东西,这会立即解决这个future,所以keep_doing将继续下一个accept()

    这展示了在 Seastar 中运行并行fibercontinuation链)是多么容易—— 当continuation运行异步函数但忽略它返回的future时,异步操作将并行继续,但从不等待。

    默默地忽略异常通常是错误的,所以如果我们忽略的future可能会用异常解决,建议处理这种情况,例如使用handle_exception()``continuation。在我们的例子中,一个失败的连接是没问题的(例如,客户端可能会关闭它的连接,我们会发送它输出),所以我们没有费心去处理这个异常。

    handle_connection()函数本身很简单 —— 它在输入流上反复调用 read(),以接收带有一些数据的temporary_buffer,然后将此临时缓冲区move到对输出流的write()调用中。缓冲区最终将在write()完成后自动释放。当read()最终返回一个表示输入结束的空缓冲区时,我们repeat通过返回stop_iteration::yes来停止迭代。

    分片服务(sharded services)

    在上一节中,我们看到 Seastar 应用程序通常需要在所有可用的 CPU 内核上运行其代码。我们看到seastar::smp::submit_to()函数允许最初仅在第一个内核(core)上运行的main函数在所有seastar::smp::count个内核上启动服务器的代码。

    但是,通常不仅需要在每个内核上运行代码,还需要有一个包含该代码状态的对象。此外,人们可能喜欢与那些不同的对象进行交互,并且还具有一种机制来停止在不同内核上运行的服务。

    seastar::sharded<T>模板提供了一种结构化的方式来创建这样的sharded service。它在每个核心中创建一个单独的T类型的对象,并提供与这些副本交互的机制,在每个核心上启动一些代码,最后彻底停止服务。

    要使用seastar::sharded,首先要为在单核上保存服务状态的对象创建一个类。例如:

    #include <seastar/core/future.hh>
    #include <iostream>
    
    class my_service {
    public:
    	std::string _str;
    	my_service(const std::string& str) : _str(str) { }
    	seastar::future<> run() {
    		std::cerr << "running on " << seastar::engine().cpu_id() <<
    			", _str = " << _str << \n";
    		return seastar::make_ready_future<>();
    	}
    	seastar::future<> stop() {
    		return seastar::make_ready_future<>();
    	}
    };
    

    该对象中唯一必须要实现的方法是stop(),当我们想要停止分片服务并希望等到它在所有核心上停止时,它将在每个核心中调用。

    现在让我们看看如何使用它:

    #include <seastar/core/sharded.hh>
    
    seastar::sharded<my_service> s;
    
    seastar::future<> f() {
    	return s.start(std::string("hello")).then([] {
    		return s.invoke_on_all([] (my_service& local_service) {
    			return local_service.run();
    		});
    	}).then([] {
    		return s.stop();
    	});
    }
    

    s.start()通过在每个核心上创建一个my_service对象来启动服务。s.start()的参数,如果有的话(在这个例子中,std::string("hello")),被传递给my_service的构造函数。

    s.start()还没有开始运行任何代码(除了对象的构造函数)。为此,我们有在所有内核上运行给定的 lambda 的s.invoke_on_all() —— 为每个 lambda 提供该内核上的本地对象my_service。在这个例子中,我们对每个对象都有一个run()方法,所以我们运行它。

    最后,在运行结束时,我们想让所有核心上的服务有机会干净地关闭,所以我们调用s.stop()。这将调用每个核心对象的``方法,并等待它们全部完成。s销毁前调用s.stop()是强制性的 —— 如果您忘记这样做,Seastar 会警告您。

    除了在所有分片上运行相同的代码的invoke_on_all()之外,分片服务通常需要的另一个功能是在一个分片上调用另一个特定分片的代码。这是通过调用分片服务的invoke_on()方法来完成的。例如:

    seastar::sharded<my_service> s;
    ...
    return s.invoke_on(0, [] (my_service& local_service) {
    	std::cerr << "invoked on " << seastar::engine().cpu_id() <<
    		", _str = " << local_service._str << "\n";
    });
    

    这将在分片 0 上运行 lambda 函数,并引用该分片上的本地my_service对象。

    命令行选项

    标准 Seastar 命令行选项

    所有 Seastar 应用程序都接受一组标准的命令行参数,例如我们在上面已经看到的那些:-c用于控制使用的线程数的选项,或-m用于确定分配给应用程序的内存量的选项。

    每个 Seastar 应用程序还接受-h(or --help) 选项,它列出并解释了所有可用选项 —— 标准 Seastar 选项和用户定义选项,如下所述。

    用户定义的命令行选项

    Seastar 在传递给app_template::run()时解析命令行选项 (argv[]) ,寻找自己的标准选项。因此,不建议应用程序尝试自行解析argv[],因为应用程序可能无法理解某些标准 Seastar 选项并且无法正确跳过它们。

    相反,想要拥有自己的命令行选项的应用程序应该告诉 Seastar 的命令行解析器这些额外的应用程序特定选项,并要求 Seastar 的命令行解析器也识别它们。Seastar 的命令行解析器实际上是 Boost 库的boost::program_options. 应用程序通过使用app_templateadd_options()add_positional_options()方法添加自己的选项来定义选项,然后调用configuration()以检索这些选项的设置。例如,

    #include <iostream>
    #include <seastar/core/app-template.hh>
    #include <seastar/core/reactor.hh>
    int main(int argc, char** argv) {
    	seastar::app_template app;
    	namespace bpo = boost::program_options;
    	app.add_options()
    		("flag", "some optional flag")
    		("size,s", bpo::value<int>()->default_value(100), "size")
    		;
    	app.add_positional_options({
    	   { "filename", bpo::value<std::vector<seastar::sstring>>()->default_value({}),
    		 "sstable files to verify", -1}
    	});
    	app.run(argc, argv, [&app] {
    		auto& args = app.configuration();
    		if (args.count("flag")) {
    			std::cout << "Flag is on\n";
    		}
    		std::cout << "Size is " << args["size"].as<int>() << "\n";
    		auto& filenames = args["filename"].as<std::vector<seastar::sstring>>();
    		for (auto&& fn : filenames) {
    			std::cout << fn << "\n";
    		}
    		return seastar::make_ready_future<>();
    	});
    	return 0;
    }
    

    在这个例子中,我们通过add_options()添加两个特定于应用程序的选项:--flag是一个不带任何附加参数的可选参数,--size(或-s)采用一个整数值,默认(如果缺少此选项)为 100。此外,我们通过add_positional_options()询问不以“ -”开头的无限数量的参数——所谓的positional arguments——被收集到“filename”选项下的字符串向量中。该程序的一些示例输出:

    $ ./a.out
    Size is 100
    $ ./a.out --flag
    Flag is on
    Size is 100
    $ ./a.out --flag -s 3
    Flag is on
    Size is 3
    $ ./a.out --size 3 hello hi
    Size is 3
    hello
    hi
    $ ./a.out --filename hello --size 3 hi
    Size is 3
    hello
    hi
    

    boost::program_options具有更强大的功能,例如所需选项,选项检查和组合,各种选项类型等等。请参阅 Boost 的文档以获取更多信息。

    promise 对象

    正如我们在上面已经定义的那样,异步函数(asynchronous function),也称为promise,是一个返回未来并安排这个未来最终被解决的函数。正如我们已经看到的,一个异步函数通常是根据其他异步函数来编写的,例如我们看到了等待现有异步函数sleep()完成,然后返回 3 的函数slow()

    seastar::future<int> slow() {
    	using namespace std::chrono_literals;
    	return seastar::sleep(100ms).then([] { return 3; });
    }
    

    编写promise的最基本构建块是promise对象,它是一个promise<T>类型的对象。promise<T>有一个返回future的方法future<T> get_future()和一个来解决这个future的方法set_value(T)。一个异步函数可以创建一个promise对象,返回它的future,以及最终调用set_value方法——这将最终解决它返回的future

    Seastar 中的内存分配

    每线程内存分配

    Seastar 要求对应用程序进行分片,即运行在不同线程上的代码对内存中的不同对象进行操作。我们已经在 [Seastar memory] ​​中看到 Seastar 如何接管给定数量的内存(通常是机器的大部分内存)并将其平均分配给不同的线程。现代多插槽机器具有非统一内存访问(NUMA),这意味着内存的某些部分更接近某些内核,Seastar 在线程之间划分内存时会考虑到这一点。目前,线程之间的内存分配是静态的,并且是相等的——线程预计会经历大致相等的负载量并且需要大致相等的内存量。

    为了实现这种按线程分配,Seastar 重新定义了 C 库函数malloc()free()和它们的众多相关函数 --- calloc()realloc()posix_memalign()memalign()malloc_usable_size()malloc_trim()。它还重新定义了 C++ 内存分配函数 、operator new及其operator delete所有变体(包括数组版本、C++14 delete要求size,以及 C++17 变体要求所需的对齐方式)。

    重要的是要记住 Seastar 的不同线程可以看到其他线程分配的内存,但强烈建议不要实际这样做。在现代多核机器上的线程之间共享数据对象会导致锁定、内存屏障和高速缓存行弹跳导致严重的性能损失。相反,Seastar 鼓励应用程序尽可能避免在线程之间共享对象(通过分片——每个线程拥有对象的一个​​子集),当线程确实需要交互时,它们会使用submit_to()通过显式消息传递来进行交互,正如我们之后将看到的那样。

    外来指针

    在一个线程上分配的对象将归该线程所有,最终应由同一线程释放。强烈建议不要在错误的线程上释放内存,但目前为了支持 Seastar 无法控制的库代码,这是支持的(尽管速度很慢)。比如std::exception_ptr分配内存;因此,如果我们在远程线程上调用异步操作并且该操作返回异常,则当我们释放返回的std::exception_ptr时,这将发生在“错误”的核心上。所以 Seastar 允许这样做,但效率低下。

    在大多数情况下,对象应该将其整个生命周期都花在一个线程上,并且只能由该线程使用。但是在某些情况下,我们希望将在一个线程上开始其生命的对象的所有权重新分配给另一个线程。这可以使用seastar::foreign_ptr<>。 指向对象的指针或智能指针包含在seastar::foreign_ptr<P>。然后可以将该包装器移动到在不同线程中运行的代码中(例如,使用submit_to())。

    最常见的用例是seastar::foreign_ptr<std::unique_ptr<T>>。接收到foreign_ptr的线程将获得该对象的独占使用权,当它销毁这个包装器时,它会回到原来的线程销毁该对象。请注意,该对象不仅在原始分片上被释放 - 它还在那里被销毁(即,它的析构函数运行)。当对象的析构函数需要访问属于原始分片的其他状态时,这通常很重要 - 例如,将自身与容器取消链接。

    虽然foreign_ptr确保对象的析构函数自动在对象的主线程上运行,但它并不能免除用户担心在何处运行对象的其他方法的麻烦。一些简单的方法,例如,从对象的字段中读取的方法,可以在接收线程上运行。但是,其他方法可能需要访问对象的主分片所拥有的其他数据,或者需要防止并发操作。即使我们确定该对象现在仅由接收线程使用,这些方法仍必须在主线程上显式运行:

    	// fp is some foreign_ptr<>
    	return smp::submit_to(fp.get_owner_shard(), [p=fp.get()]
    		{ return p->some_method(); });
    

    所以seastar::foreign_ptr<>不仅有功能上的好处(即在主分片上运行析构函数),它还有文档上的好处——它警告程序员每次使用对象时都要小心,这是一个外部指针,如果我们想要要对指向的对象做任何重要的事情,我们可能需要在home shard上做。

    上面,我们讨论了通过seastar::foreign_ptr<std::unique_ptr<T>> 将对象的所有权转移到另一个分片的情况。但是,有时发送者不想放弃对象的所有权。有时,它希望远程线程对其对象进行操作,并返回完整的对象。有时,它想将同一个对象发送到多个分片。在这种情况下,可以使用seastar::foreign_ptr<seastar::lw_shared_ptr<T>>。使用者当然也要小心,不要从多个线程并行操作同一个对象。如果这不能通过程序逻辑来保证,必须使用一些串行化的方法——比如在home shard使用上述的 submit_to()来运行这些操作。

    通常, seastar::foreign_ptr不能被复制 - 只能move。但是,当它拥有一个可以复制的智能指针(即shared_ptr)时,可能需要制作该指针的额外副本并创建第二个foreign_ptr。这样做是低效且异步的(它需要与对象的原始所有者通信以创建副本),因此需要显式使用方法future<foreign_ptr> copy()而不是普通的复制构造函数。

    Seastar::thread

    Seastar 使用futurecontinuation的编程模型是非常强大和高效的。然而,正如我们在上面的示例中已经看到的那样,它也相对冗长:每次在进行计算之前我们需要等待,我们需要编写另一个continuation。我们还需要担心在不同continuation之间传递数据(使用[生命周期管理]部分中描述的技术)。简单的流控制结构(如循环)也更多地使用continuation。例如,考虑这个简单的经典同步代码:

    std::cout << "Hi.\n";
    for (int i = 1; i < 4; i++) {
        sleep(1);
        std::cout << i << "\n";
    }
    

    在 Seastar 中,使用futurecontinuation,我们需要这样写:

    std::cout << "Hi.\n";
    return seastar::do_for_each(boost::counting_iterator<int>(1),
        boost::counting_iterator<int>(4), [] (int i) {
        return seastar::sleep(std::chrono::seconds(1)).then([i] {
            std::cout << i << "\n";
        });
    });	
    

    但是 Seastar 还允许通过 seastar::thread编写看起来更像同步代码的代码。seastar::thread·提供了一个允许阻塞的执行环境;您可以发出一个异步函数,并在同一个函数中等待它,而不是建立一个要调用的回调future<>::then():

    seastar::thread th([] {
        std::cout << "Hi.\n";
        for (int i = 1; i < 4; i++) {
            seastar::sleep(std::chrono::seconds(1)).get();
            std::cout << i << "\n";
        }
    });
    

    seastar::thread不是一个单独的操作系统线程。它仍然使用在 Seastar 的单线程(每个核心)上安排的continuation。它的工作原理如下:

    seastar::thread分配一个 128KB的堆栈,并运行给定的函数,直到它在future的get()方法的调用上阻塞。在seastar::thread上下文之外,get()只能在已经可用的未来上调用。但是在线程内部,调用一个还不可用的future上的get()会停止运行线程函数,并为这个future安排一个continuation,当future可用时继续运行线程的函数(在同一个已保存的堆栈上)。

    就像正常的 Seastar·continuation一样,seastar::threads总是在启动它们的同一内核上运行。它们也是合作的:它们永远不会被抢占,除非seastar::future::get()阻塞或显式调用seastar::thread::yield().

    值得重申的是,seastar::thread不是 POSIX 线程,它只能阻塞 Seastar future,不能阻塞系统调用。上面的例子使用seastar::sleep()了 ,而不是sleep()系统调用。seastar::thread的函数可以正常抛出和捕获异常。请记住,如果future以异常解决,get()将抛出异常。

    seastar::future::get()之外,我们还可以使用seastar::future::wait()等待而不获取future的结果。当您想要避免在future失败时抛出异常时,这有时会很有用(就像get()这样)。例如:

    future<char> getchar();
    int try_getchar() noexcept { // run this in seastar::thread context
        future fut = get_char();
        fut.wait();
        if (fut.failed()) {
            return -1;
        } else {
            // Here we already know that get() will return immediately,
            // and will not throw.
            return fut.get();
        }
    }
    

    开始和结束 seastar::thread

    创建seastar::thread对象后,我们需要使用它的方法join()等到它结束。我们还需要让该对象保持活动状态直到join()完成。一个seastar::thread的完整的使用示例如下所示:

    #include <seastar/core/sleep.hh>
    #include <seastar/core/thread.hh>
    seastar::future<> f() {
    	seastar::thread th([] {
    		std::cout << "Hi.\n";
    		for (int i = 1; i < 4; i++) {
    			seastar::sleep(std::chrono::seconds(1)).get();
    			std::cout << i << "\n";
    		}
    	});
    	return do_with(std::move(th), [] (auto& th) {
    		return th.join();
    	});
    }
    

    seastar::async()函数提供了一个方便的快捷方式来创建seastar::thread和返回一个future,它在线程完成时解决:

    #include <seastar/core/sleep.hh>
    #include <seastar/core/thread.hh>
    seastar::future<> f() {
    	return seastar::async([] {
    		std::cout << "Hi.\n";
    		for (int i = 1; i < 4; i++) {
    			seastar::sleep(std::chrono::seconds(1)).get();
    			std::cout << i << "\n";
    		}
    	});
    }
    

    seastar::async()的 lambda 可能会返回一个值,并在seastar::async()完成时返回它。例如:

    seastar::future<seastar::sstring> read_file(sstring file_name) {
    	return seastar::async([file_name] () {  // lambda executed in a thread
    		file f = seastar::open_file_dma(file_name).get0();  // get0() call "blocks"
    		auto buf = f.dma_read(0, 512).get0();  // "block" again
    		return seastar::sstring(buf.get(), buf.size());
    	});
    };
    

    虽然seastar::threadsseastar::async()使编程更方便,但它们也增加了直接使用延续编程之外的开销。最值得注意的是,每个seastar::thread堆栈都需要额外的内存。因此,使用 seastar::thread来处理高度并发的操作并不是一个好主意。例如,如果您需要处理 10,000 个并发请求,请不要使用seastar::thread来处理每个 --- 使用 futurecontinuation。但是,如果您正在编写您知道只有少数实例会同时运行的代码,例如,在您的应用程序中进行后台清理操作,那么这seastar::thread是一个很好的匹配。seastar::thread也适用于不关心性能的代码——例如测试代码。

    应用程序组件的隔离

    Seastar 让多任务处理变得非常简单——就像运行异步函数一样简单。因此,服务器很容易并行执行许多不相关的事情。例如,一个服务器可能正在响应 100 个用户的请求,同时也在进行一些长时间的后台操作。

    但是在上面的例子中,后台操作会得到多少百分比的 CPU 和磁盘吞吐量呢?用户的其中一个请求可以被后台操作延迟多长时间?如果没有我们在本节中描述的机制,就无法可靠地回答这些问题:

    • 后台操作可能是一个非常“周到”的单个fiber,即运行一个很短的continuation ,然后安排下一个continuation稍后运行。在每一点,调度程序都会看到 100 个请求处理continuation ,并且只有一个准备好运行的后台延续。后台任务获得大约 1% 的 CPU 时间,用户的请求几乎没有延迟。
    • 另一方面,后台操作可能会并行生成 1,000 个fiber,并且每次都有 1,000 个准备运行的continuation。后台操作将获得大约 90% 的运行时间,而处理用户请求的 continuation 可能会卡在 1,000 个这些后台continuation之后,并经历巨大的延迟。

    复杂的 Seastar 应用程序通常具有不同的组件,这些组件并行运行并具有不同的性能目标。在上面的例子中,我们看到了两个组件——用户请求和后台操作。我们在本节中描述的机制的第一个目标是将每个组件的性能与其他组件隔离开来;换句话说,一个组件的吞吐量和延迟不应该取决于另一个组件做出的决定——例如,它并行运行多少个continuation。第二个目标是允许应用程序控制这种隔离,例如,在上面的示例中,允许应用程序显式控制后台操作接收的 CPU 数量,以便它以所需的速度完成。

    在上面的示例中,我们使用 CPU 时间作为不同组件需要有效共享的有限资源。正如我们稍后将展示的,另一个重要的共享资源是磁盘 I/O。

    调度组(CPU 调度程序)

    考虑下面的异步函数loop(),它循环直到某个共享变量stop变为真。它保留迭代次数counter直到停止,并在最终停止时返回此计数器。

    seastar::future<long> loop(int parallelism, bool& stop) {
    	return seastar::do_with(0L, [parallelism, &stop] (long& counter) {
    		return seastar::parallel_for_each(boost::irange<unsigned>(0, parallelism),
    			[&stop, &counter]  (unsigned c) {
    				return seastar::do_until([&stop] { return stop; }, [&counter] {
    					++counter;
    					return seastar::make_ready_future<>();
    				});
    			}).then([&counter] { return counter; });
    	});
    }
    

    parallelism参数决定了计数操作的并行性:parallelism=1意味着我们只有一个循环递增计数器;parallelism=10意味着我们并行启动 10 个循环,所有循环都递增相同的计数器。

    如果我们并行启动两个loop()调用并让它们运行 10 秒会发生什么?

    seastar::future<> f() {
    	return seastar::do_with(false, [] (bool& stop) {
    		seastar::sleep(std::chrono::seconds(10)).then([&stop] {
    			stop = true;
    		});
    		return seastar::when_all_succeed(loop(1, stop), loop(1, stop)).then(
    			[] (long n1, long n2) {
    				std::cout << "Counters: " << n1 << ", " << n2 << "\n";
    			});
    	});
    }
    

    事实证明,如果这两个loop()调用具有相同的并行度1,它们的工作量大致相同:

    Counters: 3'559'635'758, 3'254'521'376
    

    但是,例如,如果我们将 loop(1)loop(10) 并行运行,则结果是loop(10)完成了10 倍的工作:

    Counters: 629'482'397, 6'320'167'297
    

    为什么 loop(1) 可以在 10 秒内完成的工作量取决于其竞争对手选择的并行度,我们如何解决这个问题?

    发生这种情况的原因如下:当一个future解析并且一个continuation 链接到它时,这个continuation就可以运行了。默认情况下,Seastar 的调度程序保留一个准备运行的continuation列表(当然在每个分片中),并按照它们准备好运行的相同顺序运行continuation。在上面的例子中,loop(1)总是有一个准备运行的continuation,但是loop(10),并行运行 10 个循环,总是有十个准备运行的continuation。因此,对于loop(1)的每一个continuation,Seastar 的默认调度程序将运行loop(10)的10个continuation,这就是loop(10)完成 10倍的工作的原因。

    为了解决这个问题,Seastar 允许应用程序定义称为调度组的单独组件,每个组件都有一个单独的准备运行延续列表。每个调度组都可以在所需的 CPU 时间百分比上运行自己的continuation,但是一个调度组中可运行的continuation的数量不会影响另一个调度组获得的CPU量。让我们看看这是如何完成的:

    调度组由scheduling_group类型的值定义。这个值是不透明的,但在内部它是一个小整数(类似于 Linux 中的进程 ID)。我们使用seastar::with_scheduling_group()函数在所需的调度组中运行代码:

    seastar::future<long>
    loop_in_sg(int parallelism, bool& stop, seastar::scheduling_group sg) {
    	return seastar::with_scheduling_group(sg, [parallelism, &stop] {
    		return loop(parallelism, stop);
    	});
    }
    

    现在让我们创建两个调度组,在第一个调度组运行loop(1)和第二个调度组中运行loop(10)

    seastar::future<> f() {
    	return seastar::when_all_succeed(
    			seastar::create_scheduling_group("loop1", 100),
    			seastar::create_scheduling_group("loop2", 100)).then(
    		[] (seastar::scheduling_group sg1, seastar::scheduling_group sg2) {
    		return seastar::do_with(false, [sg1, sg2] (bool& stop) {
    			seastar::sleep(std::chrono::seconds(10)).then([&stop] {
    				stop = true;
    			});
    			return seastar::when_all_succeed(loop_in_sg(1, stop, sg1), loop_in_sg(10, stop, sg2)).then(
    				[] (long n1, long n2) {
    					std::cout << "Counters: " << n1 << ", " << n2 << "\n";
    				});
    		});
    	});
    }
    

    在这里,我们创建了两个调度组,sg1sg2. 每个调度组都有一个名称(仅用于诊断目的)和份额(shares)数,通常介于 1 和 1000 之间:如果一个调度组的份额数是第二个调度组的两倍,它将得到CPU 时间的两倍。在此示例中,我们为两个组使用了相同数量的份额 (100),因此它们应该获得相同的 CPU 时间。

    与 Seastar 中的大多数对象每个分片都是独立的不同,Seastar 希望调度组的标识和编号在所有分片上都相同,因为在远程分片上调用任务时这很重要。因此,创建调度组的函数seastar::create_scheduling_group()是一个返回future<scheduling_group>的异步函数。

    运行上面的示例,两个调度组都设置了相同数量的份额(100),确实会导致两个调度组获得相同数量的 CPU 时间:

    Counters: 3'353'900'256, 3'350'871'461
    

    注意现在两个循环如何完成相同数量的工作——尽管一个循环的并行度是第二个循环的 10 倍。

    如果我们将第二个调度组的定义更改为拥有 200 个份额,是第一个调度组份额数的两倍,我们将看到第二个调度组获得两倍的 CPU 时间:

    Counters: 2'273'783'385, 4'549'995'716
  • 相关阅读:
    Python学习之四 数据输入与输出
    Qt使用Cmake编译时自定义控件找不到的问题
    Java类加载机制
    Spring源码篇(十一)注册bean的方式
    纯js实现录屏并保存视频到本地的尝试
    链式二叉树(1)
    【Python】-- 集合的常用方法
    Java项目:SSM的KTV管理系统
    Fashion MNIST与分类算法
    MATLAB——RBF、GRNN和PNN神经网络案例参考程序
  • 原文地址:https://www.cnblogs.com/morningli/p/15963859.html