• FreeSWITCH 1.10 源码阅读(1)-服务启动及 Event Socket 模块工作原理


    1. 前言

    FreeSWITCH 是一个开源的电话软交换平台,使用广泛,功能强大。本文基于 FreeSWITCH 1.10 版本,读者如有兴趣可以自行点击链接进入 github 下载源码。下图为 FreeSWITCH 服务启动及 Event Socket 模块运行工作的源代码时序,下文将对源码流程进行代码分析

    在这里插入图片描述

    2. 源码分析

    2.1 服务的启动

    2.1.1 FreeSWITCH 核心的启动流程
    1. FreeSWITCH 是用 C 语言写的,服务启动的入口为 switch.c#main() 函数。这个函数非常长,不过主要的处理大致分为以下几步:

      1. 调用 switch_core.c#switch_core_set_globals() 函数设置重要的运行时文件夹路径,比如模块文件的文件夹等
      2. 调用 switch_core.c#switch_core_init_and_modload() 函数初始化系统并加载模块,这部分是重点
      3. 调用 switch_core.c#switch_core_runtime_loop() 函数开启主线程循环保持系统运行
      int main(int argc, char *argv[])
      {
       
       ......
      
       switch_core_set_globals();
      
       pid = getpid();
      
       memset(pid_buffer, 0, sizeof(pid_buffer));
       switch_snprintf(pid_path, sizeof(pid_path), "%s%s%s", SWITCH_GLOBAL_dirs.run_dir, SWITCH_PATH_SEPARATOR, pfile);
       switch_snprintf(pid_buffer, sizeof(pid_buffer), "%d", pid);
       pid_len = strlen(pid_buffer);
      
       apr_pool_create(&pool, NULL);
      
       ......
      
       if (switch_core_init_and_modload(flags, nc ? SWITCH_FALSE : SWITCH_TRUE, &err) != SWITCH_STATUS_SUCCESS) {
       	fprintf(stderr, "Cannot Initialize [%s]\n", err);
       	return 255;
       }
      
       ......
      
       switch_core_runtime_loop(nc);
      
       ......
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
    2. switch_core.c#switch_core_init_and_modload() 函数是初始化重要组件和模块的入口,不过本文关注的主要是以下几个函数调用,FreeSWITCH 核心启动的主要逻辑其实就是拉起关键组件及模块

      1. switch_core.c#switch_core_init() 函数负责初始化 FreeSWITCH 核心的重要组件
      2. switch_loadable_module.c#switch_loadable_module_init() 函数将加载 FreeSWITCH 核心之外的可插拔模块
      SWITCH_DECLARE(switch_status_t) switch_core_init_and_modload(switch_core_flag_t flags, switch_bool_t console, const char **err)
      {
       ......
      
       if (switch_core_init(flags, console, err) != SWITCH_STATUS_SUCCESS) {
       	return SWITCH_STATUS_GENERR;
       }
      
       if (runtime.runlevel > 1) {
       	/* one per customer */
       	return SWITCH_STATUS_SUCCESS;
       }
      
       runtime.runlevel++;
       runtime.events_use_dispatch = 1;
      
       switch_core_set_signal_handlers();
       switch_load_network_lists(SWITCH_FALSE);
      
       switch_msrp_init();
      
       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Bringing up environment.\n");
       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Loading Modules.\n");
       if (switch_loadable_module_init(SWITCH_TRUE) != SWITCH_STATUS_SUCCESS) {
       	*err = "Cannot load modules";
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Error: %s\n", *err);
       	return SWITCH_STATUS_GENERR;
       }
      
       switch_load_network_lists(SWITCH_FALSE);
      
       switch_load_core_config("post_load_switch.conf");
      
       switch_core_set_signal_handlers();
      
       ......
      
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
    3. 此时回到本节步骤1第3步switch_core.c#switch_core_runtime_loop() 函数其实就是根据前后台启动方式来开启主线程循环保持系统运行,至此宏观上 FreeSWITCH 核心启动的流程基本结束

      如果 FreeSWITCH 以前台方式启动,则调用 switch_console.c#switch_console_loop() 函数启动控制台循环,接收控制台输入的命令并处理

      SWITCH_DECLARE(void) switch_core_runtime_loop(int bg)
      {
      #ifdef WIN32
       HANDLE shutdown_event;
       char path[256] = "";
      #endif
       if (bg) {
      #ifdef WIN32
       	switch_snprintf(path, sizeof(path), "Global\\Freeswitch.%d", getpid());
       	shutdown_event = CreateEvent(NULL, FALSE, FALSE, path);
       	if (shutdown_event) {
       		WaitForSingleObject(shutdown_event, INFINITE);
       	}
      #else
       	while (runtime.running) {
       		switch_yield(1000000);
       	}
      #endif
       } else {
       	/* wait for console input */
       	switch_console_loop();
       }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
    2.1.2 事件分发组件的初始化
    1. 2.1.1节步骤2第1步 中,笔者提到 switch_core.c#switch_core_init() 函数会初始化 FreeSWITCH 核心的重要组件和运行时属性,其大致源码如下,本文涉及的主要是以下几部分:

      1. 调用函数 switch_core_session.c#switch_core_session_init() 初始化 Session 管理器并创建对应的事件队列,这部分本文暂不深入
      2. 调用 switch_event.c#switch_event_init() 函数初始化事件分发组件,包括启动事件分发线程及队列创建等
      3. 执行 switch_core.c#switch_load_core_config() 函数解析 FreeSWITCH 的核心配置文件 switch.conf.xml,将配置属性加载进内存中
      4. 执行 switch_scheduler.c#switch_scheduler_task_thread_start() 函数启动内部定时任务线程,后续可以添加心跳通知等定时任务到内部列表,等待触发执行
      SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switch_bool_t console, const char **err)
      {
       switch_uuid_t uuid;
       char guess_ip[256];
       int mask = 0;
       struct in_addr in;
      
      
       if (runtime.runlevel > 0) {
       	/* one per customer */
       	return SWITCH_STATUS_SUCCESS;
       }
      
       memset(&runtime, 0, sizeof(runtime));
       gethostname(runtime.hostname, sizeof(runtime.hostname));
      
       runtime.shutdown_cause = SWITCH_CAUSE_SYSTEM_SHUTDOWN;
       runtime.max_db_handles = 50;
       runtime.db_handle_timeout = 5000000;
       runtime.event_heartbeat_interval = 20;
       
       ......
      
       if (!runtime.cpu_count) runtime.cpu_count = 1;
      
       if (sqlite3_initialize() != SQLITE_OK) {
       	*err = "FATAL ERROR! Could not initialize SQLite\n";
       	return SWITCH_STATUS_MEMERR;
       }
      
       /* INIT APR and Create the pool context */
       if (apr_initialize() != SWITCH_STATUS_SUCCESS) {
       	*err = "FATAL ERROR! Could not initialize APR\n";
       	return SWITCH_STATUS_MEMERR;
       }
      
       if (!(runtime.memory_pool = switch_core_memory_init())) {
       	*err = "FATAL ERROR! Could not allocate memory pool\n";
       	return SWITCH_STATUS_MEMERR;
       }
      
       ......
      
       switch_core_session_init(runtime.memory_pool);
       
       ......
      
       switch_event_init(runtime.memory_pool);
      
       ......
      
       switch_load_core_config("switch.conf");
      
       ......
      
       switch_scheduler_task_thread_start();
      
       ......
      
       switch_scheduler_add_task(switch_epoch_time_now(NULL), heartbeat_callback, "heartbeat", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL);
      
       switch_scheduler_add_task(switch_epoch_time_now(NULL), check_ip_callback, "check_ip", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL | SSHF_OWN_THREAD);
      
       ......
      
       return SWITCH_STATUS_SUCCESS;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
    2. switch_event.c#switch_event_init() 函数会根据当前机器的核心数计算确定事件分发线程的最大数量MAX_DISPATCH,核心流程是调用 switch_event.c#check_dispatch() 函数创建并启动事件分发线程

      SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
      {
      
       /* don't need any more dispatch threads than we have CPU's*/
       MAX_DISPATCH = (switch_core_cpu_count() / 2) + 1;
       if (MAX_DISPATCH < 2) {
       	MAX_DISPATCH = 2;
       }
      
       switch_assert(pool != NULL);
       THRUNTIME_POOL = RUNTIME_POOL = pool;
       switch_thread_rwlock_create(&RWLOCK, RUNTIME_POOL);
       switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
       switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
       switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
       switch_mutex_init(&CUSTOM_HASH_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
       switch_core_hash_init(&CUSTOM_HASH);
      
       if (switch_core_test_flag(SCF_MINIMAL)) {
       	return SWITCH_STATUS_SUCCESS;
       }
      
       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n");
      
       switch_core_hash_init(&event_channel_manager.lahash);
       switch_mutex_init(&event_channel_manager.lamutex, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
      
       switch_thread_rwlock_create(&event_channel_manager.rwlock, RUNTIME_POOL);
       switch_core_hash_init(&event_channel_manager.hash);
       switch_core_hash_init(&event_channel_manager.perm_hash);
       event_channel_manager.ID = 1;
      
       switch_mutex_lock(EVENT_QUEUE_MUTEX);
       SYSTEM_RUNNING = -1;
       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
      
       //switch_threadattr_create(&thd_attr, pool);
       switch_find_local_ip(guess_ip_v4, sizeof(guess_ip_v4), NULL, AF_INET);
       switch_find_local_ip(guess_ip_v6, sizeof(guess_ip_v6), NULL, AF_INET6);
      
      
      #ifdef SWITCH_EVENT_RECYCLE
       switch_queue_create(&EVENT_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
       switch_queue_create(&EVENT_HEADER_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
      #endif
      
       check_dispatch();
      
       switch_mutex_lock(EVENT_QUEUE_MUTEX);
       SYSTEM_RUNNING = 1;
       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
      
       return SWITCH_STATUS_SUCCESS;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
    3. switch_event.c#check_dispatch() 函数会创建事件分发队列EVENT_DISPATCH_QUEUE,并设置队列的容量为DISPATCH_QUEUE_LEN * MAX_DISPATCH,最后通过 switch_event.c#switch_event_launch_dispatch_threads() 函数创建 1 个事件分发线程

      static void check_dispatch(void)
      {
       if (!EVENT_DISPATCH_QUEUE) {
       	switch_mutex_lock(BLOCK);
      
       	if (!EVENT_DISPATCH_QUEUE) {
       		switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, THRUNTIME_POOL);
       		switch_event_launch_dispatch_threads(1);
      
       		while (!THREAD_COUNT) {
       			switch_cond_next();
       		}
       	}
       	switch_mutex_unlock(BLOCK);
       }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    4. switch_event.c#switch_event_launch_dispatch_threads() 函数会调用库函数创建事件分发线程,并将 switch_event.c#switch_event_dispatch_thread() 函数设置为线程任务。需注意此处会维护一个事件分发线程的数组 EVENT_DISPATCH_QUEUE_THREADS,如果数组下标上已经有线程存在了,则不会重复创建,至此事件分发组件的初始化基本结束

      SWITCH_DECLARE(void) switch_event_launch_dispatch_threads(uint32_t max)
      {
       switch_threadattr_t *thd_attr;
       uint32_t index = 0;
       int launched = 0;
       uint32_t sanity = 200;
      
       switch_memory_pool_t *pool = RUNTIME_POOL;
      
       check_dispatch();
      
       if (max > MAX_DISPATCH) {
       	return;
       }
      
       if (max < SOFT_MAX_DISPATCH) {
       	return;
       }
      
       for (index = SOFT_MAX_DISPATCH; index < max && index < MAX_DISPATCH; index++) {
       	if (EVENT_DISPATCH_QUEUE_THREADS[index]) {
       		continue;
       	}
      
       	switch_threadattr_create(&thd_attr, pool);
       	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
       	switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
       	switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE, pool);
       	while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
      
       	if (index == 1) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create event dispatch thread %d\n", index);
       	} else {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create additional event dispatch thread %d\n", index);
       	}
       	launched++;
       }
      
       SOFT_MAX_DISPATCH = index;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
    2.1.3 模块 mod_event_socket 的加载运行
    1. 2.1.1节步骤2第2步 中,执行 switch_loadable_module.c#switch_loadable_module_init() 函数将会加载配置文件中的配置的模块,其关键点如下:

      1. 模块配置由三个配置文件 pre_load_modules.conf.xml、modules.conf.xm、post_load_modules.conf.xml 共同组成,三者主要是加载顺序的区别,因为有些模块相互之间可能存在依赖,其他并无不同
      2. 在加载完前置模块之后,先调用 sswitch_core.c#witch_core_sqldb_init() 函数初始化 sql 数据库
      3. 加载模块的核心是 switch_loadable_module.c#switch_loadable_module_load_module_ex() 函数,这个函数会将指定文件夹下的模块文件加载进内存
      4. 模块加载进内存后,会调用 switch_loadable_module.c#switch_loadable_module_runtime() 函数为每一个模块新起线程执行其定义的 runtime 函数
      SWITCH_DECLARE(switch_status_t) switch_loadable_module_init(switch_bool_t autoload)
      {
      
       apr_finfo_t finfo = { 0 };
       apr_dir_t *module_dir_handle = NULL;
       apr_int32_t finfo_flags = APR_FINFO_DIRENT | APR_FINFO_TYPE | APR_FINFO_NAME;
       char *precf = "pre_load_modules.conf";
       char *cf = "modules.conf";
       char *pcf = "post_load_modules.conf";
       switch_xml_t cfg, xml;
       unsigned char all = 0;
       unsigned int count = 0;
       const char *err;
       switch_hash_t *event_hash;
       switch_hash_index_t *hi;
       void *hash_val;
       switch_event_t *event;
      
      
      #ifdef WIN32
       const char *ext = ".dll";
       const char *EXT = ".DLL";
      #elif defined (MACOSX) || defined (DARWIN)
       const char *ext = ".dylib";
       const char *EXT = ".DYLIB";
      #else
       const char *ext = ".so";
       const char *EXT = ".SO";
      #endif
      
       memset(&loadable_modules, 0, sizeof(loadable_modules));
       switch_core_new_memory_pool(&loadable_modules.pool);
      
      
      #ifdef WIN32
       switch_loadable_module_path_init();
      #endif
      
       switch_core_hash_init(&loadable_modules.module_hash);
       switch_core_hash_init_nocase(&loadable_modules.endpoint_hash);
       switch_core_hash_init_nocase(&loadable_modules.codec_hash);
       switch_core_hash_init_nocase(&loadable_modules.timer_hash);
       switch_core_hash_init_nocase(&loadable_modules.application_hash);
       switch_core_hash_init_nocase(&loadable_modules.chat_application_hash);
       switch_core_hash_init_nocase(&loadable_modules.api_hash);
       switch_core_hash_init_nocase(&loadable_modules.json_api_hash);
       switch_core_hash_init(&loadable_modules.file_hash);
       switch_core_hash_init_nocase(&loadable_modules.speech_hash);
       switch_core_hash_init_nocase(&loadable_modules.asr_hash);
       switch_core_hash_init_nocase(&loadable_modules.directory_hash);
       switch_core_hash_init_nocase(&loadable_modules.chat_hash);
       switch_core_hash_init_nocase(&loadable_modules.say_hash);
       switch_core_hash_init_nocase(&loadable_modules.management_hash);
       switch_core_hash_init_nocase(&loadable_modules.limit_hash);
       switch_core_hash_init_nocase(&loadable_modules.database_hash);
       switch_core_hash_init_nocase(&loadable_modules.dialplan_hash);
       switch_core_hash_init(&loadable_modules.secondary_recover_hash);
       switch_mutex_init(&loadable_modules.mutex, SWITCH_MUTEX_NESTED, loadable_modules.pool);
      
       if (!autoload) return SWITCH_STATUS_SUCCESS;
       
       /*
       	switch_core_sqldb_init() is not yet ready and is executed after starting modules from pre_load_modules.conf
       	Modules loading procedure generates events used by sqldb.
       	This is why we should hold those events (storing in the event_hash) not firing them until sqldb is ready.
       */
       switch_core_hash_init(&event_hash);
      
       /* 
       	Pre-load core modules.
       	Do not pre-load modules which may use databases,
       	use appropriate section.
       */
       switch_loadable_module_load_module_ex("", "CORE_SOFTTIMER_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
       switch_loadable_module_load_module_ex("", "CORE_PCM_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
       switch_loadable_module_load_module_ex("", "CORE_SPEEX_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
      
       /*
       	Loading pre-load modules.
       	Database modules must be loaded here.
       */
       if ((xml = switch_xml_open_cfg(precf, &cfg, NULL))) {
       	switch_xml_t mods, ld;
       	if ((mods = switch_xml_child(cfg, "modules"))) {
       		for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
       			switch_bool_t global = SWITCH_FALSE;
       			const char *val = switch_xml_attr_soft(ld, "module");
       			const char *path = switch_xml_attr_soft(ld, "path");
       			const char *critical = switch_xml_attr_soft(ld, "critical");
       			const char *sglobal = switch_xml_attr_soft(ld, "global");
      
       			if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
       				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
       				continue;
       			}
       			global = switch_true(sglobal);
      
       			if (path && zstr(path)) {
       				path = SWITCH_GLOBAL_dirs.mod_dir;
       			}
       			if (switch_loadable_module_load_module_ex((char *)path, (char *)val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_PRELOAD, event_hash) == SWITCH_STATUS_GENERR) {
       				if (critical && switch_true(critical)) {
       					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val);
       					switch_core_hash_destroy(&event_hash);
      
       					abort();
       				}
       			}
       			count++;
       		}
       	}
       	switch_xml_free(xml);
      
       }
       else {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", precf);
       }
      
       if (switch_core_sqldb_init(&err) != SWITCH_STATUS_SUCCESS)
       {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Loading modules interrupted. [Error: %s]\n", err);
       	switch_core_hash_destroy(&event_hash);
       	return SWITCH_STATUS_GENERR;
       }
      
       /* sqldb is ready. Fire holding events! */
       for (hi = switch_core_hash_first(event_hash); hi; hi = switch_core_hash_next(&hi)) {
       	switch_core_hash_this(hi, NULL, NULL, &hash_val);
       	event = (switch_event_t *)hash_val;
       	switch_event_fire(&event);
       }
      
       switch_core_hash_destroy(&event_hash);
      
       /*
       	To perevent locking.
       	Core modules which may use databases should be pre-loaded here
       	(databases are loaded already).
       */
      #ifdef SWITCH_HAVE_YUV
      #ifdef SWITCH_HAVE_VPX
       switch_loadable_module_load_module("", "CORE_VPX_MODULE", SWITCH_FALSE, &err);
      #endif
      #endif
      
       /* Loading common modules */
       if ((xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
       	switch_xml_t mods, ld;
       	if ((mods = switch_xml_child(cfg, "modules"))) {
       		for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
       			switch_bool_t global = SWITCH_FALSE;
       			const char *val = switch_xml_attr_soft(ld, "module");
       			const char *path = switch_xml_attr_soft(ld, "path");
       			const char *critical = switch_xml_attr_soft(ld, "critical");
       			const char *sglobal = switch_xml_attr_soft(ld, "global");
       			if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
       				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
       				continue;
       			}
       			global = switch_true(sglobal);
      
       			if (path && zstr(path)) {
       				path = SWITCH_GLOBAL_dirs.mod_dir;
       			}
       			if (switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, NULL) == SWITCH_STATUS_GENERR) {
       				if (critical && switch_true(critical)) {
       					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val);
       					abort();
       				}
       			}
       			count++;
       		}
       	}
       	switch_xml_free(xml);
      
       } else {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", cf);
       }
      
       if ((xml = switch_xml_open_cfg(pcf, &cfg, NULL))) {
       	switch_xml_t mods, ld;
      
       	if ((mods = switch_xml_child(cfg, "modules"))) {
       		for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
       			switch_bool_t global = SWITCH_FALSE;
       			const char *val = switch_xml_attr_soft(ld, "module");
       			const char *path = switch_xml_attr_soft(ld, "path");
       			const char *sglobal = switch_xml_attr_soft(ld, "global");
       			if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
       				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
       				continue;
       			}
       			global = switch_true(sglobal);
      
       			if (path && zstr(path)) {
       				path = SWITCH_GLOBAL_dirs.mod_dir;
       			}
       			switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_POSTLOAD, NULL);
       			count++;
       		}
       	}
       	switch_xml_free(xml);
      
       } else {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", pcf);
       }
      
       if (!count) {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "No modules loaded, assuming 'load all'\n");
       	all = 1;
       }
      
       if (all) {
       	if (apr_dir_open(&module_dir_handle, SWITCH_GLOBAL_dirs.mod_dir, loadable_modules.pool) != APR_SUCCESS) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Can't open directory: %s\n", SWITCH_GLOBAL_dirs.mod_dir);
       		return SWITCH_STATUS_GENERR;
       	}
      
       	while (apr_dir_read(&finfo, finfo_flags, module_dir_handle) == APR_SUCCESS) {
       		const char *fname = finfo.fname;
      
       		if (finfo.filetype != APR_REG) {
       			continue;
       		}
      
       		if (!fname) {
       			fname = finfo.name;
       		}
      
       		if (!fname) {
       			continue;
       		}
      
       		if (zstr(fname) || (!strstr(fname, ext) && !strstr(fname, EXT))) {
       			continue;
       		}
      
       		switch_loadable_module_load_module(SWITCH_GLOBAL_dirs.mod_dir, fname, SWITCH_FALSE, &err);
       	}
       	apr_dir_close(module_dir_handle);
       }
      
       switch_loadable_module_runtime();
      
       memset(&chat_globals, 0, sizeof(chat_globals));
       chat_globals.running = 1;
       chat_globals.pool = loadable_modules.pool;
       switch_mutex_init(&chat_globals.mutex, SWITCH_MUTEX_NESTED, chat_globals.pool);
      
       chat_thread_start(1);
      
       return SWITCH_STATUS_SUCCESS;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162
      • 163
      • 164
      • 165
      • 166
      • 167
      • 168
      • 169
      • 170
      • 171
      • 172
      • 173
      • 174
      • 175
      • 176
      • 177
      • 178
      • 179
      • 180
      • 181
      • 182
      • 183
      • 184
      • 185
      • 186
      • 187
      • 188
      • 189
      • 190
      • 191
      • 192
      • 193
      • 194
      • 195
      • 196
      • 197
      • 198
      • 199
      • 200
      • 201
      • 202
      • 203
      • 204
      • 205
      • 206
      • 207
      • 208
      • 209
      • 210
      • 211
      • 212
      • 213
      • 214
      • 215
      • 216
      • 217
      • 218
      • 219
      • 220
      • 221
      • 222
      • 223
      • 224
      • 225
      • 226
      • 227
      • 228
      • 229
      • 230
      • 231
      • 232
      • 233
      • 234
      • 235
      • 236
      • 237
      • 238
      • 239
      • 240
      • 241
      • 242
      • 243
      • 244
      • 245
      • 246
      • 247
      • 248
      • 249
      • 250
      • 251
      • 252
      • 253
      • 254
    2. switch_loadable_module.c#switch_loadable_module_load_module_ex() 函数的核心处理如下:

      1. 首先根据模块名称计算对应的模块的动态库文件名称,然后通过 switch_core_hash.c#switch_core_hash_find_locked() 方法查找哈希表 loadable_modules.module_hash 中是否存在该文件名,如果不存在说明模块还没有加载过,需要加载
      2. switch_loadable_module.c#switch_loadable_module_load_file() 函数负责加载动态库文件
      3. 如果加载动态库文件成功,则调用 switch_loadable_module.c#switch_loadable_module_process() 将加载完成的模块插入哈希表 loadable_modules.module_hash,并生成模块加载的事件将其推入到 FreeSWITCH 事件组件中
      static switch_status_t switch_loadable_module_load_module_ex(const char *dir, const char *fname, switch_bool_t runtime, switch_bool_t global, const char **err, switch_loadable_module_type_t type, switch_hash_t *event_hash)
      {
       switch_size_t len = 0;
       char *path;
       char *file, *dot;
       switch_loadable_module_t *new_module = NULL;
       switch_status_t status = SWITCH_STATUS_SUCCESS;
      
      #ifdef WIN32
       const char *ext = ".dll";
      #else
       const char *ext = ".so";
      #endif
      
       *err = "";
      
       if ((file = switch_core_strdup(loadable_modules.pool, fname)) == 0) {
       	*err = "allocation error";
       	return SWITCH_STATUS_FALSE;
       }
      
       if (switch_is_file_path(file)) {
       	path = switch_core_strdup(loadable_modules.pool, file);
       	file = (char *) switch_cut_path(file);
       	if ((dot = strchr(file, '.'))) {
       		*dot = '\0';
       	}
       } else {
       	if ((dot = strchr(file, '.'))) {
       		*dot = '\0';
       	}
       	len = strlen(switch_str_nil(dir));
       	len += strlen(file);
       	len += 8;
       	path = (char *) switch_core_alloc(loadable_modules.pool, len);
       	switch_snprintf(path, len, "%s%s%s%s", switch_str_nil(dir), SWITCH_PATH_SEPARATOR, file, ext);
       }
      
      
       if (switch_core_hash_find_locked(loadable_modules.module_hash, file, loadable_modules.mutex)) {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Module %s Already Loaded!\n", file);
       	*err = "Module already loaded";
       	status = SWITCH_STATUS_FALSE;
       } else if ((status = switch_loadable_module_load_file(path, file, global, &new_module)) == SWITCH_STATUS_SUCCESS) {
       	new_module->type = type;
      
       	if ((status = switch_loadable_module_process(file, new_module, event_hash)) == SWITCH_STATUS_SUCCESS && runtime) {
       		if (new_module->switch_module_runtime) {
       			new_module->thread = switch_core_launch_thread(switch_loadable_module_exec, new_module, new_module->pool);
       		}
       	} else if (status != SWITCH_STATUS_SUCCESS) {
       		*err = "module load routine returned an error";
       	}
       } else {
       	*err = "module load file routine returned an error";
       }
      
       return status;
      
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
    3. switch_loadable_module.c#switch_loadable_module_load_file() 函数的核心处理步骤如下:

      1. 首先调用 switch_dso.c#switch_dso_open() 函数打开库文件,随后调用 switch_dso.c#sswitch_dso_data_sym() 函数获取动态库中的符号表,这部分涉及到平台库函数,不做深入讨论
      2. 如果符号表加载成功,则将其赋给函数表 interface_struct_handle,最终将其赋给函数表结构体 mod_interface_functions
      3. 如果当前加载的模块定义了 load 函数,则需要通过函数指针load_func_ptr回调执行,以 Event Socket 模块为例,这里将回调到 mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) 函数
      4. 以上处理完毕,生成结构体 switch_loadable_module_t 实例 module 保存模块中的函数等信息,最终将其添加到传入函数的指针 new_module 末尾
      static switch_status_t switch_loadable_module_load_file(char *path, char *filename, switch_bool_t global, switch_loadable_module_t **new_module)
      {
       switch_loadable_module_t *module = NULL;
       switch_dso_lib_t dso = NULL;
       apr_status_t status = SWITCH_STATUS_SUCCESS;
       switch_loadable_module_function_table_t *interface_struct_handle = NULL;
       switch_loadable_module_function_table_t *mod_interface_functions = NULL;
       char *struct_name = NULL;
       switch_module_load_t load_func_ptr = NULL;
       int loading = 1;
       switch_loadable_module_interface_t *module_interface = NULL;
       char *derr = NULL;
       const char *err = NULL;
       switch_memory_pool_t *pool = NULL;
       switch_bool_t load_global = global;
      
       switch_assert(path != NULL);
      
       switch_core_new_memory_pool(&pool);
       *new_module = NULL;
      
       struct_name = switch_core_sprintf(pool, "%s_module_interface", filename);
      
      #ifdef WIN32
       dso = switch_dso_open("FreeSwitch.dll", load_global, &derr);
      #elif defined (MACOSX) || defined(DARWIN)
       {
       	char *lib_path = switch_mprintf("%s/libfreeswitch.dylib", SWITCH_GLOBAL_dirs.lib_dir);
       	dso = switch_dso_open(lib_path, load_global, &derr);
       	switch_safe_free(lib_path);
       }
      #else
       dso = switch_dso_open(NULL, load_global, &derr);
      #endif
       if (!derr && dso) {
       	interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr);
       }
      
       switch_safe_free(derr);
      
       if (!interface_struct_handle) {
       	if (dso) switch_dso_destroy(&dso);
       	dso = switch_dso_open(path, load_global, &derr);
       }
      
       while (loading) {
       	if (derr) {
       		err = derr;
       		break;
       	}
      
       	if (!interface_struct_handle) {
       		interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr);
       	}
      
       	if (derr) {
       		err = derr;
       		break;
       	}
      
       	if (interface_struct_handle && interface_struct_handle->switch_api_version != SWITCH_API_VERSION) {
       		err = "Trying to load an out of date module, please rebuild the module.";
       		break;
       	}
      
       	if (!load_global && interface_struct_handle && switch_test_flag(interface_struct_handle, SMODF_GLOBAL_SYMBOLS)) {
       		load_global = SWITCH_TRUE;
       		switch_dso_destroy(&dso);
       		interface_struct_handle = NULL;
       		dso = switch_dso_open(path, load_global, &derr);
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loading module with global namespace at request of module\n");
       		continue;
       	}
      
       	if (interface_struct_handle) {
       		mod_interface_functions = interface_struct_handle;
       		load_func_ptr = mod_interface_functions->load;
       	}
      
       	if (load_func_ptr == NULL) {
       		err = "Cannot locate symbol 'switch_module_load' please make sure this is a valid module.";
       		break;
       	}
      
       	status = load_func_ptr(&module_interface, pool);
      
       	if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_NOUNLOAD) {
       		err = "Module load routine returned an error";
       		module_interface = NULL;
       		break;
       	}
      
       	if (!module_interface) {
       		err = "Module failed to initialize its module_interface. Is this a valid module?";
       		break;
       	}
      
       	if ((module = switch_core_alloc(pool, sizeof(switch_loadable_module_t))) == 0) {
       		abort();
       	}
      
       	if (status == SWITCH_STATUS_NOUNLOAD) {
       		module->perm++;
       	}
      
       	loading = 0;
       }
      
      
       if (err) {
      
       	if (dso) {
       		switch_dso_destroy(&dso);
       	}
      
       	if (pool) {
       		switch_core_destroy_memory_pool(&pool);
       	}
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error Loading module %s\n**%s**\n", path, err);
       	switch_safe_free(derr);
       	return SWITCH_STATUS_GENERR;
       }
      
       module->pool = pool;
       module->filename = switch_core_strdup(module->pool, path);
       module->module_interface = module_interface;
       module->switch_module_load = load_func_ptr;
      
       if (mod_interface_functions) {
       	module->switch_module_shutdown = mod_interface_functions->shutdown;
       	module->switch_module_runtime = mod_interface_functions->runtime;
       }
      
       module->lib = dso;
      
       *new_module = module;
       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Successfully Loaded [%s]\n", module_interface->module_name);
      
       switch_core_set_signal_handlers();
      
       return SWITCH_STATUS_SUCCESS;
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
    4. mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) 函数源码中重要的处理如下:

      1. 调用 switch_event.c#switch_event_bind_removable() 注册事件监听,这里是监听 SWITCH_EVENT_ALL 所有事件,并将函数 mod_event_socket.c#event_hanlder() 作为回调传入
      2. 通过宏调用 switch_loadable_module.h#SWITCH_ADD_APP()switch_event.c#socket_function() 函数作为一个名为 socket 的 Application 注册到 FreeSwitch 核心中供系统调用,也就是外联 Outbound 模式主动连接外部的入口
      SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
      {
       switch_application_interface_t *app_interface;
       switch_api_interface_t *api_interface;
      
       memset(&globals, 0, sizeof(globals));
      
       switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
      
       memset(&listen_list, 0, sizeof(listen_list));
       switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool);
      
       if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
       	return SWITCH_STATUS_GENERR;
       }
      
       switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
      
       /* connect my internal structure to the blank pointer passed to me */
       *module_interface = switch_loadable_module_create_module_interface(pool, modname);
       SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "[:]", SAF_SUPPORT_NOMEDIA);
       SWITCH_ADD_API(api_interface, "event_sink", "event_sink", event_sink_function, "");
      
       /* indicate that the module should continue to be loaded */
       return SWITCH_STATUS_SUCCESS;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
    5. switch_event.c#switch_event_bind_removable() 函数的处理比较直观,可以看到核心就是将调用方传入的回调等信息封装为一个 switch_event_node_t 结构体实例,并将其加入到监听节点数组 EVENT_NODES

      SWITCH_DECLARE(switch_status_t) switch_event_bind_removable(const char *id, switch_event_types_t event, const char *subclass_name,
       														switch_event_callback_t callback, void *user_data, switch_event_node_t **node)
      {
       switch_event_node_t *event_node;
       switch_event_subclass_t *subclass = NULL;
      
       switch_assert(BLOCK != NULL);
       switch_assert(RUNTIME_POOL != NULL);
      
       if (node) {
       	*node = NULL;
       }
      
       if (subclass_name) {
       	switch_mutex_lock(CUSTOM_HASH_MUTEX);
      
       	if (!(subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name))) {
       		switch_event_reserve_subclass_detailed(id, subclass_name);
       		subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name);
       		subclass->bind = 1;
       	}
      
       	switch_mutex_unlock(CUSTOM_HASH_MUTEX);
      
       	if (!subclass) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not reserve subclass. '%s'\n", subclass_name);
       		return SWITCH_STATUS_FALSE;
       	}
       }
      
       if (event <= SWITCH_EVENT_ALL) {
       	switch_zmalloc(event_node, sizeof(*event_node));
       	switch_thread_rwlock_wrlock(RWLOCK);
       	switch_mutex_lock(BLOCK);
       	/*  ----------------------------------------------- */
       	event_node->id = DUP(id);
       	event_node->event_id = event;
       	if (subclass_name) {
       		event_node->subclass_name = DUP(subclass_name);
       	}
       	event_node->callback = callback;
       	event_node->user_data = user_data;
      
       	if (EVENT_NODES[event]) {
       		event_node->next = EVENT_NODES[event];
       	}
      
       	EVENT_NODES[event] = event_node;
       	switch_mutex_unlock(BLOCK);
       	switch_thread_rwlock_unlock(RWLOCK);
       	/*  ----------------------------------------------- */
      
       	if (node) {
       		*node = event_node;
       	}
      
       	return SWITCH_STATUS_SUCCESS;
       }
      
       return SWITCH_STATUS_MEMERR;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
    6. 此时模块的加载流程基本结束,回到本节步骤1第4步 switch_loadable_module.c#switch_loadable_module_runtime() 函数调用,可以看到这个函数的主要逻辑是判断加载的模块是否有定义 runtime 函数,有的话就调用 switch_core.c#switch_core_launch_thread() 函数新建线程,将 switch_loadable_module.c#switch_loadable_module_exec() 函数作为线程任务传入

      static void switch_loadable_module_runtime(void)
      {
       switch_hash_index_t *hi;
       void *val;
       switch_loadable_module_t *module;
      
       switch_mutex_lock(loadable_modules.mutex);
       for (hi = switch_core_hash_first(loadable_modules.module_hash); hi; hi = switch_core_hash_next(&hi)) {
       	switch_core_hash_this(hi, NULL, NULL, &val);
       	module = (switch_loadable_module_t *) val;
      
       	if (module->switch_module_runtime) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting runtime thread for %s\n", module->module_interface->module_name);
       		module->thread = switch_core_launch_thread(switch_loadable_module_exec, module, loadable_modules.pool);
       	}
       }
       switch_mutex_unlock(loadable_modules.mutex);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    7. switch_loadable_module.c#switch_loadable_module_exec() 函数逻辑清晰,主要逻辑就是执行模块的 runtime 函数,以 Event Socket 模块为例,这里将回调到 mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) 函数,至此加载进内存的 Event Socket 模块已经运行起来,其运行原理将在下节分析

      static void *SWITCH_THREAD_FUNC switch_loadable_module_exec(switch_thread_t *thread, void *obj)
      {
      
       switch_status_t status = SWITCH_STATUS_SUCCESS;
       switch_core_thread_session_t *ts = obj;
       switch_loadable_module_t *module = ts->objs[0];
       int restarts;
      
       switch_assert(thread != NULL);
       switch_assert(module != NULL);
      
       for (restarts = 0; status != SWITCH_STATUS_TERM && !module->shutting_down; restarts++) {
       	status = module->switch_module_runtime();
       }
       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Thread ended for %s\n", module->module_interface->module_name);
      
       if (ts->pool) {
       	switch_memory_pool_t *pool = ts->pool;
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Destroying Pool for %s\n", module->module_interface->module_name);
       	switch_core_destroy_memory_pool(&pool);
       }
       switch_thread_exit(thread, 0);
       return NULL;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24

    2.2 mod_event_socket 的运行原理

    FreeSWITCH 中的 EventSocket 有如下图两种交互模式,从功能上来说二者几乎没有差别,主要区别是 FreeSWITCH 在连接中担任的角色不同

    在这里插入图片描述

    2.2.1 事件订阅命令的处理
    1. mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) 函数是 Event Socket 模块运行的关键,其核心处理如下:

      1. 首先调用 mod_event_socket.c#config() 函数读取配置文件的属性,将其加载到内存中
      2. 根据配置属性,绑定监听本机端口,通过 switch_apr.c#switch_socket_accept() 调用底层接口等待远程 Inbound 连接
      3. 一旦接收到远程 Inbound 连接,则将其封装为 listener_t 结构体实例 listener 并添加到 socket 监听列表中
      4. 最后调用 mod_event_socket.c#launch_listener_thread() 函数创建一个线程单独处理这条 Inbound 连接上的数据交互
      SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
      {
       switch_memory_pool_t *pool = NULL, *listener_pool = NULL;
       switch_status_t rv;
       switch_sockaddr_t *sa;
       switch_socket_t *inbound_socket = NULL;
       listener_t *listener;
       uint32_t x = 0;
       uint32_t errs = 0;
      
       if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
       	return SWITCH_STATUS_TERM;
       }
      
       config();
      
       while (!prefs.done) {
       	rv = switch_sockaddr_info_get(&sa, prefs.ip, SWITCH_UNSPEC, prefs.port, 0, pool);
       	if (rv) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot get information about IP address %s\n", prefs.ip);
       		goto fail;
       	}
       	rv = switch_socket_create(&listen_list.sock, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, pool);
       	if (rv)
       		goto sock_fail;
       	rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_REUSEADDR, 1);
       	if (rv)
       		goto sock_fail;
      #ifdef WIN32
       	/* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */
       	if (switch_sockaddr_get_family(sa) == AF_INET6) {
       		rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_IPV6_V6ONLY, 0);
       		if (rv) goto sock_fail;
       	}
      #endif
       	rv = switch_socket_bind(listen_list.sock, sa);
       	if (rv)
       		goto sock_fail;
       	rv = switch_socket_listen(listen_list.sock, 5);
       	if (rv)
       		goto sock_fail;
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on %s:%u\n", prefs.ip, prefs.port);
      
       	if (prefs.nat_map) {
       		switch_nat_add_mapping(prefs.port, SWITCH_NAT_TCP, NULL, SWITCH_FALSE);
       	}
      
       	break;
         sock_fail:
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not listen on %s:%u\n", prefs.ip, prefs.port);
       	if (prefs.stop_on_bind_error) {
       		prefs.done = 1;
       		goto fail;
       	}
       	switch_yield(100000);
       }
      
       listen_list.ready = 1;
      
      
       while (!prefs.done) {
       	if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
       		goto fail;
       	}
      
      
       	if ((rv = switch_socket_accept(&inbound_socket, listen_list.sock, listener_pool))) {
       		if (prefs.done) {
       			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n");
       			goto end;
       		} else {
       			/* I wish we could use strerror_r here but its not defined everywhere =/ */
       			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno));
       			if (++errs > 100) {
       				goto end;
       			}
       		}
       	} else {
       		errs = 0;
       	}
      
      
       	if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
       		break;
       	}
      
       	switch_thread_rwlock_create(&listener->rwlock, listener_pool);
       	switch_queue_create(&listener->event_queue, MAX_QUEUE_LEN, listener_pool);
       	switch_queue_create(&listener->log_queue, MAX_QUEUE_LEN, listener_pool);
      
       	listener->sock = inbound_socket;
       	listener->pool = listener_pool;
       	listener_pool = NULL;
       	listener->format = EVENT_FORMAT_PLAIN;
       	switch_set_flag(listener, LFLAG_FULL);
       	switch_set_flag(listener, LFLAG_ALLOW_LOG);
      
       	switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
       	switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool);
      
       	switch_core_hash_init(&listener->event_hash);
       	switch_socket_create_pollset(&listener->pollfd, listener->sock, SWITCH_POLLIN | SWITCH_POLLERR, listener->pool);
      
      
      
       	if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) {
       		switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa);
       		if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) {
       			if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS)
       				continue;
       		}
       	}
      
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initilizing connection\n");
       	close_socket(&listener->sock);
       	expire_listener(&listener);
      
       }
      
       end:
      
       close_socket(&listen_list.sock);
      
       if (prefs.nat_map && switch_nat_get_type()) {
       	switch_nat_del_mapping(prefs.port, SWITCH_NAT_TCP);
       }
      
       if (pool) {
       	switch_core_destroy_memory_pool(&pool);
       }
      
       if (listener_pool) {
       	switch_core_destroy_memory_pool(&listener_pool);
       }
      
      
       for (x = 0; x < prefs.acl_count; x++) {
       	switch_safe_free(prefs.acl[x]);
       }
      
      fail:
       return SWITCH_STATUS_TERM;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
    2. mod_event_socket.c#config() 函数逻辑比较简洁,就是读取配置文件 event_socket.conf.xml 属性,并将其保存到 prefs 结构体中

      static int config(void)
      {
       char *cf = "event_socket.conf";
       switch_xml_t cfg, xml, settings, param;
      
       memset(&prefs, 0, sizeof(prefs));
      
       if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
       	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
       } else {
       	if ((settings = switch_xml_child(cfg, "settings"))) {
       		for (param = switch_xml_child(settings, "param"); param; param = param->next) {
       			char *var = (char *) switch_xml_attr_soft(param, "name");
       			char *val = (char *) switch_xml_attr_soft(param, "value");
      
       			if (!strcmp(var, "listen-ip")) {
       				set_pref_ip(val);
       			} else if (!strcmp(var, "debug")) {
       				globals.debug = atoi(val);
       			} else if (!strcmp(var, "nat-map")) {
       				if (switch_true(val) && switch_nat_get_type()) {
       					prefs.nat_map = 1;
       				}
       			} else if (!strcmp(var, "listen-port")) {
       				prefs.port = (uint16_t) atoi(val);
       			} else if (!strcmp(var, "password")) {
       				set_pref_pass(val);
       			} else if (!strcasecmp(var, "apply-inbound-acl") && ! zstr(val)) {
       				if (prefs.acl_count < MAX_ACL) {
       					prefs.acl[prefs.acl_count++] = strdup(val);
       				} else {
       					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Max acl records of %d reached\n", MAX_ACL);
       				}
       			} else if (!strcasecmp(var, "stop-on-bind-error")) {
       				prefs.stop_on_bind_error = switch_true(val) ? 1 : 0;
       			}
       		}
       	}
       	switch_xml_free(xml);
       }
      
       if (zstr(prefs.ip)) {
       	set_pref_ip("127.0.0.1");
       }
      
       if (zstr(prefs.password)) {
       	set_pref_pass("ClueCon");
       }
      
       if (!prefs.nat_map) {
       	prefs.nat_map = 0;
       }
      
       if (!prefs.acl_count) {
       	prefs.acl[prefs.acl_count++] = strdup("loopback.auto");
       }
      
       if (prefs.nat_map) {
       	prefs.nat_map = 0;
       }
      
       if (!prefs.port) {
       	prefs.port = 8021;
       }
      
       return 0;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
    3. 远程 Inbound 连接到来时,mod_event_socket.c#launch_listener_thread() 函数将被调用,可以看到此处核心处理是新建线程,并将 mod_event_socket.c#listener_run() 函数作为线程任务执行

      static switch_status_t launch_listener_thread(listener_t *listener)
      {
       switch_thread_t *thread;
       switch_threadattr_t *thd_attr = NULL;
      
       switch_threadattr_create(&thd_attr, listener->pool);
       switch_threadattr_detach_set(thd_attr, 1);
       switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
       return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    4. mod_event_socket.c#listener_run() 函数源码如下,可以看到其关键的处理分为以下几步:

      1. 首先是尝试对连接进行 acl 检查,实际上只会对 Inbound 连接进行 acl 检查,Outbound 连接跳过这步
      2. 接下来调用 mod_event_socket.c#add_listener() 函数将当前 listener 添加到内部 listen_list 列表
      3. 接着是用户授权认证,同样是只会对 Inbound 连接进行,Outbound 连接默认已经授权。授权完成后,这个连接可以正式开始交互,则在 while 循环中不断调用 mod_event_socket.c#read_packet() 函数读取对端数据
      4. 读取到对端数据后,再调用 mod_event_socket.c#parse_command() 函数解析传输过来的命令并执行
      static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
      {
       listener_t *listener = (listener_t *) obj;
       char buf[1024];
       switch_size_t len;
       switch_status_t status;
       switch_event_t *event;
       char reply[512] = "";
       switch_core_session_t *session = NULL;
       switch_channel_t *channel = NULL;
       switch_event_t *revent = NULL;
       const char *var;
       int locked = 1;
      
       switch_mutex_lock(globals.listener_mutex);
       prefs.threads++;
       switch_mutex_unlock(globals.listener_mutex);
      
       switch_assert(listener != NULL);
      
       if ((session = listener->session)) {
       	if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
       		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n");
       		locked = 0;
       		session = NULL;
       		goto done;
       	}
      
       	listener->lock_acquired = 1;
       }
      
       if (!listener->sock) {
       	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Listener socket is null!\n");
       	switch_clear_flag_locked(listener, LFLAG_RUNNING);
       	goto done;
       }
      
       switch_socket_opt_set(listener->sock, SWITCH_SO_TCP_NODELAY, TRUE);
       switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE);
      
       if (prefs.acl_count && listener->sa && !zstr(listener->remote_ip)) {
       	uint32_t x = 0;
      
       	for (x = 0; x < prefs.acl_count; x++) {
       		if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) {
       			const char message[] = "Access Denied, go away.\n";
       			int mlen = (int)strlen(message);
      
       			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "IP %s Rejected by acl \"%s\"\n", listener->remote_ip,
       							  prefs.acl[x]);
      
       			switch_snprintf(buf, sizeof(buf), "Content-Type: text/rude-rejection\nContent-Length: %d\n\n", mlen);
       			len = strlen(buf);
       			switch_socket_send(listener->sock, buf, &len);
       			len = mlen;
       			switch_socket_send(listener->sock, message, &len);
       			goto done;
       		}
       	}
       }
      
       if (globals.debug > 0) {
       	if (zstr(listener->remote_ip)) {
       		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open\n");
       	} else {
       		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open from %s:%d\n", listener->remote_ip,
       						  listener->remote_port);
       	}
       }
      
       switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE);
       switch_set_flag_locked(listener, LFLAG_RUNNING);
       add_listener(listener);
      
       if (session && switch_test_flag(listener, LFLAG_AUTHED)) {
       	switch_event_t *ievent = NULL;
      
       	switch_set_flag_locked(listener, LFLAG_SESSION);
       	status = read_packet(listener, &ievent, 25);
      
       	if (status != SWITCH_STATUS_SUCCESS || !ievent) {
       		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Socket Error!\n");
       		switch_clear_flag_locked(listener, LFLAG_RUNNING);
       		goto done;
       	}
      
      
       	if (parse_command(listener, &ievent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
       		switch_clear_flag_locked(listener, LFLAG_RUNNING);
       		goto done;
       	}
      
      
       } else {
       	switch_snprintf(buf, sizeof(buf), "Content-Type: auth/request\n\n");
      
       	len = strlen(buf);
       	switch_socket_send(listener->sock, buf, &len);
      
       	while (!switch_test_flag(listener, LFLAG_AUTHED)) {
       		status = read_packet(listener, &event, 25);
       		if (status != SWITCH_STATUS_SUCCESS) {
       			goto done;
       		}
       		if (!event) {
       			continue;
       		}
      
       		if (parse_command(listener, &event, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
       			switch_clear_flag_locked(listener, LFLAG_RUNNING);
       			goto done;
       		}
       		if (*reply != '\0') {
       			if (*reply == '~') {
       				switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1);
       			} else {
       				switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply);
       			}
       			len = strlen(buf);
       			switch_socket_send(listener->sock, buf, &len);
       		}
       		break;
       	}
       }
      
       while (!prefs.done && switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) {
       	len = sizeof(buf);
       	memset(buf, 0, len);
       	status = read_packet(listener, &revent, 0);
      
       	if (status != SWITCH_STATUS_SUCCESS) {
       		break;
       	}
      
       	if (!revent) {
       		continue;
       	}
      
       	if (parse_command(listener, &revent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
       		switch_clear_flag_locked(listener, LFLAG_RUNNING);
       		break;
       	}
      
       	if (revent) {
       		switch_event_destroy(&revent);
       	}
      
       	if (*reply != '\0') {
       		if (*reply == '~') {
       			switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1);
       		} else {
       			switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply);
       		}
       		len = strlen(buf);
       		switch_socket_send(listener->sock, buf, &len);
       	}
      
       }
      
       done:
      
       if (revent) {
       	switch_event_destroy(&revent);
       }
      
       remove_listener(listener);
      
       if (globals.debug > 0) {
       	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
       }
      
       switch_thread_rwlock_wrlock(listener->rwlock);
       flush_listener(listener, SWITCH_TRUE, SWITCH_TRUE);
       switch_mutex_lock(listener->filter_mutex);
       if (listener->filters) {
       	switch_event_destroy(&listener->filters);
       }
       switch_mutex_unlock(listener->filter_mutex);
      
       if (listener->session && locked) {
       	channel = switch_core_session_get_channel(listener->session);
       }
      
       if (channel && switch_channel_get_state(channel) != CS_HIBERNATE &&
       	!switch_channel_test_flag(channel, CF_REDIRECT) && !switch_channel_test_flag(channel, CF_TRANSFER) && !switch_channel_test_flag(channel, CF_RESET) &&
       	(switch_test_flag(listener, LFLAG_RESUME) || ((var = switch_channel_get_variable(channel, "socket_resume")) && switch_true(var)))) {
       	switch_channel_set_state(channel, CS_RESET);
       }
      
       if (listener->sock) {
       	send_disconnect(listener, "Disconnected, goodbye.\nSee you at ClueCon! http://www.cluecon.com/\n");
       	close_socket(&listener->sock);
       }
      
       switch_thread_rwlock_unlock(listener->rwlock);
      
       if (globals.debug > 0) {
       	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Closed\n");
       }
      
       switch_core_hash_destroy(&listener->event_hash);
      
       if (listener->allowed_event_hash) {
       	switch_core_hash_destroy(&listener->allowed_event_hash);
       }
      
       if (listener->allowed_api_hash) {
       	switch_core_hash_destroy(&listener->allowed_api_hash);
       }
      
       if (listener->session) {
       	if (locked) {
       		switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
       	}
       	switch_clear_flag_locked(listener, LFLAG_SESSION);
       	if (locked) {
       		switch_core_session_rwunlock(listener->session);
       	}
       } else if (listener->pool) {
       	switch_memory_pool_t *pool = listener->pool;
       	switch_core_destroy_memory_pool(&pool);
       }
      
       switch_mutex_lock(globals.listener_mutex);
       prefs.threads--;
       switch_mutex_unlock(globals.listener_mutex);
      
       listener->finished = 1;
      
       return NULL;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162
      • 163
      • 164
      • 165
      • 166
      • 167
      • 168
      • 169
      • 170
      • 171
      • 172
      • 173
      • 174
      • 175
      • 176
      • 177
      • 178
      • 179
      • 180
      • 181
      • 182
      • 183
      • 184
      • 185
      • 186
      • 187
      • 188
      • 189
      • 190
      • 191
      • 192
      • 193
      • 194
      • 195
      • 196
      • 197
      • 198
      • 199
      • 200
      • 201
      • 202
      • 203
      • 204
      • 205
      • 206
      • 207
      • 208
      • 209
      • 210
      • 211
      • 212
      • 213
      • 214
      • 215
      • 216
      • 217
      • 218
      • 219
      • 220
      • 221
      • 222
      • 223
      • 224
      • 225
      • 226
      • 227
      • 228
      • 229
      • 230
      • 231
    5. mod_event_socket.c#read_packet() 函数体比较琐碎,简单来说关键处理是以下几步:

      1. 通过 switch_apr.c#switch_socket_recv() 函数调用底层接口读取 socket 数据,有数据则将其处理后封装到 switch_event_t 结构体实例 event 中,跳出循环
      2. 如果没有读到数据,并且当前 listener 对应的远端已经订阅了事件,则将 listener->event_queue 队列中的事件出队,按照订阅方的订阅格式通过 switch_apr.c#switch_socket_send() 函数发送给对端
      static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout)
      {
       
       ......
      
       while (listener->sock && !prefs.done) {
       	uint8_t do_sleep = 1;
       	mlen = 1;
      
       	if (bytes == buf_len - 1) {
       		char *tmp;
       		int pos;
      
       		pos = (int)(ptr - mbuf);
       		buf_len += block_len;
       		tmp = realloc(mbuf, buf_len);
       		switch_assert(tmp);
       		mbuf = tmp;
       		memset(mbuf + bytes, 0, buf_len - bytes);
       		ptr = (mbuf + pos);
      
       	}
      
       	status = switch_socket_recv(listener->sock, ptr, &mlen);
      
       	if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) {
       		switch_goto_status(SWITCH_STATUS_FALSE, end);
       	}
      
       	if (mlen) {
       		bytes += mlen;
       		do_sleep = 0;
      
       		if (*mbuf == '\r' || *mbuf == '\n') {	/* bah */
       			ptr = mbuf;
       			mbuf[0] = '\0';
       			bytes = 0;
       			continue;
       		}
      
       		if (*ptr == '\n') {
       			crcount++;
       		} else if (*ptr != '\r') {
       			crcount = 0;
       		}
       		ptr++;
      
       		if (bytes >= max_len) {
       			crcount = 2;
       		}
      
       		if (crcount == 2) {
       			char *next;
       			char *cur = mbuf;
       			while (cur) {
       				if ((next = strchr(cur, '\r')) || (next = strchr(cur, '\n'))) {
       					while (*next == '\r' || *next == '\n') {
       						next++;
       					}
       				}
       				count++;
       				if (count == 1) {
       					switch_event_create(event, SWITCH_EVENT_CLONE);
       					switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, "Command", mbuf);
       				} else if (cur) {
       					char *var, *val;
       					var = cur;
       					strip_cr(var);
       					if (!zstr(var)) {
       						if ((val = strchr(var, ':'))) {
       							*val++ = '\0';
       							while (*val == ' ') {
       								val++;
       							}
       						}
       						if (val) {
       							switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, var, val);
       							if (!strcasecmp(var, "content-length")) {
       								clen = atoi(val);
      
       								if (clen > 0) {
       									char *body;
       									char *p;
      
       									switch_zmalloc(body, clen + 1);
      
       									p = body;
       									while (clen > 0) {
       										mlen = clen;
      
       										status = switch_socket_recv(listener->sock, p, &mlen);
      
       										if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) {
       											free(body);
       											switch_goto_status(SWITCH_STATUS_FALSE, end);
       										}
      
       										/*
       										   if (channel && !switch_channel_ready(channel)) {
       										   status = SWITCH_STATUS_FALSE;
       										   break;
       										   }
       										 */
      
       										clen -= (int) mlen;
       										p += mlen;
       									}
      
       									switch_event_add_body(*event, "%s", body);
       									free(body);
       								}
       							}
       						}
       					}
       				}
      
       				cur = next;
       			}
       			break;
       		}
       	}
       	if (!*mbuf) {
           
               ......
      
       		if (listener->session) {
       			switch_channel_t *chan = switch_core_session_get_channel(listener->session);
       			if (switch_channel_get_state(chan) < CS_HANGUP && switch_channel_test_flag(chan, CF_DIVERT_EVENTS)) {
       				switch_event_t *e = NULL;
       				while (switch_core_session_dequeue_event(listener->session, &e, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
       					if (switch_queue_trypush(listener->event_queue, e) != SWITCH_STATUS_SUCCESS) {
       						switch_core_session_queue_event(listener->session, &e);
       						break;
       					}
       				}
       			}
       		}
      
       		if (switch_test_flag(listener, LFLAG_EVENTS)) {
       			while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
       				char hbuf[512];
       				switch_event_t *pevent = (switch_event_t *) pop;
       				char *etype;
      
       				do_sleep = 0;
       				if (listener->format == EVENT_FORMAT_PLAIN) {
       					etype = "plain";
       					switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
       				} else if (listener->format == EVENT_FORMAT_JSON) {
       					etype = "json";
       					switch_event_serialize_json(pevent, &listener->ebuf);
       				} else {
       					switch_xml_t xml;
       					etype = "xml";
      
       					if ((xml = switch_event_xmlize(pevent, SWITCH_VA_NONE))) {
       						listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
       						switch_xml_free(xml);
       					} else {
       						switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "XML ERROR!\n");
       						goto endloop;
       					}
       				}
      
       				switch_assert(listener->ebuf);
      
       				len = strlen(listener->ebuf);
      
       				switch_snprintf(hbuf, sizeof(hbuf), "Content-Length: %" SWITCH_SSIZE_T_FMT "\n" "Content-Type: text/event-%s\n" "\n", len, etype);
      
       				len = strlen(hbuf);
       				switch_socket_send(listener->sock, hbuf, &len);
      
       				len = strlen(listener->ebuf);
       				switch_socket_send(listener->sock, listener->ebuf, &len);
      
       				switch_safe_free(listener->ebuf);
      
       			  endloop:
      
       				switch_event_destroy(&pevent);
       			}
       		}
       	}
      
       	......
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162
      • 163
      • 164
      • 165
      • 166
      • 167
      • 168
      • 169
      • 170
      • 171
      • 172
      • 173
      • 174
      • 175
      • 176
      • 177
      • 178
      • 179
      • 180
      • 181
      • 182
      • 183
      • 184
      • 185
      • 186
      • 187
      • 188
    6. 此时回到本节步骤4第4步mod_event_socket.c#parse_command() 函数非常长,主要逻辑是各个命令的处理,可以看到其关键处理如下:

      1. 首先调用 switch_event.h#switch_event_get_header() 函数从上一步封装好的 switch_event_t 结构体中获取到命令名称
      2. 命令的处理非常繁冗,此处以 event 订阅事件命令为例解析,该命令的主要处理逻辑是解析命令参数,将订阅的事件记录在 listener->event_list标记数组中并给 listener 添上 LFLAG_EVENTS 标记位,至此 Event Socket 事件订阅处理基本结束
      static switch_status_t parse_command(listener_t *listener, switch_event_t **event, char *reply, uint32_t reply_len)
      {
       switch_status_t status = SWITCH_STATUS_SUCCESS;
       char *cmd = NULL;
       char unload_cheat[] = "api bgapi unload mod_event_socket";
       char reload_cheat[] = "api bgapi reload mod_event_socket";
      
       *reply = '\0';
      
       if (!event || !*event || !(cmd = switch_event_get_header(*event, "command"))) {
       	switch_clear_flag_locked(listener, LFLAG_RUNNING);
       	switch_snprintf(reply, reply_len, "-ERR command parse error.");
       	goto done;
       }
      
       ......
       
       if (!strncasecmp(cmd, "sendevent", 9)) {
           
       ......
       
       } else if (!strncasecmp(cmd, "event", 5)) {
       	char *next, *cur;
       	uint32_t count = 0, key_count = 0;
       	uint8_t custom = 0;
      
       	strip_cr(cmd);
       	cur = cmd + 5;
      
       	if ((cur = strchr(cur, ' '))) {
       		for (cur++; cur; count++) {
       			switch_event_types_t type;
      
       			if ((next = strchr(cur, ' '))) {
       				*next++ = '\0';
       			}
      
       			if (!count) {
       				if (!strcasecmp(cur, "xml")) {
       					listener->format = EVENT_FORMAT_XML;
       					goto end;
       				} else if (!strcasecmp(cur, "plain")) {
       					listener->format = EVENT_FORMAT_PLAIN;
       					goto end;
       				} else if (!strcasecmp(cur, "json")) {
       					listener->format = EVENT_FORMAT_JSON;
       					goto end;
       				}
       			}
      
      
       			if (custom) {
       				if (!listener->allowed_event_hash || switch_core_hash_find(listener->allowed_event_hash, cur)) {
       					switch_core_hash_insert(listener->event_hash, cur, MARKER);
       				} else {
       					switch_snprintf(reply, reply_len, "-ERR permission denied");
       					goto done;
       				}
       			} else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) {
       				if (switch_test_flag(listener, LFLAG_AUTH_EVENTS) && !listener->allowed_event_list[type] &&
       					!switch_test_flag(listener, LFLAG_ALL_EVENTS_AUTHED)) {
       					switch_snprintf(reply, reply_len, "-ERR permission denied");
       					goto done;
       				}
      
       				key_count++;
       				if (type == SWITCH_EVENT_ALL) {
       					uint32_t x = 0;
       					for (x = 0; x < SWITCH_EVENT_ALL; x++) {
       						listener->event_list[x] = 1;
       					}
      
       					if (!listener->allowed_event_hash) {
       						set_all_custom(listener);
       					} else {
       						set_allowed_custom(listener);
       					}
      
       				}
       				if (type <= SWITCH_EVENT_ALL) {
       					listener->event_list[type] = 1;
       				}
       				if (type == SWITCH_EVENT_CUSTOM) {
       					custom++;
       				}
       			}
      
       		  end:
       			cur = next;
       		}
       	}
      
       	if (!key_count) {
       		switch_snprintf(reply, reply_len, "-ERR no keywords supplied");
       		goto done;
       	}
      
       	if (!switch_test_flag(listener, LFLAG_EVENTS)) {
       		switch_set_flag_locked(listener, LFLAG_EVENTS);
       	}
      
       	switch_snprintf(reply, reply_len, "+OK event listener enabled %s", format2str(listener->format));
      
       }
       
       ......
      
      done:
      
       if (zstr(reply)) {
       	switch_snprintf(reply, reply_len, "-ERR command not found");
       }
      
       done_noreply:
      
       if (event) {
       	switch_event_destroy(event);
       }
      
       return status;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
    2.2.2 事件分发的流程
    1. 经过上一节步骤5第2步的分析,我们知道远程连接监听的事件其实都来自于 listner 的内部队列,所以要了解事件的分发流程,关键是要知道队列中的数据是从哪里来。此时回顾2.1.3节步骤4第1步,Event Socket 模块在加载时注册了监听到 FreeSWITCH 的事件组件中,而事件组件的事件来源其实是宏定义 switch_event.h#switch_event_fire() 调用 switch_event.c#switch_event_fire_detailed() 函数。该函数比较简洁,核心处理如下:

      1. 根据 runtime.events_use_dispatch 属性决定外部调用方投递的事件的最终目的地,该属性默认值为1,也就是默认会调用 switch_event.c#switch_event_queue_dispatch_event() 投递到 EVENT_DISPATCH_QUEUE
      2. switch_event.c#switch_event_deliver_thread_pool() 函数执行时会将事件投递到 session_manager.thread_queue 队列,并根据线程池当前空闲状态决定是否创建新线程
      SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, const char *func, int line, switch_event_t **event, void *user_data)
      {
      
       switch_assert(BLOCK != NULL);
       switch_assert(RUNTIME_POOL != NULL);
       switch_assert(EVENT_QUEUE_MUTEX != NULL);
       switch_assert(RUNTIME_POOL != NULL);
      
       if (SYSTEM_RUNNING <= 0) {
       	/* sorry we're closed */
       	switch_event_destroy(event);
       	return SWITCH_STATUS_SUCCESS;
       }
      
       if (user_data) {
       	(*event)->event_user_data = user_data;
       }
      
       if (runtime.events_use_dispatch) {
       	check_dispatch();
      
       	if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
       		switch_event_destroy(event);
       		return SWITCH_STATUS_FALSE;
       	}
       } else {
       	switch_event_deliver_thread_pool(event);
       }
      
       return SWITCH_STATUS_SUCCESS;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
    2. switch_event.c#switch_event_queue_dispatch_event() 函数的核心处理如下:

      1. 计算 DISPATCH_QUEUE_LEN 队列空闲状态,如果队列中数据已经超过阈值,则检查当前事件分发线程数量是否达到最大值,最终决定是否需要新建事件分发线程
      2. 如果需要新建线程,则执行 switch_event.c#switch_event_launch_dispatch_threads() 函数启动新线程
      3. 最后调用函数 switch_apr.c#switch_queue_push() 将事件添加到 DISPATCH_QUEUE_LEN 队列
      static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp)
      {
      
       switch_event_t *event = *eventp;
      
       if (!SYSTEM_RUNNING) {
       	return SWITCH_STATUS_FALSE;
       }
      
       while (event) {
       	int launch = 0;
      
       	switch_mutex_lock(EVENT_QUEUE_MUTEX);
      
       	if (!PENDING && switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
       		if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
       			launch++;
       			PENDING++;
       		}
       	}
      
       	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
      
       	if (launch) {
       		if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
       			switch_event_launch_dispatch_threads(SOFT_MAX_DISPATCH + 1);
       		}
      
       		switch_mutex_lock(EVENT_QUEUE_MUTEX);
       		PENDING--;
       		switch_mutex_unlock(EVENT_QUEUE_MUTEX);
       	}
      
       	*eventp = NULL;
       	switch_queue_push(EVENT_DISPATCH_QUEUE, event);
       	event = NULL;
      
       }
      
       return SWITCH_STATUS_SUCCESS;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
    3. 事件已经入队,实际对 DISPATCH_QUEUE_LEN 队列进行处理的是在 2.1.2节步骤4 提到的线程任务 switch_event.c#switch_event_dispatch_thread() 中。可以看到这个函数的核心操作是在 for 空循环中将 DISPATCH_QUEUE_LEN 队列的数据轮询出来,随后调用 switch_event.c#switch_event_deliver() 函数进行处理

      static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj)
      {
       switch_queue_t *queue = (switch_queue_t *) obj;
       int my_id = 0;
      
       switch_mutex_lock(EVENT_QUEUE_MUTEX);
       THREAD_COUNT++;
       DISPATCH_THREAD_COUNT++;
      
       for (my_id = 0; my_id < MAX_DISPATCH_VAL; my_id++) {
       	if (EVENT_DISPATCH_QUEUE_THREADS[my_id] == thread) {
       		break;
       	}
       }
      
       if ( my_id >= MAX_DISPATCH_VAL ) {
       	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
       	return NULL;
       }
      
       EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
      
      
       for (;;) {
       	void *pop = NULL;
       	switch_event_t *event = NULL;
      
       	if (!SYSTEM_RUNNING) {
       		break;
       	}
      
       	if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
       		continue;
       	}
      
       	if (!pop) {
       		break;
       	}
      
       	event = (switch_event_t *) pop;
       	switch_event_deliver(&event);
       	switch_os_yield();
       }
      
      
       switch_mutex_lock(EVENT_QUEUE_MUTEX);
       EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 0;
       THREAD_COUNT--;
       DISPATCH_THREAD_COUNT--;
       switch_mutex_unlock(EVENT_QUEUE_MUTEX);
      
       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id);
       return NULL;
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
    4. switch_event.c#switch_event_deliver() 函数非常简洁,核心处理就是遍历 EVENT_NODES 列表,调用注册方的回调函数将事件分发到订阅方。此处结合2.1.3节步骤4第1步,可以知道会调用到 mod_event_socket.c#event_handler() 函数

      SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event)
      {
       switch_event_types_t e;
       switch_event_node_t *node;
      
       if (SYSTEM_RUNNING) {
       	switch_thread_rwlock_rdlock(RWLOCK);
       	for (e = (*event)->event_id;; e = SWITCH_EVENT_ALL) {
       		for (node = EVENT_NODES[e]; node; node = node->next) {
       			if (switch_events_match(*event, node)) {
       				(*event)->bind_user_data = node->user_data;
       				node->callback(*event);
       			}
       		}
      
       		if (e == SWITCH_EVENT_ALL) {
       			break;
       		}
       	}
       	switch_thread_rwlock_unlock(RWLOCK);
       }
      
       switch_event_destroy(event);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    5. mod_event_socket.c#event_handler() 函数源码比较繁琐,不过核心处理并不复杂,关键如下:

      1. 遍历全局列表 listen_list.listeners,判断当前事件是否当前 listener 订阅的
      2. 是的话再检查 listener 上是否有 filter,有则经过 filter 过滤后最终确定需要发送该事件则将其拷贝一份通过 switch_apr.c#switch_queue_trypush() 函数将事件添加到 listener 私属的事件队列。至此事件已经入队,结合2.2.1节步骤5第2步,事件最终发送给了外部订阅方,事件分发流程的分析基本结束
      static void event_handler(switch_event_t *event)
      {
       switch_event_t *clone = NULL;
       listener_t *l, *lp, *last = NULL;
       time_t now = switch_epoch_time_now(NULL);
       switch_status_t qstatus;
      
       switch_assert(event != NULL);
      
       if (!listen_list.ready) {
       	return;
       }
      
       switch_mutex_lock(globals.listener_mutex);
      
       lp = listen_list.listeners;
      
       while (lp) {
       	int send = 0;
      
       	l = lp;
       	lp = lp->next;
      
       	if (switch_test_flag(l, LFLAG_STATEFUL) && (l->expire_time || (l->timeout && now - l->last_flush > l->timeout))) {
       		if (expire_listener(&l) == SWITCH_STATUS_SUCCESS) {
       			if (last) {
       				last->next = lp;
       			} else {
       				listen_list.listeners = lp;
       			}
       			continue;
       		}
       	}
      
       	if (l->expire_time || !switch_test_flag(l, LFLAG_EVENTS)) {
       		last = l;
       		continue;
       	}
      
       	if (l->event_list[SWITCH_EVENT_ALL]) {
       		send = 1;
       	} else if ((l->event_list[event->event_id])) {
       		if (event->event_id != SWITCH_EVENT_CUSTOM || !event->subclass_name || (switch_core_hash_find(l->event_hash, event->subclass_name))) {
       			send = 1;
       		}
       	}
      
       	if (send) {
       		switch_mutex_lock(l->filter_mutex);
      
       		if (l->filters && l->filters->headers) {
       			switch_event_header_t *hp;
       			const char *hval;
      
       			send = 0;
      
       			for (hp = l->filters->headers; hp; hp = hp->next) {
       				if ((hval = switch_event_get_header(event, hp->name))) {
       					const char *comp_to = hp->value;
       					int pos = 1, cmp = 0;
      
       					while (comp_to && *comp_to) {
       						if (*comp_to == '+') {
       							pos = 1;
       						} else if (*comp_to == '-') {
       							pos = 0;
       						} else if (*comp_to != ' ') {
       							break;
       						}
       						comp_to++;
       					}
      
       					if (send && pos) {
       						continue;
       					}
      
       					if (!comp_to) {
       						continue;
       					}
      
       					if (*hp->value == '/') {
       						switch_regex_t *re = NULL;
       						int ovector[30];
       						cmp = !!switch_regex_perform(hval, comp_to, &re, ovector, sizeof(ovector) / sizeof(ovector[0]));
       						switch_regex_safe_free(re);
       					} else {
       						cmp = !strcasecmp(hval, comp_to);
       					}
      
       					if (cmp) {
       						if (pos) {
       							send = 1;
       						} else {
       							send = 0;
       							break;
       						}
       					}
       				}
       			}
       		}
      
       		switch_mutex_unlock(l->filter_mutex);
       	}
      
       	if (send && switch_test_flag(l, LFLAG_MYEVENTS)) {
       		char *uuid = switch_event_get_header(event, "unique-id");
       		if (!uuid || (l->session && strcmp(uuid, switch_core_session_get_uuid(l->session)))) {
       			send = 0;
       		}
       		if (!strcmp(switch_core_session_get_uuid(l->session), switch_event_get_header_nil(event, "Job-Owner-UUID"))) {
       		    send = 1;
       		}
       	}
      
       	if (send) {
       		if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
       			qstatus = switch_queue_trypush(l->event_queue, clone); 
       			if (qstatus == SWITCH_STATUS_SUCCESS) {
       				if (l->lost_events) {
       					int le = l->lost_events;
       					l->lost_events = 0;
       					switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u]\n", le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN);
       				}
       			} else {
       				char errbuf[512] = {0};
       				unsigned int qsize = switch_queue_size(l->event_queue);
       				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
       						"Event enqueue ERROR [%d] | [%s] | Queue size: [%u/%u] %s\n", 
       						(int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN)?"Max queue size reached":"");
       				if (++l->lost_events > MAX_MISSED) {
       					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events. Lost [%d] Queue size[%u/%u]\n", l->lost_events, qsize, MAX_QUEUE_LEN);
       					kill_listener(l, "killed listener because of lost events\n");
       				}
       				switch_event_destroy(&clone);
       			}
       		} else {
       			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_ERROR, "Memory Error!\n");
       		}
       	}
       	last = l;
       }
       switch_mutex_unlock(globals.listener_mutex);
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
  • 相关阅读:
    MQ消息队列(四)——RabbitMQ进阶,惰性队列
    UniApp 在 iOS 16 下真机自定义基座调试指南,可解决模拟器无法输出调试日志问题
    MongoDB - 事务支持
    数据包络分析——SBM模型
    # js 模块化(commonjs、AMD、CMD、ES6)##
    Rust常用特型之TryFrom和TryInto特型
    矢量绘图软件源码定制开发,类似visio绘图,大量复合图元模板,可编程动态控制图元
    AMSR/ADEOS-II L1A Raw Observation Counts V003地球表面和大气微波辐射的详细观测数据
    来自云仓酒庄雷盛红酒分享关于葡萄酒和氧气的基本知识
    Flutter的Timer类
  • 原文地址:https://blog.csdn.net/weixin_45505313/article/details/126891426