• 轻量级的低级线程和任务框架---Argobots


    Argobots是一个轻量级的低级线程和任务框架。

    一、安装

    1. #tar xzf argobots.tar.gz
    2. #cd argobots
    3. #./autogen.sh
    4. //配置
    5. #./configure --prefix=/home/USERNAME/argobots-install 2>&1 | tee c.txt
    6. //--prefix=/home/USERNAME/argobots-install 是安装路径
    7. //构建
    8. #make 2>&1 | tee m.txt
    9. //如果出现问题,请执行make clean,然后在V=1时再次运行make。
    10. #make V=1 2>&1 | tee m.txt
    11. //安装
    12. #make install 2>&1 | tee mi.txt

    (1)可以将Argobot测试套件打包到ArgobotsDistribution中,然后执行:

    make check

    (2)还可以在示例目录中运行Argobots示例:

    make check

    (3)可以使用下列命令查看配置选项

    ./configure --help

    对于性能测试,建议使用以下标志:

    ./configure --enable-perf-opt --enable-affinity --disable-checks

    对于调试,建议使用以下标志:

    ./configure --enable-fast=O0 --enable-debug=most

    二、使用实例1(普通用法)

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #define DEFAULT_NUM_XSTREAMS 2
    7. #define DEFAULT_NUM_THREADS 8
    8. typedef struct {
    9. int tid;
    10. } thread_arg_t;
    11. void hello_world(void *arg)
    12. {
    13. int tid = ((thread_arg_t *)arg)->tid;
    14. printf("Hello world! (thread = %d)\n", tid);
    15. }
    16. int main(int argc, char **argv)
    17. {
    18. int i;
    19. /* Read arguments. */
    20. int num_xstreams = DEFAULT_NUM_XSTREAMS;
    21. int num_threads = DEFAULT_NUM_THREADS;
    22. while (1) {
    23. int opt = getopt(argc, argv, "he:n:");
    24. if (opt == -1)
    25. break;
    26. switch (opt) {
    27. case 'e':
    28. num_xstreams = atoi(optarg);
    29. break;
    30. case 'n':
    31. num_threads = atoi(optarg);
    32. break;
    33. case 'h':
    34. default:
    35. printf("Usage: ./hello_world [-e NUM_XSTREAMS] "
    36. "[-n NUM_THREADS]\n");
    37. return -1;
    38. }
    39. }
    40. if (num_xstreams <= 0)
    41. num_xstreams = 1;
    42. if (num_threads <= 0)
    43. num_threads = 1;
    44. /* Allocate memory. */
    45. ABT_xstream *xstreams =
    46. (ABT_xstream *)malloc(sizeof(ABT_xstream) * num_xstreams);
    47. ABT_pool *pools = (ABT_pool *)malloc(sizeof(ABT_pool) * num_xstreams);
    48. ABT_thread *threads =
    49. (ABT_thread *)malloc(sizeof(ABT_thread) * num_threads);
    50. thread_arg_t *thread_args =
    51. (thread_arg_t *)malloc(sizeof(thread_arg_t) * num_threads);
    52. /* Initialize Argobots. */
    53. ABT_init(argc, argv);
    54. /* Get a primary execution stream. */
    55. ABT_xstream_self(&xstreams[0]);
    56. /* Create secondary execution streams. */
    57. for (i = 1; i < num_xstreams; i++) {
    58. ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]);
    59. }
    60. /* Get default pools. */
    61. for (i = 0; i < num_xstreams; i++) {
    62. ABT_xstream_get_main_pools(xstreams[i], 1, &pools[i]);
    63. }
    64. /* Create ULTs. */
    65. for (i = 0; i < num_threads; i++) {
    66. int pool_id = i % num_xstreams;
    67. thread_args[i].tid = i;
    68. ABT_thread_create(pools[pool_id], hello_world, &thread_args[i],
    69. ABT_THREAD_ATTR_NULL, &threads[i]);
    70. }
    71. /* Join and free ULTs. */
    72. for (i = 0; i < num_threads; i++) {
    73. ABT_thread_free(&threads[i]);
    74. }
    75. /* Join and free secondary execution streams. */
    76. for (i = 1; i < num_xstreams; i++) {
    77. ABT_xstream_join(xstreams[i]);
    78. ABT_xstream_free(&xstreams[i]);
    79. }
    80. /* Finalize Argobots. */
    81. ABT_finalize();
    82. /* Free allocated memory. */
    83. free(xstreams);
    84. free(pools);
    85. free(threads);
    86. free(thread_args);
    87. return 0;
    88. }

    使用实例2(带调度器和绑核)

    1. //入口函数
    2. int dispatch_request(crt_rpc_t* rpc) {
    3. if (!g_umds_context->dispatcher.is_inited()) {
    4. derr << "dispatcher is not inited" << dendl;
    5. return -ERR_UMD_NOT_INITIAL;
    6. }
    7. //生成请求req
    8. req* request = new req(rpc);
    9. //获取xstream执行流
    10. XstreamInstance* xstream = g_umds_context->dispatcher.get_handler(request);
    11. request->xstream = xstream;
    12. request->cli_req_id = *((req_id_t*)rpc->cr_input);
    13. //挂载执行函数
    14. ABT_thread_create(request->xstream->task_pool, handle_request, request,
    15. ABT_THREAD_ATTR_NULL, NULL);
    16. return UMD_SUCCESS;
    17. }
    18. XstreamInstance::XstreamInstance(SrvDispatcher* _dispatcher, int index) {
    19. id = index;
    20. total = 0;
    21. dispatcher = _dispatcher;
    22. step = 0;
    23. //获取cpu id
    24. int cpu_index = index % dispatcher->cpus.size();
    25. //为绑核做准备
    26. cpu = dispatcher->cpus[cpu_index];
    27. pthread_spin_init(&data_lock, PTHREAD_PROCESS_PRIVATE);
    28. }
    29. int SrvDispatcher::init() {
    30. int ret = 0;
    31. XstreamInstance* tmp_xstream;
    32. // init argobot
    33. ret = ABT_init(0, NULL);
    34. if (ret != ABT_SUCCESS) {
    35. derr << "ABT init failed: " << ret << dendl;
    36. goto out_abt_init;
    37. }
    38. dout(INFO) << "abt_init success" << dendl;
    39. // create xstream
    40. for (uint32_t i =0; i_conf->xstream_num; i++) {
    41. tmp_xstream = new XstreamInstance(this, i);
    42. //xstream初始化,里面有创建pool,创建调度器、创建xstream
    43. ret = tmp_xstream->init();
    44. if (ret == UMD_SUCCESS) {
    45. xstreams.push_back(tmp_xstream);
    46. } else {
    47. derr << "init xstream failed: " << get_err_desc(ret) << dendl;
    48. delete tmp_xstream;
    49. goto out_start_xstream;
    50. }
    51. }
    52. dout(INFO) << "xstreams init success" << dendl;
    53. pthread_rwlock_init(&xstream_lock, NULL);
    54. is_init = true;
    55. return UMD_SUCCESS;
    56. }
    57. int XstreamInstance::init_scheduler() {
    58. ABT_sched_config config;
    59. ABT_sched_config_var event_freq = {
    60. .idx = 0,
    61. .type = ABT_SCHED_CONFIG_INT
    62. };
    63. ABT_sched_config_var dx_ptr = {
    64. .idx = 1,
    65. .type = ABT_SCHED_CONFIG_PTR
    66. };
    67. ABT_sched_config_var timeout = {
    68. .idx = 2,
    69. .type = ABT_SCHED_CONFIG_INT
    70. };
    71. ABT_sched_def sched_def = {
    72. .type = ABT_SCHED_TYPE_ULT,
    73. .init = sched_init,
    74. .run = sched_run,
    75. .free = sched_free,
    76. .get_migr_pool = NULL
    77. };
    78. int rc;
    79. //创建pool
    80. rc = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
    81. ABT_TRUE, &task_pool);
    82. dout(INFO) << "xstream " << id << " pool create success" << dendl;
    83. //创建调度器的配置项
    84. rc = ABT_sched_config_create(&config, event_freq, g_cds_context->_conf->umds_event_check_interval, dx_ptr,
    85. this, timeout, g_cds_context->_conf->xstream_pool_timeout,
    86. ABT_sched_config_var_end);
    87. dout(INFO) << "xstream " << id << " scheduler config create success" << dendl;
    88. //创建调度器
    89. rc = ABT_sched_create(&sched_def, 1, &task_pool, config,
    90. &scheduler);
    91. ABT_sched_config_free(&config);
    92. dout(INFO) << "xstream " << id << " create scheduler success" << dendl;
    93. return UMD_SUCCESS;
    94. }
    95. int XstreamInstance::init() {
    96. int rc = init_scheduler();
    97. dout(INFO) << "scheduler init success" << dendl;
    98. step = SRV_XSTREAM_STEP_SCHED;
    99. //创建xstream
    100. rc = ABT_xstream_create_with_rank(scheduler, id + 1,
    101. &xstream);
    102. dout(INFO) << "create xstream success" << dendl;
    103. step = SRV_XSTREAM_STEP_XSTREAM;
    104. //执行绑核操作
    105. rc = ABT_thread_create(task_pool,
    106. bind_xstream_cpu, this, ABT_THREAD_ATTR_NULL,
    107. NULL);
    108. dout(INFO) << "create bind cpu task success" << dendl;
    109. step = SRV_XSTREAM_STEP_BIND_CPU;
    110. return UMD_SUCCESS;
    111. }
    112. static void bind_xstream_cpu(void *arg) {
    113. XstreamInstance *srv_xstream = (XstreamInstance*)arg;
    114. cds_assert(srv_xstream != NULL);
    115. int rc = hwloc_set_cpubind(srv_xstream->dispatcher->topology, srv_xstream->cpu, HWLOC_CPUBIND_THREAD);
    116. if (rc) {
    117. derr << "xstream " << srv_xstream->id << " failed to set cpu affinity: " << rc << dendl;
    118. return;
    119. }
    120. dout(INFO) << "xstream " << srv_xstream->id << " bind cpu success" << dendl;
    121. // memory not need bind, because it will allocate memory from the bind cpu default
    122. }

  • 相关阅读:
    面试:HashMap
    为什么要考一级建造师,一建证书含金量有多高?
    超千万下载量的NPM包遭黑客攻击,美国监管机构紧急警告
    java基于Vue的社团管理系统
    服务器端优化、NIO、非阻塞的HTTP服务器
    Android桌面回到应用 重新打开 android.intent.action.MAIN 的界面
    极空间变身监控录像机,搭配Onvif摄像头,实现实时观看和录制视频回放功能
    java基于微信小程序的智能停车场管理系统+ssm+uinapp+Mysql+计算机毕业设计
    ftp传送文件脚本
    Maven配置MAVEN_OPTS
  • 原文地址:https://blog.csdn.net/cyq6239075/article/details/126745140