<dependency>
<groupId>io.fabric8groupId>
<artifactId>kubernetes-clientartifactId>
<version>5.10.2version>
dependency>
认证十分简单,只要拿到认证的config信息后使用以下方式即可。
KubernetesClient client = new DefaultKubernetesClient(config);
当然了,为了方便使用,client最好是存入数据库后再放在缓存中去维护。且需要对接多个k8s集群时,需要多个KubernetesClient,因此最好是在缓存中维护一个集群编码和client的对应关系。
那么认证的config信息要怎么拿到呢?通常有使用config文件和oauthtoken认证两种方式。
当然了,在初始时可以对这个client进行校验,测试连通性是否有问题,如果校验通过再在后面对它进行操作。
try {
NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = kubernetesClient.namespaces();
if (namespaces != null) {
namespaces.withName("default").get();
}
} catch (Exception e) {
throw new XxxException(Xxxxxx);
} finally {
kubernetesClient.close();
}
config文件在管理节点的/root/.kube/目录下面,在页面上传后,我们后端拿到的比如说是一个fileUploadReqBO下的byte[] file;
String kubeConfig = null;
Config config = null;
try {
kubeConfig = new String(fileUploadReqBO.getFile(), Charsets.UTF_8.toString());
config = Config.fromKubeconfig(kubeConfig);
} catch (Exception e) {
throw new XxxException(xxxx, e.getMessage());
}
获取oauthtoken需要有admin权限的serviceaccount,如果没有的话那么就自己手动创建一个。
创建serviceaccount账户,这里我们就叫test-admin:
kubectl create serviceaccount test-admin -n kube-system
给予admin权限:
kubectl create clusterrolebinding my-service-account-admin --clusterrole=cluster-admin --serviceaccount=kube-system:test-admin
执行以下命令
kubectl get secret -n kube-system|grep admin
找到返回结果中以test-admin-token-开头的内容,使用以下命令
kubectl describe secret test-admin-token-XXX -n kube-system
就可以获取到token了
获取到后可使用kubectl auth can-i create deployments --as=system:serviceaccount:kube-system:test-admin --token=
判断是否有管理员权限 yes有 no没有
那么这时假设我们能拿到一个masterUrl,例如 https://10.20.66.152:6443(kube-apiserver一般来说默认端口为6443)以及token。
就可以这样获取到config:
Config config = new ConfigBuilder().withTrustCerts(true).build();
config.setMasterUrl(masterUrl);
config.setOauthToken(oauthToken);
查询所有pod:
//已获取KubernetesClient:KubernetesClient client = new DefaultKubernetesClient(config);
PodList podList = client.pods().list();
根据命名空间查询:
PodList podList = client.pods().inNamespace(K8sGenericConstant.K8S_NAMESPACE_ENGINE_SERVER).list();
遍历pod:
if (podList != null && podList.getItems() != null) {
for(Pod pod : podList.getItems()){
//pod名称
String podName = pod.getMetadata().getName();
//pod所在节点名称
String nodeName = pod.getSpec().getNodeName();
//pod标签
Map<String, String> labels = pod.getMetadata().getLabels();
//命名空间
String ns = pod.getMetadata().getNamespace();
//状态
pod.getStatus().getContainerStatuses();
pod.getStatus().getReason();
List<PodCondition> podConditions = pod.getStatus().getConditions();
if (!CollectionUtils.isEmpty(podConditions)) {
PodCondition podCondition = podConditions.get(0);
reason = podCondition.getReason() + ":" + podCondition.getMessage();
}
}
创建
NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = client.namespaces();
if (namespaces == null) {
return null;
}
String name = "test-ns";
Map<String, String> labels = Maps.newHashMap();
labels.put("testlabel", "testvalue");
Namespace ns = new NamespaceBuilder().withNewMetadata().withName(name).addToLabels(labels).endMetadata().build();
ns = namespaces.createOrReplace(ns);
删除
NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = client.namespaces();
if (namespaces == null) {
return null;
}
namespaces.withName(name).delete();
删除:
//注意这里deployment需要先查询出Deployment类型,而不只是名称
client.apps().deployments().inNamespace(namespace).delete(deployment);
client.apps().deployments().inNamespace(namespace).withName(deploymentname).delete();
创建:
Deployment deployment = new DeploymentBuilder()
.withNewMetadata()
.withName(podName)
.endMetadata()
.withNewSpec()
.withNewSelector().addToMatchLabels(matchLabels).endSelector()
.withReplicas(1)
.withNewTemplate()
.withNewMetadata().withLabels(matchLabels).withNamespace(namespace).withAnnotations(annotations).endMetadata()
.withNewSpec()
.addNewContainer()
.withName(podName).withImage(imageUrl).withImagePullPolicy(K8sImagePullPolicyEnum.IF_NOT_PRESENT.getValue()).withResources(resourceRequirements).withPorts(containerPorts).withEnv(envVarList).withVolumeMounts(volumeMounts).withCommand(commandList).withArgs(argList).endContainer()
.withVolumes(volumeList)
.withNewAffinity().withNodeAffinity(nodeAffinity).endAffinity()
.withNodeSelector(nodeSelector)
.endSpec()
.endTemplate()
.endSpec()
.build();
client.apps().deployments().inNamespace(namespace).create(deployment);
其中的参数比如podName、namespace、podName、imageUrl是String类型,commandList、argList为List
ResourceRequirements resourceRequirements = new ResourceRequirements();
Map<String, Quantity> limits = new HashMap<>();
limits.put("cpu", new Quantity("2000m"));
limits.put("memory", new Quantity("20480Mi"));
limits.put("nvidia.kubernetes.io/gpu", new Quantity("1"));
Map<String, Quantity> requests = new HashMap<>();
requests.put("cpu", new Quantity("1000m"));
requests.put("memory", new Quantity("10240Mi"));
requests.put("nvidia.kubernetes.io/gpu", new Quantity("1"));
resourceRequirements.setRequests(requests);
resourceRequirements.setLimits(limits);
注意这里的limits.put()后面的key要和describe node获取的一致。比如这里的gpu用的是nvidia.kubernetes.io/gpu,如果是其他厂商的或者映射出来的不一致,则要和环境中保持一致。实际使用中经常做成可配置/传参的,由于这里只是一个示例,因此写死了。
Capacity:
cpu: 8
ephemeral-storage: 308468608Ki
hugepages-1Gi: 0
hugepages-2Mi: 0
memory: 32771060Ki
nvidia.kubernetes.io/gpu: 1
pods: 200
containerPorts需要的类型是List
也就是如下图所示:
public synchronized List<ContainerPort> buildContainerPorts() {
LOGGER.info("ports={}", ports);
List<ContainerPort> containerPortList = Lists.newArrayList();
//实际使用时需作为入参传入List,这里作为示范直接写死
ContainerPort port = new ContainerPort();
port.setHostPort(32111);
port.setName("web-port");
port.setProtocol("TCP");
port.setContainerPort(32111);
containerPortList.add(port);
//假设这里我们已经获得了一个containerPortList
containerPortList = containerPortList.stream().filter(p -> p.getHostPort() != null && p.getContainerPort() != null).collect(Collectors.toList());
if (CollectionUtils.isEmpty(containerPortList)) {
return null;
}
// 如果由上层直接指定端口的话,这里直接return containerPortList即可
//但当需要我们自己去分配端口时 需要尽量避免端口冲突,因此做了以下处理(并不完全能避免,但至少如果某个节点跑多个pod,不会只能跑一个其他的都在pending)
// 1.查询每个POD占用的端口
PodList podList = K8sClientTool.getKubernetesClient().pods().list();
Set<Integer> excludeNodePortList = Sets.newHashSet();
if (podList != null && podList.getItems() != null) {
for (Pod pod : podList.getItems()) {
List<Integer> portList = pod.getSpec().getContainers().stream().flatMap(m ->
m.getPorts().stream().filter(p -> p.getHostPort() != null).map(ContainerPort::getHostPort)
).collect(Collectors.toList());
excludeNodePortList.addAll(portList);
}
}
// 2.获取组件安装机器的端口,一般aid安装在K8S集群的主节点上,这样可以规避掉主要的端口
try {
String result = SshTool.doExecute("netstat -nlpt | grep -Po '\\d+(?=.+)' | sort -rn | xargs -n1");
if (StringUtils.isNotEmpty(result)) {
excludeNodePortList.addAll(Arrays.stream(result.split("\n")).map(s -> Integer.parseInt(s.trim())).collect(Collectors.toList()));
}
} catch (Exception e) {
throw new ComputeResourceException(AidServerErrorCode.ERR_DEVICE_SSH_CONNECT);
}
// 3.解决容器端口的占用和冲突问题,这里需要解决并发的问题,加一个锁来处理
List<Pair<Integer, Long>> needRemovePortPairList = Lists.newArrayList();
// 4.先加入配置文件中要排除的端口
excludeNodePortList.addAll(Arrays.stream(excludeNodePorts.split(",")).map(s -> Integer.parseInt(s.trim())).collect(Collectors.toList()));
// 5.再加入历史分配出去的端口,这些端口有可能没有真正的分配出去,但是需要缓存,避免同时出现2个要分配的端口
excludeNodePortList.addAll(excludeNodePortPairList.stream().map(pair -> {
if (pair.getRight() < (System.currentTimeMillis() - DEFAULT_TIME_TO_LIVE)) {
return pair.getLeft();
}
needRemovePortPairList.add(pair);
return null;
}).filter(p -> p != null).collect(Collectors.toSet()));
// 6.清理掉过期的缓存端口
excludeNodePortPairList.removeAll(needRemovePortPairList);
LOGGER.info("containerPortList={}, excludeNodePortList={}", containerPortList, excludeNodePortList);
containerPortList.stream().forEach(c -> {
// 优先使用分配的hostPort,不满足再随机分配
Integer hostPort = c.getHostPort();
while (excludeNodePortList.contains(hostPort)) {
hostPort = RandomUtils.nextInt(minNodePort, maxNodePort);
}
excludeNodePortList.add(hostPort);
excludeNodePortPairList.add(Pair.of(hostPort, System.currentTimeMillis()));
if (StringUtils.isNotEmpty(c.getName())) {
c.setName(c.getName().toLowerCase().replaceAll("_", "-"));
if (c.getName().length() > 15) {
c.setName(c.getName().substring(0, 15));
}
}
c.setHostPort(hostPort);
});
LOGGER.info("containerPortList={}", containerPortList);
return containerPortList;
}
List<EnvVar> envVarList = Lists.newArrayList();
EnvVar envVar = new EnvVar();
envVar.setName("TEST_ENV_KEY");
envVar.setValue("TEST_ENV_VALUE");
envVarList.add(envVar);
假设参数以List
volumeMounts:
public List<VolumeMount> buildVolumeMounts(List<Map<String, String>> volumeMountMapList) {
List<VolumeMount> volumeMounts = Lists.newArrayList();
if (!CollectionUtils.isEmpty(volumeMountMapList)) {
for (Map<String, String> map : volumeMountMapList) {
volumeMounts.add(TypeTool.castToBean(map, VolumeMount.class));
}
}
// VolumeMount testVolumeMount = new VolumeMount();
// testVolumeMount.setName("test-name");
// testVolumeMount.setMountPath("/home/test");
// volumeMounts.add(testVolumeMount); volumeMounts.add(testVolumeMount);
return volumeMounts;
}
volumeList:
public List<Volume> buildVolumes(List<VolumeMount> volumeMounts, List<Map<String, String>> volumeMountMapList) {
return volumeMounts.stream().map(m -> {
Volume volume = new Volume();
volume.setName(m.getName());
String path = m.getMountPath();
if (!CollectionUtils.isEmpty(volumeMountMapList)) {
Optional<Map<String, String>> optional = volumeMountMapList.stream().filter(p -> m.getName().equals(p.get("name"))).findFirst();
if (optional.isPresent()) {
Map<String, String> volumeMap = optional.get();
if (volumeMap.containsKey("hostPath")) {
path = optional.get().get("hostPath");
}
}
}
HostPathVolumeSource hostPath = new HostPathVolumeSource();
hostPath.setPath(path);
volume.setHostPath(hostPath);
return volume;
}).collect(Collectors.toList());
}
List<NodeSelectorRequirement> matchExpressions = Lists.newArrayList();
matchExpressions.add(new NodeSelectorRequirementBuilder().withKey("nvidia.kubernetes.io/gpu")
//GpuTypeEnum.toContainerValues():List
.withOperator("In").withValues(GpuTypeEnum.toContainerValues()).build());
NodeAffinity nodeAffinity = new NodeAffinityBuilder()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions(matchExpressions).build())
.endRequiredDuringSchedulingIgnoredDuringExecution()
.build();
删除:
client.pods().inNamespace(namespace).delete(pod);
client.pods().inNamespace(namespace).withName(podname).delete();
创建:
Pod podToCreate = new PodBuilder()
.withNewMetadata()
.withName(podName)
.withNamespace(namespace)
.withLabels(labels)
.withAnnotations(annotations)
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName(podName)
.withImage(imageUrl)
.withImagePullPolicy("IfNotPresent")
.withResources(resourceRequirements)
.withPorts(containerPorts)
.withEnv(envVarList)
.withVolumeMounts(volumeMounts)
.withCommand(commandList)
.withArgs(argList)
.endContainer()
.withNodeSelector(nodeSelector)
.withRestartPolicy("OnFailure")
.withVolumes(volumeList)
//如果需要容忍污点
.addNewToleration().withEffect("NoSchedule").withOperator("Exists").endToleration()
//节点选择策略
.withNewAffinity().withNodeAffinity(nodeAffinity).endAffinity()
.and().build();
Pod pod = null;
try {
pod = client.pods().create(podToCreate);
} catch (Exception e) {
}
这里需要用到的参数和deployment的差不多,就不赘述了。
和deployment的创建大致一致,只是使用的是client.apps().daemonSets()
以及和上面的示例相比没有replicas,这里就不再做说明了。
//先查出所需node
NodeList nodeList = client.nodes().list();
//筛选出需要的node
Optional<Node> optionalNode = nodeList.getItems().stream().filter(e -> e.getMetadata().getUid().equals(indexCode)).findFirst();
if (!optionalNode.isPresent()) {
throw new XxxException();
}
// 4. 处理node标签
Node node = optionalNode.get();
//获取原有标签
Map<String, String> labels = node.getMetadata().getLabels();
//加入新的标签
labels.put("xxx","xxx")
//设置标签
node.getMetadata().setLabels(labels);
//保存
client.nodes().createOrReplace(node);