• skynet中一条消息从取出到处理完整流程(协程调度源码刨析)


    先介绍一点前置基础知识,逐步阐述一条消息得到处理完整流程,有基础的朋友直接从第二大点开始看。

    一.lua简单基础知识

    • local test
      test = function ()

      等价于

      local function test()

    • 前边加local表示局部变量,反之。

    • 关于require “skynet.core” 来自哪里

      很多地方都用到了skynet.core,

      比如

    c =  require "skynet.core"
    function skynet.start(start_func)
    2     c.callback(skynet.dispatch_message)
    3     skynet.timeout(0, function()
    4         skynet.init_service(start_func)
    5     end)
    6 end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里的c来自于require “skynet.core”,它是在lua-skynet.c中注册的,如下,每次调用c库都是通用的接口,这里可以看到接口函数为luaopen_skynet_core, 在加载的时候会把第二个_转化成 . ,这样就可以明白skynet.core是哪里来的了。

     int luaopen_skynet_core(lua_State *L) {
        luaL_checkversion(L);
    
        luaL_Reg l[] = {
             ...
           { "callback", _callback },
           { NULL, NULL },
         };
     
         luaL_newlibtable(L, l);
    
         lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
        struct skynet_context *ctx = lua_touserdata(L,-1);
        if (ctx == NULL) {
            return luaL_error(L, "Init skynet context first");
         }
     
        luaL_setfuncs(L,l,1);
     
         return 1;
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    skynet.start中调用c.callback,对应的就是lua-skynet.c中的lcallback函数,skynet.dispatch_message回调就是它的参数(这里引申一点:lua层在调用C接口的时候,可以发现C接口层的函数都是一个lua_State * 类型的参数,我们可以把这种方式理解为传递了当前虚拟机的状态,就以当前情况举例,我们在lua层使用了c.callback(skynet.dispatch_message),这里我们本来传递的参数是skynet.dispatch_message,其实当c调用lua层的时候会产生一个新的堆栈区,这个参数被放到了这个新创建的堆栈区中(每次调用一个c接口都会产生一个新的堆栈),不管你放几个参数,都是放到了新创建的虚拟堆栈中,最终在c层面我们只看到了唯一的参数接口,lua_State * ,表示对应的lua层服务虚拟机的状态。(一个lua服务对应一个lua虚拟机))

    skynet.dispatch_message 内部实现是通过raw_dispatch_message实现

    这里有个细节:为什么下面出现的p.dispatch是我们自定义的函数,在skynet.lua中查看skynet.dispatch函数的实现即可明白

    local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
        -- skynet.PTYPE_RESPONSE = 1, read skynet.h
        if prototype == 1 then -- “response” 类型消息,skynet 已自动处理
            local co = session_id_coroutine[session]
            session_id_coroutine[session] = nil
            suspend(co, coroutine.resume(co, true, msg, sz))
        else -- 其他类型消息派发到相应的 dispatch 函数
            local p = assert(proto[prototype], prototype)
            local f = p.dispatch -- 我们自定义的 dispatch 函数
            if f then
                local co = co_create(f) -- 创建 coroutine
                session_coroutine_id[co] = session
                session_coroutine_address[co] = source
                suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
            end
        end
    end
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    二:服务创建–>回调函数的设置

    涉及目录文件

    skynet-src目录:

    以下涉及函数,若未标注,均在skynet_server.c中

    讲述了newservice服务开启的底层,当调用newservice时,实际上底层是去了主函数为skynet_context_new的地方
    在这里插入图片描述

    涉及结构体struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
    初始化skynet_context实例,可以理解为一个服务的实例

    struct skynet_context {		//这个结构体表示一个服务(actor)实例
    	void * instance;
    	struct skynet_module * mod;
    	void * cb_ud;
    	skynet_cb cb;  //回调函数
    	struct message_queue *queue;
    	ATOM_POINTER logfile;
    	uint64_t cpu_cost;	// in microsec
    	uint64_t cpu_start;	// in microsec
    	char result[32];
    	uint32_t handle;
    	int session_id;
    	ATOM_INT ref;
    	int message_count;
    	bool init;
    	bool endless;
    	bool profile;
    
    	CHECKCALLING_DECL
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    涉及函数:

    // 用于创建隔离的环境 
    void * skynet_module_instance_create(struct skynet_module *m);
     // 用于设置回调函数 int 
    skynet_module_instance_init(struct skynet_module *m, void * inst, struct 
    skynet_context *ctx, const char * parm); 
    // 用于释放 actor 对象 void 
    skynet_module_instance_release(struct skynet_module *m, void *inst); 
    //用于处理 信号 消息 void skynet_module_instance_signal(struct 
     skynet_module *m, void *inst, int signal);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    skynet_module_query模块初始化,所有服务启动的必经之路(在lua层调用C库完成后续的create,init,release,signal函数的加载)
    其中的get_api 可以理解为在lua层调用c语言写的库获取所需函数的地址(加载.so库文件)

    这个函数是skynet-module.c里面的,使用了open_sym函数(内部使用了get_api),将service-snlua.c加载成的库文件对应的snlua_create,snlua_init,snlua_release,snlua_signal函数地址加载进去,所以也叫模块初始化,所有服务初始化的时候都会经过service-snlua.c这个文件
    service-snlua.c被加载成库文件,这个库文件可以在cservice目录里看到。
    下边的函数实现全都依赖这个模块初始化,初始化后,会将snlua_create,snlua_init,snlua_release,snlua_signal这几个函数的地址都设置好,下边这些函数才能调用。
    
    • 1
    • 2
    • 3

    skynet_module_instance_create创建lua虚拟机(create函数)

    函数内部使用了create函数指针,create函数本质是service-src目录下,service-snlua.c里面的snlua_create。(这里体现了在lua层调用c语言写的库,即上边所说的,在模块初始化过程就把create加载好了)
    snlua_create里面可以看到一个函数lua_newstate,即创建一个lua虚拟机
    
    • 1
    • 2

    skynet_handle_register生成全局唯一句柄handle

    将handle的值赋值给该服务的skynet_context实例,这里指的是ctx
    ctx.handle = handle
    
    • 1
    • 2

    skynet_mq_create创建队列

    每创建一个队列的时候,都会把ctx.handle传进去,
    目的是将actor(服务)与队列进行关系绑定
    队列结构体中包含了handle字段,所有能够进行绑定,
    
    • 1
    • 2
    • 3

    skynet_module_instance_init设置回调函数

    函数内部使用了init函数指针,init函数本质是service-src目录下,
    service-snlua.c里面的snlua_init。(在模块初始化过程加载的)
    
    • 1
    • 2

    Actor运行

    在这里插入图片描述

    dispatch_message 和 skynet_callback 都在这里面

    skynet_context_send :可以看到如何将一个消息加入服务队列

    skynet_context_push和skynet_context_send类似,可以对比观察

    细节点:

    为什么上边的图有两条路径最终都是去往ctx->cb,其实下边这条路径是在服务进行初始化时,对回调函数的设置,上边那条路径是在收到消息后,对消息进行处理时,最终进到ctx->cb,继续往后学习,你会发现,ctx->cb只是一个接口,你的第二个传参决定了最终去处理哪个服务(actor)的处理函数,最终的实现是在skynet.dispatch_message中的raw_dispatch_message中调用对应的p.dispatch。

    skynet_start.c文件

    讲述了线程(内核)的分配工作方式

    涉及函数:

    skynet_context_message_dispatch

    skynet.lua

    涉及函数:

    skynet.dispatch_message(里面用了raw_dispatch_message),描述了当一个消息到来会创建一个协程去处理,用的co_create,实际上是从一个协程池里面取一个协程。

    skynet.dispatch(typename, func) 的实现,可以了解到是如何设置协程对应的处理函数的

    三:消息与actor建立关联

    skynet是基于消息的,那么当我们取出一条消息,怎么判断它对应的是哪个服务呢。

    上边说到,每个服务都有自己消息队列,消息队列这个结构体包含了一个handle字段,handle字段是每个服务的全局唯一标识。
    在skynet_server.c文件中skynet_context_message_dispatch函数充分体现了这个过程,先通过skynet_globalmq_pop从全局队列中取出一个次级消息队列,通过skynet_mq_handle获取对应句柄,再通过skynet_handle_grab,将句柄作为参数传进去,得到skynet_context * ctx(服务实例),接着后边几行可以看到对这个线程的权重的设置,这决定当前线程可以同时处理同一个服务中的几个消息

    线程权重设置好处:如不设置,所有线程都从全局队列中取出一个次级队列,并且只处理一个消息又放回去全局队列尾部,一直这样循环,对于同一个服务,线程切换次数过多,效率低下。

    四:阐述工作线程如何处理一条消息

    在skynet_start.c中,start函数中包括了线程权重的设置,以及确定了线程的工作函数是thread_work(void * p) ,从这个函数中可以通过skynet_context_message_dispatch(在skynet_server.c中)来实现对消息的处理,进入skynet_context_message_dispatch这个函数我们可以看到通过skynet_globalmq_pop()获取了一个次级消息队列,通过这个次级消息队列我们可以获取全局唯一的服务句柄(因为消息队列这个结构体包含了handle),进而获取服务实例(skynet_context * ctx),使用dispatch_message(struct skynet_context *ctx, struct skynet_message *msg)函数(在skynet.server.c中)将ctx,和message作为参数传入,进一步调用ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
    这里需要注意第二个参数就是需要处理消息的服务(actor)的状态,通过这个参数就能找到对应服务的skynet.dispatch_message。

    上边已经说过skynet.start中调用c.callback,这个callback对应的就是lua-skynet.c中的 lcallback 函数,skynet.dispatch_message这个函数就是它的参数,
    我们再重温一下上边的知识点:lua层在调用C接口的时候,可以发现C接口层的函数都是一个lua_State * 类型的参数,我们可以把这种方式理解为传递了当前虚拟机的状态,就以当前情况举例,我们在lua层使用了c.callback(skynet.dispatch_message),这里我们本来传递的参数是skynet.dispatch_message,其实当c调用lua层的时候会产生一个新的堆栈区,这个参数被放到了这个新创建的堆栈区中(每次调用一个c接口都会产生一个新的堆栈),不管你放几个参数,都是放到了新创建的虚拟堆栈中,最终在c层面我们只看到了唯一的参数接口,lua_State * ,表示对应的lua层服务虚拟机的状态。(一个lua服务对应一个lua虚拟机))

    static int lcallback(lua_State *L) {
        struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
        int forward = lua_toboolean(L, 2);
        luaL_checktype(L,1,LUA_TFUNCTION);
        lua_settop(L,1);
        lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); 
    
        lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
        lua_State *gL = lua_tothread(L,-1);
    
        if (forward) {
            skynet_callback(context, gL, forward_cb);
                --forward_cb内部就是_cb
        } else {
            skynet_callback(context, gL, _cb);
        }
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    上边代码的第六行:
    可以看到,其以函数_cb为key,这个服务的LUA回调函数(skynet.dispatch_message)作为value被注册到全局注册表(每个lua服务都有一个自己的单独注册表)中。

     void skynet_callback(struct skynet_context * ctx, void *ud, skynet_cb cb) {
         ctx->cb = cb;
         ctx->cb_ud = ud;
     }
    
    • 1
    • 2
    • 3
    • 4

    这里的ctx表示服务实例,ctx->cb表示回调函数,ctx->cb_ud才是表示的lua虚拟机的状态,也是最终决定消息属于哪个服务器的判断关键。
    重点:
    所有服务的统一接口都是ctx->cb,但这个cb其实就是(lua-skynet.c文件里的)_cb函数,让我们看一下源码

    static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
        lua_State *L = ud; --重点。、
        int trace = 1;
        int r;
        int top = lua_gettop(L);
        if (top == 0) {
            lua_pushcfunction(L, traceback);
            lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
        } else {
            assert(top == 2);
        }
        lua_pushvalue(L,2);
    
            
            --	再把这些信息(type,msg,sz,session,source)压栈调用
        lua_pushinteger(L, type);
        lua_pushlightuserdata(L, (void *)msg);
        lua_pushinteger(L,sz);
        lua_pushinteger(L, session);
        lua_pushinteger(L, source);
    
        r = lua_pcall(L, 5, 0 , trace);
    
        if (r == LUA_OK) {
            return 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    只需要知道,skynet-context(也就是我们指的服务实例,ctx)接收到消息后会转发给context->cb(ctx->cb)处理,也就是_cb函数。在_cb中,在代码第九行可以看出,从全局表中取到当前服务关联的LUA回调(skynet.dispatch_message),将type, msg, sz, session, source(这几个参数刚好是raw_dispatch_message需要的参数)压栈调用。最终LUA回调到该服务的(skynet.dispatch_message)里面,进一步使用raw_dispatch_message实现具体功能。

    重点:当你不明白当前服务的lua回调(skynet.dispatch_message)是怎么注册,怎么取出的时候,仔细观察lcallback函数的参数,以及_cb函数的第二个参数,可以发现,它们都被转化为lua_State结构,把这个结构体理解为一个服务(Actor)的状态,上边我们说过,lua在调用c库的时候会有一个独立的虚拟栈空间,在lua层不管我们传入多少参数,都会被压栈到这个虚拟栈空间保存,最终体现的都是一个lua_State * 结构来表示这个服务的状态,然后使用这个栈空间进行信息交互。

    让我们对比一下这个_cb函数中的第九行,和lcallback中的第六行。

    lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);  --_cb第六行
    lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);	 --lcallback第九行
    
    • 1
    • 2

    这里面的L参数就是对应的服务的lua虚拟机状态。
    这时候我相信大家已经恍然大悟了

    若有任何问题欢迎评论区留言讨论!!

  • 相关阅读:
    MyBatis 如何使用set标签呢?
    vue+asp.net Web api前后端分离项目发布部署
    Document类型【2】
    华测RTK采集的GPX数据如何带属性转出kml、shp进行后续的管理和分析
    spring中12种定义bean的方法,你都知道哪几种?
    十八、openlayers官网示例Custom Animation解析——地图上添加自定义动画
    10 分钟了解 Pulsar:针对 Kafka 用户的指南
    临床三线表/基线资料表一行代码绘制
    Seata(1.4.2)环境搭建-SpringCloudAlibaba微服务
    oppo前端开发一面
  • 原文地址:https://blog.csdn.net/qq_51721904/article/details/126080397