• 如何实现一个K8S DevicePlugin?


    什么是device plugin

    k8s允许限制容器对资源的使用,比如CPU内存,并以此作为调度的依据。

    当其他非官方支持的设备类型需要参与到k8s工作流程中时,就需要实现一个device plugin

    Kubernetes提供了一个设备插件框架,你可以用它来将系统硬件资源发布到Kubelet

    供应商可以实现设备插件,由你手动部署或作为 DaemonSet 来部署,而不必定制 Kubernetes 本身的代码。

    目标设备包括 GPU、高性能 NIC、FPGA、 InfiniBand 适配器以及其他类似的、可能需要特定于供应商的初始化和设置的计算资源。

    更多云原生、K8S相关文章请点击【专栏】查看!

    发现插件

    一个新的device plugin是如何被kubelet发现的?

    device plugin通过gRPC的方式与kubelet通信,kubelet实现了Register接口,用于注册插件。

    service Registration {
    	rpc Register(RegisterRequest) returns (Empty) {}
    }
    
    • 1
    • 2
    • 3

    通过这个接口, 向kubelet提交当前插件的信息,包括插件的名称、版本、socket路径等。

    已注册的插件信息并不会被持久化下来, 也就是说当kubelet重启后,插件需要重新调用Register方法。

    kuelet重启时会删除插件的socket文件, 插件通过监听socket文件的方式来感知kubelet的重启并重新注册。

    成功注册后,设备插件就向 kubelet 发送它所管理的设备列表,然后 kubelet 负责将这些资源发布到 API 服务器,作为 kubelet 节点状态更新的一部分。

    当插件注册成功后, 根据插件中的配置与定义, 可能会有类似下面的pod配置以使用插件中的资源。

    apiVersion: v1
    kind: Pod
    metadata:
      name: demo-pod
    spec:
      containers:
        - name: demo-container-1
          image: registry.k8s.io/pause:2.0
          resources:
            limits:
              hardware-vendor.example/foo: 2
    #
    # 这个 pod 需要两个 hardware-vendor.example/foo 设备
    # 而且只能够调度到满足需求的节点上
    #
    # 如果该节点中有 2 个以上的设备可用,其余的可供其他 Pod 使用
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述

    AMD GPU插件源码解析

    插件的实现并不复杂, 只需要实现几个接口函数即可。

    service DevicePlugin {
       // GetDevicePluginOptions 返回与设备管理器沟通的选项。
       // kuelet 在每次方法调用前都会调用这个方法,来获取可用的设备插件选项。
       rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
    
       // ListAndWatch 返回 Device 列表构成的数据流。
       // 当 Device 状态发生变化或者 Device 消失时,ListAndWatch会返回新的列表。
       rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
    
       // Allocate 在容器创建期间调用,这样设备插件可以运行一些特定于设备的操作,
       // 并告诉 kubelet 如何令 Device 可在容器中访问的所需执行的具体步骤
       rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
    
       // GetPreferredAllocation 从一组可用的设备中返回一些优选的设备用来分配,
       // 所返回的优选分配结果不一定会是设备管理器的最终分配方案。
       // 此接口的设计仅是为了让设备管理器能够在可能的情况下做出更有意义的决定。
       rpc GetPreferredAllocation(PreferredAllocationRequest) returns (PreferredAllocationResponse) {}
    
       // PreStartContainer 在设备插件注册阶段根据需要被调用,调用发生在容器启动之前。
       // 在将设备提供给容器使用之前,设备插件可以运行一些诸如重置设备之类的特定于具体设备的操作,
       rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    以下源码解析以AMD GPU插件为例。

    代码版本 0.12.0

    仓库地址 https://github.com/ROCm/k8s-device-plugin

    源码解析

    插件启动流程

    AMD GPU插件的框架,是使用的"github.com/kubevirt/device-plugin-manager/pkg/dpm"这个包。

    AMD的插件确实实现的很粗糙, 这里我们只用它分析实现一个插件需要做什么。

    程序启动时实例化Manager对象, 并调用Run方法。

    func main() {
        // ...
        // Lister用于传递心跳与资源更新
    	l := Lister{
    		ResUpdateChan: make(chan dpm.PluginNameList),
    		Heartbeat:     make(chan bool),
    	}
    	manager := dpm.NewManager(&l)
        // ...
        // 启动管理器
    	manager.Run()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Run方法中启动了gRPC服务, 并注册了AMD GPU插件。

    func (dpm *Manager) Run() {
        // ...
        // 监听socket文件变化(kubelet会在重启时删除)
    	fsWatcher, _ := fsnotify.NewWatcher()
    	defer fsWatcher.Close()
    	// DevicePluginPath = "/var/lib/kubelet/device-plugins/"
    	fsWatcher.Add(pluginapi.DevicePluginPath)
        
        // 启动插件监听方法, 
        // 实际是将上面传入Liste.ResUpdateChan的数据转发到这个chan中
    	pluginsCh := make(chan PluginNameList)
    	defer close(pluginsCh)
    	go dpm.lister.Discover(pluginsCh)
    HandleSignals:
    	for {
    		select {
    		case newPluginsList := <-pluginsCh:
    			// 创建新的插件服务, 并启动服务
    			dpm.handleNewPlugins(pluginMap, newPluginsList)
    		case event := <-fsWatcher.Events:
    			if event.Name == pluginapi.KubeletSocket {
    				// kubelet重启时, 重新注册插件
    				if event.Op&fsnotify.Create == fsnotify.Create {
    					dpm.startPluginServers(pluginMap)
    				}
    				if event.Op&fsnotify.Remove == fsnotify.Remove {
    					dpm.stopPluginServers(pluginMap)
    				}
    			}
    		case s := <-signalCh:
    			switch s {
    			case syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT:
                    // 优雅退出
    				dpm.stopPlugins(pluginMap)
    				break HandleSignals
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    创建插件服务会返回一个devicePlugin对象:

    // dpm.handleNewPlugins(pluginMap, newPluginsList) 最终会调用这个方法
    func newDevicePlugin(resourceNamespace string, pluginName string, devicePluginImpl PluginInterface) devicePlugin {
    	return devicePlugin{
    		DevicePluginImpl: devicePluginImpl,
    		// DevicePluginPath = "/var/lib/kubelet/device-plugins/"
    		// resourceNamespace = "amd.com"
    		Socket:           pluginapi.DevicePluginPath + resourceNamespace + "_" + pluginName,
    		ResourceName:     resourceNamespace + "/" + pluginName,
    		Name:             pluginName,
    		Starting:         &sync.Mutex{},
    	}
    }
    type devicePlugin struct {
    	// 实现的deviceplugin server
    	DevicePluginImpl PluginInterface
    	ResourceName     string
    	Name             string
    	// socket文件路径
    	Socket           string
    	Server           *grpc.Server
    	Running          bool
    	Starting         *sync.Mutex
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    启动服务最终会由StartServer这个方法来完成。

    func (dpi *devicePlugin) StartServer() error {
    	// ...
    	if dpi.Running {
    		return nil
    	}
        // 启动grpc服务
    	err := dpi.serve()
    	if err != nil {
    		return err
    	}
        // 调用Register方法向kubelet注册插件
    	err = dpi.register()
    	if err != nil {
    		dpi.StopServer()
    		return err
    	}
    	dpi.Running = true
    
    	return nil
    }
    func (dpi *devicePlugin) serve() error {
    	// ...
    	// 可以看见是以socket文件启动的grpc服务
    	sock, err := net.Listen("unix", dpi.Socket)
    	if err != nil {
    		glog.Errorf("%s: Failed to setup a DPI gRPC server: %s", dpi.Name, err)
    		return err
    	}
    
    	dpi.Server = grpc.NewServer([]grpc.ServerOption{}...)
    	pluginapi.RegisterDevicePluginServer(dpi.Server, dpi.DevicePluginImpl)
    	go dpi.Server.Serve(sock)
    	// ...
    	return nil
    }
    func (dpi *devicePlugin) register() error {
    	// KubeletSocket = DevicePluginPath + "kubelet.sock"
    	// "/var/lib/kubelet/device-plugins/kubelet.sock"
    	// 与kubelet通信
    	conn, err := grpc.Dial(pluginapi.KubeletSocket, grpc.WithInsecure(),
    		grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
    			return net.DialTimeout("unix", addr, timeout)
    		}))
    	defer conn.Close()
    	client := pluginapi.NewRegistrationClient(conn)
    	// 向kubelet注册插件
    	reqt := &pluginapi.RegisterRequest{
    		Version:      pluginapi.Version,
    		Endpoint:     path.Base(dpi.Socket),
    		ResourceName: dpi.ResourceName,
    		Options:      options,
    	}
    	_, err = client.Register(context.Background(), reqt)
    	// ...
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    socket文件默认会放在/var/lib/kubelet/device-plugins目录下, 所以当以daemonset的方式部署插件时,需要将这个目录挂载到容器中。

    服务实现

    AMD GPU插件只实现了两个关键方法(因为不同设备插件的实现都不一样,所以这里不展开):

    • ListAndWatch
    • Allocate

    所以它的GetDevicePluginOptions方法返回的是一个空结构体

    func (p *Plugin) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
    	return &pluginapi.DevicePluginOptions{}, nil
    }
    type DevicePluginOptions struct {
    	// 是否需要调用 PreStartContainer 方法
    	PreStartRequired bool `protobuf:"varint,1,opt,name=pre_start_required,json=preStartRequired,proto3" json:"pre_start_required,omitempty"`
    	// 是否需要调用 GetPreferredAllocation 方法
    	GetPreferredAllocationAvailable bool     `protobuf:"varint,2,opt,name=get_preferred_allocation_available,json=getPreferredAllocationAvailable,proto3" json:"get_preferred_allocation_available,omitempty"`
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    服务部署

    设备插件可以作为节点操作系统的软件包来部署、作为 DaemonSet 来部署或者手动部署。

    如果你将设备插件部署为 DaemonSet, /var/lib/kubelet/device-plugins 目录必须要在插件的 PodSpec 中声明作为 卷(Volume)被挂载到插件中。

    实现一个设备插件

    1. 实现一个虚假设备, 用于测试插件。(可选)
    2. 实现DevicePlugin接口。 我们可以仅实现ListAndWatchAllocate两个关键方法。
    3. 注册gRPC服务, 并向kubelet注册插件。
    4. 监听kubelet的socket文件变化, 重新注册插件。

    代码实现

    待补充…

  • 相关阅读:
    Qt学习15 用户界面与业务逻辑的分离
    Vue必备知识点(简单+快速上手Vue)
    BF算法详解(JAVA语言实现)
    简要解析盒子模型
    红光光浴-改善亚健康状态
    基于 RocketMQ 的 Dubbo-go 通信新范式
    基础算法篇——前缀和与差分
    硬之城如何基于 SAE 打造数智化电子工业互联网平台
    SpringCloud-4.服务网关(GateWay)
    SpringBoot三层架构
  • 原文地址:https://blog.csdn.net/q1403539144/article/details/136211691