C++20
协程的http
库--cinatra
cinatra
是基于C++20
无栈协程实现的跨平台
,仅头
,高性能,易用的http/https
库(http1.1)
,包括httpserver
和httpclient
,功能完备,不仅支持最普通的getpost
等请求,还支持restfulapi,websocket,chunked,ranges,multipart
,静态文件服务和反向代理等功能.
后面会分别介绍这些功能,文末也专门附上benchmark
测试代码.
2
.基本http
请求2.1
.启动http
服务器#include
using namespace cinatra;
void start_server() {
coro_http_server server(/*`thread_num=`*/std::thread::hardware_concurrency(), 9001);
server.set_http_handler<GET>(
"/", [](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "ok"); //`IO`线程中的响应
});
server.set_http_handler<GET, POST>(
"/in_thread_pool",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
//在线程池中休息.
co_await coro_io::post([&] {
resp.set_status_and_content(status_type::ok, "ok in thread pool");
});
});
server.sync_start();
}
几行代码
即可创建一个http
服务,先设置httpserver
工作线程数和端口.然后设置http
服务的url
,httpmethod
和对应的处理函数
.
可在io
线程或线程池
中处理http
请求.
2.2
.client
发请求#include
using namespace cinatra;
async_simple::coro::Lazy<void> do_request() {
coro_http_client client{};
auto result = co_await client.async_get("http://127.0.0.1:9001/");
assert(result.status == 200);
assert(result.resp_body == "ok");
for (auto [key, val] : result.resp_headers) {
std::cout << key << ": " << val << "\n";
}
result = co_await client.async_get("/in_thread_pool");
assert(result.status == 200);
}
httpclient
异步请求服务器,返回结果
中包括服务端
响应的状态码
,httpcontent
及http
头,如果网络错误,可从result.net_err
中取错码和错误message
.
2.3
.restfulapi
coro_http_server server(/*`thread_num=`*/std::thread::hardware_concurrency(), 9001);
server.set_http_handler<cinatra::GET, cinatra::POST>(
"/test2/{}/test3/{}",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
co_await coro_io::post([&]() {
CHECK(req.matches_.str(1) == "name");
CHECK(req.matches_.str(2) == "test");
resp.set_status_and_content(cinatra::status_type::ok, "hello world");
});
co_return;
});
server.set_http_handler<cinatra::GET, cinatra::POST>(
R"(/numbers/(\d+)/test/(\d+))",
[](coro_http_request &req, coro_http_response &response) {
CHECK(req.matches_.str(1) == "100");
CHECK(req.matches_.str(2) == "200");
response.set_status_and_content(status_type::ok, "number regex ok");
});
server.set_http_handler<cinatra::GET, cinatra::POST>(
"/user/:id", [](coro_http_request &req, coro_http_response &response) {
CHECK(req.params_["id"] == "cinatra");
response.set_status_and_content(status_type::ok, "ok");
});
server.set_http_handler<cinatra::GET, cinatra::POST>(
"/user/:id/subscriptions",
[](coro_http_request &req, coro_http_response &response) {
CHECK(req.params_["id"] == "subid");
response.set_status_and_content(status_type::ok, "ok");
});
server.set_http_handler<cinatra::GET, cinatra::POST>(
"/values/:x/:y/:z",
[](coro_http_request &req, coro_http_response &response) {
CHECK(req.params_["x"] == "guilliman");
CHECK(req.params_["y"] == "cawl");
CHECK(req.params_["z"] == "yvraine");
response.set_status_and_content(status_type::ok, "ok");
});
server.async_start();
coro_http_client client;
client.get("http://127.0.0.1:9001/test2/name/test3/test");
client.get("http://127.0.0.1:9001/numbers/100/test/200");
client.get("http://127.0.0.1:9001/user/cinatra");
client.get("http://127.0.0.1:9001/user/subid/subscriptions");
client.get("http://127.0.0.1:9001/value/guilliman/cawl/yvraine");
https
访问#ifdef CINATRA_ENABLE_SSL
coro_http_client client{};
result = co_await client.async_get("https://www.taobao.com");
assert(result.status == 200);
#endif
访问https
网站时,确保已安装了openssl
并开启了ENABLE_SSL
.
websocket
cinatra::coro_http_server server(1, 9001);
server.set_http_handler<cinatra::GET>(
"/ws_echo",
[](cinatra::coro_http_request &req,
cinatra::coro_http_response &resp) -> async_simple::coro::Lazy<void> {
cinatra::websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
if (result.ec) {
break;
}
if (result.type == cinatra::ws_frame_type::WS_CLOSE_FRAME) {
REQUIRE(result.data == "test close");
break;
}
auto ec = co_await req.get_conn()->write_websocket(result.data);
if (ec) {
break;
}
}
});
server.sync_start();
在协程处理函数中,while
循环异步读写websocket
数据.
client
端: cinatra::coro_http_client client{};
std::string message(100, 'x');
client.on_ws_close([](std::string_view reason) {
std::cout << "web socket close " << reason << std::endl;
});
client.on_ws_msg([message](cinatra::resp_data data) {
if (data.net_err) {
std::cout << "ws_msg net error " << data.net_err.message() << "\n";
return;
}
std::cout << "ws msg len: " << data.resp_body.size() << std::endl;
REQUIRE(data.resp_body == message);
});
co_await client.async_ws_connect("ws://127.0.0.1:9001/ws_echo");
co_await client.async_send_ws(message);
co_await client.async_send_ws_close("test close");
client
设置读回调和close
回调分别处理收到的websocket
消息和websocketclose
消息.
4
.静态文件服务 std::string filename = "temp.txt";
create_file(filename, 64);
coro_http_server server(1, 9001);
std::string virtual_path = "download";
std::string files_root_path = ""; //当前路径
server.set_static_res_dir(
virtual_path,
files_root_path);//在服务器启动之前设置此项,如果添加新文件,则需要重启服务器.
server.async_start();
coro_http_client client{};
auto result =
co_await client.async_get("http://127.0.0.1:9001/download/temp.txt");
assert(result.status == 200);
assert(result.resp_body.size() == 64);
服务端
设置虚路径
和实际文件路径
,下载文件
时输入虚路径
和实际路径
下的文件名
即可实现下载
.
5
.反向代理假设有3个服务器
需要代理,代理服务器根据负载均衡
算法来选择其中的一个来访问
并把结果返回
给客户.
5.1
.先启动3个被代理的服务器 cinatra::coro_http_server web_one(1, 9001);
web_one.set_http_handler<cinatra::GET, cinatra::POST>(
"/",
[](coro_http_request &req,
coro_http_response &response) -> async_simple::coro::Lazy<void> {
co_await coro_io::post([&]() {
response.set_status_and_content(status_type::ok, "web1");
});
});
web_one.async_start();
cinatra::coro_http_server web_two(1, 9002);
web_two.set_http_handler<cinatra::GET, cinatra::POST>(
"/",
[](coro_http_request &req,
coro_http_response &response) -> async_simple::coro::Lazy<void> {
co_await coro_io::post([&]() {
response.set_status_and_content(status_type::ok, "web2");
});
});
web_two.async_start();
cinatra::coro_http_server web_three(1, 9003);
web_three.set_http_handler<cinatra::GET, cinatra::POST>(
"/", [](coro_http_request &req, coro_http_response &response) {
response.set_status_and_content(status_type::ok, "web3");
});
web_three.async_start();
5.2
.启动代理服务器roundrobin
策略的代理服务器: coro_http_server proxy_rr(2, 8091);
proxy_rr.set_http_proxy_handler<GET, POST>(
"/rr", {"127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"},
coro_io::load_blance_algorithm::RR);
proxy_rr.sync_start();
random
策略的代理服务器: coro_http_server proxy_random(2, 8092);
proxy_random.set_http_proxy_handler<GET, POST>(
"/random", {"127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"});
proxy_random.sync_start();
weightroundrobin
策略的代理服务器: coro_http_server proxy_wrr(2, 8090);
proxy_wrr.set_http_proxy_handler<GET, POST>(
"/wrr", {"127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"},
coro_io::load_blance_algorithm::WRR, {10, 5, 5});
proxy_wrr.sync_start();
5.3
.client
请求代理服务器 coro_http_client client_rr;
resp_data resp_rr = client_rr.get("http://127.0.0.1:8091/rr");
assert(resp_rr.resp_body == "web1");
resp_rr = client_rr.get("http://127.0.0.1:8091/rr");
assert(resp_rr.resp_body == "web2");
resp_rr = client_rr.get("http://127.0.0.1:8091/rr");
assert(resp_rr.resp_body == "web3");
coro_http_client client_wrr;
resp_data resp = client_wrr.get("http://127.0.0.1:8090/wrr");
assert(resp.resp_body == "web1");
resp = client_wrr.get("http://127.0.0.1:8090/wrr");
assert(resp.resp_body == "web1");
resp = client_wrr.get("http://127.0.0.1:8090/wrr");
assert(resp.resp_body == "web2");
resp = client_wrr.get("http://127.0.0.1:8090/wrr");
assert(resp.resp_body == "web3");
6
.增加切面6.1
.创建任意切面struct log_t {
bool before(coro_http_request &, coro_http_response &) {
std::cout << "before log" << std::endl;
return true;
}
bool after(coro_http_request &, coro_http_response &res) {
std::cout << "after log" << std::endl;
res.add_header("aaaa", "bbcc");
return true;
}
};
struct get_data {
bool before(coro_http_request &req, coro_http_response &res) {
req.set_aspect_data("hello world");
return true;
}
};
切面是实现了before
或after
函数的类.
6.2
.应用切面async_simple::coro::Lazy<void> use_aspects() {
coro_http_server server(1, 9001);
server.set_http_handler<GET>(
"/get",
[](coro_http_request &req, coro_http_response &resp) {
auto val = req.get_aspect_data();
assert(val[0] == "hello world");
resp.set_status_and_content(status_type::ok, "ok");
},
log_t{}, get_data{});//设置了两个切面,可按需设置任意个切面
server.async_start();
coro_http_client client{};
auto result = co_await client.async_get("http://127.0.0.1:9001/get");
assert(result.status == 200);
}
注册httphandler
时设置了两个切面
,该url
的处理会先进入
切面,切面返回true
,才会继续往下执行业务逻辑
,如果返回false
则不会执行后续逻辑,返回false
时,要在切面
中调用resp.set_status_and_content
设置状态码
和返回内容
.
7.chunked,ranges,multipart
7.1
.chunked
上传下载chunked
协议适合大文件上传和下载
:
async_simple::coro::Lazy<void> chunked_upload_download() {
coro_http_server server(1, 9001);
server.set_http_handler<GET, POST>(
"/chunked",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
assert(req.get_content_type() == content_type::chunked);
chunked_result result{};
std::string content;
while (true) {
result = co_await req.get_conn()->read_chunked();
if (result.ec) {
co_return;
}
if (result.eof) {
break;
}
content.append(result.data);
}
std::cout << "content size: " << content.size() << "\n";
std::cout << content << "\n";
resp.set_format_type(format_type::chunked);
resp.set_status_and_content(status_type::ok, "chunked ok");
});
server.set_http_handler<GET, POST>(
"/write_chunked",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
resp.set_format_type(format_type::chunked);
bool ok;
if (ok = co_await resp.get_conn()->begin_chunked(); !ok) {
co_return;
}
std::vector<std::string> vec{"hello", " world", " ok"};
for (auto &str : vec) {
if (ok = co_await resp.get_conn()->write_chunked(str); !ok) {
co_return;
}
}
ok = co_await resp.get_conn()->end_chunked();
});
server.sync_start();
result = co_await client.async_get("http://127.0.0.1:9001/write_chunked");
assert(result.status == 200);
assert(result.resp_body == "hello world ok");
}
clientchunked
上传文件coro_http_client client{};
std::string filename = "test.txt";
create_file(filename, 1010);
coro_io::coro_file file{};
co_await file.async_open(filename, coro_io::flags::read_only);
std::string buf;
detail::resize(buf, 100);
auto fn = [&file, &buf]() -> async_simple::coro::Lazy<read_result> {
auto [ec, size] = co_await file.async_read(buf.data(), buf.size());
co_return read_result{buf, file.eof(), ec};
};
auto result = co_await client.async_upload_chunked(
"http://127.0.0.1:9001/chunked"sv, http_method::POST, std::move(fn));
client
从读文件
到分块上传文件
整个过程都是异步
的.
clientchunked
下载文件: auto result = co_await client.async_get("http://127.0.0.1:9001/write_chunked");
assert(result.status == 200);
assert(result.resp_body == "hello world ok");
这样下载
到内存
中,也可下载
到文件
中.
auto result = co_await client.async_download(
"http://127.0.0.1:9001/write_chunked", "download.txt");
CHECK(std::filesystem::file_size("download.txt")==1010);
7.2
.ranges
下载async_simple::coro::Lazy<void> byte_ranges_download() {
create_file("test_multiple_range.txt", 64);
coro_http_server server(1, 8090);
server.set_static_res_dir("", "");
server.async_start();
std::this_thread::sleep_for(200ms);
std::string uri = "http://127.0.0.1:8090/test_multiple_range.txt";
{
std::string filename = "test1.txt";
std::error_code ec{};
std::filesystem::remove(filename, ec);
coro_http_client client{};
resp_data result = co_await client.async_download(uri, filename, "1-10");
assert(result.status == 206);
assert(std::filesystem::file_size(filename) == 10);
filename = "test2.txt";
std::filesystem::remove(filename, ec);
result = co_await client.async_download(uri, filename, "10-15");
assert(result.status == 206);
assert(std::filesystem::file_size(filename) == 6);
}
{
coro_http_client client{};
std::string uri = "http://127.0.0.1:8090/test_multiple_range.txt";
client.add_header("Range", "bytes=1-10,20-30");
auto result = co_await client.async_get(uri);
assert(result.status == 206);
assert(result.resp_body.size() == 21);
std::string filename = "test_ranges.txt";
client.add_header("Range", "bytes=0-10,21-30");
result = co_await client.async_download(uri, filename);
assert(result.status == 206);
assert(fs::file_size(filename) == 21);
}
}
7.3
.multipart
上传下载 coro_http_server server(1, 8090);
server.set_http_handler<cinatra::PUT, cinatra::POST>(
"/multipart_upload",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
assert(req.get_content_type() == content_type::multipart);
auto boundary = req.get_boundary();
multipart_reader_t multipart(req.get_conn());
while (true) {
auto part_head = co_await multipart.read_part_head();
if (part_head.ec) {
co_return;
}
std::cout << part_head.name << "\n";
std::cout << part_head.filename << "\n";
std::shared_ptr<coro_io::coro_file> file;
std::string filename;
if (!part_head.filename.empty()) {
file = std::make_shared<coro_io::coro_file>();
filename = std::to_string(
std::chrono::system_clock::now().time_since_epoch().count());
size_t pos = part_head.filename.rfind('.');
if (pos != std::string::npos) {
auto extent = part_head.filename.substr(pos);
filename += extent;
}
std::cout << filename << "\n";
co_await file->async_open(filename, coro_io::flags::create_write);
if (!file->is_open()) {
resp.set_status_and_content(status_type::internal_server_error, "file open failed");
co_return;
}
}
auto part_body = co_await multipart.read_part_body(boundary);
if (part_body.ec) {
co_return;
}
if (!filename.empty()) {
auto ec = co_await file->async_write(part_body.data.data(), part_body.data.size());
if (ec) {
co_return;
}
file->close();
CHECK(fs::file_size(filename) == 1024);
}
else {
std::cout << part_body.data << "\n";
}
if (part_body.eof) {
break;
}
}
resp.set_status_and_content(status_type::ok, "ok");
co_return;
});
server.async_start();
std::string filename = "test_1024.txt";
create_file(filename);
coro_http_client client{};
std::string uri = "http://127.0.0.1:8090/multipart_upload";
client.add_str_part("test", "test value");
client.add_file_part("test file", filename);
auto result =
async_simple::coro::syncAwait(client.async_upload_multipart(uri));
CHECK(result.status == 200);
8
.benchmarkcode
8.1. brpc http benchmark code
DEFINE_int32(port, 9001, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
class HttpServiceImpl : public HttpService {
public:
HttpServiceImpl() {}
virtual ~HttpServiceImpl() {}
void Echo(google::protobuf::RpcController *cntl_base, const HttpRequest *,
HttpResponse *, google::protobuf::Closure *done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller *cntl = static_cast<brpc::Controller *>(cntl_base);
std::string date_str{get_gmt_time_str()};
cntl->http_response().SetHeader("Date", date_str);
cntl->http_response().SetHeader("Server", "brpc");
cntl->http_response().set_content_type("text/plain");
butil::IOBufBuilder os;
os << "hello, world!";
os.move_to(cntl->response_attachment());
}
};
int main(int argc, char *argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
example::HttpServiceImpl http_svc;
if (server.AddService(&http_svc, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add http_svc";
return -1;
}
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start HttpServer";
return -1;
}
server.RunUntilAskedToQuit();
return 0;
}
8.2.drogonbenchmarkcode
#include
using namespace drogon;
int main() {
app()
.setLogPath("./")
.setLogLevel(trantor::Logger::kWarn)
.addListener("0.0.0.0", 9001)
.setThreadNum(0)
.registerSyncAdvice([](const HttpRequestPtr &req) -> HttpResponsePtr {
auto response = HttpResponse::newHttpResponse();
response->setBody("Hello, world!");
return response;
})
.run();
}
8.3
.nginxhttp
配置user nginx;
worker_processes auto;
worker_cpu_affinity auto;
error_log stderr error;
#worker_rlimit_nofile 1024000;
timer_resolution 1s;
daemon off;
events {
worker_connections 32768;
multi_accept off; #default
}
http {
include /etc/nginx/mime.types;
access_log off;
server_tokens off;
msie_padding off;
sendfile off; #default
tcp_nopush off; #default
tcp_nodelay on; #default
keepalive_timeout 65;
keepalive_disable none; #default msie6
keepalive_requests 300000; #default 100
server {
listen 9001 default_server reuseport deferred fastopen=4096;
root /;
location = /plaintext {
default_type text/plain;
return 200 "Hello, World!";
}
}
}
8.4
.cinatrabenchmarkcode
#include
using namespace cinatra;
using namespace std::chrono_literals;
int main() {
coro_http_server server(std::thread::hardware_concurrency(), 8090);
server.set_http_handler<GET>(
"/plaintext", [](coro_http_request& req, coro_http_response& resp) {
resp.get_conn()->set_multi_buf(false);
resp.set_content_type<resp_content_type::txt>();
resp.set_status_and_content(status_type::ok, "Hello, world!");
});
server.sync_start();
}