FreeSWITCH 是一个开源的电话软交换平台,使用广泛,功能强大。本文基于 FreeSWITCH 1.10 版本,读者如有兴趣可以自行点击链接进入 github 下载源码。下图为 FreeSWITCH 服务启动及 Event Socket 模块运行工作的源代码时序,下文将对源码流程进行代码分析

FreeSWITCH 是用 C 语言写的,服务启动的入口为 switch.c#main() 函数。这个函数非常长,不过主要的处理大致分为以下几步:
- 调用
switch_core.c#switch_core_set_globals()函数设置重要的运行时文件夹路径,比如模块文件的文件夹等- 调用
switch_core.c#switch_core_init_and_modload()函数初始化系统并加载模块,这部分是重点- 调用
switch_core.c#switch_core_runtime_loop()函数开启主线程循环保持系统运行
int main(int argc, char *argv[])
{
......
switch_core_set_globals();
pid = getpid();
memset(pid_buffer, 0, sizeof(pid_buffer));
switch_snprintf(pid_path, sizeof(pid_path), "%s%s%s", SWITCH_GLOBAL_dirs.run_dir, SWITCH_PATH_SEPARATOR, pfile);
switch_snprintf(pid_buffer, sizeof(pid_buffer), "%d", pid);
pid_len = strlen(pid_buffer);
apr_pool_create(&pool, NULL);
......
if (switch_core_init_and_modload(flags, nc ? SWITCH_FALSE : SWITCH_TRUE, &err) != SWITCH_STATUS_SUCCESS) {
fprintf(stderr, "Cannot Initialize [%s]\n", err);
return 255;
}
......
switch_core_runtime_loop(nc);
......
}
switch_core.c#switch_core_init_and_modload() 函数是初始化重要组件和模块的入口,不过本文关注的主要是以下几个函数调用,FreeSWITCH 核心启动的主要逻辑其实就是拉起关键组件及模块
switch_core.c#switch_core_init()函数负责初始化 FreeSWITCH 核心的重要组件switch_loadable_module.c#switch_loadable_module_init()函数将加载 FreeSWITCH 核心之外的可插拔模块
SWITCH_DECLARE(switch_status_t) switch_core_init_and_modload(switch_core_flag_t flags, switch_bool_t console, const char **err)
{
......
if (switch_core_init(flags, console, err) != SWITCH_STATUS_SUCCESS) {
return SWITCH_STATUS_GENERR;
}
if (runtime.runlevel > 1) {
/* one per customer */
return SWITCH_STATUS_SUCCESS;
}
runtime.runlevel++;
runtime.events_use_dispatch = 1;
switch_core_set_signal_handlers();
switch_load_network_lists(SWITCH_FALSE);
switch_msrp_init();
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Bringing up environment.\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Loading Modules.\n");
if (switch_loadable_module_init(SWITCH_TRUE) != SWITCH_STATUS_SUCCESS) {
*err = "Cannot load modules";
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Error: %s\n", *err);
return SWITCH_STATUS_GENERR;
}
switch_load_network_lists(SWITCH_FALSE);
switch_load_core_config("post_load_switch.conf");
switch_core_set_signal_handlers();
......
}
此时回到本节步骤1第3步,switch_core.c#switch_core_runtime_loop() 函数其实就是根据前后台启动方式来开启主线程循环保持系统运行,至此宏观上 FreeSWITCH 核心启动的流程基本结束
如果 FreeSWITCH 以前台方式启动,则调用
switch_console.c#switch_console_loop()函数启动控制台循环,接收控制台输入的命令并处理
SWITCH_DECLARE(void) switch_core_runtime_loop(int bg)
{
#ifdef WIN32
HANDLE shutdown_event;
char path[256] = "";
#endif
if (bg) {
#ifdef WIN32
switch_snprintf(path, sizeof(path), "Global\\Freeswitch.%d", getpid());
shutdown_event = CreateEvent(NULL, FALSE, FALSE, path);
if (shutdown_event) {
WaitForSingleObject(shutdown_event, INFINITE);
}
#else
while (runtime.running) {
switch_yield(1000000);
}
#endif
} else {
/* wait for console input */
switch_console_loop();
}
}
在 2.1.1节步骤2第1步 中,笔者提到 switch_core.c#switch_core_init() 函数会初始化 FreeSWITCH 核心的重要组件和运行时属性,其大致源码如下,本文涉及的主要是以下几部分:
- 调用函数
switch_core_session.c#switch_core_session_init()初始化 Session 管理器并创建对应的事件队列,这部分本文暂不深入- 调用
switch_event.c#switch_event_init()函数初始化事件分发组件,包括启动事件分发线程及队列创建等- 执行
switch_core.c#switch_load_core_config()函数解析 FreeSWITCH 的核心配置文件switch.conf.xml,将配置属性加载进内存中- 执行
switch_scheduler.c#switch_scheduler_task_thread_start()函数启动内部定时任务线程,后续可以添加心跳通知等定时任务到内部列表,等待触发执行
SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switch_bool_t console, const char **err)
{
switch_uuid_t uuid;
char guess_ip[256];
int mask = 0;
struct in_addr in;
if (runtime.runlevel > 0) {
/* one per customer */
return SWITCH_STATUS_SUCCESS;
}
memset(&runtime, 0, sizeof(runtime));
gethostname(runtime.hostname, sizeof(runtime.hostname));
runtime.shutdown_cause = SWITCH_CAUSE_SYSTEM_SHUTDOWN;
runtime.max_db_handles = 50;
runtime.db_handle_timeout = 5000000;
runtime.event_heartbeat_interval = 20;
......
if (!runtime.cpu_count) runtime.cpu_count = 1;
if (sqlite3_initialize() != SQLITE_OK) {
*err = "FATAL ERROR! Could not initialize SQLite\n";
return SWITCH_STATUS_MEMERR;
}
/* INIT APR and Create the pool context */
if (apr_initialize() != SWITCH_STATUS_SUCCESS) {
*err = "FATAL ERROR! Could not initialize APR\n";
return SWITCH_STATUS_MEMERR;
}
if (!(runtime.memory_pool = switch_core_memory_init())) {
*err = "FATAL ERROR! Could not allocate memory pool\n";
return SWITCH_STATUS_MEMERR;
}
......
switch_core_session_init(runtime.memory_pool);
......
switch_event_init(runtime.memory_pool);
......
switch_load_core_config("switch.conf");
......
switch_scheduler_task_thread_start();
......
switch_scheduler_add_task(switch_epoch_time_now(NULL), heartbeat_callback, "heartbeat", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL);
switch_scheduler_add_task(switch_epoch_time_now(NULL), check_ip_callback, "check_ip", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL | SSHF_OWN_THREAD);
......
return SWITCH_STATUS_SUCCESS;
}
switch_event.c#switch_event_init() 函数会根据当前机器的核心数计算确定事件分发线程的最大数量MAX_DISPATCH,核心流程是调用 switch_event.c#check_dispatch() 函数创建并启动事件分发线程
SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
{
/* don't need any more dispatch threads than we have CPU's*/
MAX_DISPATCH = (switch_core_cpu_count() / 2) + 1;
if (MAX_DISPATCH < 2) {
MAX_DISPATCH = 2;
}
switch_assert(pool != NULL);
THRUNTIME_POOL = RUNTIME_POOL = pool;
switch_thread_rwlock_create(&RWLOCK, RUNTIME_POOL);
switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_mutex_init(&CUSTOM_HASH_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_core_hash_init(&CUSTOM_HASH);
if (switch_core_test_flag(SCF_MINIMAL)) {
return SWITCH_STATUS_SUCCESS;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n");
switch_core_hash_init(&event_channel_manager.lahash);
switch_mutex_init(&event_channel_manager.lamutex, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_thread_rwlock_create(&event_channel_manager.rwlock, RUNTIME_POOL);
switch_core_hash_init(&event_channel_manager.hash);
switch_core_hash_init(&event_channel_manager.perm_hash);
event_channel_manager.ID = 1;
switch_mutex_lock(EVENT_QUEUE_MUTEX);
SYSTEM_RUNNING = -1;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
//switch_threadattr_create(&thd_attr, pool);
switch_find_local_ip(guess_ip_v4, sizeof(guess_ip_v4), NULL, AF_INET);
switch_find_local_ip(guess_ip_v6, sizeof(guess_ip_v6), NULL, AF_INET6);
#ifdef SWITCH_EVENT_RECYCLE
switch_queue_create(&EVENT_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
switch_queue_create(&EVENT_HEADER_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
#endif
check_dispatch();
switch_mutex_lock(EVENT_QUEUE_MUTEX);
SYSTEM_RUNNING = 1;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
return SWITCH_STATUS_SUCCESS;
}
switch_event.c#check_dispatch() 函数会创建事件分发队列EVENT_DISPATCH_QUEUE,并设置队列的容量为DISPATCH_QUEUE_LEN * MAX_DISPATCH,最后通过 switch_event.c#switch_event_launch_dispatch_threads() 函数创建 1 个事件分发线程
static void check_dispatch(void)
{
if (!EVENT_DISPATCH_QUEUE) {
switch_mutex_lock(BLOCK);
if (!EVENT_DISPATCH_QUEUE) {
switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, THRUNTIME_POOL);
switch_event_launch_dispatch_threads(1);
while (!THREAD_COUNT) {
switch_cond_next();
}
}
switch_mutex_unlock(BLOCK);
}
}
switch_event.c#switch_event_launch_dispatch_threads() 函数会调用库函数创建事件分发线程,并将 switch_event.c#switch_event_dispatch_thread() 函数设置为线程任务。需注意此处会维护一个事件分发线程的数组 EVENT_DISPATCH_QUEUE_THREADS,如果数组下标上已经有线程存在了,则不会重复创建,至此事件分发组件的初始化基本结束
SWITCH_DECLARE(void) switch_event_launch_dispatch_threads(uint32_t max)
{
switch_threadattr_t *thd_attr;
uint32_t index = 0;
int launched = 0;
uint32_t sanity = 200;
switch_memory_pool_t *pool = RUNTIME_POOL;
check_dispatch();
if (max > MAX_DISPATCH) {
return;
}
if (max < SOFT_MAX_DISPATCH) {
return;
}
for (index = SOFT_MAX_DISPATCH; index < max && index < MAX_DISPATCH; index++) {
if (EVENT_DISPATCH_QUEUE_THREADS[index]) {
continue;
}
switch_threadattr_create(&thd_attr, pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE, pool);
while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
if (index == 1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create event dispatch thread %d\n", index);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create additional event dispatch thread %d\n", index);
}
launched++;
}
SOFT_MAX_DISPATCH = index;
}
在 2.1.1节步骤2第2步 中,执行 switch_loadable_module.c#switch_loadable_module_init() 函数将会加载配置文件中的配置的模块,其关键点如下:
- 模块配置由三个配置文件 pre_load_modules.conf.xml、modules.conf.xm、post_load_modules.conf.xml 共同组成,三者主要是加载顺序的区别,因为有些模块相互之间可能存在依赖,其他并无不同
- 在加载完前置模块之后,先调用
sswitch_core.c#witch_core_sqldb_init()函数初始化 sql 数据库- 加载模块的核心是
switch_loadable_module.c#switch_loadable_module_load_module_ex()函数,这个函数会将指定文件夹下的模块文件加载进内存- 模块加载进内存后,会调用
switch_loadable_module.c#switch_loadable_module_runtime()函数为每一个模块新起线程执行其定义的 runtime 函数
SWITCH_DECLARE(switch_status_t) switch_loadable_module_init(switch_bool_t autoload)
{
apr_finfo_t finfo = { 0 };
apr_dir_t *module_dir_handle = NULL;
apr_int32_t finfo_flags = APR_FINFO_DIRENT | APR_FINFO_TYPE | APR_FINFO_NAME;
char *precf = "pre_load_modules.conf";
char *cf = "modules.conf";
char *pcf = "post_load_modules.conf";
switch_xml_t cfg, xml;
unsigned char all = 0;
unsigned int count = 0;
const char *err;
switch_hash_t *event_hash;
switch_hash_index_t *hi;
void *hash_val;
switch_event_t *event;
#ifdef WIN32
const char *ext = ".dll";
const char *EXT = ".DLL";
#elif defined (MACOSX) || defined (DARWIN)
const char *ext = ".dylib";
const char *EXT = ".DYLIB";
#else
const char *ext = ".so";
const char *EXT = ".SO";
#endif
memset(&loadable_modules, 0, sizeof(loadable_modules));
switch_core_new_memory_pool(&loadable_modules.pool);
#ifdef WIN32
switch_loadable_module_path_init();
#endif
switch_core_hash_init(&loadable_modules.module_hash);
switch_core_hash_init_nocase(&loadable_modules.endpoint_hash);
switch_core_hash_init_nocase(&loadable_modules.codec_hash);
switch_core_hash_init_nocase(&loadable_modules.timer_hash);
switch_core_hash_init_nocase(&loadable_modules.application_hash);
switch_core_hash_init_nocase(&loadable_modules.chat_application_hash);
switch_core_hash_init_nocase(&loadable_modules.api_hash);
switch_core_hash_init_nocase(&loadable_modules.json_api_hash);
switch_core_hash_init(&loadable_modules.file_hash);
switch_core_hash_init_nocase(&loadable_modules.speech_hash);
switch_core_hash_init_nocase(&loadable_modules.asr_hash);
switch_core_hash_init_nocase(&loadable_modules.directory_hash);
switch_core_hash_init_nocase(&loadable_modules.chat_hash);
switch_core_hash_init_nocase(&loadable_modules.say_hash);
switch_core_hash_init_nocase(&loadable_modules.management_hash);
switch_core_hash_init_nocase(&loadable_modules.limit_hash);
switch_core_hash_init_nocase(&loadable_modules.database_hash);
switch_core_hash_init_nocase(&loadable_modules.dialplan_hash);
switch_core_hash_init(&loadable_modules.secondary_recover_hash);
switch_mutex_init(&loadable_modules.mutex, SWITCH_MUTEX_NESTED, loadable_modules.pool);
if (!autoload) return SWITCH_STATUS_SUCCESS;
/*
switch_core_sqldb_init() is not yet ready and is executed after starting modules from pre_load_modules.conf
Modules loading procedure generates events used by sqldb.
This is why we should hold those events (storing in the event_hash) not firing them until sqldb is ready.
*/
switch_core_hash_init(&event_hash);
/*
Pre-load core modules.
Do not pre-load modules which may use databases,
use appropriate section.
*/
switch_loadable_module_load_module_ex("", "CORE_SOFTTIMER_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
switch_loadable_module_load_module_ex("", "CORE_PCM_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
switch_loadable_module_load_module_ex("", "CORE_SPEEX_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
/*
Loading pre-load modules.
Database modules must be loaded here.
*/
if ((xml = switch_xml_open_cfg(precf, &cfg, NULL))) {
switch_xml_t mods, ld;
if ((mods = switch_xml_child(cfg, "modules"))) {
for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
switch_bool_t global = SWITCH_FALSE;
const char *val = switch_xml_attr_soft(ld, "module");
const char *path = switch_xml_attr_soft(ld, "path");
const char *critical = switch_xml_attr_soft(ld, "critical");
const char *sglobal = switch_xml_attr_soft(ld, "global");
if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
continue;
}
global = switch_true(sglobal);
if (path && zstr(path)) {
path = SWITCH_GLOBAL_dirs.mod_dir;
}
if (switch_loadable_module_load_module_ex((char *)path, (char *)val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_PRELOAD, event_hash) == SWITCH_STATUS_GENERR) {
if (critical && switch_true(critical)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val);
switch_core_hash_destroy(&event_hash);
abort();
}
}
count++;
}
}
switch_xml_free(xml);
}
else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", precf);
}
if (switch_core_sqldb_init(&err) != SWITCH_STATUS_SUCCESS)
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Loading modules interrupted. [Error: %s]\n", err);
switch_core_hash_destroy(&event_hash);
return SWITCH_STATUS_GENERR;
}
/* sqldb is ready. Fire holding events! */
for (hi = switch_core_hash_first(event_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, &hash_val);
event = (switch_event_t *)hash_val;
switch_event_fire(&event);
}
switch_core_hash_destroy(&event_hash);
/*
To perevent locking.
Core modules which may use databases should be pre-loaded here
(databases are loaded already).
*/
#ifdef SWITCH_HAVE_YUV
#ifdef SWITCH_HAVE_VPX
switch_loadable_module_load_module("", "CORE_VPX_MODULE", SWITCH_FALSE, &err);
#endif
#endif
/* Loading common modules */
if ((xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
switch_xml_t mods, ld;
if ((mods = switch_xml_child(cfg, "modules"))) {
for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
switch_bool_t global = SWITCH_FALSE;
const char *val = switch_xml_attr_soft(ld, "module");
const char *path = switch_xml_attr_soft(ld, "path");
const char *critical = switch_xml_attr_soft(ld, "critical");
const char *sglobal = switch_xml_attr_soft(ld, "global");
if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
continue;
}
global = switch_true(sglobal);
if (path && zstr(path)) {
path = SWITCH_GLOBAL_dirs.mod_dir;
}
if (switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, NULL) == SWITCH_STATUS_GENERR) {
if (critical && switch_true(critical)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val);
abort();
}
}
count++;
}
}
switch_xml_free(xml);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", cf);
}
if ((xml = switch_xml_open_cfg(pcf, &cfg, NULL))) {
switch_xml_t mods, ld;
if ((mods = switch_xml_child(cfg, "modules"))) {
for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
switch_bool_t global = SWITCH_FALSE;
const char *val = switch_xml_attr_soft(ld, "module");
const char *path = switch_xml_attr_soft(ld, "path");
const char *sglobal = switch_xml_attr_soft(ld, "global");
if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
continue;
}
global = switch_true(sglobal);
if (path && zstr(path)) {
path = SWITCH_GLOBAL_dirs.mod_dir;
}
switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_POSTLOAD, NULL);
count++;
}
}
switch_xml_free(xml);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", pcf);
}
if (!count) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "No modules loaded, assuming 'load all'\n");
all = 1;
}
if (all) {
if (apr_dir_open(&module_dir_handle, SWITCH_GLOBAL_dirs.mod_dir, loadable_modules.pool) != APR_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Can't open directory: %s\n", SWITCH_GLOBAL_dirs.mod_dir);
return SWITCH_STATUS_GENERR;
}
while (apr_dir_read(&finfo, finfo_flags, module_dir_handle) == APR_SUCCESS) {
const char *fname = finfo.fname;
if (finfo.filetype != APR_REG) {
continue;
}
if (!fname) {
fname = finfo.name;
}
if (!fname) {
continue;
}
if (zstr(fname) || (!strstr(fname, ext) && !strstr(fname, EXT))) {
continue;
}
switch_loadable_module_load_module(SWITCH_GLOBAL_dirs.mod_dir, fname, SWITCH_FALSE, &err);
}
apr_dir_close(module_dir_handle);
}
switch_loadable_module_runtime();
memset(&chat_globals, 0, sizeof(chat_globals));
chat_globals.running = 1;
chat_globals.pool = loadable_modules.pool;
switch_mutex_init(&chat_globals.mutex, SWITCH_MUTEX_NESTED, chat_globals.pool);
chat_thread_start(1);
return SWITCH_STATUS_SUCCESS;
}
switch_loadable_module.c#switch_loadable_module_load_module_ex() 函数的核心处理如下:
- 首先根据模块名称计算对应的模块的动态库文件名称,然后通过
switch_core_hash.c#switch_core_hash_find_locked()方法查找哈希表 loadable_modules.module_hash 中是否存在该文件名,如果不存在说明模块还没有加载过,需要加载switch_loadable_module.c#switch_loadable_module_load_file()函数负责加载动态库文件- 如果加载动态库文件成功,则调用
switch_loadable_module.c#switch_loadable_module_process()将加载完成的模块插入哈希表 loadable_modules.module_hash,并生成模块加载的事件将其推入到 FreeSWITCH 事件组件中
static switch_status_t switch_loadable_module_load_module_ex(const char *dir, const char *fname, switch_bool_t runtime, switch_bool_t global, const char **err, switch_loadable_module_type_t type, switch_hash_t *event_hash)
{
switch_size_t len = 0;
char *path;
char *file, *dot;
switch_loadable_module_t *new_module = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
#ifdef WIN32
const char *ext = ".dll";
#else
const char *ext = ".so";
#endif
*err = "";
if ((file = switch_core_strdup(loadable_modules.pool, fname)) == 0) {
*err = "allocation error";
return SWITCH_STATUS_FALSE;
}
if (switch_is_file_path(file)) {
path = switch_core_strdup(loadable_modules.pool, file);
file = (char *) switch_cut_path(file);
if ((dot = strchr(file, '.'))) {
*dot = '\0';
}
} else {
if ((dot = strchr(file, '.'))) {
*dot = '\0';
}
len = strlen(switch_str_nil(dir));
len += strlen(file);
len += 8;
path = (char *) switch_core_alloc(loadable_modules.pool, len);
switch_snprintf(path, len, "%s%s%s%s", switch_str_nil(dir), SWITCH_PATH_SEPARATOR, file, ext);
}
if (switch_core_hash_find_locked(loadable_modules.module_hash, file, loadable_modules.mutex)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Module %s Already Loaded!\n", file);
*err = "Module already loaded";
status = SWITCH_STATUS_FALSE;
} else if ((status = switch_loadable_module_load_file(path, file, global, &new_module)) == SWITCH_STATUS_SUCCESS) {
new_module->type = type;
if ((status = switch_loadable_module_process(file, new_module, event_hash)) == SWITCH_STATUS_SUCCESS && runtime) {
if (new_module->switch_module_runtime) {
new_module->thread = switch_core_launch_thread(switch_loadable_module_exec, new_module, new_module->pool);
}
} else if (status != SWITCH_STATUS_SUCCESS) {
*err = "module load routine returned an error";
}
} else {
*err = "module load file routine returned an error";
}
return status;
}
switch_loadable_module.c#switch_loadable_module_load_file() 函数的核心处理步骤如下:
- 首先调用
switch_dso.c#switch_dso_open()函数打开库文件,随后调用switch_dso.c#sswitch_dso_data_sym()函数获取动态库中的符号表,这部分涉及到平台库函数,不做深入讨论- 如果符号表加载成功,则将其赋给函数表 interface_struct_handle,最终将其赋给函数表结构体 mod_interface_functions
- 如果当前加载的模块定义了 load 函数,则需要通过函数指针
load_func_ptr回调执行,以 Event Socket 模块为例,这里将回调到mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)函数- 以上处理完毕,生成结构体
switch_loadable_module_t实例 module 保存模块中的函数等信息,最终将其添加到传入函数的指针 new_module 末尾
static switch_status_t switch_loadable_module_load_file(char *path, char *filename, switch_bool_t global, switch_loadable_module_t **new_module)
{
switch_loadable_module_t *module = NULL;
switch_dso_lib_t dso = NULL;
apr_status_t status = SWITCH_STATUS_SUCCESS;
switch_loadable_module_function_table_t *interface_struct_handle = NULL;
switch_loadable_module_function_table_t *mod_interface_functions = NULL;
char *struct_name = NULL;
switch_module_load_t load_func_ptr = NULL;
int loading = 1;
switch_loadable_module_interface_t *module_interface = NULL;
char *derr = NULL;
const char *err = NULL;
switch_memory_pool_t *pool = NULL;
switch_bool_t load_global = global;
switch_assert(path != NULL);
switch_core_new_memory_pool(&pool);
*new_module = NULL;
struct_name = switch_core_sprintf(pool, "%s_module_interface", filename);
#ifdef WIN32
dso = switch_dso_open("FreeSwitch.dll", load_global, &derr);
#elif defined (MACOSX) || defined(DARWIN)
{
char *lib_path = switch_mprintf("%s/libfreeswitch.dylib", SWITCH_GLOBAL_dirs.lib_dir);
dso = switch_dso_open(lib_path, load_global, &derr);
switch_safe_free(lib_path);
}
#else
dso = switch_dso_open(NULL, load_global, &derr);
#endif
if (!derr && dso) {
interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr);
}
switch_safe_free(derr);
if (!interface_struct_handle) {
if (dso) switch_dso_destroy(&dso);
dso = switch_dso_open(path, load_global, &derr);
}
while (loading) {
if (derr) {
err = derr;
break;
}
if (!interface_struct_handle) {
interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr);
}
if (derr) {
err = derr;
break;
}
if (interface_struct_handle && interface_struct_handle->switch_api_version != SWITCH_API_VERSION) {
err = "Trying to load an out of date module, please rebuild the module.";
break;
}
if (!load_global && interface_struct_handle && switch_test_flag(interface_struct_handle, SMODF_GLOBAL_SYMBOLS)) {
load_global = SWITCH_TRUE;
switch_dso_destroy(&dso);
interface_struct_handle = NULL;
dso = switch_dso_open(path, load_global, &derr);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loading module with global namespace at request of module\n");
continue;
}
if (interface_struct_handle) {
mod_interface_functions = interface_struct_handle;
load_func_ptr = mod_interface_functions->load;
}
if (load_func_ptr == NULL) {
err = "Cannot locate symbol 'switch_module_load' please make sure this is a valid module.";
break;
}
status = load_func_ptr(&module_interface, pool);
if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_NOUNLOAD) {
err = "Module load routine returned an error";
module_interface = NULL;
break;
}
if (!module_interface) {
err = "Module failed to initialize its module_interface. Is this a valid module?";
break;
}
if ((module = switch_core_alloc(pool, sizeof(switch_loadable_module_t))) == 0) {
abort();
}
if (status == SWITCH_STATUS_NOUNLOAD) {
module->perm++;
}
loading = 0;
}
if (err) {
if (dso) {
switch_dso_destroy(&dso);
}
if (pool) {
switch_core_destroy_memory_pool(&pool);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error Loading module %s\n**%s**\n", path, err);
switch_safe_free(derr);
return SWITCH_STATUS_GENERR;
}
module->pool = pool;
module->filename = switch_core_strdup(module->pool, path);
module->module_interface = module_interface;
module->switch_module_load = load_func_ptr;
if (mod_interface_functions) {
module->switch_module_shutdown = mod_interface_functions->shutdown;
module->switch_module_runtime = mod_interface_functions->runtime;
}
module->lib = dso;
*new_module = module;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Successfully Loaded [%s]\n", module_interface->module_name);
switch_core_set_signal_handlers();
return SWITCH_STATUS_SUCCESS;
}
mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) 函数源码中重要的处理如下:
- 调用
switch_event.c#switch_event_bind_removable()注册事件监听,这里是监听SWITCH_EVENT_ALL所有事件,并将函数mod_event_socket.c#event_hanlder()作为回调传入- 通过宏调用
switch_loadable_module.h#SWITCH_ADD_APP()将switch_event.c#socket_function()函数作为一个名为 socket 的 Application 注册到 FreeSwitch 核心中供系统调用,也就是外联 Outbound 模式主动连接外部的入口
SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
{
switch_application_interface_t *app_interface;
switch_api_interface_t *api_interface;
memset(&globals, 0, sizeof(globals));
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
memset(&listen_list, 0, sizeof(listen_list));
switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool);
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
return SWITCH_STATUS_GENERR;
}
switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "[:]" , SAF_SUPPORT_NOMEDIA);
SWITCH_ADD_API(api_interface, "event_sink", "event_sink", event_sink_function, "" );
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
switch_event.c#switch_event_bind_removable() 函数的处理比较直观,可以看到核心就是将调用方传入的回调等信息封装为一个 switch_event_node_t 结构体实例,并将其加入到监听节点数组 EVENT_NODES 中
SWITCH_DECLARE(switch_status_t) switch_event_bind_removable(const char *id, switch_event_types_t event, const char *subclass_name,
switch_event_callback_t callback, void *user_data, switch_event_node_t **node)
{
switch_event_node_t *event_node;
switch_event_subclass_t *subclass = NULL;
switch_assert(BLOCK != NULL);
switch_assert(RUNTIME_POOL != NULL);
if (node) {
*node = NULL;
}
if (subclass_name) {
switch_mutex_lock(CUSTOM_HASH_MUTEX);
if (!(subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name))) {
switch_event_reserve_subclass_detailed(id, subclass_name);
subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name);
subclass->bind = 1;
}
switch_mutex_unlock(CUSTOM_HASH_MUTEX);
if (!subclass) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not reserve subclass. '%s'\n", subclass_name);
return SWITCH_STATUS_FALSE;
}
}
if (event <= SWITCH_EVENT_ALL) {
switch_zmalloc(event_node, sizeof(*event_node));
switch_thread_rwlock_wrlock(RWLOCK);
switch_mutex_lock(BLOCK);
/* ----------------------------------------------- */
event_node->id = DUP(id);
event_node->event_id = event;
if (subclass_name) {
event_node->subclass_name = DUP(subclass_name);
}
event_node->callback = callback;
event_node->user_data = user_data;
if (EVENT_NODES[event]) {
event_node->next = EVENT_NODES[event];
}
EVENT_NODES[event] = event_node;
switch_mutex_unlock(BLOCK);
switch_thread_rwlock_unlock(RWLOCK);
/* ----------------------------------------------- */
if (node) {
*node = event_node;
}
return SWITCH_STATUS_SUCCESS;
}
return SWITCH_STATUS_MEMERR;
}
此时模块的加载流程基本结束,回到本节步骤1第4步 switch_loadable_module.c#switch_loadable_module_runtime() 函数调用,可以看到这个函数的主要逻辑是判断加载的模块是否有定义 runtime 函数,有的话就调用 switch_core.c#switch_core_launch_thread() 函数新建线程,将 switch_loadable_module.c#switch_loadable_module_exec() 函数作为线程任务传入
static void switch_loadable_module_runtime(void)
{
switch_hash_index_t *hi;
void *val;
switch_loadable_module_t *module;
switch_mutex_lock(loadable_modules.mutex);
for (hi = switch_core_hash_first(loadable_modules.module_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, &val);
module = (switch_loadable_module_t *) val;
if (module->switch_module_runtime) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting runtime thread for %s\n", module->module_interface->module_name);
module->thread = switch_core_launch_thread(switch_loadable_module_exec, module, loadable_modules.pool);
}
}
switch_mutex_unlock(loadable_modules.mutex);
}
switch_loadable_module.c#switch_loadable_module_exec() 函数逻辑清晰,主要逻辑就是执行模块的 runtime 函数,以 Event Socket 模块为例,这里将回调到 mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) 函数,至此加载进内存的 Event Socket 模块已经运行起来,其运行原理将在下节分析
static void *SWITCH_THREAD_FUNC switch_loadable_module_exec(switch_thread_t *thread, void *obj)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_core_thread_session_t *ts = obj;
switch_loadable_module_t *module = ts->objs[0];
int restarts;
switch_assert(thread != NULL);
switch_assert(module != NULL);
for (restarts = 0; status != SWITCH_STATUS_TERM && !module->shutting_down; restarts++) {
status = module->switch_module_runtime();
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Thread ended for %s\n", module->module_interface->module_name);
if (ts->pool) {
switch_memory_pool_t *pool = ts->pool;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Destroying Pool for %s\n", module->module_interface->module_name);
switch_core_destroy_memory_pool(&pool);
}
switch_thread_exit(thread, 0);
return NULL;
}
FreeSWITCH 中的 EventSocket 有如下图两种交互模式,从功能上来说二者几乎没有差别,主要区别是 FreeSWITCH 在连接中担任的角色不同

mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) 函数是 Event Socket 模块运行的关键,其核心处理如下:
- 首先调用
mod_event_socket.c#config()函数读取配置文件的属性,将其加载到内存中- 根据配置属性,绑定监听本机端口,通过
switch_apr.c#switch_socket_accept()调用底层接口等待远程 Inbound 连接- 一旦接收到远程 Inbound 连接,则将其封装为
listener_t结构体实例 listener 并添加到 socket 监听列表中- 最后调用
mod_event_socket.c#launch_listener_thread()函数创建一个线程单独处理这条 Inbound 连接上的数据交互
SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
{
switch_memory_pool_t *pool = NULL, *listener_pool = NULL;
switch_status_t rv;
switch_sockaddr_t *sa;
switch_socket_t *inbound_socket = NULL;
listener_t *listener;
uint32_t x = 0;
uint32_t errs = 0;
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
return SWITCH_STATUS_TERM;
}
config();
while (!prefs.done) {
rv = switch_sockaddr_info_get(&sa, prefs.ip, SWITCH_UNSPEC, prefs.port, 0, pool);
if (rv) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot get information about IP address %s\n", prefs.ip);
goto fail;
}
rv = switch_socket_create(&listen_list.sock, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, pool);
if (rv)
goto sock_fail;
rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_REUSEADDR, 1);
if (rv)
goto sock_fail;
#ifdef WIN32
/* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */
if (switch_sockaddr_get_family(sa) == AF_INET6) {
rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_IPV6_V6ONLY, 0);
if (rv) goto sock_fail;
}
#endif
rv = switch_socket_bind(listen_list.sock, sa);
if (rv)
goto sock_fail;
rv = switch_socket_listen(listen_list.sock, 5);
if (rv)
goto sock_fail;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on %s:%u\n", prefs.ip, prefs.port);
if (prefs.nat_map) {
switch_nat_add_mapping(prefs.port, SWITCH_NAT_TCP, NULL, SWITCH_FALSE);
}
break;
sock_fail:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not listen on %s:%u\n", prefs.ip, prefs.port);
if (prefs.stop_on_bind_error) {
prefs.done = 1;
goto fail;
}
switch_yield(100000);
}
listen_list.ready = 1;
while (!prefs.done) {
if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
goto fail;
}
if ((rv = switch_socket_accept(&inbound_socket, listen_list.sock, listener_pool))) {
if (prefs.done) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n");
goto end;
} else {
/* I wish we could use strerror_r here but its not defined everywhere =/ */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno));
if (++errs > 100) {
goto end;
}
}
} else {
errs = 0;
}
if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
break;
}
switch_thread_rwlock_create(&listener->rwlock, listener_pool);
switch_queue_create(&listener->event_queue, MAX_QUEUE_LEN, listener_pool);
switch_queue_create(&listener->log_queue, MAX_QUEUE_LEN, listener_pool);
listener->sock = inbound_socket;
listener->pool = listener_pool;
listener_pool = NULL;
listener->format = EVENT_FORMAT_PLAIN;
switch_set_flag(listener, LFLAG_FULL);
switch_set_flag(listener, LFLAG_ALLOW_LOG);
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash);
switch_socket_create_pollset(&listener->pollfd, listener->sock, SWITCH_POLLIN | SWITCH_POLLERR, listener->pool);
if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) {
switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa);
if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) {
if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS)
continue;
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initilizing connection\n");
close_socket(&listener->sock);
expire_listener(&listener);
}
end:
close_socket(&listen_list.sock);
if (prefs.nat_map && switch_nat_get_type()) {
switch_nat_del_mapping(prefs.port, SWITCH_NAT_TCP);
}
if (pool) {
switch_core_destroy_memory_pool(&pool);
}
if (listener_pool) {
switch_core_destroy_memory_pool(&listener_pool);
}
for (x = 0; x < prefs.acl_count; x++) {
switch_safe_free(prefs.acl[x]);
}
fail:
return SWITCH_STATUS_TERM;
}
mod_event_socket.c#config() 函数逻辑比较简洁,就是读取配置文件 event_socket.conf.xml 属性,并将其保存到 prefs 结构体中
static int config(void)
{
char *cf = "event_socket.conf";
switch_xml_t cfg, xml, settings, param;
memset(&prefs, 0, sizeof(prefs));
if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
} else {
if ((settings = switch_xml_child(cfg, "settings"))) {
for (param = switch_xml_child(settings, "param"); param; param = param->next) {
char *var = (char *) switch_xml_attr_soft(param, "name");
char *val = (char *) switch_xml_attr_soft(param, "value");
if (!strcmp(var, "listen-ip")) {
set_pref_ip(val);
} else if (!strcmp(var, "debug")) {
globals.debug = atoi(val);
} else if (!strcmp(var, "nat-map")) {
if (switch_true(val) && switch_nat_get_type()) {
prefs.nat_map = 1;
}
} else if (!strcmp(var, "listen-port")) {
prefs.port = (uint16_t) atoi(val);
} else if (!strcmp(var, "password")) {
set_pref_pass(val);
} else if (!strcasecmp(var, "apply-inbound-acl") && ! zstr(val)) {
if (prefs.acl_count < MAX_ACL) {
prefs.acl[prefs.acl_count++] = strdup(val);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Max acl records of %d reached\n", MAX_ACL);
}
} else if (!strcasecmp(var, "stop-on-bind-error")) {
prefs.stop_on_bind_error = switch_true(val) ? 1 : 0;
}
}
}
switch_xml_free(xml);
}
if (zstr(prefs.ip)) {
set_pref_ip("127.0.0.1");
}
if (zstr(prefs.password)) {
set_pref_pass("ClueCon");
}
if (!prefs.nat_map) {
prefs.nat_map = 0;
}
if (!prefs.acl_count) {
prefs.acl[prefs.acl_count++] = strdup("loopback.auto");
}
if (prefs.nat_map) {
prefs.nat_map = 0;
}
if (!prefs.port) {
prefs.port = 8021;
}
return 0;
}
远程 Inbound 连接到来时,mod_event_socket.c#launch_listener_thread() 函数将被调用,可以看到此处核心处理是新建线程,并将 mod_event_socket.c#listener_run() 函数作为线程任务执行
static switch_status_t launch_listener_thread(listener_t *listener)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
switch_threadattr_create(&thd_attr, listener->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
}
mod_event_socket.c#listener_run() 函数源码如下,可以看到其关键的处理分为以下几步:
- 首先是尝试对连接进行 acl 检查,实际上只会对 Inbound 连接进行 acl 检查,Outbound 连接跳过这步
- 接下来调用
mod_event_socket.c#add_listener()函数将当前 listener 添加到内部 listen_list 列表- 接着是用户授权认证,同样是只会对 Inbound 连接进行,Outbound 连接默认已经授权。授权完成后,这个连接可以正式开始交互,则在 while 循环中不断调用
mod_event_socket.c#read_packet()函数读取对端数据- 读取到对端数据后,再调用
mod_event_socket.c#parse_command()函数解析传输过来的命令并执行
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
{
listener_t *listener = (listener_t *) obj;
char buf[1024];
switch_size_t len;
switch_status_t status;
switch_event_t *event;
char reply[512] = "";
switch_core_session_t *session = NULL;
switch_channel_t *channel = NULL;
switch_event_t *revent = NULL;
const char *var;
int locked = 1;
switch_mutex_lock(globals.listener_mutex);
prefs.threads++;
switch_mutex_unlock(globals.listener_mutex);
switch_assert(listener != NULL);
if ((session = listener->session)) {
if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n");
locked = 0;
session = NULL;
goto done;
}
listener->lock_acquired = 1;
}
if (!listener->sock) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Listener socket is null!\n");
switch_clear_flag_locked(listener, LFLAG_RUNNING);
goto done;
}
switch_socket_opt_set(listener->sock, SWITCH_SO_TCP_NODELAY, TRUE);
switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE);
if (prefs.acl_count && listener->sa && !zstr(listener->remote_ip)) {
uint32_t x = 0;
for (x = 0; x < prefs.acl_count; x++) {
if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) {
const char message[] = "Access Denied, go away.\n";
int mlen = (int)strlen(message);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "IP %s Rejected by acl \"%s\"\n", listener->remote_ip,
prefs.acl[x]);
switch_snprintf(buf, sizeof(buf), "Content-Type: text/rude-rejection\nContent-Length: %d\n\n", mlen);
len = strlen(buf);
switch_socket_send(listener->sock, buf, &len);
len = mlen;
switch_socket_send(listener->sock, message, &len);
goto done;
}
}
}
if (globals.debug > 0) {
if (zstr(listener->remote_ip)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open\n");
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open from %s:%d\n", listener->remote_ip,
listener->remote_port);
}
}
switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE);
switch_set_flag_locked(listener, LFLAG_RUNNING);
add_listener(listener);
if (session && switch_test_flag(listener, LFLAG_AUTHED)) {
switch_event_t *ievent = NULL;
switch_set_flag_locked(listener, LFLAG_SESSION);
status = read_packet(listener, &ievent, 25);
if (status != SWITCH_STATUS_SUCCESS || !ievent) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Socket Error!\n");
switch_clear_flag_locked(listener, LFLAG_RUNNING);
goto done;
}
if (parse_command(listener, &ievent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
switch_clear_flag_locked(listener, LFLAG_RUNNING);
goto done;
}
} else {
switch_snprintf(buf, sizeof(buf), "Content-Type: auth/request\n\n");
len = strlen(buf);
switch_socket_send(listener->sock, buf, &len);
while (!switch_test_flag(listener, LFLAG_AUTHED)) {
status = read_packet(listener, &event, 25);
if (status != SWITCH_STATUS_SUCCESS) {
goto done;
}
if (!event) {
continue;
}
if (parse_command(listener, &event, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
switch_clear_flag_locked(listener, LFLAG_RUNNING);
goto done;
}
if (*reply != '\0') {
if (*reply == '~') {
switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1);
} else {
switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply);
}
len = strlen(buf);
switch_socket_send(listener->sock, buf, &len);
}
break;
}
}
while (!prefs.done && switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) {
len = sizeof(buf);
memset(buf, 0, len);
status = read_packet(listener, &revent, 0);
if (status != SWITCH_STATUS_SUCCESS) {
break;
}
if (!revent) {
continue;
}
if (parse_command(listener, &revent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
switch_clear_flag_locked(listener, LFLAG_RUNNING);
break;
}
if (revent) {
switch_event_destroy(&revent);
}
if (*reply != '\0') {
if (*reply == '~') {
switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1);
} else {
switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply);
}
len = strlen(buf);
switch_socket_send(listener->sock, buf, &len);
}
}
done:
if (revent) {
switch_event_destroy(&revent);
}
remove_listener(listener);
if (globals.debug > 0) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
}
switch_thread_rwlock_wrlock(listener->rwlock);
flush_listener(listener, SWITCH_TRUE, SWITCH_TRUE);
switch_mutex_lock(listener->filter_mutex);
if (listener->filters) {
switch_event_destroy(&listener->filters);
}
switch_mutex_unlock(listener->filter_mutex);
if (listener->session && locked) {
channel = switch_core_session_get_channel(listener->session);
}
if (channel && switch_channel_get_state(channel) != CS_HIBERNATE &&
!switch_channel_test_flag(channel, CF_REDIRECT) && !switch_channel_test_flag(channel, CF_TRANSFER) && !switch_channel_test_flag(channel, CF_RESET) &&
(switch_test_flag(listener, LFLAG_RESUME) || ((var = switch_channel_get_variable(channel, "socket_resume")) && switch_true(var)))) {
switch_channel_set_state(channel, CS_RESET);
}
if (listener->sock) {
send_disconnect(listener, "Disconnected, goodbye.\nSee you at ClueCon! http://www.cluecon.com/\n");
close_socket(&listener->sock);
}
switch_thread_rwlock_unlock(listener->rwlock);
if (globals.debug > 0) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Closed\n");
}
switch_core_hash_destroy(&listener->event_hash);
if (listener->allowed_event_hash) {
switch_core_hash_destroy(&listener->allowed_event_hash);
}
if (listener->allowed_api_hash) {
switch_core_hash_destroy(&listener->allowed_api_hash);
}
if (listener->session) {
if (locked) {
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
}
switch_clear_flag_locked(listener, LFLAG_SESSION);
if (locked) {
switch_core_session_rwunlock(listener->session);
}
} else if (listener->pool) {
switch_memory_pool_t *pool = listener->pool;
switch_core_destroy_memory_pool(&pool);
}
switch_mutex_lock(globals.listener_mutex);
prefs.threads--;
switch_mutex_unlock(globals.listener_mutex);
listener->finished = 1;
return NULL;
}
mod_event_socket.c#read_packet() 函数体比较琐碎,简单来说关键处理是以下几步:
- 通过
switch_apr.c#switch_socket_recv()函数调用底层接口读取 socket 数据,有数据则将其处理后封装到switch_event_t结构体实例 event 中,跳出循环- 如果没有读到数据,并且当前 listener 对应的远端已经订阅了事件,则将
listener->event_queue队列中的事件出队,按照订阅方的订阅格式通过switch_apr.c#switch_socket_send()函数发送给对端
static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout)
{
......
while (listener->sock && !prefs.done) {
uint8_t do_sleep = 1;
mlen = 1;
if (bytes == buf_len - 1) {
char *tmp;
int pos;
pos = (int)(ptr - mbuf);
buf_len += block_len;
tmp = realloc(mbuf, buf_len);
switch_assert(tmp);
mbuf = tmp;
memset(mbuf + bytes, 0, buf_len - bytes);
ptr = (mbuf + pos);
}
status = switch_socket_recv(listener->sock, ptr, &mlen);
if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) {
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
if (mlen) {
bytes += mlen;
do_sleep = 0;
if (*mbuf == '\r' || *mbuf == '\n') { /* bah */
ptr = mbuf;
mbuf[0] = '\0';
bytes = 0;
continue;
}
if (*ptr == '\n') {
crcount++;
} else if (*ptr != '\r') {
crcount = 0;
}
ptr++;
if (bytes >= max_len) {
crcount = 2;
}
if (crcount == 2) {
char *next;
char *cur = mbuf;
while (cur) {
if ((next = strchr(cur, '\r')) || (next = strchr(cur, '\n'))) {
while (*next == '\r' || *next == '\n') {
next++;
}
}
count++;
if (count == 1) {
switch_event_create(event, SWITCH_EVENT_CLONE);
switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, "Command", mbuf);
} else if (cur) {
char *var, *val;
var = cur;
strip_cr(var);
if (!zstr(var)) {
if ((val = strchr(var, ':'))) {
*val++ = '\0';
while (*val == ' ') {
val++;
}
}
if (val) {
switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, var, val);
if (!strcasecmp(var, "content-length")) {
clen = atoi(val);
if (clen > 0) {
char *body;
char *p;
switch_zmalloc(body, clen + 1);
p = body;
while (clen > 0) {
mlen = clen;
status = switch_socket_recv(listener->sock, p, &mlen);
if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) {
free(body);
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
/*
if (channel && !switch_channel_ready(channel)) {
status = SWITCH_STATUS_FALSE;
break;
}
*/
clen -= (int) mlen;
p += mlen;
}
switch_event_add_body(*event, "%s", body);
free(body);
}
}
}
}
}
cur = next;
}
break;
}
}
if (!*mbuf) {
......
if (listener->session) {
switch_channel_t *chan = switch_core_session_get_channel(listener->session);
if (switch_channel_get_state(chan) < CS_HANGUP && switch_channel_test_flag(chan, CF_DIVERT_EVENTS)) {
switch_event_t *e = NULL;
while (switch_core_session_dequeue_event(listener->session, &e, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
if (switch_queue_trypush(listener->event_queue, e) != SWITCH_STATUS_SUCCESS) {
switch_core_session_queue_event(listener->session, &e);
break;
}
}
}
}
if (switch_test_flag(listener, LFLAG_EVENTS)) {
while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
char hbuf[512];
switch_event_t *pevent = (switch_event_t *) pop;
char *etype;
do_sleep = 0;
if (listener->format == EVENT_FORMAT_PLAIN) {
etype = "plain";
switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
} else if (listener->format == EVENT_FORMAT_JSON) {
etype = "json";
switch_event_serialize_json(pevent, &listener->ebuf);
} else {
switch_xml_t xml;
etype = "xml";
if ((xml = switch_event_xmlize(pevent, SWITCH_VA_NONE))) {
listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
switch_xml_free(xml);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "XML ERROR!\n");
goto endloop;
}
}
switch_assert(listener->ebuf);
len = strlen(listener->ebuf);
switch_snprintf(hbuf, sizeof(hbuf), "Content-Length: %" SWITCH_SSIZE_T_FMT "\n" "Content-Type: text/event-%s\n" "\n", len, etype);
len = strlen(hbuf);
switch_socket_send(listener->sock, hbuf, &len);
len = strlen(listener->ebuf);
switch_socket_send(listener->sock, listener->ebuf, &len);
switch_safe_free(listener->ebuf);
endloop:
switch_event_destroy(&pevent);
}
}
}
......
}
此时回到本节步骤4第4步,mod_event_socket.c#parse_command() 函数非常长,主要逻辑是各个命令的处理,可以看到其关键处理如下:
- 首先调用
switch_event.h#switch_event_get_header()函数从上一步封装好的switch_event_t结构体中获取到命令名称- 命令的处理非常繁冗,此处以 event 订阅事件命令为例解析,该命令的主要处理逻辑是解析命令参数,将订阅的事件记录在
listener->event_list标记数组中并给 listener 添上LFLAG_EVENTS标记位,至此 Event Socket 事件订阅处理基本结束
static switch_status_t parse_command(listener_t *listener, switch_event_t **event, char *reply, uint32_t reply_len)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
char *cmd = NULL;
char unload_cheat[] = "api bgapi unload mod_event_socket";
char reload_cheat[] = "api bgapi reload mod_event_socket";
*reply = '\0';
if (!event || !*event || !(cmd = switch_event_get_header(*event, "command"))) {
switch_clear_flag_locked(listener, LFLAG_RUNNING);
switch_snprintf(reply, reply_len, "-ERR command parse error.");
goto done;
}
......
if (!strncasecmp(cmd, "sendevent", 9)) {
......
} else if (!strncasecmp(cmd, "event", 5)) {
char *next, *cur;
uint32_t count = 0, key_count = 0;
uint8_t custom = 0;
strip_cr(cmd);
cur = cmd + 5;
if ((cur = strchr(cur, ' '))) {
for (cur++; cur; count++) {
switch_event_types_t type;
if ((next = strchr(cur, ' '))) {
*next++ = '\0';
}
if (!count) {
if (!strcasecmp(cur, "xml")) {
listener->format = EVENT_FORMAT_XML;
goto end;
} else if (!strcasecmp(cur, "plain")) {
listener->format = EVENT_FORMAT_PLAIN;
goto end;
} else if (!strcasecmp(cur, "json")) {
listener->format = EVENT_FORMAT_JSON;
goto end;
}
}
if (custom) {
if (!listener->allowed_event_hash || switch_core_hash_find(listener->allowed_event_hash, cur)) {
switch_core_hash_insert(listener->event_hash, cur, MARKER);
} else {
switch_snprintf(reply, reply_len, "-ERR permission denied");
goto done;
}
} else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) {
if (switch_test_flag(listener, LFLAG_AUTH_EVENTS) && !listener->allowed_event_list[type] &&
!switch_test_flag(listener, LFLAG_ALL_EVENTS_AUTHED)) {
switch_snprintf(reply, reply_len, "-ERR permission denied");
goto done;
}
key_count++;
if (type == SWITCH_EVENT_ALL) {
uint32_t x = 0;
for (x = 0; x < SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 1;
}
if (!listener->allowed_event_hash) {
set_all_custom(listener);
} else {
set_allowed_custom(listener);
}
}
if (type <= SWITCH_EVENT_ALL) {
listener->event_list[type] = 1;
}
if (type == SWITCH_EVENT_CUSTOM) {
custom++;
}
}
end:
cur = next;
}
}
if (!key_count) {
switch_snprintf(reply, reply_len, "-ERR no keywords supplied");
goto done;
}
if (!switch_test_flag(listener, LFLAG_EVENTS)) {
switch_set_flag_locked(listener, LFLAG_EVENTS);
}
switch_snprintf(reply, reply_len, "+OK event listener enabled %s", format2str(listener->format));
}
......
done:
if (zstr(reply)) {
switch_snprintf(reply, reply_len, "-ERR command not found");
}
done_noreply:
if (event) {
switch_event_destroy(event);
}
return status;
}
经过上一节步骤5第2步的分析,我们知道远程连接监听的事件其实都来自于 listner 的内部队列,所以要了解事件的分发流程,关键是要知道队列中的数据是从哪里来。此时回顾2.1.3节步骤4第1步,Event Socket 模块在加载时注册了监听到 FreeSWITCH 的事件组件中,而事件组件的事件来源其实是宏定义 switch_event.h#switch_event_fire() 调用 switch_event.c#switch_event_fire_detailed() 函数。该函数比较简洁,核心处理如下:
- 根据 runtime.events_use_dispatch 属性决定外部调用方投递的事件的最终目的地,该属性默认值为1,也就是默认会调用
switch_event.c#switch_event_queue_dispatch_event()投递到 EVENT_DISPATCH_QUEUEswitch_event.c#switch_event_deliver_thread_pool()函数执行时会将事件投递到 session_manager.thread_queue 队列,并根据线程池当前空闲状态决定是否创建新线程
SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, const char *func, int line, switch_event_t **event, void *user_data)
{
switch_assert(BLOCK != NULL);
switch_assert(RUNTIME_POOL != NULL);
switch_assert(EVENT_QUEUE_MUTEX != NULL);
switch_assert(RUNTIME_POOL != NULL);
if (SYSTEM_RUNNING <= 0) {
/* sorry we're closed */
switch_event_destroy(event);
return SWITCH_STATUS_SUCCESS;
}
if (user_data) {
(*event)->event_user_data = user_data;
}
if (runtime.events_use_dispatch) {
check_dispatch();
if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
switch_event_destroy(event);
return SWITCH_STATUS_FALSE;
}
} else {
switch_event_deliver_thread_pool(event);
}
return SWITCH_STATUS_SUCCESS;
}
switch_event.c#switch_event_queue_dispatch_event() 函数的核心处理如下:
- 计算 DISPATCH_QUEUE_LEN 队列空闲状态,如果队列中数据已经超过阈值,则检查当前事件分发线程数量是否达到最大值,最终决定是否需要新建事件分发线程
- 如果需要新建线程,则执行
switch_event.c#switch_event_launch_dispatch_threads()函数启动新线程- 最后调用函数
switch_apr.c#switch_queue_push()将事件添加到 DISPATCH_QUEUE_LEN 队列
static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp)
{
switch_event_t *event = *eventp;
if (!SYSTEM_RUNNING) {
return SWITCH_STATUS_FALSE;
}
while (event) {
int launch = 0;
switch_mutex_lock(EVENT_QUEUE_MUTEX);
if (!PENDING && switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
launch++;
PENDING++;
}
}
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
if (launch) {
if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
switch_event_launch_dispatch_threads(SOFT_MAX_DISPATCH + 1);
}
switch_mutex_lock(EVENT_QUEUE_MUTEX);
PENDING--;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
}
*eventp = NULL;
switch_queue_push(EVENT_DISPATCH_QUEUE, event);
event = NULL;
}
return SWITCH_STATUS_SUCCESS;
}
事件已经入队,实际对 DISPATCH_QUEUE_LEN 队列进行处理的是在 2.1.2节步骤4 提到的线程任务 switch_event.c#switch_event_dispatch_thread() 中。可以看到这个函数的核心操作是在 for 空循环中将 DISPATCH_QUEUE_LEN 队列的数据轮询出来,随后调用 switch_event.c#switch_event_deliver() 函数进行处理
static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj)
{
switch_queue_t *queue = (switch_queue_t *) obj;
int my_id = 0;
switch_mutex_lock(EVENT_QUEUE_MUTEX);
THREAD_COUNT++;
DISPATCH_THREAD_COUNT++;
for (my_id = 0; my_id < MAX_DISPATCH_VAL; my_id++) {
if (EVENT_DISPATCH_QUEUE_THREADS[my_id] == thread) {
break;
}
}
if ( my_id >= MAX_DISPATCH_VAL ) {
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
return NULL;
}
EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
for (;;) {
void *pop = NULL;
switch_event_t *event = NULL;
if (!SYSTEM_RUNNING) {
break;
}
if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
continue;
}
if (!pop) {
break;
}
event = (switch_event_t *) pop;
switch_event_deliver(&event);
switch_os_yield();
}
switch_mutex_lock(EVENT_QUEUE_MUTEX);
EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 0;
THREAD_COUNT--;
DISPATCH_THREAD_COUNT--;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id);
return NULL;
}
switch_event.c#switch_event_deliver() 函数非常简洁,核心处理就是遍历 EVENT_NODES 列表,调用注册方的回调函数将事件分发到订阅方。此处结合2.1.3节步骤4第1步,可以知道会调用到 mod_event_socket.c#event_handler() 函数
SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event)
{
switch_event_types_t e;
switch_event_node_t *node;
if (SYSTEM_RUNNING) {
switch_thread_rwlock_rdlock(RWLOCK);
for (e = (*event)->event_id;; e = SWITCH_EVENT_ALL) {
for (node = EVENT_NODES[e]; node; node = node->next) {
if (switch_events_match(*event, node)) {
(*event)->bind_user_data = node->user_data;
node->callback(*event);
}
}
if (e == SWITCH_EVENT_ALL) {
break;
}
}
switch_thread_rwlock_unlock(RWLOCK);
}
switch_event_destroy(event);
}
mod_event_socket.c#event_handler() 函数源码比较繁琐,不过核心处理并不复杂,关键如下:
- 遍历全局列表 listen_list.listeners,判断当前事件是否当前 listener 订阅的
- 是的话再检查 listener 上是否有 filter,有则经过 filter 过滤后最终确定需要发送该事件则将其拷贝一份通过
switch_apr.c#switch_queue_trypush()函数将事件添加到 listener 私属的事件队列。至此事件已经入队,结合2.2.1节步骤5第2步,事件最终发送给了外部订阅方,事件分发流程的分析基本结束
static void event_handler(switch_event_t *event)
{
switch_event_t *clone = NULL;
listener_t *l, *lp, *last = NULL;
time_t now = switch_epoch_time_now(NULL);
switch_status_t qstatus;
switch_assert(event != NULL);
if (!listen_list.ready) {
return;
}
switch_mutex_lock(globals.listener_mutex);
lp = listen_list.listeners;
while (lp) {
int send = 0;
l = lp;
lp = lp->next;
if (switch_test_flag(l, LFLAG_STATEFUL) && (l->expire_time || (l->timeout && now - l->last_flush > l->timeout))) {
if (expire_listener(&l) == SWITCH_STATUS_SUCCESS) {
if (last) {
last->next = lp;
} else {
listen_list.listeners = lp;
}
continue;
}
}
if (l->expire_time || !switch_test_flag(l, LFLAG_EVENTS)) {
last = l;
continue;
}
if (l->event_list[SWITCH_EVENT_ALL]) {
send = 1;
} else if ((l->event_list[event->event_id])) {
if (event->event_id != SWITCH_EVENT_CUSTOM || !event->subclass_name || (switch_core_hash_find(l->event_hash, event->subclass_name))) {
send = 1;
}
}
if (send) {
switch_mutex_lock(l->filter_mutex);
if (l->filters && l->filters->headers) {
switch_event_header_t *hp;
const char *hval;
send = 0;
for (hp = l->filters->headers; hp; hp = hp->next) {
if ((hval = switch_event_get_header(event, hp->name))) {
const char *comp_to = hp->value;
int pos = 1, cmp = 0;
while (comp_to && *comp_to) {
if (*comp_to == '+') {
pos = 1;
} else if (*comp_to == '-') {
pos = 0;
} else if (*comp_to != ' ') {
break;
}
comp_to++;
}
if (send && pos) {
continue;
}
if (!comp_to) {
continue;
}
if (*hp->value == '/') {
switch_regex_t *re = NULL;
int ovector[30];
cmp = !!switch_regex_perform(hval, comp_to, &re, ovector, sizeof(ovector) / sizeof(ovector[0]));
switch_regex_safe_free(re);
} else {
cmp = !strcasecmp(hval, comp_to);
}
if (cmp) {
if (pos) {
send = 1;
} else {
send = 0;
break;
}
}
}
}
}
switch_mutex_unlock(l->filter_mutex);
}
if (send && switch_test_flag(l, LFLAG_MYEVENTS)) {
char *uuid = switch_event_get_header(event, "unique-id");
if (!uuid || (l->session && strcmp(uuid, switch_core_session_get_uuid(l->session)))) {
send = 0;
}
if (!strcmp(switch_core_session_get_uuid(l->session), switch_event_get_header_nil(event, "Job-Owner-UUID"))) {
send = 1;
}
}
if (send) {
if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
qstatus = switch_queue_trypush(l->event_queue, clone);
if (qstatus == SWITCH_STATUS_SUCCESS) {
if (l->lost_events) {
int le = l->lost_events;
l->lost_events = 0;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u]\n", le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN);
}
} else {
char errbuf[512] = {0};
unsigned int qsize = switch_queue_size(l->event_queue);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"Event enqueue ERROR [%d] | [%s] | Queue size: [%u/%u] %s\n",
(int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN)?"Max queue size reached":"");
if (++l->lost_events > MAX_MISSED) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events. Lost [%d] Queue size[%u/%u]\n", l->lost_events, qsize, MAX_QUEUE_LEN);
kill_listener(l, "killed listener because of lost events\n");
}
switch_event_destroy(&clone);
}
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_ERROR, "Memory Error!\n");
}
}
last = l;
}
switch_mutex_unlock(globals.listener_mutex);
}