Argobots是一个轻量级的低级线程和任务框架。
一、安装
- #tar xzf argobots.tar.gz
- #cd argobots
- #./autogen.sh
-
- //配置
- #./configure --prefix=/home/USERNAME/argobots-install 2>&1 | tee c.txt
-
- //--prefix=/home/USERNAME/argobots-install 是安装路径
-
- //构建
- #make 2>&1 | tee m.txt
-
- //如果出现问题,请执行make clean,然后在V=1时再次运行make。
- #make V=1 2>&1 | tee m.txt
-
- //安装
- #make install 2>&1 | tee mi.txt
(1)可以将Argobot测试套件打包到ArgobotsDistribution中,然后执行:
make check
(2)还可以在示例目录中运行Argobots示例:
make check
(3)可以使用下列命令查看配置选项
./configure --help
对于性能测试,建议使用以下标志:
./configure --enable-perf-opt --enable-affinity --disable-checks
对于调试,建议使用以下标志:
./configure --enable-fast=O0 --enable-debug=most
二、使用实例1(普通用法)
- #include
- #include
- #include
- #include
- #include
-
- #define DEFAULT_NUM_XSTREAMS 2
- #define DEFAULT_NUM_THREADS 8
-
- typedef struct {
- int tid;
- } thread_arg_t;
-
- void hello_world(void *arg)
- {
- int tid = ((thread_arg_t *)arg)->tid;
- printf("Hello world! (thread = %d)\n", tid);
- }
-
- int main(int argc, char **argv)
- {
- int i;
- /* Read arguments. */
- int num_xstreams = DEFAULT_NUM_XSTREAMS;
- int num_threads = DEFAULT_NUM_THREADS;
- while (1) {
- int opt = getopt(argc, argv, "he:n:");
- if (opt == -1)
- break;
- switch (opt) {
- case 'e':
- num_xstreams = atoi(optarg);
- break;
- case 'n':
- num_threads = atoi(optarg);
- break;
- case 'h':
- default:
- printf("Usage: ./hello_world [-e NUM_XSTREAMS] "
- "[-n NUM_THREADS]\n");
- return -1;
- }
- }
- if (num_xstreams <= 0)
- num_xstreams = 1;
- if (num_threads <= 0)
- num_threads = 1;
-
- /* Allocate memory. */
- ABT_xstream *xstreams =
- (ABT_xstream *)malloc(sizeof(ABT_xstream) * num_xstreams);
- ABT_pool *pools = (ABT_pool *)malloc(sizeof(ABT_pool) * num_xstreams);
- ABT_thread *threads =
- (ABT_thread *)malloc(sizeof(ABT_thread) * num_threads);
- thread_arg_t *thread_args =
- (thread_arg_t *)malloc(sizeof(thread_arg_t) * num_threads);
-
- /* Initialize Argobots. */
- ABT_init(argc, argv);
-
- /* Get a primary execution stream. */
- ABT_xstream_self(&xstreams[0]);
-
- /* Create secondary execution streams. */
- for (i = 1; i < num_xstreams; i++) {
- ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]);
- }
-
- /* Get default pools. */
- for (i = 0; i < num_xstreams; i++) {
- ABT_xstream_get_main_pools(xstreams[i], 1, &pools[i]);
- }
-
- /* Create ULTs. */
- for (i = 0; i < num_threads; i++) {
- int pool_id = i % num_xstreams;
- thread_args[i].tid = i;
- ABT_thread_create(pools[pool_id], hello_world, &thread_args[i],
- ABT_THREAD_ATTR_NULL, &threads[i]);
- }
-
- /* Join and free ULTs. */
- for (i = 0; i < num_threads; i++) {
- ABT_thread_free(&threads[i]);
- }
-
- /* Join and free secondary execution streams. */
- for (i = 1; i < num_xstreams; i++) {
- ABT_xstream_join(xstreams[i]);
- ABT_xstream_free(&xstreams[i]);
- }
-
- /* Finalize Argobots. */
- ABT_finalize();
-
- /* Free allocated memory. */
- free(xstreams);
- free(pools);
- free(threads);
- free(thread_args);
-
- return 0;
- }
使用实例2(带调度器和绑核)
- //入口函数
- int dispatch_request(crt_rpc_t* rpc) {
- if (!g_umds_context->dispatcher.is_inited()) {
- derr << "dispatcher is not inited" << dendl;
- return -ERR_UMD_NOT_INITIAL;
- }
-
- //生成请求req
- req* request = new req(rpc);
- //获取xstream执行流
- XstreamInstance* xstream = g_umds_context->dispatcher.get_handler(request);
-
-
- request->xstream = xstream;
- request->cli_req_id = *((req_id_t*)rpc->cr_input);
- //挂载执行函数
- ABT_thread_create(request->xstream->task_pool, handle_request, request,
- ABT_THREAD_ATTR_NULL, NULL);
- return UMD_SUCCESS;
- }
-
- XstreamInstance::XstreamInstance(SrvDispatcher* _dispatcher, int index) {
- id = index;
- total = 0;
- dispatcher = _dispatcher;
- step = 0;
- //获取cpu id
- int cpu_index = index % dispatcher->cpus.size();
- //为绑核做准备
- cpu = dispatcher->cpus[cpu_index];
- pthread_spin_init(&data_lock, PTHREAD_PROCESS_PRIVATE);
- }
-
- int SrvDispatcher::init() {
- int ret = 0;
- XstreamInstance* tmp_xstream;
-
- // init argobot
- ret = ABT_init(0, NULL);
- if (ret != ABT_SUCCESS) {
- derr << "ABT init failed: " << ret << dendl;
- goto out_abt_init;
- }
- dout(INFO) << "abt_init success" << dendl;
-
- // create xstream
- for (uint32_t i =0; i
_conf->xstream_num; i++) { - tmp_xstream = new XstreamInstance(this, i);
- //xstream初始化,里面有创建pool,创建调度器、创建xstream
- ret = tmp_xstream->init();
- if (ret == UMD_SUCCESS) {
- xstreams.push_back(tmp_xstream);
- } else {
- derr << "init xstream failed: " << get_err_desc(ret) << dendl;
- delete tmp_xstream;
- goto out_start_xstream;
- }
- }
- dout(INFO) << "xstreams init success" << dendl;
-
- pthread_rwlock_init(&xstream_lock, NULL);
- is_init = true;
- return UMD_SUCCESS;
- }
-
- int XstreamInstance::init_scheduler() {
- ABT_sched_config config;
- ABT_sched_config_var event_freq = {
- .idx = 0,
- .type = ABT_SCHED_CONFIG_INT
- };
- ABT_sched_config_var dx_ptr = {
- .idx = 1,
- .type = ABT_SCHED_CONFIG_PTR
- };
- ABT_sched_config_var timeout = {
- .idx = 2,
- .type = ABT_SCHED_CONFIG_INT
- };
- ABT_sched_def sched_def = {
- .type = ABT_SCHED_TYPE_ULT,
- .init = sched_init,
- .run = sched_run,
- .free = sched_free,
- .get_migr_pool = NULL
- };
- int rc;
- //创建pool
- rc = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
- ABT_TRUE, &task_pool);
- dout(INFO) << "xstream " << id << " pool create success" << dendl;
-
- //创建调度器的配置项
- rc = ABT_sched_config_create(&config, event_freq, g_cds_context->_conf->umds_event_check_interval, dx_ptr,
- this, timeout, g_cds_context->_conf->xstream_pool_timeout,
- ABT_sched_config_var_end);
-
- dout(INFO) << "xstream " << id << " scheduler config create success" << dendl;
- //创建调度器
- rc = ABT_sched_create(&sched_def, 1, &task_pool, config,
- &scheduler);
- ABT_sched_config_free(&config);
-
- dout(INFO) << "xstream " << id << " create scheduler success" << dendl;
- return UMD_SUCCESS;
- }
-
- int XstreamInstance::init() {
- int rc = init_scheduler();
-
- dout(INFO) << "scheduler init success" << dendl;
- step = SRV_XSTREAM_STEP_SCHED;
-
- //创建xstream
- rc = ABT_xstream_create_with_rank(scheduler, id + 1,
- &xstream);
-
- dout(INFO) << "create xstream success" << dendl;
- step = SRV_XSTREAM_STEP_XSTREAM;
- //执行绑核操作
- rc = ABT_thread_create(task_pool,
- bind_xstream_cpu, this, ABT_THREAD_ATTR_NULL,
- NULL);
-
- dout(INFO) << "create bind cpu task success" << dendl;
- step = SRV_XSTREAM_STEP_BIND_CPU;
- return UMD_SUCCESS;
- }
-
-
- static void bind_xstream_cpu(void *arg) {
- XstreamInstance *srv_xstream = (XstreamInstance*)arg;
- cds_assert(srv_xstream != NULL);
-
- int rc = hwloc_set_cpubind(srv_xstream->dispatcher->topology, srv_xstream->cpu, HWLOC_CPUBIND_THREAD);
- if (rc) {
- derr << "xstream " << srv_xstream->id << " failed to set cpu affinity: " << rc << dendl;
- return;
- }
- dout(INFO) << "xstream " << srv_xstream->id << " bind cpu success" << dendl;
-
- // memory not need bind, because it will allocate memory from the bind cpu default
- }