• Kubeedge:edgecore源码速读


    Kubeedge源码版本:v1.15.1

    首先,我们从edgehub的start函数看起:

    它主要干几件事情:

    1. 初始化证书相关,这里的证书主要用于webskt的连接
    2. 启动edgehub,开启三个协程,分别把云发过来的消息路由到边缘的组件、把边缘组件发过来的消息路由到云、向云上做保活。
    func (eh *EdgeHub) Start() {
    	eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
    	eh.certManager.Start()
    	for _, v := range GetCertSyncChannel() {
    		v <- true
    		close(v)
    	}
    
    	go eh.ifRotationDone() // 一个用于证书轮换的同步的goroutine,更新完证书之后就重启。
    
    	for {
    		select { // 用于停掉edgehub
    		case <-beehiveContext.Done():
    			klog.Warning("EdgeHub stop")
    			return
    		default:
    		} // select done
            
    		err := eh.initial() // 加载一些配置到Edgehub结构体的eh.chClient项中
    		if err != nil {
    			klog.Exitf("failed to init controller: %v", err)
    			return
    		}
    
    		waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2
    
    		err = eh.chClient.Init() // 基于前面加载的eh.chClient项建立webskt/quic连接
    		if err != nil {
    			time.Sleep(waitTime) // 如果失败就歇一会再重试
    			continue
    		}
    		// execute hook func after connect
    		eh.pubConnectInfo(true) // 向所有的边缘组件发送云边通道建立的消息
    		go eh.routeToEdge() // 接收 云 发到 边 的所有消息,然后dispatch
    		go eh.routeToCloud() // 接收 边 发到 云 的所有消息,然后试图发送
    		go eh.keepalive() // 用于向 云 发送保活信号
    
    		// wait the stop signal
    		// stop authinfo manager/websocket connection
    
    		<-eh.reconnectChan // 接收重连的信号
    		eh.chClient.UnInit() // 关闭连接
    
    		// execute hook fun after disconnect
    		eh.pubConnectInfo(false) // 向所有的边缘组件发送云边通道断开的消息
    
    		// sleep one period of heartbeat, then try to connect cloud hub again
    		klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
    		time.Sleep(waitTime)
    
    		time.Sleep(300 * time.Second)
    
    		// clean channel
    	clean:
    		for {
    			select {
    			case <-eh.reconnectChan:
    			default:
    				break clean
    			}
    		}
    	} // end for
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    然后主要查看这几个函数:

    go eh.routeToEdge() // 接收 云 发到 边 的所有消息,然后给handler进行处理
    go eh.routeToCloud() // 接收 边 发到 云 的所有消息,然后试图发送
    go eh.keepalive() // 用于向 云 发送保活信号
    
    • 1
    • 2
    • 3
    首先查看go eh.routeToEdge()方法

    这是一个死循环,不断地从云端获取消息(如果暂时没有消息就会被阻塞掉),拿到消息之后就会被eh.dispatch(message)方法进行处理。逻辑如下:

    func (eh *EdgeHub) routeToEdge() {
    	for {
    		select {
    		case <-beehiveContext.Done():
    			klog.Warning("EdgeHub RouteToEdge stop")
    			return
    		default:
    		} // end select
    		
    		message, err := eh.chClient.Receive() // 接收webskt传来的云端消息
    		if err != nil {
    			klog.Errorf("websocket read error: %v", err)
    			eh.reconnectChan <- struct{}{}
    			return
    		}
    
    		klog.V(4).Infof("[edgehub/routeToEdge] receive msg from cloud, msg:% +v", message)
    		err = eh.dispatch(message) // 进行处理
    		if err != nil {
    			klog.Errorf("failed to dispatch message, discard: %v", err)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    相应地,我们看一下dispatch函数的逻辑,它包装了一个handler,这个handler会对从云端发往edgehub的消息进行处理。

    func (eh *EdgeHub) dispatch(message model.Message) error {
    	// handler for msg.
    	err := msghandler.ProcessHandler(message, eh.chClient)
    	if err != nil {
    		return err
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    继续看一下ProcessHandler,对于从云端发往edgehub的消息,会先过滤再处理:

    func ProcessHandler(message model.Message, client clients.Adapter) error {
    	lock.RLock()
    	defer lock.RUnlock()
    	for _, handle := range Handlers {
    		if handle.Filter(&message) {
    			err := handle.Process(&message, client)
    			if err != nil {
    				return fmt.Errorf("...")
    			}
    			return nil
    		}
    	}
    	return fmt.Errorf("...")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    其中,这里面的Filter函数说是过滤一些东西,但是个人目前来看就是什么东西都不过滤(但是,不排除在后来的版本中添加更精细的过滤逻辑)

    func (*defaultHandler) Filter(message *model.Message) bool {
    	group := message.GetGroup()
    	return group == messagepkg.ResourceGroupName || group == messagepkg.TwinGroupName ||
    		group == messagepkg.FuncGroupName || group == messagepkg.UserGroupName // 基本上就是返回true了
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在过滤完云端发到edgehub的消息之后,edgehub会在process函数里对消息进行处理,主要就是用beeive做一下消息转发操作。

    func (*defaultHandler) Process(message *model.Message, clientHub clients.Adapter) error {
    	group := message.GetGroup()
    	md := ""
    	switch group {
    	case messagepkg.ResourceGroupName:
    		md = modules.MetaGroup
    	case messagepkg.TwinGroupName:
    		md = modules.TwinGroup
    	case messagepkg.FuncGroupName:
    		md = modules.MetaGroup
    	case messagepkg.UserGroupName:
    		md = modules.BusGroup
    	}
    
    	// TODO: just for a temporary fix.
    	// The code related to device twin message transmission will be reconstructed
    	//  by using sendSync function instead of send function.
    	if group == messagepkg.TwinGroupName {
    		beehiveContext.SendToGroup(md, *message)
    		return nil
    	}
    
    	isResponse := isSyncResponse(message.GetParentID())
    	if isResponse {
    		beehiveContext.SendResp(*message)
    		return nil
    	}
    	if group == messagepkg.UserGroupName && message.GetSource() == "router_eventbus" {
    		beehiveContext.Send(modules.EventBusModuleName, *message)
    	} else if group == messagepkg.UserGroupName && message.GetSource() == "router_servicebus" {
    		beehiveContext.Send(modules.ServiceBusModuleName, *message)
    	} else {
    		beehiveContext.SendToGroup(md, *message)
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    查看go eh.routeToCloud()

    eh.routeToCloud()函数主要是从边缘的各个组件拿到beehive格式的消息,然后走webskt/quic发送到cloudhub。

    使用一个死循环不断地拿beehive中的存储的消息。考虑到消息的数量可能较多,这里对消息处理进行了限速。

    func (eh *EdgeHub) routeToCloud() {
    	for {
    		select {
    		case <-beehiveContext.Done():
    			klog.Warning("EdgeHub RouteToCloud stop")
    			return
    		default:
    		} // end select
    		
    		message, err := beehiveContext.Receive(modules.EdgeHubModuleName)
    		if err != nil {
    			klog.Errorf("failed to receive message from edge: %v", err)
    			time.Sleep(time.Second)
    			continue
    		}
    
    		err = eh.tryThrottle(message.GetID()) // 用来做流量速率控制的函数
    		if err != nil {
    			klog.Errorf("msgID: %s, client rate limiter returned an error: %v ", message.GetID(), err)
    			continue
    		}
    
    		// post message to cloud hub
    		err = eh.sendToCloud(message) // 发送到云上而已
    		if err != nil {
    			klog.Errorf("failed to send message to cloud: %v", err)
    			eh.reconnectChan <- struct{}{}
    			return
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  • 相关阅读:
    北京:PMP证书在可对应中级职称!
    dreamweaver个人网页设计作业 学生个人网页猫眼电影 WEB静态网页作业模板 大学生个人主页博客网页代码 dw个人网页作业成品
    园区全光技术选型-中篇
    golang解析excel、csv编码格式
    Mallox勒索病毒:最新变种malloxx袭击了您的计算机?
    【杰理AC696X】外挂FLASH音乐播放及Bin文件制作
    2022年在uniapp中引入vant Weapp
    这道静态变量题,我居然考了0分
    3D感知技术(2)3D数据获取技术概述
    Django — 请求和响应
  • 原文地址:https://blog.csdn.net/weixin_43466027/article/details/138170098