Kubeedge源码版本:v1.15.1
首先,我们从edgehub的start函数看起:
它主要干几件事情:
func (eh *EdgeHub) Start() {
eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
eh.certManager.Start()
for _, v := range GetCertSyncChannel() {
v <- true
close(v)
}
go eh.ifRotationDone() // 一个用于证书轮换的同步的goroutine,更新完证书之后就重启。
for {
select { // 用于停掉edgehub
case <-beehiveContext.Done():
klog.Warning("EdgeHub stop")
return
default:
} // select done
err := eh.initial() // 加载一些配置到Edgehub结构体的eh.chClient项中
if err != nil {
klog.Exitf("failed to init controller: %v", err)
return
}
waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2
err = eh.chClient.Init() // 基于前面加载的eh.chClient项建立webskt/quic连接
if err != nil {
time.Sleep(waitTime) // 如果失败就歇一会再重试
continue
}
// execute hook func after connect
eh.pubConnectInfo(true) // 向所有的边缘组件发送云边通道建立的消息
go eh.routeToEdge() // 接收 云 发到 边 的所有消息,然后dispatch
go eh.routeToCloud() // 接收 边 发到 云 的所有消息,然后试图发送
go eh.keepalive() // 用于向 云 发送保活信号
// wait the stop signal
// stop authinfo manager/websocket connection
<-eh.reconnectChan // 接收重连的信号
eh.chClient.UnInit() // 关闭连接
// execute hook fun after disconnect
eh.pubConnectInfo(false) // 向所有的边缘组件发送云边通道断开的消息
// sleep one period of heartbeat, then try to connect cloud hub again
klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
time.Sleep(waitTime)
time.Sleep(300 * time.Second)
// clean channel
clean:
for {
select {
case <-eh.reconnectChan:
default:
break clean
}
}
} // end for
}
然后主要查看这几个函数:
go eh.routeToEdge() // 接收 云 发到 边 的所有消息,然后给handler进行处理
go eh.routeToCloud() // 接收 边 发到 云 的所有消息,然后试图发送
go eh.keepalive() // 用于向 云 发送保活信号
go eh.routeToEdge()
方法这是一个死循环,不断地从云端获取消息(如果暂时没有消息就会被阻塞掉),拿到消息之后就会被eh.dispatch(message)
方法进行处理。逻辑如下:
func (eh *EdgeHub) routeToEdge() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToEdge stop")
return
default:
} // end select
message, err := eh.chClient.Receive() // 接收webskt传来的云端消息
if err != nil {
klog.Errorf("websocket read error: %v", err)
eh.reconnectChan <- struct{}{}
return
}
klog.V(4).Infof("[edgehub/routeToEdge] receive msg from cloud, msg:% +v", message)
err = eh.dispatch(message) // 进行处理
if err != nil {
klog.Errorf("failed to dispatch message, discard: %v", err)
}
}
}
相应地,我们看一下dispatch
函数的逻辑,它包装了一个handler,这个handler会对从云端发往edgehub的消息进行处理。
func (eh *EdgeHub) dispatch(message model.Message) error {
// handler for msg.
err := msghandler.ProcessHandler(message, eh.chClient)
if err != nil {
return err
}
return nil
}
继续看一下ProcessHandler,对于从云端发往edgehub的消息,会先过滤再处理:
func ProcessHandler(message model.Message, client clients.Adapter) error {
lock.RLock()
defer lock.RUnlock()
for _, handle := range Handlers {
if handle.Filter(&message) {
err := handle.Process(&message, client)
if err != nil {
return fmt.Errorf("...")
}
return nil
}
}
return fmt.Errorf("...")
}
其中,这里面的Filter
函数说是过滤一些东西,但是个人目前来看就是什么东西都不过滤(但是,不排除在后来的版本中添加更精细的过滤逻辑)
func (*defaultHandler) Filter(message *model.Message) bool {
group := message.GetGroup()
return group == messagepkg.ResourceGroupName || group == messagepkg.TwinGroupName ||
group == messagepkg.FuncGroupName || group == messagepkg.UserGroupName // 基本上就是返回true了
}
在过滤完云端发到edgehub的消息之后,edgehub会在process函数里对消息进行处理,主要就是用beeive做一下消息转发操作。
func (*defaultHandler) Process(message *model.Message, clientHub clients.Adapter) error {
group := message.GetGroup()
md := ""
switch group {
case messagepkg.ResourceGroupName:
md = modules.MetaGroup
case messagepkg.TwinGroupName:
md = modules.TwinGroup
case messagepkg.FuncGroupName:
md = modules.MetaGroup
case messagepkg.UserGroupName:
md = modules.BusGroup
}
// TODO: just for a temporary fix.
// The code related to device twin message transmission will be reconstructed
// by using sendSync function instead of send function.
if group == messagepkg.TwinGroupName {
beehiveContext.SendToGroup(md, *message)
return nil
}
isResponse := isSyncResponse(message.GetParentID())
if isResponse {
beehiveContext.SendResp(*message)
return nil
}
if group == messagepkg.UserGroupName && message.GetSource() == "router_eventbus" {
beehiveContext.Send(modules.EventBusModuleName, *message)
} else if group == messagepkg.UserGroupName && message.GetSource() == "router_servicebus" {
beehiveContext.Send(modules.ServiceBusModuleName, *message)
} else {
beehiveContext.SendToGroup(md, *message)
}
return nil
}
go eh.routeToCloud()
eh.routeToCloud()
函数主要是从边缘的各个组件拿到beehive格式的消息,然后走webskt/quic发送到cloudhub。
使用一个死循环不断地拿beehive中的存储的消息。考虑到消息的数量可能较多,这里对消息处理进行了限速。
func (eh *EdgeHub) routeToCloud() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToCloud stop")
return
default:
} // end select
message, err := beehiveContext.Receive(modules.EdgeHubModuleName)
if err != nil {
klog.Errorf("failed to receive message from edge: %v", err)
time.Sleep(time.Second)
continue
}
err = eh.tryThrottle(message.GetID()) // 用来做流量速率控制的函数
if err != nil {
klog.Errorf("msgID: %s, client rate limiter returned an error: %v ", message.GetID(), err)
continue
}
// post message to cloud hub
err = eh.sendToCloud(message) // 发送到云上而已
if err != nil {
klog.Errorf("failed to send message to cloud: %v", err)
eh.reconnectChan <- struct{}{}
return
}
}
}