• 【skynet】skynet入口解析


    skynet入口

    skynet总体架构

    skynet是一个多线程的服务端架构。

    skynet入口函数

    总入口

    void 
    skynet_start(struct skynet_config * config) {
    	// register SIGHUP for log file reopen
    	struct sigaction sa;
    	sa.sa_handler = &handle_hup;
    	sa.sa_flags = SA_RESTART;
    	sigfillset(&sa.sa_mask);
    	sigaction(SIGHUP, &sa, NULL);
    
    	if (config->daemon) {
    		if (daemon_init(config->daemon)) {
    			exit(1);
    		}
    	}
    	skynet_harbor_init(config->harbor);
    	skynet_handle_init(config->harbor);
    	skynet_mq_init();
    	skynet_module_init(config->module_path);
    	skynet_timer_init();
    	skynet_socket_init();
    	skynet_profile_enable(config->profile);
    
    	struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
    	if (ctx == NULL) {
    		fprintf(stderr, "Can't launch %s service\n", config->logservice);
    		exit(1);
    	}
    
    	skynet_handle_namehandle(skynet_context_handle(ctx), "logger");
    
    	bootstrap(ctx, config->bootstrap);
    
    	start(config->thread);
    
    	// harbor_exit may call socket send, so it should exit before socket_free
    	skynet_harbor_exit();
    	skynet_socket_free();
    	if (config->daemon) {
    		daemon_exit(config->daemon);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    创建上下文环境

    static void
    bootstrap(struct skynet_context * logger, const char * cmdline) {
    	int sz = strlen(cmdline);
    	char name[sz+1];
    	char args[sz+1];
    	int arg_pos;
    	sscanf(cmdline, "%s", name);  
    	arg_pos = strlen(name);
    	if (arg_pos < sz) {
    		while(cmdline[arg_pos] == ' ') {
    			arg_pos++;
    		}
    		strncpy(args, cmdline + arg_pos, sz);
    	} else {
    		args[0] = '\0';
    	}
    	struct skynet_context *ctx = skynet_context_new(name, args);
    	if (ctx == NULL) {
    		skynet_error(NULL, "Bootstrap error : %s\n", cmdline);
    		skynet_context_dispatchall(logger);
    		exit(1);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    创建线程:创建一个socket、timer、monitor线程和n个工作线程。工作线程的个数由启动时配置的参数决定。

    static void
    start(int thread) {
    	pthread_t pid[thread+3];
    
    	struct monitor *m = skynet_malloc(sizeof(*m));
    	memset(m, 0, sizeof(*m));
    	m->count = thread;
    	m->sleep = 0;
    
    	m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
    	int i;
    	for (i=0;i<thread;i++) {
    		m->m[i] = skynet_monitor_new();
    	}
    	if (pthread_mutex_init(&m->mutex, NULL)) {
    		fprintf(stderr, "Init mutex error");
    		exit(1);
    	}
    	if (pthread_cond_init(&m->cond, NULL)) {
    		fprintf(stderr, "Init cond error");
    		exit(1);
    	}
    
    	create_thread(&pid[0], thread_monitor, m);
    	create_thread(&pid[1], thread_timer, m);
    	create_thread(&pid[2], thread_socket, m);
    
    	static int weight[] = { 
    		-1, -1, -1, -1, 0, 0, 0, 0,
    		1, 1, 1, 1, 1, 1, 1, 1, 
    		2, 2, 2, 2, 2, 2, 2, 2, 
    		3, 3, 3, 3, 3, 3, 3, 3, };
    	struct worker_parm wp[thread];
    	for (i=0;i<thread;i++) {
    		wp[i].m = m;
    		wp[i].id = i;
    		if (i < sizeof(weight)/sizeof(weight[0])) {
    			wp[i].weight= weight[i];
    		} else {
    			wp[i].weight = 0;
    		}
    		create_thread(&pid[i+3], thread_worker, &wp[i]);
    	}
    
    	for (i=0;i<thread+3;i++) {
    		pthread_join(pid[i], NULL); 
    	}
    
    	free_monitor(m);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    具体会有如下线程:
    1. 一个网络线程
    2. 多个工作线程(可通过配置修改,取值一般是CPU的核心数)
    3. 一个定时器线程
    4. 一个监视器线程
    网络线程工作流程:
    1. 调用多路复用API创建epoll管理多个客户端socket
    2. 当收到客户端消息,把消息插入到客户端对应的服务的消息队列中。
    工作线程工作流程:
    1. 从全局队列获取有消息要处理的服务
    2. 把服务从全局队列中删掉,然后处理服务里的消息队列的消息。注意消息队列里的每个消息都会创建一个协程来处理,提高当前线程的并发性
    定时器线程工作流程:
    1. 线程每隔一段时间休眠一次
    2. 线程唤醒后,查看是否有到期的消息,如果有,把到期的消息插入到对应的服务的消息队列中
    监视器线程工作流程:
    1. 线程每隔5秒休眠一次
    2. 线程唤醒后,查看工作线程是否陷入死循环。(注:每个工作线程对应一个监视)
    一、skynet网络线程入口

    网络线程入口函数如下,调用函数socket_server_poll创建多路复用IO监听socket IO事件。

    //skynet_start.c
    static void *
    thread_socket(void *p) {
    	struct monitor * m = p;
    	skynet_initthread(THREAD_SOCKET);
    	for (;;) {
    		int r = skynet_socket_poll();
    		if (r==0)
    			break;
    		if (r<0) {
    			CHECK_ABORT
    			continue;
    		}
    		wakeup(m,0);
    	}
    	return NULL;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    socket_server_poll函数触发获得事件后,根据返回值事件类型type,调用消息创建函数forward_message创建消息。

    //skynet_socket.c
    int 
    skynet_socket_poll() {
    	struct socket_server *ss = SOCKET_SERVER;
    	assert(ss);
    	struct socket_message result;
    	int more = 1;
    	int type = socket_server_poll(ss, &result, &more);
    	switch (type) {
    	case SOCKET_EXIT:
    		return 0;
    	case SOCKET_DATA:
    		forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
    		break;
    	case SOCKET_CLOSE:
    		forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
    		break;
    	case SOCKET_OPEN:
    		forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
    		break;
    	case SOCKET_ERR:
    		forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
    		break;
    	case SOCKET_ACCEPT:
    		forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
    		break;
    	case SOCKET_UDP:
    		forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
    		break;
    	case SOCKET_WARNING:
    		forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
    		break;
    	default:
    		skynet_error(NULL, "Unknown socket message type %d.",type);
    		return -1;
    	}
    	if (more) {
    		return -1;
    	}
    	return 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    在创建消息函数中,构建struct skynet_message message消息对象,调用skynet_context_push函数将消息插入服务的消息队列。

    //skynet_socket.c
    static void
    forward_message(int type, bool padding, struct socket_message * result) {
    	struct skynet_socket_message *sm;
    	size_t sz = sizeof(*sm);
    	if (padding) {
    		if (result->data) {
    			size_t msg_sz = strlen(result->data);
    			if (msg_sz > 128) {
    				msg_sz = 128;
    			}
    			sz += msg_sz;
    		} else {
    			result->data = "";
    		}
    	}
    	sm = (struct skynet_socket_message *)skynet_malloc(sz);
    	sm->type = type;
    	sm->id = result->id;
    	sm->ud = result->ud;
    	if (padding) {
    		sm->buffer = NULL;
    		memcpy(sm+1, result->data, sz - sizeof(*sm));
    	} else {
    		sm->buffer = result->data;
    	}
    
    	struct skynet_message message;
    	message.source = 0;
    	message.session = 0;
    	message.data = sm;
    	message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
    	
    	if (skynet_context_push((uint32_t)result->opaque, &message)) {
    		// todo: report somewhere to close socket
    		// don't call skynet_socket_close here (It will block mainloop)
    		skynet_free(sm->buffer);
    		skynet_free(sm);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    具体将信息插入服务的消息队列代码如下,注意加锁解锁操作,因为涉及多线程操作该消息队列(worker线程要从队列拿出信息消费,网络线程要插入信息到队列)

    int
    skynet_context_push(uint32_t handle, struct skynet_message *message) {
    	struct skynet_context * ctx = skynet_handle_grab(handle);
    	if (ctx == NULL) {
    		return -1;
    	}
    	skynet_mq_push(ctx->queue, message);
    	skynet_context_release(ctx);
    
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    二、skynet工作线程入口

    skynet工作线程入口如下,调用函数skynet_context_message_dispatch从全局队列拿出服务然后消费,返回值q如果为NULL表示目前没有服务有消息要处理,然后通过条件变量+互斥锁阻塞当前的工作线程。

    //skynet_start.c
    static void *skynet_start.c
    thread_worker(void *p) {
    	struct worker_parm *wp = p;
    	int id = wp->id;
    	int weight = wp->weight;
    	struct monitor *m = wp->m;
    	struct skynet_monitor *sm = m->m[id];
    	skynet_initthread(THREAD_WORKER);
    	struct message_queue * q = NULL;
    	while (!m->quit) {
    		q = skynet_context_message_dispatch(sm, q, weight);
    		if (q == NULL) {
    			if (pthread_mutex_lock(&m->mutex) == 0) {
    				++ m->sleep;
    				// "spurious wakeup" is harmless,
    				// because skynet_context_message_dispatch() can be call at any time.
    				if (!m->quit)
    					pthread_cond_wait(&m->cond, &m->mutex);
    				-- m->sleep;
    				if (pthread_mutex_unlock(&m->mutex)) {
    					fprintf(stderr, "unlock mutex error");
    					exit(1);
    				}
    			}
    		}
    	}
    	return NULL;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    消息处理逻辑如下。在for循环里,处理消息队列里指定数目的消息。调用函数dispatch_message,执行在服务skynet.start时候绑定的ctx->cb回调函数。在该回调函数中,会创建一个协程去执行,具体可以跟一下skynet.start创建回调函数的具体代码。

    //skynet_server.c
    struct message_queue * 
    skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
    	if (q == NULL) {
    		q = skynet_globalmq_pop();
    		if (q==NULL)
    			return NULL;
    	}
    
    	uint32_t handle = skynet_mq_handle(q);
    
    	struct skynet_context * ctx = skynet_handle_grab(handle);
    	if (ctx == NULL) {
    		struct drop_t d = { handle };
    		skynet_mq_release(q, drop_message, &d);
    		return skynet_globalmq_pop();
    	}
    
    	int i,n=1;
    	struct skynet_message msg;
    
    	for (i=0;i<n;i++) {
    		if (skynet_mq_pop(q,&msg)) {
    			skynet_context_release(ctx);
    			return skynet_globalmq_pop();
    		} else if (i==0 && weight >= 0) {
    			n = skynet_mq_length(q);
    			n >>= weight;
    		}
    		int overload = skynet_mq_overload(q);
    		if (overload) {
    			skynet_error(ctx, "May overload, message queue length = %d", overload);
    		}
    
    		skynet_monitor_trigger(sm, msg.source , handle);
    
    		if (ctx->cb == NULL) {
    			skynet_free(msg.data);
    		} else {
    			dispatch_message(ctx, &msg);
    		}
    
    		skynet_monitor_trigger(sm, 0,0);
    	}
    
    	assert(q == ctx->queue);
    	struct message_queue *nq = skynet_globalmq_pop();
    	if (nq) {
    		// If global mq is not empty , push q back, and return next queue (nq)
    		// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
    		skynet_globalmq_push(q);
    		q = nq;
    	} 
    	skynet_context_release(ctx);
    
    	return q;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    三、skynet定时器线程入口

    skynet定时器入口如下,就是在for死循环中,调用skynet_updatetimeskynet_socket_updatetime函数更新时间,然后调用usleep系统接口每隔一段时间阻塞睡眠当前线程,当CPU重新调度到该线程后,调用函数signal_hup处理到期时间逻辑。

    //skynet_start.c
    static void *
    thread_timer(void *p) {
    	struct monitor * m = p;
    	skynet_initthread(THREAD_TIMER);
    	for (;;) {
    		skynet_updatetime();
    		skynet_socket_updatetime();
    		CHECK_ABORT
    		wakeup(m,m->count-1);
    		usleep(2500);
    		if (SIG) {
    			signal_hup();
    			SIG = 0;
    		}
    	}
    	// wakeup socket thread
    	skynet_socket_exit();
    	// wakeup all worker thread
    	pthread_mutex_lock(&m->mutex);
    	m->quit = 1;
    	pthread_cond_broadcast(&m->cond);
    	pthread_mutex_unlock(&m->mutex);
    	return NULL;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    处理到期时间逻辑如下,调用函数skynet_handle_findname获取到期的时间信息,然后调用函数skynet_context_push把到期时间的信息重新插入到其对应的服务的消息队列当中。

    static void
    signal_hup() {
    	// make log file reopen
    
    	struct skynet_message smsg;
    	smsg.source = 0;
    	smsg.session = 0;
    	smsg.data = NULL;
    	smsg.sz = (size_t)PTYPE_SYSTEM << MESSAGE_TYPE_SHIFT;
    	uint32_t logger = skynet_handle_findname("logger");
    	if (logger) {
    		skynet_context_push(logger, &smsg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    四、skynet监视器线程入口

    skynet监视器入口如下,就是在for死循环中,每隔5秒钟,调用skynet_monitor_check检查对应工作线程是否死循环(注:每个工作线程对应一个监视,因此m->count就是工作线程的个数)。

    static void *
    thread_monitor(void *p) {
    	struct monitor * m = p;
    	int i;
    	int n = m->count;
    	skynet_initthread(THREAD_MONITOR);
    	for (;;) {
    		CHECK_ABORT
    		for (i=0;i<n;i++) {
    			skynet_monitor_check(m->m[i]);
    		}
    		for (i=0;i<5;i++) {
    			CHECK_ABORT
    			sleep(1);
    		}
    	}
    
    	return NULL;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    每个工作线程对应一个skynet_monitor结构,当工作线程处理消息前用skynet_monitor记录消息的来源和目标。处理完清除。监控线程每隔五秒检测。

    struct skynet_monitor {
        int version;
        int check_version;
        uint32_t source;
        uint32_t destination;
    };
    
    //监控线程检测是否陷入死循环
    void 
    skynet_monitor_check(struct skynet_monitor *sm) {
        if (sm->version == sm->check_version) {
            if (sm->destination) {
                skynet_context_endless(sm->destination);
                skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)", sm->source , sm->destination, sm->version);
            }
        } else {
            sm->check_version = sm->version;
        }
    }
    //工作线程记录消息执行记录
    skynet_monitor_trigger(struct skynet_monitor *sm, uint32_t source, uint32_t destination) {
        sm->source = source;
        sm->destination = destination;
        ATOM_INC(&sm->version);
    }
    //调用消息回调前
    skynet_monitor_trigger(sm, msg.source , handle);
    //调用消息回调后
    skynet_monitor_trigger(sm, 0,0);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    如何判断是否陷入死循环?

    假设 version = 0,check_version = 0. 工作线程处理消息前 在skynet_monitor_trigger中sm->version = sm->version + 1,五秒后监控线程执行sm->check_version = sm->version; version = 1,check_version = 1.五秒后,监控线程再次检测,如果在五秒内工作线程执行完消息那么工作线程会执行skynet_monitor_trigger sm->version = sm->version + 1 .version = 2,check_version = 1.没执行完则version = 1,check_version = 1,sm->destination != 0.则认为消息的执行超过了五秒,可能陷入死循环。

    如何查找skynet的C源码位置

    lua代码中像如下的源文件怎么查找呢

    local socketdriver = require "skynet.socketdriver"
    
    • 1

    由于lua代码无法进行IO操作,因此lua的IO操作都是调用C接口进行的。

    因此如上比较底层的需要和socket IO打交道的代码,一定是在C源码中。

    我们可以利用全局搜索,如下所示。注意规律,前面加luaopen_,skynet和后面的库名之间的".“替换为”_"

    luaopen_skynet_socketdriver
    
    • 1

    lua代码中像如下的源文件怎么查找呢

    local c = require "skynet.core"
    
    • 1

    我们可以利用全局搜索,如下所示。注意规律,前面加luaopen_,skynet和后面的库名之间的".“替换为”_"

    luaopen_skynet_core
    
    • 1
  • 相关阅读:
    从云AK泄露利用看企业特权管理
    学习笔记7--交通环境行为预测
    Cocos 系列教程 - 01 认识项目结构
    【0224】源码分析RelFileNode对smgr访问磁盘表文件的重要性(2)
    高斯算法的原理及其与常规求和方法的区别
    C# Winform编程(8)GDI+绘图
    python程序打包——基础准备、源代码打包、二进制打包、setuptools基础
    Advanced .Net Debugging 4:基本调试任务(对象检查:内存、值类型、引用类型、数组和异常的转储)
    pdf如何让多张图片在一页
    这份300页的2020最新java面试题及答案,“吃透”能让你成功定位阿里P9
  • 原文地址:https://blog.csdn.net/qq_37717687/article/details/122149726