Skynet 仅解决一个问题:把一个符合规范的 C 模块,从动态库(so 文件)中启动起来,绑定一个永不重复(即使模块退出)的数字 id 做为其 handle 。模块被称为服务(Service),服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对 CPU 资源零消耗。如果需要自主逻辑,则可以利用 Skynet 系统提供的 timeout 消息,定期触发。(Skynet 提供了名字服务,还可以给特定的服务起一个易读的名字,而不是用 id 来指代它。id 和运行时态相关,无法保证每次启动服务,都有一致的 id ,但名字可以。)
从上面的意思来看,skynet可以让我们写的不同的业务逻辑,独立运行在不同的上下文环境中,并且能够通过某种方式,相互协作,最终共同服务(actor模型)
skynet机制:
/*
一个模块被加载以后,将被放置到modules的skynet_module数组中,当要创建该module的实例时,将会从skynet_module中取出对应的模块,并调用create函数创建实例,然后将实例指针传入init函数完成初始化以后,赋值给context。
一个C服务,定义以上四个接口时,一定要以文件名作为前缀,然后通过下划线和对应函数连接起来,因为skynet加载的时候,就是通过这种方式去寻找对应函数的地址的,比如一个c服务文件名为logger,那么对应的4个函数名则为logger_create、logger_init、logger_signal、logger_release(在程序中动态加载到skynet_module列表中,这里通过dlopen函数来获取so库的访问句柄,并通过dlsym将so库中对应的函数绑定到函数指针中)
*/
// skynet_module.h
typedef void * (*skynet_dl_create)(void); //create
//skynet_context 对象会注册在 skynet_context list
typedef int (*skynet_dl_init)(void * inst, struct skynet_context *, const char * parm); //init
typedef void (*skynet_dl_release)(void * inst); //release
typedef void (*skynet_dl_signal)(void * inst, int signal); //signal
struct skynet_module {
const char * name; // C服务名称,一般是C服务的文件名
void * module; // 访问该so库的dl句柄,该句柄通过dlopen函数获得
skynet_dl_create create; // 绑定so库中的xxx_create函数,通过dlsym函数实现绑定,调用该create即是调用xxx_create
skynet_dl_init init; // 绑定so库中的xxx_init函数,调用该init即是调用xxx_init
skynet_dl_release release; // 绑定so库中的xxx_release函数,调用该release即是调用xxx_release
skynet_dl_signal signal; // 绑定so库中的xxx_signal函数,调用该signal即是调用xxx_signal
};
// skynet_module.c
#define MAX_MODULE_TYPE 32
struct modules {
int count; // modules的数量
struct spinlock lock; // 自旋锁,避免多个线程同时向skynet_module写入数据,保证线程安全
const char * path; // 由skynet配置表中的cpath指定,一般包含./cservice/?.so路径
struct skynet_module m[MAX_MODULE_TYPE]; // 存放服务模块的数组,最多32类
};
static struct modules * M = NULL;
/*
对于一个新服务的创建流程:
对应的module -> module实例化和初始化 -> 创建skynet_context上下文环境 -> module实例和模块与skynet_context关联 -> 放置到skynet_context list
当一个消息送达一个context时,其callback函数就会被调用,callback函数一般在module的init函数里指定,调用callback函数时,会传入
userdata(一般是instance指针),
source(发送方的服务id),
type(消息类型),
msg和sz(数据及其大小),
每个服务的callback处理各自的逻辑
*/
// skynet_server.c
struct skynet_context {
void * instance; // 由指定module的create函数,创建的数据实例指针,同一类服务可能有多个实例,
// 因此每个服务都应该有自己的数据
struct skynet_module * mod; // 引用服务module的指针,方便后面对create、init、signal和release函数进行调用
void * cb_ud; // 调用callback函数时,回传给callback的userdata,一般是instance指针
skynet_cb cb; // 服务的消息回调函数,一般在skynet_module的init函数里指定
struct message_queue *queue; // 服务专属的次级消息队列指针
FILE * logfile; // 日志句柄
char result[32]; // 操作skynet_context的返回值,会写到这里
uint32_t handle; // 标识唯一context的服务id
int session_id; // 在发出请求后,收到对方的返回消息时,通过session_id来匹配一个返回,对应哪个请求
int ref; // 引用计数变量,当为0时,表示内存可以被释放
bool init; // 是否完成初始化
bool endless; // 消息是否堵住
CHECKCALLING_DECL
};
// skynet_handle.c
// 这个结构用于记录,服务对应的别名,当应用层为某个服务命名时,会写到这里来
struct handle_name {
char * name; // 服务别名
uint32_t handle; // 服务id
};
struct handle_storage {
struct rwlock lock; // 读写锁
uint32_t harbor; // harbor id
uint32_t handle_index; // 创建下一个服务时,该服务的slot idx,一般会先判断该slot是否被占用,后面会详细讨论
int slot_size; // slot的大小,一定是2^n,初始值是4
struct skynet_context ** slot; // skynet_context list
int name_cap; // 别名列表大小,大小为2^n
int name_count; // 别名数量
struct handle_name *name; // 别名列表
};
static struct handle_storage *H = NULL;
/*
skynet包含两级消息队列
1、global_mq 包含一个head和tail指针 分别指向次级消息队列的头部和尾部
2、次级消息队列(mq) 单线链表
次级消息队列,实际上是一个数组,并且用两个int型数据,分别指向他的头部和尾部(head和tail),不论是head还是tail,当他们的值>=数组尺寸时,都会进行回绕(即从下标为0开始,比如值为数组的size时,会被重新赋值为0),在push操作后,head等于tail意味着队列已满(此时,队列会扩充两倍,并从头到尾重新赋值,此时head指向0,而tail为扩充前,数组的大小),在pop操作后,head等于tail意味着队列已经空了(后面他会从skynet全局消息队列中,被剔除掉)。
*/
/*
1、消息驱动
消息派发的机制:(worker线程 --> global_mq --(pop mq)--> mq --(pop msg)-->context的callback函数 --(push mq)-->global_mq)
工作线程,会从global_mq里pop一个次级消息队列来,然后从次级消息队列中,pop出一个消息,并传给context的callback函数,在完成驱动以后,再将次级消息队列push回global_mq中
*/
// skynet_mq.h
struct skynet_message {
uint32_t source; // 消息发送方的服务地址
// 如果这是一个回应消息,那么要通过session找回对应的一次请求,在lua层,我们每次调用call的时候,都会往对
// 方的消息队列中,push一个消息,并且生成一个session,然后将本地的协程挂起,挂起时,会以session为key,协程句
// 柄为值,放入一个table中,当回应消息送达时,通过session找到对应的协程,并将其唤醒。后面章节会详细讨论
int session;
void * data; // 消息地址
size_t sz; // 消息大小
};
// skynet_mq.c
#define DEFAULT_QUEUE_SIZE 64
#define MAX_GLOBAL_MQ 0x10000
// 0 means mq is not in global mq.
// 1 means mq is in global mq , or the message is dispatching.
#define MQ_IN_GLOBAL 1
#define MQ_OVERLOAD 1024
struct message_queue {
// 自旋锁,可能存在多个线程,向同一个队列写入的情况,加上自旋锁避免并发带来的发现,
//后面会讨论互斥锁,自旋锁,读写锁和条件变量的区别
struct spinlock lock;
uint32_t handle; // 拥有此消息队列的服务的id
int cap; // 消息大小
int head; // 头部index
int tail; // 尾部index
int release; // 是否能释放消息
int in_global; // 是否在全局消息队列中,0表示不是,1表示是
int overload; // 是否过载
int overload_threshold;
struct skynet_message *queue; // 消息队列
struct message_queue *next; // 下一个次级消息队列的指针
};
struct global_queue {
struct message_queue *head;
struct message_queue *tail;
struct spinlock lock;
};
static struct global_queue *Q = NULL;
/*
2、消息写入
我们要向一个服务发消息,最终是通过调用skynet.send接口,将消息插入到该服务专属的次级消息队列的,次级消息队列的内容,并不是context结构的一部分(context只是引用了他的指针),因此,在一个服务执行callback的同时,其他服务(可能是多个线程内执行callback的其他服务)可以向它的消息队列里push消息,而mq的push操作,是加了一个自旋锁,以避免多个线程,同时操作一个消息队列。lua层的skynet.send接口,最终会调到c层的skynet_context_push。这个接口实质上,是通过handle将context指针取出来,然后再往消息队列里push消息:
*/
// skynet_server.c
int skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message);
skynet_context_release(ctx);
return 0;
}
// skynet_handle.c
struct skynet_context *
skynet_handle_grab(uint32_t handle) {
struct handle_storage *s = H;
struct skynet_context * result = NULL;
rwlock_rlock(&s->lock);
uint32_t hash = handle & (s->slot_size-1);
struct skynet_context * ctx = s->slot[hash];
if (ctx && skynet_context_handle(ctx) == handle) {
result = ctx;
skynet_context_grab(result);
}
/*
因为我们访问一个服务的机会,远大于创建一个服务并写入列表的机会,因此这里用了读写锁,在通过handle获取context指针时,加了一个读取锁,这样当在读取的过程中,同时有新的服务创建,并且存在要扩充skynet_context list容量的风险,因此不论如何,他都应当被阻塞住,直到所有的读取锁都释放掉。
*/
rwlock_runlock(&s->lock);
return result;
}
创建c服务的工作,一般在c层进行,一般会调用skynet_context_new接口,如下所示:
// skynet_server.c
struct skynet_context *
skynet_context_new(const char * name, const char *param) {
struct skynet_module * mod = skynet_module_query(name);
if (mod == NULL)
return NULL;
void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
CHECKCALLING_INIT(ctx)
ctx->mod = mod;
ctx->instance = inst;
ctx->ref = 2;
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->session_id = 0;
ctx->logfile = NULL;
ctx->init = false;
ctx->endless = false;
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
// init function maybe use ctx->handle, so it must init at last
context_inc();
CHECKCALLING_BEGIN(ctx)
int r = skynet_module_instance_init(mod, inst, ctx, param);
CHECKCALLING_END(ctx)
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx);
skynet_handle_retire(handle);
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d);
return NULL;
}
}
// skynet_module.c
struct skynet_module *
skynet_module_query(const char * name) {
struct skynet_module * result = _query(name);
if (result)
return result;
SPIN_LOCK(M)
result = _query(name); // double check
if (result == NULL && M->count < MAX_MODULE_TYPE) {
int index = M->count;
void * dl = _try_open(M,name);
if (dl) {
M->m[index].name = name;
M->m[index].module = dl;
if (_open_sym(&M->m[index]) == 0) {
M->m[index].name = skynet_strdup(name);
M->count ++;
result = &M->m[index];
}
}
}
SPIN_UNLOCK(M)
return result;
}
static void *
_try_open(struct modules *m, const char * name) {
const char *l;
const char * path = m->path;
size_t path_size = strlen(path);
size_t name_size = strlen(name);
int sz = path_size + name_size;
//search path
void * dl = NULL;
char tmp[sz];
do
{
memset(tmp,0,sz);
while (*path == ';') path++;
if (*path == '\0') break;
l = strchr(path, ';');
if (l == NULL) l = path + strlen(path);
int len = l - path;
int i;
for (i=0;path[i]!='?' && i < len ;i++) {
tmp[i] = path[i];
}
memcpy(tmp+i,name,name_size);
if (path[i] == '?') {
strncpy(tmp+i+name_size,path+i+1,len - i - 1);
} else {
fprintf(stderr,"Invalid C service path\n");
exit(1);
}
dl = dlopen(tmp, RTLD_NOW | RTLD_GLOBAL);
path = l;
}while(dl == NULL);
if (dl == NULL) {
fprintf(stderr, "try open %s failed : %s\n",name,dlerror());
}
return dl;
}
_open_sym(struct skynet_module *mod) {
size_t name_size = strlen(mod->name);
char tmp[name_size + 9]; // create/init/release/signal , longest name is release (7)
memcpy(tmp, mod->name, name_size);
strcpy(tmp+name_size, "_create");
mod->create = dlsym(mod->module, tmp);
strcpy(tmp+name_size, "_init");
mod->init = dlsym(mod->module, tmp);
strcpy(tmp+name_size, "_release");
mod->release = dlsym(mod->module, tmp);
strcpy(tmp+name_size, "_signal");
mod->signal = dlsym(mod->module, tmp);
return mod->init == NULL;
}
1、启动skynet节点时,会启动一个logger c服务
// skynet_start.c
void
skynet_start(struct skynet_config * config) {
...
struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}
...
}
2、此时,skynet_module list列表中,搜索logger服务模块,如果没找到则在so库的输出路径中,寻找名为logger的so库,找到则将该so库加载到内存中,并将对应的logger_create,logger_init,logger_release函数地址分别赋值给logger模块中的create,init,release函数指针,此时skynet_module list中,多了一个logger模块。
3、创建服务实例,即创建一个skynet_context实例,为了使skynet_context实例拥有访问logger服务内部函数的权限,这里将logger模块指针,赋值给skynet_context实例的mod变量中。
4、创建一个logger服务的数据实例,调用logger服务的create函数:
// service_logger.c
struct logger {
FILE * handle;
int close;
};
struct logger *
logger_create(void) {
struct logger * inst = skynet_malloc(sizeof(*inst));
inst->handle = NULL;
inst->close = 0;
return inst;
}
此时,将新创建的数据实例赋值给skynet_context的instance变量,此时,一个服务对象运行时,所要用到的逻辑,能够通过mod变量,访问logger服务对应的函数,而通过instance可以找到该服务自己的数据块。
5、将新创建的skynet_context对象,注册skynet_context list中,此时skynet_context list多了一个logger服务实例
6、初始化logger服务,注册logger服务的callback函数:
// service_logger.c
static int
_logger(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
struct logger * inst = ud;
fprintf(inst->handle, "[:%08x] ",source);
fwrite(msg, sz , 1, inst->handle);
fprintf(inst->handle, "\n");
fflush(inst->handle);
return 0;
}
int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
if (parm) {
inst->handle = fopen(parm,"w");
if (inst->handle == NULL) {
return 1;
}
inst->close = 1;
} else {
inst->handle = stdout;
}
if (inst->handle) {
skynet_callback(ctx, inst, _logger);
skynet_command(ctx, "REG", ".logger");
return 0;
}
return 1;
}
// skynet_server.c
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb;
context->cb_ud = ud;
}
上面这段逻辑,将skynet_context的callback函数设置为logger服务的_logger函数,并将调用callback时,传入的userdata设置为先前创建的数据实例
7、为logger服务实例创建一个次级消息队列,并将队列插入到全局消息队列中
简单的启动初始化流程: