• Spring Cloud Zookeeper 升级为Spring Cloud Kubernetes


    这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

    背景

    现有的微服务是使用的Spring Cloud Zookeeper这一套,实际应用在Kubernetes中部署并不需要额外的注册中心,本身Kubernetes自己就支持,所以打算替换到Zookeeper 替换为Spring Cloud Kubernetes

    替换

    1. 删除Spring Cloud Zookeeper相关依赖

    	<dependency>
          <groupId>org.springframework.cloudgroupId>
          <artifactId>spring-cloud-starter-zookeeper-discoveryartifactId>
        dependency>
    
    • 1
    • 2
    • 3
    • 4

    2. 添加 Spring Cloud Kubernetes 相关依赖

    	<dependency>
          <groupId>org.springframework.cloudgroupId>
          <artifactId>spring-cloud-starter-kubernetes-client-allartifactId>
        dependency>
    
        <dependency>
          <groupId>org.springframework.cloudgroupId>
          <artifactId>spring-cloud-kubernetes-client-loadbalancerartifactId>
        dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    版本我这里使用的最新版本,2.1.4

    3. 解决port没有命名的bug

    由于最新版本有bug,就是service.yaml中如果没有定义port的name会报错,所以这里我们采用修改源码方式去解决

    问题详情可以参考我之前发的博文

    直接创建一个包名为:org.springframework.cloud.kubernetes.client.discovery
    创建类KubernetesInformerDiscoveryClient 代码如下

    package org.springframework.cloud.kubernetes.client.discovery;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Optional;
    import java.util.function.Supplier;
    import java.util.stream.Collectors;
    
    import io.kubernetes.client.extended.wait.Wait;
    import io.kubernetes.client.informer.SharedInformer;
    import io.kubernetes.client.informer.SharedInformerFactory;
    import io.kubernetes.client.informer.cache.Lister;
    import io.kubernetes.client.openapi.models.V1EndpointAddress;
    import io.kubernetes.client.openapi.models.V1EndpointPort;
    import io.kubernetes.client.openapi.models.V1Endpoints;
    import io.kubernetes.client.openapi.models.V1Service;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.client.discovery.DiscoveryClient;
    import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
    import org.springframework.cloud.kubernetes.commons.discovery.KubernetesServiceInstance;
    import org.springframework.util.Assert;
    import org.springframework.util.CollectionUtils;
    import org.springframework.util.StringUtils;
    
    /**
     *@author : wh
     *@date : 2022/10/27 14:36
     *@description:
     */
    public class KubernetesInformerDiscoveryClient implements DiscoveryClient, InitializingBean {
    
    	private static final Log log = LogFactory.getLog(KubernetesInformerDiscoveryClient.class);
    
    	private static final String PRIMARY_PORT_NAME_LABEL_KEY = "primary-port-name";
    
    	private static final String HTTPS_PORT_NAME = "https";
    
    	private static final String UNSET_PORT_NAME = "";
    
    	private static final String HTTP_PORT_NAME = "http";
    
    	private final SharedInformerFactory sharedInformerFactory;
    
    	private final Lister<V1Service> serviceLister;
    
    	private final Supplier<Boolean> informersReadyFunc;
    
    	private final Lister<V1Endpoints> endpointsLister;
    
    	private final KubernetesDiscoveryProperties properties;
    
    	private final String namespace;
    
    	public KubernetesInformerDiscoveryClient(String namespace, SharedInformerFactory sharedInformerFactory,
    			Lister<V1Service> serviceLister, Lister<V1Endpoints> endpointsLister,
    			SharedInformer<V1Service> serviceInformer, SharedInformer<V1Endpoints> endpointsInformer,
    			KubernetesDiscoveryProperties properties) {
    		this.namespace = namespace;
    		this.sharedInformerFactory = sharedInformerFactory;
    
    		this.serviceLister = serviceLister;
    		this.endpointsLister = endpointsLister;
    		this.informersReadyFunc = () -> serviceInformer.hasSynced() && endpointsInformer.hasSynced();
    
    		this.properties = properties;
    	}
    
    	@Override
    	public String description() {
    		return "Kubernetes Client Discovery";
    	}
    
    	@Override
    	public List<ServiceInstance> getInstances(String serviceId) {
    		Assert.notNull(serviceId, "[Assertion failed] - the object argument must not be null");
    
    		if (!StringUtils.hasText(namespace) && !properties.isAllNamespaces()) {
    			log.warn("Namespace is null or empty, this may cause issues looking up services");
    		}
    
    		V1Service service = properties.isAllNamespaces() ? this.serviceLister.list().stream()
    				.filter(svc -> serviceId.equals(svc.getMetadata().getName())).findFirst().orElse(null)
    				: this.serviceLister.namespace(this.namespace).get(serviceId);
    		if (service == null || !matchServiceLabels(service)) {
    			// no such service present in the cluster
    			return new ArrayList<>();
    		}
    
    		Map<String, String> svcMetadata = new HashMap<>();
    		if (this.properties.getMetadata() != null) {
    			if (this.properties.getMetadata().isAddLabels()) {
    				if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
    					String labelPrefix = this.properties.getMetadata().getLabelsPrefix() != null
    							? this.properties.getMetadata().getLabelsPrefix() : "";
    					service.getMetadata().getLabels().entrySet().stream()
    							.filter(e -> e.getKey().startsWith(labelPrefix))
    							.forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));
    				}
    			}
    			if (this.properties.getMetadata().isAddAnnotations()) {
    				if (service.getMetadata() != null && service.getMetadata().getAnnotations() != null) {
    					String annotationPrefix = this.properties.getMetadata().getAnnotationsPrefix() != null
    							? this.properties.getMetadata().getAnnotationsPrefix() : "";
    					service.getMetadata().getAnnotations().entrySet().stream()
    							.filter(e -> e.getKey().startsWith(annotationPrefix))
    							.forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));
    				}
    			}
    		}
    
    		V1Endpoints ep = this.endpointsLister.namespace(service.getMetadata().getNamespace())
    				.get(service.getMetadata().getName());
    		if (ep == null || ep.getSubsets() == null) {
    			// no available endpoints in the cluster
    			return new ArrayList<>();
    		}
    
    		Optional<String> discoveredPrimaryPortName = Optional.empty();
    		if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
    			discoveredPrimaryPortName = Optional
    					.ofNullable(service.getMetadata().getLabels().get(PRIMARY_PORT_NAME_LABEL_KEY));
    		}
    		final String primaryPortName = discoveredPrimaryPortName.orElse(this.properties.getPrimaryPortName());
    
    		return ep.getSubsets().stream().filter(subset -> subset.getPorts() != null && subset.getPorts().size() > 0) // safeguard
    				.flatMap(subset -> {
    					Map<String, String> metadata = new HashMap<>(svcMetadata);
    					List<V1EndpointPort> endpointPorts = subset.getPorts();
    					if (this.properties.getMetadata() != null && this.properties.getMetadata().isAddPorts()) {
    						endpointPorts.forEach(p -> metadata.put(StringUtils.hasText(p.getName()) ? p.getName() : UNSET_PORT_NAME,
    								Integer.toString(p.getPort())));
    					}
    					List<V1EndpointAddress> addresses = subset.getAddresses();
    					if (addresses == null) {
    						addresses = new ArrayList<>();
    					}
    					if (this.properties.isIncludeNotReadyAddresses()
    							&& !CollectionUtils.isEmpty(subset.getNotReadyAddresses())) {
    						addresses.addAll(subset.getNotReadyAddresses());
    					}
    
    					final int port = findEndpointPort(endpointPorts, primaryPortName, serviceId);
    					return addresses.stream()
    							.map(addr -> new KubernetesServiceInstance(
    									addr.getTargetRef() != null ? addr.getTargetRef().getUid() : "", serviceId,
    									addr.getIp(), port, metadata, false, service.getMetadata().getNamespace(),
    									service.getMetadata().getClusterName()));
    				}).collect(Collectors.toList());
    	}
    
    	private int findEndpointPort(List<V1EndpointPort> endpointPorts, String primaryPortName, String serviceId) {
    		if (endpointPorts.size() == 1) {
    			return endpointPorts.get(0).getPort();
    		}
    		else {
    			Map<String, Integer> ports = endpointPorts.stream().filter(p -> StringUtils.hasText(p.getName()))
    					.collect(Collectors.toMap(V1EndpointPort::getName, V1EndpointPort::getPort));
    			// This oneliner is looking for a port with a name equal to the primary port
    			// name specified in the service label
    			// or in spring.cloud.kubernetes.discovery.primary-port-name, equal to https,
    			// or equal to http.
    			// In case no port has been found return -1 to log a warning and fall back to
    			// the first port in the list.
    			int discoveredPort = ports.getOrDefault(primaryPortName,
    					ports.getOrDefault(HTTPS_PORT_NAME, ports.getOrDefault(HTTP_PORT_NAME, -1)));
    
    			if (discoveredPort == -1) {
    				if (StringUtils.hasText(primaryPortName)) {
    					log.warn("Could not find a port named '" + primaryPortName + "', 'https', or 'http' for service '"
    							+ serviceId + "'.");
    				}
    				else {
    					log.warn("Could not find a port named 'https' or 'http' for service '" + serviceId + "'.");
    				}
    				log.warn(
    						"Make sure that either the primary-port-name label has been added to the service, or that spring.cloud.kubernetes.discovery.primary-port-name has been configured.");
    				log.warn("Alternatively name the primary port 'https' or 'http'");
    				log.warn("An incorrect configuration may result in non-deterministic behaviour.");
    				discoveredPort = endpointPorts.get(0).getPort();
    			}
    			return discoveredPort;
    		}
    	}
    
    	@Override
    	public List<String> getServices() {
    		List<V1Service> services = this.properties.isAllNamespaces() ? this.serviceLister.list()
    				: this.serviceLister.namespace(this.namespace).list();
    		return services.stream().filter(this::matchServiceLabels).map(s -> s.getMetadata().getName())
    				.collect(Collectors.toList());
    	}
    
    	@Override
    	public void afterPropertiesSet() throws Exception {
    		this.sharedInformerFactory.startAllRegisteredInformers();
    		if (!Wait.poll(Duration.ofSeconds(1), Duration.ofSeconds(this.properties.getCacheLoadingTimeoutSeconds()),
    				() -> {
    					log.info("Waiting for the cache of informers to be fully loaded..");
    					return this.informersReadyFunc.get();
    				})) {
    			if (this.properties.isWaitCacheReady()) {
    				throw new IllegalStateException(
    						"Timeout waiting for informers cache to be ready, is the kubernetes service up?");
    			}
    			else {
    				log.warn(
    						"Timeout waiting for informers cache to be ready, ignoring the failure because waitForInformerCacheReady property is false");
    			}
    		}
    		log.info("Cache fully loaded (total " + serviceLister.list().size()
    				+ " services) , discovery client is now available");
    	}
    
    	private boolean matchServiceLabels(V1Service service) {
    		if (log.isDebugEnabled()) {
    			log.debug("Kubernetes Service Label Properties:");
    			if (this.properties.getServiceLabels() != null) {
    				this.properties.getServiceLabels().forEach((key, value) -> log.debug(key + ":" + value));
    			}
    			log.debug("Service " + service.getMetadata().getName() + " labels:");
    			if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
    				service.getMetadata().getLabels().forEach((key, value) -> log.debug(key + ":" + value));
    			}
    		}
    		// safeguard
    		if (service.getMetadata() == null) {
    			return false;
    		}
    		if (properties.getServiceLabels() == null || properties.getServiceLabels().isEmpty()) {
    			return true;
    		}
    		return properties.getServiceLabels().keySet().stream()
    				.allMatch(k -> service.getMetadata().getLabels() != null
    						&& service.getMetadata().getLabels().containsKey(k)
    						&& service.getMetadata().getLabels().get(k).equals(properties.getServiceLabels().get(k)));
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246

    4. 修改路由转发

    				builder.routes()
                    .route(r -> r.path("/ms/test/**")
                            .filters(f -> f.stripPrefix(1))
                            .uri("lb://test-service"))
    
    • 1
    • 2
    • 3
    • 4

    原先使用的是zookeeper上的服务名进行转发的,如果pod上面的服务名和之前zookeeper上面注册的名字不一致,就需要改一下路由的服务名

    部署

    然后再k8s中重新部署服务

    报错

    在这里插入图片描述

    可以看到这里是pod没有权限调用k8s相关的api,授权就好了。使用RBAC权限处理

    1. 创建role
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      namespace: default
      name: pod-reader
    rules:
      - apiGroups: [""]
        resources: ["pods","configmaps"]
        verbs: ["get", "watch", "list"]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 创建ServiceAccount
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: config-reader
      namespace: default
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 绑定Role和ServiceAccount
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: pod-reader
      namespace: default
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: pod-reader
    subjects:
      - kind: ServiceAccount
        name: config-reader
        namespace: default
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    1. 在deployment中指定上面的ServiceAccount

    参考博客

    重新部署启动就会发现是无缝切换的

    总结

    总得来说切换比较简单,基本是无缝的!这样就不用依赖外部的注册中心了

  • 相关阅读:
    从零开始写一个PHP开发框架websocket框架
    LabVIEW浮点型和双精度数据类型之间的精度差异是什么 为什么 在LabVIEW 中, 浮点 数 会 失去 精度?
    AbstractApplicationContext抽象类解读
    视频剪辑音效处理软件有哪些?视频剪辑软件那个好用
    力扣热题100——一刷day01
    ES6 入门教程 11 对象的新增方法 11.5 Object.keys(),Object.values(),Object.entries()
    ssh 连接错误 Too many authentication failures 解决方法
    c语言进阶:冒泡排序函数初步实现到逐步优化
    《数据结构、算法与应用C++语言描述》使用C++语言实现数组栈
    Qt在工控行业的一些重点知识点
  • 原文地址:https://blog.csdn.net/qq_42651904/article/details/127725818