这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
现有的微服务是使用的Spring Cloud Zookeeper
这一套,实际应用在Kubernetes
中部署并不需要额外的注册中心,本身Kubernetes
自己就支持,所以打算替换到Zookeeper
替换为Spring Cloud Kubernetes
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-zookeeper-discoveryartifactId>
dependency>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-kubernetes-client-allartifactId>
dependency>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-kubernetes-client-loadbalancerartifactId>
dependency>
版本我这里使用的最新版本,2.1.4
由于最新版本有bug,就是service.yaml
中如果没有定义port的name会报错,所以这里我们采用修改源码方式去解决
问题详情可以参考我之前发的博文
直接创建一个包名为:org.springframework.cloud.kubernetes.client.discovery
创建类KubernetesInformerDiscoveryClient
代码如下
package org.springframework.cloud.kubernetes.client.discovery;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import io.kubernetes.client.extended.wait.Wait;
import io.kubernetes.client.informer.SharedInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.openapi.models.V1EndpointAddress;
import io.kubernetes.client.openapi.models.V1EndpointPort;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesServiceInstance;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
*@author : wh
*@date : 2022/10/27 14:36
*@description:
*/
public class KubernetesInformerDiscoveryClient implements DiscoveryClient, InitializingBean {
private static final Log log = LogFactory.getLog(KubernetesInformerDiscoveryClient.class);
private static final String PRIMARY_PORT_NAME_LABEL_KEY = "primary-port-name";
private static final String HTTPS_PORT_NAME = "https";
private static final String UNSET_PORT_NAME = "" ;
private static final String HTTP_PORT_NAME = "http";
private final SharedInformerFactory sharedInformerFactory;
private final Lister<V1Service> serviceLister;
private final Supplier<Boolean> informersReadyFunc;
private final Lister<V1Endpoints> endpointsLister;
private final KubernetesDiscoveryProperties properties;
private final String namespace;
public KubernetesInformerDiscoveryClient(String namespace, SharedInformerFactory sharedInformerFactory,
Lister<V1Service> serviceLister, Lister<V1Endpoints> endpointsLister,
SharedInformer<V1Service> serviceInformer, SharedInformer<V1Endpoints> endpointsInformer,
KubernetesDiscoveryProperties properties) {
this.namespace = namespace;
this.sharedInformerFactory = sharedInformerFactory;
this.serviceLister = serviceLister;
this.endpointsLister = endpointsLister;
this.informersReadyFunc = () -> serviceInformer.hasSynced() && endpointsInformer.hasSynced();
this.properties = properties;
}
@Override
public String description() {
return "Kubernetes Client Discovery";
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
Assert.notNull(serviceId, "[Assertion failed] - the object argument must not be null");
if (!StringUtils.hasText(namespace) && !properties.isAllNamespaces()) {
log.warn("Namespace is null or empty, this may cause issues looking up services");
}
V1Service service = properties.isAllNamespaces() ? this.serviceLister.list().stream()
.filter(svc -> serviceId.equals(svc.getMetadata().getName())).findFirst().orElse(null)
: this.serviceLister.namespace(this.namespace).get(serviceId);
if (service == null || !matchServiceLabels(service)) {
// no such service present in the cluster
return new ArrayList<>();
}
Map<String, String> svcMetadata = new HashMap<>();
if (this.properties.getMetadata() != null) {
if (this.properties.getMetadata().isAddLabels()) {
if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
String labelPrefix = this.properties.getMetadata().getLabelsPrefix() != null
? this.properties.getMetadata().getLabelsPrefix() : "";
service.getMetadata().getLabels().entrySet().stream()
.filter(e -> e.getKey().startsWith(labelPrefix))
.forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));
}
}
if (this.properties.getMetadata().isAddAnnotations()) {
if (service.getMetadata() != null && service.getMetadata().getAnnotations() != null) {
String annotationPrefix = this.properties.getMetadata().getAnnotationsPrefix() != null
? this.properties.getMetadata().getAnnotationsPrefix() : "";
service.getMetadata().getAnnotations().entrySet().stream()
.filter(e -> e.getKey().startsWith(annotationPrefix))
.forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));
}
}
}
V1Endpoints ep = this.endpointsLister.namespace(service.getMetadata().getNamespace())
.get(service.getMetadata().getName());
if (ep == null || ep.getSubsets() == null) {
// no available endpoints in the cluster
return new ArrayList<>();
}
Optional<String> discoveredPrimaryPortName = Optional.empty();
if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
discoveredPrimaryPortName = Optional
.ofNullable(service.getMetadata().getLabels().get(PRIMARY_PORT_NAME_LABEL_KEY));
}
final String primaryPortName = discoveredPrimaryPortName.orElse(this.properties.getPrimaryPortName());
return ep.getSubsets().stream().filter(subset -> subset.getPorts() != null && subset.getPorts().size() > 0) // safeguard
.flatMap(subset -> {
Map<String, String> metadata = new HashMap<>(svcMetadata);
List<V1EndpointPort> endpointPorts = subset.getPorts();
if (this.properties.getMetadata() != null && this.properties.getMetadata().isAddPorts()) {
endpointPorts.forEach(p -> metadata.put(StringUtils.hasText(p.getName()) ? p.getName() : UNSET_PORT_NAME,
Integer.toString(p.getPort())));
}
List<V1EndpointAddress> addresses = subset.getAddresses();
if (addresses == null) {
addresses = new ArrayList<>();
}
if (this.properties.isIncludeNotReadyAddresses()
&& !CollectionUtils.isEmpty(subset.getNotReadyAddresses())) {
addresses.addAll(subset.getNotReadyAddresses());
}
final int port = findEndpointPort(endpointPorts, primaryPortName, serviceId);
return addresses.stream()
.map(addr -> new KubernetesServiceInstance(
addr.getTargetRef() != null ? addr.getTargetRef().getUid() : "", serviceId,
addr.getIp(), port, metadata, false, service.getMetadata().getNamespace(),
service.getMetadata().getClusterName()));
}).collect(Collectors.toList());
}
private int findEndpointPort(List<V1EndpointPort> endpointPorts, String primaryPortName, String serviceId) {
if (endpointPorts.size() == 1) {
return endpointPorts.get(0).getPort();
}
else {
Map<String, Integer> ports = endpointPorts.stream().filter(p -> StringUtils.hasText(p.getName()))
.collect(Collectors.toMap(V1EndpointPort::getName, V1EndpointPort::getPort));
// This oneliner is looking for a port with a name equal to the primary port
// name specified in the service label
// or in spring.cloud.kubernetes.discovery.primary-port-name, equal to https,
// or equal to http.
// In case no port has been found return -1 to log a warning and fall back to
// the first port in the list.
int discoveredPort = ports.getOrDefault(primaryPortName,
ports.getOrDefault(HTTPS_PORT_NAME, ports.getOrDefault(HTTP_PORT_NAME, -1)));
if (discoveredPort == -1) {
if (StringUtils.hasText(primaryPortName)) {
log.warn("Could not find a port named '" + primaryPortName + "', 'https', or 'http' for service '"
+ serviceId + "'.");
}
else {
log.warn("Could not find a port named 'https' or 'http' for service '" + serviceId + "'.");
}
log.warn(
"Make sure that either the primary-port-name label has been added to the service, or that spring.cloud.kubernetes.discovery.primary-port-name has been configured.");
log.warn("Alternatively name the primary port 'https' or 'http'");
log.warn("An incorrect configuration may result in non-deterministic behaviour.");
discoveredPort = endpointPorts.get(0).getPort();
}
return discoveredPort;
}
}
@Override
public List<String> getServices() {
List<V1Service> services = this.properties.isAllNamespaces() ? this.serviceLister.list()
: this.serviceLister.namespace(this.namespace).list();
return services.stream().filter(this::matchServiceLabels).map(s -> s.getMetadata().getName())
.collect(Collectors.toList());
}
@Override
public void afterPropertiesSet() throws Exception {
this.sharedInformerFactory.startAllRegisteredInformers();
if (!Wait.poll(Duration.ofSeconds(1), Duration.ofSeconds(this.properties.getCacheLoadingTimeoutSeconds()),
() -> {
log.info("Waiting for the cache of informers to be fully loaded..");
return this.informersReadyFunc.get();
})) {
if (this.properties.isWaitCacheReady()) {
throw new IllegalStateException(
"Timeout waiting for informers cache to be ready, is the kubernetes service up?");
}
else {
log.warn(
"Timeout waiting for informers cache to be ready, ignoring the failure because waitForInformerCacheReady property is false");
}
}
log.info("Cache fully loaded (total " + serviceLister.list().size()
+ " services) , discovery client is now available");
}
private boolean matchServiceLabels(V1Service service) {
if (log.isDebugEnabled()) {
log.debug("Kubernetes Service Label Properties:");
if (this.properties.getServiceLabels() != null) {
this.properties.getServiceLabels().forEach((key, value) -> log.debug(key + ":" + value));
}
log.debug("Service " + service.getMetadata().getName() + " labels:");
if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
service.getMetadata().getLabels().forEach((key, value) -> log.debug(key + ":" + value));
}
}
// safeguard
if (service.getMetadata() == null) {
return false;
}
if (properties.getServiceLabels() == null || properties.getServiceLabels().isEmpty()) {
return true;
}
return properties.getServiceLabels().keySet().stream()
.allMatch(k -> service.getMetadata().getLabels() != null
&& service.getMetadata().getLabels().containsKey(k)
&& service.getMetadata().getLabels().get(k).equals(properties.getServiceLabels().get(k)));
}
}
builder.routes()
.route(r -> r.path("/ms/test/**")
.filters(f -> f.stripPrefix(1))
.uri("lb://test-service"))
原先使用的是zookeeper
上的服务名进行转发的,如果pod
上面的服务名和之前zookeeper
上面注册的名字不一致,就需要改一下路由的服务名
然后再k8s中重新部署服务
可以看到这里是pod
没有权限调用k8s相关的api,授权就好了。使用RBAC权限处理
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: pod-reader
rules:
- apiGroups: [""]
resources: ["pods","configmaps"]
verbs: ["get", "watch", "list"]
apiVersion: v1
kind: ServiceAccount
metadata:
name: config-reader
namespace: default
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: pod-reader
namespace: default
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: pod-reader
subjects:
- kind: ServiceAccount
name: config-reader
namespace: default
参考博客
重新部署启动就会发现是无缝切换的
总得来说切换比较简单,基本是无缝的!这样就不用依赖外部的注册中心了