• 服务端Skynet(二)——消息调度机制


    服务端Skynet(二)——消息调度机制


    参考文献:

    skynet设计综述

    skynet源码赏析

    1、提前了解知识

    1.1、互斥锁(mutex lock : mutual exclusion lock)

    1. 概念:互斥锁,一条线程加锁锁住临界区,另一条线程尝试访问该临界区的时候,会发生阻塞,并进入休眠状态。临界区是锁lock和unlock之间的代码片段,一般是多条线程能够共同访问的部分。
    2. 具体说明:假设一台机器上的cpu有两个核心core0和core1,现在有线程A、B、C,此时core0运行线程A,core1运行线程B,此时线程B使用Mutex锁,锁住一个临界区,当线程A试图访问该临界区时,因为线程B已经将其锁住,因此线程A被挂起,进入休眠状态,此时core0进行上下文切换,将线程A放入休眠队列中,然后core0运行线程C,当线程B完成临界区的流程并执行解锁之后,线程A又会被唤醒,core0重新运行线程A

    1.2、自旋锁(spinlock)

    1. 概念:自旋锁,一条线程加锁锁住临界区,另一条线程尝试访问该临界区的时候,会发生阻塞,但是不会进入休眠状态,并且不断轮询该锁,直至原来锁住临界区的线程解锁。
    2. 具体说明:假设一台机器上有两个核心core0和core1,现在有线程A、B、C,此时core0运行线程A,core1运行线程B,此时线程B调用spin lock锁住临界区,当线程A尝试访问该临界区时,因为B已经加锁,此时线程A会阻塞,并且不断轮询该锁,不会交出core0的使用权,当线程B释放锁时,A开始执行临界区逻辑

    1.3、读写锁(readers–writer lock)

    1. 概述:读写锁,一共三种状态
      • 读状态时加锁,此时为共享锁,当一个线程加了读锁时,其他线程如果也尝试以读模式进入临界区,那么不会发生阻塞,直接访问临界区
      • 写状态时加锁,此时为独占锁,当某个线程加了写锁,那么其他线程尝试访问该临界区(不论是读还是写),都会阻塞等待
      • 不加锁
    2. 注意:
      • 某线程加读取锁时,允许其他线程以读模式进入,此时如果有一个线程尝试以写模式访问临界区时,该线程会被阻塞,而其后尝试以读方式访问该临界区的线程也会被阻塞
      • 读写锁适合在读远大于写的情形中使用

    1.4、条件变量(condition variables)

    1. 概述:条件变量是利用线程间共享的变量进行同步的一种机制,是在多线程程序中用来实现"等待–>唤醒"逻辑常用的方法,用于维护一个条件(与是条件变量不同的概念),线程可以使用条件变量来等待某个条件为真,注意理解并不是等待条件变量为真。当条件不满足时,线程将自己加入等待队列,同时释放持有的互斥锁; 当一个线程唤醒一个或多个等待线程时,此时条件不一定为真(虚假唤醒)。
    2. 具体说明:应用程序A中包含两个线程t1和t2。t1需要在bool变量test_cond为true时才能继续执行,而test_cond的值是由t2来改变的。t1在test_cond为false时调用cond_wait进行等待,t2在改变test_cond的值后,调用cond_signal,唤醒在等待中的t1,告诉t1 test_cond的值变了,这样t1便可继续往下执行。

    2、消息的数据结构

    skynet 中一共支持两种不同的消息:

    1. skynet_message 本地消息
    2. remote_message 远程消息
    //skynet_mq.h
    struct skynet_message {
    	uint32_t source; 	//源地址(发送的)
    	int session;	    //当一个服务向另一个服务发请求是会生成一个session(包含请求数据的结构体),当响应端处理完毕之后,将结果放到session 原样返回
    	void * data;
    	size_t sz;
    };
    
    //skynet_harbor.h
    #define GLOBALNAME_LENGTH 16
    #define REMOTE_MAX 256
    
    //remote_name 远程节点(skynet) 名称
    struct remote_name {
    	char name[GLOBALNAME_LENGTH];
    	uint32_t handle;
    };
    
    struct remote_message {
    	struct remote_name destination;
    	const void * message;
    	size_t sz;
    	int type;
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    3、消费消息流程

    ​ skynet在启动时,会创建若干条worker线程(由配置指定),这些worker线程被创建以后,会不断得从global_mq里pop出一个次级消息队列来,每个worker线程,每次只pop一个次级消息队列,然后再从次级消息队列中,pop一到若干条消息出来(受权重值影响),最后消息将作为参数传给对应服务的callback函数(每个服务只有一个专属的次级消息队列),当callback执行完时,worker线程会将次级消息队列push回global_mq里,这样就完成了消息的消费。在这个过程中,因为每个worker线程会从global_mq里pop一个次级消息队列出来,此时其他worker线程就不能从global_mq里pop出同一个次级消息队列,也就是说,一个服务不能在多个worker线程内调用callback函数,从而保证了线程安全。大致框图如下:

    在这里插入图片描述

    ​ 其中线程池中的前三个线程是 monitor, timer 和 socket 线程。其中,monitor 线程主要负责检查每个服务是否陷入了死循环,socket 线程负责网络相关操作,timer 线程则负责定时器。worker 具有不同的权重值,每个 worker 会不断从全局消息队列中取出某个服务的次级消息队列,并根据权重值的不同从消息队列中取出若干个消息,然后调用服务所属的 callback 函数消费消息。创建流程源码:

    //skynet_start.c
    static void
    start(int thread) {
    	pthread_t pid[thread+3];
    
    	struct monitor *m = skynet_malloc(sizeof(*m));
    	memset(m, 0, sizeof(*m));
    	m->count = thread;
    	m->sleep = 0;
    
    	m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
    	int i;
    	for (i=0;i<thread;i++) {
    		m->m[i] = skynet_monitor_new();
    	}
    	if (pthread_mutex_init(&m->mutex, NULL)) {
    		fprintf(stderr, "Init mutex error");
    		exit(1);
    	}
    	if (pthread_cond_init(&m->cond, NULL)) {
    		fprintf(stderr, "Init cond error");
    		exit(1);
    	}
    	
        //创建monitor, timer 和 socket 线程
    	create_thread(&pid[0], thread_monitor, m);
    	create_thread(&pid[1], thread_timer, m);
    	create_thread(&pid[2], thread_socket, m);
    	
        //worker 线程的权重值
    /*
        -1:从次级消息队列取出一个消息进行处理
        0:从次级消息队列取出所有消息进行处理
        
        当权重>0时,假设次级消息队列的长度为mq_length,将mq_length转成二进制数值以后,向右移动weight(权重值)位,结果N则是,该线程一次消费次级消息队列的消息数:
        1:从次级消息队列取出一半的消息进行处理
        2:从次级消息队列取出四分之一的消息进行处理
        3:从次级消息队列取出八分之一的消息进行处理
        
        
    	这样做的目的,大概是希望避免过多的worker线程为了等待spinlock解锁,而陷入阻塞状态(因为一些线程,一次消费多条甚至全部次级消息队列的消息,因此在消费期间,不会对global_mq进行入队和出队操作,入队和出队操作时加自旋锁的,因此就不会尝试去访问spinlock锁住的临界区,该线程就在相当一段时间内不会陷入阻塞),进而提升服务器的并发处理能力。这里还有一个细节值得注意,就是前四条线程,每次只是pop一个次级消息队列的消息出来,这样做也在一定程度上保证了没有服务会被饿死。
    */
    	static int weight[] = { 
    		-1, -1, -1, -1, 0, 0, 0, 0,
    		1, 1, 1, 1, 1, 1, 1, 1, 
    		2, 2, 2, 2, 2, 2, 2, 2, 
    		3, 3, 3, 3, 3, 3, 3, 3, };
        //创建相应数量的 worker 线程
    	struct worker_parm wp[thread];
    	for (i=0;i<thread;i++) {
    		wp[i].m = m;
    		wp[i].id = i;
    		if (i < sizeof(weight)/sizeof(weight[0])) {
    			wp[i].weight= weight[i];
    		} else {
    			wp[i].weight = 0;
    		}
    		create_thread(&pid[i+3], thread_worker, &wp[i]);
    	}
    
    	for (i=0;i<thread+3;i++) {
    		pthread_join(pid[i], NULL); 
    	}
    
    	free_monitor(m);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    在多条线程,同时运作时,每条worker线程都要从global_mq中pop一条次级消息队列出来,对global_mq进行pop和push操作的时候,会用自旋锁锁住临界区

    // skynet_mq.c
    void 
    skynet_globalmq_push(struct message_queue * queue) {
        struct global_queue *q= Q;
        
        SPIN_LOCK(q) //自旋锁
        assert(queue->next == NULL);
        if(q->tail) {
            q->tail->next = queue;
            q->tail = queue;
        } else {
            q->head = q->tail = queue;
        }
        SPIN_UNLOCK(q)
    }
        
    struct message_queue * 
    skynet_globalmq_pop() {
        struct global_queue *q = Q;
        
        SPIN_LOCK(q)
        struct message_queue *mq = q->head;
        if(mq) {
            q->head = mq->next;
            if(q->head == NULL) {
                assert(mq == q->tail);
                q->tail = NULL;
            }
            mq->next = NULL;
        }
        SPIN_UNLOCK(q)
        
        return mq;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    正如本节概述所说,一个worker线程被创建出来以后,则是不断尝试从global_mq中pop一个次级消息队列,并从次级消息队列中pop消息,进而通过服务的callback函数来消费该消息:

    // skynet_start.c
    static void
    wakeup(struct monitor *m, int busy) {
        if (m->sleep >= m->count - busy) {
            // signal sleep worker, "spurious wakeup" is harmless
            pthread_cond_signal(&m->cond);
        }
    }
        
    static void *
    thread_timer(void *p) {
        struct monitor * m = p;
        skynet_initthread(THREAD_TIMER);
        for (;;) {
            skynet_updatetime();
            CHECK_ABORT
            wakeup(m,m->count-1);
            usleep(2500);
        }
        // wakeup socket thread
        skynet_socket_exit();
        // wakeup all worker thread
        pthread_mutex_lock(&m->mutex);
        m->quit = 1;
        pthread_cond_broadcast(&m->cond);
        pthread_mutex_unlock(&m->mutex);
        return NULL;
    }
        
    static void *
    thread_worker(void *p) {
        struct worker_parm *wp = p;
        int id = wp->id;
        int weight = wp->weight;
        struct monitor *m = wp->m;
        struct skynet_monitor *sm = m->m[id];
        skynet_initthread(THREAD_WORKER);
        struct message_queue * q = NULL;
        while (!m->quit) {
            q = skynet_context_message_dispatch(sm, q, weight);
            if (q == NULL) {
                if (pthread_mutex_lock(&m->mutex) == 0) {
                    ++ m->sleep;
                    // "spurious wakeup" is harmless,
                    // because skynet_context_message_dispatch() can be call at any time.
                    if (!m->quit)
                        pthread_cond_wait(&m->cond, &m->mutex);
                    -- m->sleep;
                    if (pthread_mutex_unlock(&m->mutex)) {
                        fprintf(stderr, "unlock mutex error");
                        exit(1);
                    }
                }
            }
        }
        return NULL;
    }
        
    // skynet_server.c
    struct message_queue * 
    skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
        if (q == NULL) {
            q = skynet_globalmq_pop();
            if (q==NULL)
                return NULL;
        }
        
        uint32_t handle = skynet_mq_handle(q);
        
        struct skynet_context * ctx = skynet_handle_grab(handle);
        if (ctx == NULL) {
            struct drop_t d = { handle };
            skynet_mq_release(q, drop_message, &d);
            return skynet_globalmq_pop();
        }
        
        int i,n=1;
        struct skynet_message msg;
        
        for (i=0;i<n;i++) {
            if (skynet_mq_pop(q,&msg)) {
                skynet_context_release(ctx);
                return skynet_globalmq_pop();
            } else if (i==0 && weight >= 0) {
                n = skynet_mq_length(q);
                n >>= weight;
            }
            int overload = skynet_mq_overload(q);
            if (overload) {
                skynet_error(ctx, "May overload, message queue length = %d", overload);
            }
        
            skynet_monitor_trigger(sm, msg.source , handle);
        
            if (ctx->cb == NULL) {
                skynet_free(msg.data);
            } else {
                dispatch_message(ctx, &msg);
            }
        
            skynet_monitor_trigger(sm, 0,0);
        }
        
        assert(q == ctx->queue);
        struct message_queue *nq = skynet_globalmq_pop();
        if (nq) {
            // If global mq is not empty , push q back, and return next queue (nq)
            // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
            skynet_globalmq_push(q);
            q = nq;
        } 
        skynet_context_release(ctx);
        
        return q;
    }
        
    static void
    dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
        assert(ctx->init);
        CHECKCALLING_BEGIN(ctx)
        pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
        int type = msg->sz >> MESSAGE_TYPE_SHIFT;
        size_t sz = msg->sz & MESSAGE_TYPE_MASK;
        if (ctx->logfile) {
            skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
        }
        if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) {
            skynet_free(msg->data);
        } 
        CHECKCALLING_END(ctx)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131

    整个worker线程的消费流程是:
    a) worker线程每次,从global_mq中弹出一个次级消息队列,如果次级消息队列为空,则该worker线程投入睡眠,timer线程每隔2.5毫秒会唤醒一条睡眠中的worker线程,并重新尝试从全局消息队列中pop一个次级消息队列出来,当次级消息队列不为空时,进入下一步
    b) 根据次级消息的handle,找出其所属的服务(一个skynet_context实例)指针,从次级消息队列中,pop出n条消息(受weight值影响),并且将其作为参数,传给skynet_context的cb函数,并调用它
    c) 当完成callback函数调用时,就从global_mq中再pop一个次级消息队列中,供下一次使用,并将本次使用的次级消息队列push回global_mq的尾部
    d) 返回第a步

    线程安全

    ​ 1、整个消费流程,每条worker线程,从global_mq取出的次级消息队列都是唯一的,并且有且只有一个服务与之对应,取出之后,在该worker线程完成所有callback调用之前,不会push回global_mq中,也就是说,在这段时间内,其他worker线程不能拿到这个次级消息队列所对应的服务,并调用callback函数,也就是说一个服务不可能同时在多条worker线程内执行callback函数,从而保证了线程安全。

    在这里插入图片描述

    ​ 2、不论是global_mq也好,次级消息队列也好,他们在入队和出队操作时,都有加上spinlock,这样多个线程同时访问mq的时候,第一个访问者会进入临界区并锁住,其他线程会阻塞等待,直至该锁解除,这样也保证了线程安全。global_mq会同时被多个worker线程访问,这个很好理解,因为worker线程总是在不断尝试驱动不同的服务,要驱动服务,首先要取出至少一个消息,要获得消息,就要取出一个次级消息队列,而这个次级消息队列要从全局消息队列里取。虽然一个服务的callback函数,只能在一个worker线程内被调用,但是在多个worker线程中,可以向同一个次级消息队列push消息,即便是该次级消息队列所对应的服务正在执行callback函数,由于次级消息队列不是skynet_context的成员(skynet_context只是包含了次级消息队列的指针),因此改变次级消息队列不等于改变skynet_context上的数据,不会影响到该服务自身内存的数据,次级消息队列在进行push和pop操作的时候,会加上一个spinlock,当多个worker线程同时向同一个次级消息队列push消息时,第一个访问的worker线程,能够进入临界区,其他worker线程就阻塞等待,直至该临界区解锁,这样保证了线程安全。

    ​ 3、我们在通过handle从skynet_context list里获取skynet_context的过程中(比如派发消息时,要要先获取skynet_context指针,再调用该服务的callback函数),需要加上一个读写锁,因为在skynet运作的过程中,获取skynet_context,比创建skynet_context的情况要多得多,因此这里用了读写锁:

    struct skynet_context * 
    skynet_handle_grab(uint32_t handle) {
        struct handle_storage *s = H;
        struct skynet_context * result = NULL;
            
        rwlock_rlock(&s->lock);
            
        uint32_t hash = handle & (s->slot_size-1);
        struct skynet_context * ctx = s->slot[hash];
        if (ctx && skynet_context_handle(ctx) == handle) {
            result = ctx;
            skynet_context_grab(result);
        }
            
        rwlock_runlock(&s->lock);
            
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    这里加上读写锁的意义在于,多个worker线程,同时从skynet_context列表中获取context指针时,没有一条线程是会被阻塞的,这样提高了并发的效率,而此时,尝试往skyent_context里表中,添加新的服务的线程将会被阻塞住,因为添加新的服务可能会导致skynet_context列表(也就是代码里的slot列表)可能会被resize,因此读的时候不允许写入,写的时候不允许读取,保证了线程安全。

    4、生成消息流程

    skynet 中不同的服务运行在不同的上下文当中,彼此之间的交互只能通过消息队列进行转发。不同服务之间转发消息的接口为 skynet_send ,其定义如下:

    //skynet_server.c
    int
    skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
    	if ((sz & MESSAGE_TYPE_MASK) != sz) {
    		skynet_error(context, "The message to %x is too large", destination);
    		if (type & PTYPE_TAG_DONTCOPY) {
    			skynet_free(data);
    		}
    		return -2;
    	}
        //type类型确定消息格式 
        // PTYPE_TAG_ALLOCSESSION  	分配 新session							*session = skynet_context_newsession(context);
        // PTYPE_TAG_DONTCOPY 		不要拷贝data,在data上直接处理		
    	_filter_args(context, type, &session, (void **)&data, &sz);
    
        
       //消息发送端
    	if (source == 0) {
    		source = context->handle;
    	}
    
       //消息接收端
    	if (destination == 0) {
    		if (data) {
    			skynet_error(context, "Destination address can't be 0");
    			skynet_free(data);
    			return -1;
    		}
    
    		return session;
    	}
        /*
        	skynet_harbor_send 和 skynet_context_push --> skynet_mq_push 
        	skynet发消息的实质是往服务的次级消息队列压入消息
        */
    	if (skynet_harbor_message_isremote(destination)) {
    		struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
    		rmsg->destination.handle = destination;
    		rmsg->message = data;
    		rmsg->sz = sz & MESSAGE_TYPE_MASK;
    		rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
    		skynet_harbor_send(rmsg, source, session);
    	} else {
    		struct skynet_message smsg;
    		smsg.source = source;
    		smsg.session = session;
    		smsg.data = data;
    		smsg.sz = sz;
    
    		if (skynet_context_push(destination, &smsg)) {
    			skynet_free(data);
    			return -1;
    		}
    	}
    	return session;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    5、监管机制

    1、定义结构体

    //skynet_monitor.c
    struct skynet_monitor {
    	ATOM_INT version;		//版本
    	int check_version;		//旧版本
    	uint32_t source;		//源地址
    	uint32_t destination;	//目标地址
    };
    
    //skynet_start.c
    struct monitor {
    	int count;					//monitor监管的 工作线程数量
    	struct skynet_monitor ** m;	 //数组  存放所有的skynet_monitor   一个worker对应一个skynet_monitor
    	pthread_cond_t cond;
    	pthread_mutex_t mutex;
    	int sleep;
    	int quit;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2、运行函数

    /*
    	skynet_start.c	thread_monitor
    	
    	每隔一段时间(5s) 对每个worker线程都执行一次 skynet_monitor_check
    	
    	工作流程:
    		worker 从global_mq取出次级消息队列进行消费  执行dispatch_message(ctx, &msg); 会先调用skynet_monitor_trigger函数 添加skynet_monitor
    		此时skynet_monitor状态:
    			skynet_monitor->version = 1; skynet_monitor->check_version = 0;
    		
    		当monitor 对skynet_monitor执行 skynet_monitor_check函数 此时skynet_monitor_check(第一次)中 sm->version == sm->check_version == 1
    		
    		当worker 陷入死循环达到(第二次)skynet_monitor_check 因为sm->version == sm->check_version成立 返回一条错误日志
    		
    		当worker 在(第二次)skynet_monitor_check之前的时间(5s)处理完消息,此时sm->source 和 sm->destination 都设置为0
    		
    */
    static void *
    thread_monitor(void *p) {
    	struct monitor * m = p;
    	int i;
    	int n = m->count;
    	skynet_initthread(THREAD_MONITOR);
    	for (;;) {
    		CHECK_ABORT
    		for (i=0;i<n;i++) {
    			skynet_monitor_check(m->m[i]);
    		}
    		for (i=0;i<5;i++) {
    			CHECK_ABORT
    			sleep(1);
    		}
    	}
    
    	return NULL;
    }
    
    // skynet_monitor.c   skynet_monitor_check
    void 
    skynet_monitor_check(struct skynet_monitor *sm) {
    	if (sm->version == sm->check_version) {
    		if (sm->destination) { //检查目标地址是否为0
    			skynet_context_endless(sm->destination);
    			skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)", sm->source , sm->destination, sm->version);
    		}
    	} else {
    		sm->check_version = sm->version;
    	}
    }
    
    /*	
    	那worker怎么添加monitor?
    	thread_worker --> skynet_context_message_dispatch --> skynet_monitor_trigger(sm, msg.source , handle);
    */
    struct message_queue * 
    skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
    	if (q == NULL) {
    		q = skynet_globalmq_pop();
    		if (q==NULL)
    			return NULL;
    	}
    
    	uint32_t handle = skynet_mq_handle(q);
    
    	struct skynet_context * ctx = skynet_handle_grab(handle);
    	if (ctx == NULL) {
    		struct drop_t d = { handle };
    		skynet_mq_release(q, drop_message, &d);
    		return skynet_globalmq_pop();
    	}
    
    	int i,n=1;
    	struct skynet_message msg;
    
    	for (i=0;i<n;i++) {
    		if (skynet_mq_pop(q,&msg)) {
    			skynet_context_release(ctx);
    			return skynet_globalmq_pop();
    		} else if (i==0 && weight >= 0) {
    			n = skynet_mq_length(q);
    			n >>= weight;
    		}
    		int overload = skynet_mq_overload(q);
    		if (overload) {
    			skynet_error(ctx, "May overload, message queue length = %d", overload);
    		}
    
    		skynet_monitor_trigger(sm, msg.source , handle);
    
    		if (ctx->cb == NULL) {
    			skynet_free(msg.data);
    		} else {
    			dispatch_message(ctx, &msg);
    		}
    
    		skynet_monitor_trigger(sm, 0,0);
    	}
    
    	assert(q == ctx->queue);
    	struct message_queue *nq = skynet_globalmq_pop();
    	if (nq) {
    		// If global mq is not empty , push q back, and return next queue (nq)
    		// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
    		skynet_globalmq_push(q);
    		q = nq;
    	} 
    	skynet_context_release(ctx);
    
    	return q;
    }
    
    
    void 
    skynet_monitor_trigger(struct skynet_monitor *sm, uint32_t source, uint32_t destination) {
    	sm->source = source;
    	sm->destination = destination;
    	ATOM_FINC(&sm->version);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
  • 相关阅读:
    vue开发游戏知识
    简述树状数组
    ASEMI光伏二极管TPA3045参数,TPA3045规格,TPA3045图片
    OSCS开源安全周报第13期:Exchange 高危漏洞公开
    【Linux命令】文件和目录权限
    简单聊下Redis的主从复制和哨兵机制以及集群(面试题)
    多表操作-外键约束
    利用人工智能和大数据技术,优化IT运维流程和策略
    Android学习笔记 23. ViewPager
    初学phar反序列化
  • 原文地址:https://blog.csdn.net/weixin_43730892/article/details/127906254