• 使用fabric8操作k8s


    一、引入fabric包

    <dependency>
        <groupId>io.fabric8groupId>
        <artifactId>kubernetes-clientartifactId>
        <version>5.10.2version>
    dependency>
    

    二、认证

    认证十分简单,只要拿到认证的config信息后使用以下方式即可。

    KubernetesClient client = new DefaultKubernetesClient(config);
    

    当然了,为了方便使用,client最好是存入数据库后再放在缓存中去维护。且需要对接多个k8s集群时,需要多个KubernetesClient,因此最好是在缓存中维护一个集群编码和client的对应关系。
    那么认证的config信息要怎么拿到呢?通常有使用config文件和oauthtoken认证两种方式。

    当然了,在初始时可以对这个client进行校验,测试连通性是否有问题,如果校验通过再在后面对它进行操作。

    try {
        NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = kubernetesClient.namespaces();
        if (namespaces != null) {
            namespaces.withName("default").get();
        }
    } catch (Exception e) {
    	    throw new XxxException(Xxxxxx);
    } finally {
        kubernetesClient.close();
    }
    
    

    1、使用config文件认证

    config文件在管理节点的/root/.kube/目录下面,在页面上传后,我们后端拿到的比如说是一个fileUploadReqBO下的byte[] file;

    String kubeConfig = null;
    Config config = null;
    try {
        kubeConfig = new String(fileUploadReqBO.getFile(), Charsets.UTF_8.toString());
        config = Config.fromKubeconfig(kubeConfig);
    } catch (Exception e) {
        throw new XxxException(xxxx, e.getMessage());
    }
    

    2、使用oauthtoken认证

    获取oauthtoken需要有admin权限的serviceaccount,如果没有的话那么就自己手动创建一个。
    创建serviceaccount账户,这里我们就叫test-admin:

    kubectl create serviceaccount test-admin -n kube-system
    

    给予admin权限:

    kubectl create clusterrolebinding my-service-account-admin --clusterrole=cluster-admin --serviceaccount=kube-system:test-admin
    

    执行以下命令

    kubectl get secret -n kube-system|grep admin
    

    找到返回结果中以test-admin-token-开头的内容,使用以下命令

    kubectl describe secret test-admin-token-XXX -n kube-system
    

    就可以获取到token了
    获取到后可使用kubectl auth can-i create deployments --as=system:serviceaccount:kube-system:test-admin --token= 判断是否有管理员权限 yes有 no没有
    那么这时假设我们能拿到一个masterUrl,例如 https://10.20.66.152:6443(kube-apiserver一般来说默认端口为6443)以及token。
    就可以这样获取到config:

    Config config = new ConfigBuilder().withTrustCerts(true).build();
    config.setMasterUrl(masterUrl);
    config.setOauthToken(oauthToken);
    

    三、pod的查询和遍历

    查询所有pod:

    //已获取KubernetesClient:KubernetesClient client = new DefaultKubernetesClient(config);
    PodList podList = client.pods().list();
    

    根据命名空间查询:

    PodList podList = client.pods().inNamespace(K8sGenericConstant.K8S_NAMESPACE_ENGINE_SERVER).list();
    

    遍历pod:

    if (podList != null && podList.getItems() != null) {
        for(Pod pod : podList.getItems()){
            //pod名称
            String podName = pod.getMetadata().getName();
            //pod所在节点名称
            String nodeName = pod.getSpec().getNodeName();
            //pod标签
            Map<String, String> labels = pod.getMetadata().getLabels();
            //命名空间
            String ns = pod.getMetadata().getNamespace();
            //状态
            pod.getStatus().getContainerStatuses();
            pod.getStatus().getReason();
            List<PodCondition> podConditions = pod.getStatus().getConditions();
            if (!CollectionUtils.isEmpty(podConditions)) {
                PodCondition podCondition = podConditions.get(0);
                reason = podCondition.getReason() + ":" + podCondition.getMessage();
            }
        }
    

    四、命名空间的创建和删除

    创建

    NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = client.namespaces();
    if (namespaces == null) {
        return null;
    }
    String name = "test-ns";
    Map<String, String> labels = Maps.newHashMap();
    labels.put("testlabel", "testvalue");
    Namespace ns = new NamespaceBuilder().withNewMetadata().withName(name).addToLabels(labels).endMetadata().build();
    ns = namespaces.createOrReplace(ns);
    

    删除

    NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = client.namespaces();
    if (namespaces == null) {
        return null;
    }
    namespaces.withName(name).delete();
    

    五、deployment的创建和删除

    删除:

    //注意这里deployment需要先查询出Deployment类型,而不只是名称
    client.apps().deployments().inNamespace(namespace).delete(deployment);
    client.apps().deployments().inNamespace(namespace).withName(deploymentname).delete();
    

    创建:

    Deployment deployment = new DeploymentBuilder()
    	.withNewMetadata()
    		.withName(podName)
    	.endMetadata()
    	.withNewSpec()
    		.withNewSelector().addToMatchLabels(matchLabels).endSelector()
    		.withReplicas(1)
    		.withNewTemplate()
    			.withNewMetadata().withLabels(matchLabels).withNamespace(namespace).withAnnotations(annotations).endMetadata()
    			.withNewSpec()
    				.addNewContainer()
    				.withName(podName).withImage(imageUrl).withImagePullPolicy(K8sImagePullPolicyEnum.IF_NOT_PRESENT.getValue()).withResources(resourceRequirements).withPorts(containerPorts).withEnv(envVarList).withVolumeMounts(volumeMounts).withCommand(commandList).withArgs(argList).endContainer()
    				.withVolumes(volumeList)
    				.withNewAffinity().withNodeAffinity(nodeAffinity).endAffinity()
    				.withNodeSelector(nodeSelector)
    			.endSpec()
    		.endTemplate()
    	.endSpec()
    	.build();
    	
    client.apps().deployments().inNamespace(namespace).create(deployment);
    

    部分参数说明

    其中的参数比如podName、namespace、podName、imageUrl是String类型,commandList、argList为List类型,但也有不少需要提前构造的参数,比如matchLabels、annotations、nodeSelector是Map的类型,又比如以下几个示例:

    1、resourceRequirements

    ResourceRequirements resourceRequirements = new ResourceRequirements();
    Map<String, Quantity> limits = new HashMap<>();
    limits.put("cpu", new Quantity("2000m"));
    limits.put("memory", new Quantity("20480Mi"));
    limits.put("nvidia.kubernetes.io/gpu", new Quantity("1"));
    Map<String, Quantity> requests = new HashMap<>();
    requests.put("cpu", new Quantity("1000m"));
    requests.put("memory", new Quantity("10240Mi"));
    requests.put("nvidia.kubernetes.io/gpu", new Quantity("1"));
    resourceRequirements.setRequests(requests);
    resourceRequirements.setLimits(limits);
    

    注意这里的limits.put()后面的key要和describe node获取的一致。比如这里的gpu用的是nvidia.kubernetes.io/gpu,如果是其他厂商的或者映射出来的不一致,则要和环境中保持一致。实际使用中经常做成可配置/传参的,由于这里只是一个示例,因此写死了。

    Capacity:
      cpu:                8
      ephemeral-storage:  308468608Ki
      hugepages-1Gi:      0
      hugepages-2Mi:      0
      memory:             32771060Ki
      nvidia.kubernetes.io/gpu:     1
      pods:               200
    

    2、containerPorts

    containerPorts需要的类型是List
    也就是如下图所示:

    public synchronized List<ContainerPort> buildContainerPorts() {
        LOGGER.info("ports={}", ports);
        List<ContainerPort> containerPortList = Lists.newArrayList();
        //实际使用时需作为入参传入List,这里作为示范直接写死
        ContainerPort port = new ContainerPort();
        port.setHostPort(32111);
        port.setName("web-port");
        port.setProtocol("TCP");
        port.setContainerPort(32111);
        containerPortList.add(port);
        //假设这里我们已经获得了一个containerPortList
        containerPortList = containerPortList.stream().filter(p -> p.getHostPort() != null && p.getContainerPort() != null).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(containerPortList)) {
            return null;
        }
        // 如果由上层直接指定端口的话,这里直接return containerPortList即可
        //但当需要我们自己去分配端口时 需要尽量避免端口冲突,因此做了以下处理(并不完全能避免,但至少如果某个节点跑多个pod,不会只能跑一个其他的都在pending)
        // 1.查询每个POD占用的端口
        PodList podList = K8sClientTool.getKubernetesClient().pods().list();
        Set<Integer> excludeNodePortList = Sets.newHashSet();
        if (podList != null && podList.getItems() != null) {
            for (Pod pod : podList.getItems()) {
                List<Integer> portList = pod.getSpec().getContainers().stream().flatMap(m ->
                        m.getPorts().stream().filter(p -> p.getHostPort() != null).map(ContainerPort::getHostPort)
                ).collect(Collectors.toList());
    
                excludeNodePortList.addAll(portList);
            }
        }
    
        // 2.获取组件安装机器的端口,一般aid安装在K8S集群的主节点上,这样可以规避掉主要的端口
        try {
            String result = SshTool.doExecute("netstat -nlpt  | grep -Po '\\d+(?=.+)' | sort -rn | xargs -n1");
            if (StringUtils.isNotEmpty(result)) {
                excludeNodePortList.addAll(Arrays.stream(result.split("\n")).map(s -> Integer.parseInt(s.trim())).collect(Collectors.toList()));
            }
        } catch (Exception e) {
            throw new ComputeResourceException(AidServerErrorCode.ERR_DEVICE_SSH_CONNECT);
        }
    
        // 3.解决容器端口的占用和冲突问题,这里需要解决并发的问题,加一个锁来处理
        List<Pair<Integer, Long>> needRemovePortPairList = Lists.newArrayList();
        // 4.先加入配置文件中要排除的端口
        excludeNodePortList.addAll(Arrays.stream(excludeNodePorts.split(",")).map(s -> Integer.parseInt(s.trim())).collect(Collectors.toList()));
        // 5.再加入历史分配出去的端口,这些端口有可能没有真正的分配出去,但是需要缓存,避免同时出现2个要分配的端口
        excludeNodePortList.addAll(excludeNodePortPairList.stream().map(pair -> {
            if (pair.getRight() < (System.currentTimeMillis() - DEFAULT_TIME_TO_LIVE)) {
                return pair.getLeft();
            }
            needRemovePortPairList.add(pair);
            return null;
        }).filter(p -> p != null).collect(Collectors.toSet()));
        // 6.清理掉过期的缓存端口
        excludeNodePortPairList.removeAll(needRemovePortPairList);
        LOGGER.info("containerPortList={}, excludeNodePortList={}", containerPortList, excludeNodePortList);
        containerPortList.stream().forEach(c -> {
            // 优先使用分配的hostPort,不满足再随机分配
            Integer hostPort = c.getHostPort();
            while (excludeNodePortList.contains(hostPort)) {
                hostPort = RandomUtils.nextInt(minNodePort, maxNodePort);
            }
            excludeNodePortList.add(hostPort);
            excludeNodePortPairList.add(Pair.of(hostPort, System.currentTimeMillis()));
            if (StringUtils.isNotEmpty(c.getName())) {
                c.setName(c.getName().toLowerCase().replaceAll("_", "-"));
                if (c.getName().length() > 15) {
                    c.setName(c.getName().substring(0, 15));
                }
            }
            c.setHostPort(hostPort);
        });
        LOGGER.info("containerPortList={}", containerPortList);
        return containerPortList;
    }
    
    

    3、envVarList

    List<EnvVar> envVarList = Lists.newArrayList();
    EnvVar envVar = new EnvVar();
    envVar.setName("TEST_ENV_KEY");
    envVar.setValue("TEST_ENV_VALUE");
    envVarList.add(envVar);
    

    4、volumeMounts和volumeList

    假设参数以List>形式传入,例如:
    “volumeMounts”:[{“name”:“test-name”,“mountPath”:“/home/test”,“hostPath”:“/home/test”}]

    volumeMounts:

    public List<VolumeMount> buildVolumeMounts(List<Map<String, String>> volumeMountMapList) {
        List<VolumeMount> volumeMounts = Lists.newArrayList();
        if (!CollectionUtils.isEmpty(volumeMountMapList)) {
            for (Map<String, String> map : volumeMountMapList) {
                volumeMounts.add(TypeTool.castToBean(map, VolumeMount.class));
            }
        }
    //    VolumeMount testVolumeMount = new VolumeMount();
    //    testVolumeMount.setName("test-name");
    //    testVolumeMount.setMountPath("/home/test");
    //    volumeMounts.add(testVolumeMount); volumeMounts.add(testVolumeMount);
        return volumeMounts;
    }
    

    volumeList:

    public List<Volume> buildVolumes(List<VolumeMount> volumeMounts, List<Map<String, String>> volumeMountMapList) {
        return volumeMounts.stream().map(m -> {
            Volume volume = new Volume();
            volume.setName(m.getName());
            String path = m.getMountPath();
            if (!CollectionUtils.isEmpty(volumeMountMapList)) {
                Optional<Map<String, String>> optional = volumeMountMapList.stream().filter(p -> m.getName().equals(p.get("name"))).findFirst();
                if (optional.isPresent()) {
                    Map<String, String> volumeMap = optional.get();
                    if (volumeMap.containsKey("hostPath")) {
                        path = optional.get().get("hostPath");
                    }
                }
            }
            HostPathVolumeSource hostPath = new HostPathVolumeSource();
            hostPath.setPath(path);
            volume.setHostPath(hostPath);
            return volume;
        }).collect(Collectors.toList());
    }
    

    5、nodeAffinity

    List<NodeSelectorRequirement> matchExpressions = Lists.newArrayList();
    	  matchExpressions.add(new NodeSelectorRequirementBuilder().withKey("nvidia.kubernetes.io/gpu")
    	    //GpuTypeEnum.toContainerValues():List
    	    .withOperator("In").withValues(GpuTypeEnum.toContainerValues()).build());
    	    NodeAffinity nodeAffinity = new NodeAffinityBuilder()
    	            .withNewRequiredDuringSchedulingIgnoredDuringExecution()
    	                    .withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions(matchExpressions).build())
    	            .endRequiredDuringSchedulingIgnoredDuringExecution()
    	            .build();
    

    六、单个pod的创建和删除

    删除:

    client.pods().inNamespace(namespace).delete(pod);
    client.pods().inNamespace(namespace).withName(podname).delete();
    

    创建:

    Pod podToCreate = new PodBuilder()
    	.withNewMetadata()
    		.withName(podName)
    		.withNamespace(namespace)
    		.withLabels(labels)
    		.withAnnotations(annotations)
    	.endMetadata()
    	.withNewSpec()
    		.addNewContainer()
    			.withName(podName)
    			.withImage(imageUrl)
    			.withImagePullPolicy("IfNotPresent")
    			.withResources(resourceRequirements)
    			.withPorts(containerPorts)
    			.withEnv(envVarList)
    			.withVolumeMounts(volumeMounts)
    			.withCommand(commandList)
    			.withArgs(argList)
    		.endContainer()
    		.withNodeSelector(nodeSelector)
    		.withRestartPolicy("OnFailure")
    		.withVolumes(volumeList)
    	//如果需要容忍污点
    	.addNewToleration().withEffect("NoSchedule").withOperator("Exists").endToleration()
    	//节点选择策略
    	.withNewAffinity().withNodeAffinity(nodeAffinity).endAffinity()
    	.and().build();
    Pod pod = null;
    try {
        pod = client.pods().create(podToCreate);
    } catch (Exception e) {
    
    }
    

    这里需要用到的参数和deployment的差不多,就不赘述了。

    七、DaemonSet的创建

    和deployment的创建大致一致,只是使用的是client.apps().daemonSets()
    以及和上面的示例相比没有replicas,这里就不再做说明了。

    七、给node打标签

    //先查出所需node
    NodeList nodeList = client.nodes().list();
    //筛选出需要的node
    Optional<Node> optionalNode = nodeList.getItems().stream().filter(e -> e.getMetadata().getUid().equals(indexCode)).findFirst();
    if (!optionalNode.isPresent()) {
        throw new XxxException();
    }
    // 4. 处理node标签
    Node node = optionalNode.get();
    //获取原有标签
    Map<String, String> labels = node.getMetadata().getLabels();
    //加入新的标签
    labels.put("xxx","xxx")
    //设置标签
    node.getMetadata().setLabels(labels);
    //保存
    client.nodes().createOrReplace(node);
    
    
  • 相关阅读:
    14、Set 和 Map 数据结构
    scrapy框架流程
    [Linux]进程程序替换
    安全开发运维必备,如何进行Nginx代理Web服务器性能优化与安全加固配置,看这篇指南就够了
    【敬伟ps教程】自由变换
    虚拟列表方案实现
    Builder建造者模式
    ChatGPT人工智能:ai大模型应用开发源码搭建
    设计模式之(9)——适配器 模式
    实用又好用,4款高质量办公软件,助你工作效率,节节攀升
  • 原文地址:https://blog.csdn.net/yogima/article/details/140012814