• Istio-PilotAgent源码分析


    前序

    在正式开始源码分析之前,我们需要了解一下agent的配置

    type Agent struct {
    	// 代理配置,配置envoy运行文件目录,代理镜像地址等
    	// 与mesh.proxyconfig属性一致
    	proxyConfig *mesh.ProxyConfig
    	// 主要将proxyConfig,secOpts信息进行封装
    	cfg *AgentOptions
    	// 安全信息配置,主要存储了静态证书的地址,证书提供者等
    	secOpts *security.Options
    	//  envoy运行时所需要的信息,比如envoy二进制文件,日志级别等
    	envoyOpts  envoy.ProxyConfig
    	envoyAgent *envoy.Agent
    	// envoy等待管道
    	envoyWaitCh chan error
    	// SDSGRPC服务器,主要用于工作负载的证书申请
    	// sds会生成证书然后调用secretCache对证书进行签证,完成后发送给envoy
    	sdsServer *sds.Server
    	// 用于SDS证书签证,可以通过文件的形式进行签证
    	// 默认使用istiod对工作负载进行签证
    	secretCache *cache.SecretManagerClient
    
    	// Used when proxying envoy xds via istio-agent is enabled.
    	// agnet的XDS服务器,主要用于连接上游istiod与下游envoy通讯的桥梁
    	// envoy发送注册事件给agent,agent 会将注册事件转发给istiod
    	// istiod下发的配置会通过agent传输给envoy
    	xdsProxy *XdsProxy
    	// 证书监听器,监听证书更新事件然后触发证书更新策略
    	// 主要是获取证书然后重新进行签证生成envoy配置下发
    	caFileWatcher filewatcher.FileWatcher
    
    	// local DNS Server that processes DNS requests locally and forwards to upstream DNS if needed.
    	// 本地DNS服务器
    	localDNSServer *dnsClient.LocalDNSServer
    
    	// Signals true completion (e.g. with delayed graceful termination of Envoy)
    	wg sync.WaitGroup
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    以上配置包含了agent运行过程中使用到的所有配置,我们对他们混个眼熟后就开始逐个击破吧!
    下面重点分析proxyConfig,cfg,secOpts.

    proxyConfig

    代理配置,属于mesh中的属性,可以对全局的服务网格中的代理进行配置.

    type ProxyConfig struct {
    	// 生成配置输出的目录,默认为./etc/istio/proxy   主要生成envoy的bootstrap配置文件
    	ConfigPath string 
    	// envoy运行文件的目录
    	BinaryPath string 
        // 当前服务的名称istio-proxy
    	ClusterName isProxyConfig_ClusterName `protobuf_oneof:"cluster_name"`
    	// envoy热启动耗时时间
    	DrainDuration *duration.Duration 
    	// envoy关闭等待时间
    	ParentShutdownDuration *duration.Duration 
    	// istiod地址,默认为istiod.istio-system.svc:15012
    	DiscoveryAddress string 
    	//UDp地址
    	StatsdUdpAddress string 
    	//envoyMetrics地址
    	EnvoyMetricsServiceAddress string 
    	// 代理管理端口
    	ProxyAdminPort int32  
    	// 连接控制层面 istiod 时的身份验证策略,默认为双向TLS
    	ControlPlaneAuthPolicy AuthenticationPolicy 
    	// 自定义配置文件
    	CustomConfigFile string 
    	// 指标最大名字长度
    	StatNameLength int32
    	// 工作线程
    	Concurrency *wrappers.Int32Value
    	// bootstrap模板文件目录
    	ProxyBootstrapTemplatePath string 
    	// 流入代理策略
    	InterceptionMode ProxyConfig_InboundInterceptionMode 
    	// 追踪
    	Tracing *Tracing
    	// 附加的环境变量
    	ProxyMetadata map[string]string 
    	// 状态端口
    	StatusPort int32
    	// 额外证书列表
    	CaCertificatesPem []string 
    	// 代理镜像地址
    	Image *v1beta1.ProxyImage 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    SecurityOptions

    安全配置包含了agent要使用的证书,token等,强烈建议仔细理解,对于下面的证书请求,认证有很大的帮助.

    type Options struct {
    	// CA签发服务器,默认为istiod.istio-system.svc:15012
    	CAEndpoint string
    	// 设置ServerName 来覆盖CAEndpoint提取的ServerName
    	CAEndpointSAN string
    	// CA提供者的名称,默认为Citadel说明使用内部的证书管理
    	CAProviderName string
    	// 是否生成PKCS#8私钥
    	Pkcs8Keys bool
    	// 为工作负载生成的证书输出目录
    	OutputKeyCertToDir string
    	// 客户端在认证时向CA服务器提供密钥和证书的目录
    	ProvCert string
    	//签名算法
    	ECCSigAlg string
    	// 证书提供者 默认为istiod
    	PilotCertProvider string
    	// STS port
    	STSPort int
    	//身份验证提供程序特定插件
    	TokenExchanger TokenExchanger
        // 这里主要用于grpcPerRPCCredentials中的get.token使用
        // 在请求发送前运行,将token值添加到请求头中
    	CredFetcher CredFetcher
    	// 凭证身份提供者
    	CredIdentityProvider string
    	// 与工作负载对应的命名空间
    	WorkloadNamespace string
    	// token管理器
    	TokenManager TokenManager
    	//证书签名者信息
    	CertSigner string
    
    	//系统CA证书根地址值
    	CARootPath string
    	// 本地现有证书集合,agent会首先使用该证书作为工作负载的证书.
    	CertChainFilePath string
    	// The path for an existing key file
    	KeyFilePath string
    	// The path for an existing root certificate bundle
    	RootCertFilePath string
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    对于证书的配置,我感觉istio配置的有些乱,接下来我会在证书章节对上面的配置进行讲解.

    cfg

    对于cfg,它就像一个缝合怪将上面的配置封装了一下,下面将重点讲解几个比较重要的属性

    	// XDSRootCerts is the location of the root CA for the XDS connection. Used for setting platform certs or
    	// using custom roots.
    	// 创建XDS服务器中的上游连接时所需要的Root证书,默认为/var/run/secrets/istio/root-cert.pem
    	// 这个是cm中的istio-ca-root-cert里的key
    	// istio-ca-root-cert是dicovery创建的,当命名空间被创建后,会自动为其创建一个istio-ca-root-cert
    	XDSRootCerts string
    
    	// CARootCerts of the location of the root CA for the CA connection. Used for setting platform certs or
    	// using custom roots.
    	// 用于SDS连接上游istiod时所用到的证书默认值也是/var/run/secrets/istio/root-cert.pem
    	CARootCerts string
    	// Path to local UDS to communicate with Envoy
    	// agent使用unix与envoy通讯,当前unix的目录
    	XdsUdsPath string
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    创建

    pilot-agent是istio向应用注入的sidecar容器,它主要有以下工作

    1. 解析外部配置参数(可以根据文件,环境变量配置),生成envoy的Bootstrap配置文件(envoy的根配置文件,相当于全局配置).
    2. 监视证书的变化,实现证书的热加载.
    3. 监视envoy运行状态,提供deplyoment的功能
    4. 通知envoy进程优雅退出
    cmd.PrintFlags(c.Flags())
    log.Infof("Version %s", version.Info.String())
    
    // 初始化代理结构体,解析podip,domain
    proxy, err := initProxy(args)
    if err != nil {
        return err
    }
    // 生成代理配置
    proxyConfig, err := config.ConstructProxyConfig(meshConfigFile, serviceCluster, options.ProxyConfigEnv, concurrency, proxy)
    if err != nil {
        return fmt.Errorf("failed to get proxy config: %v", err)
    }
    if out, err := protomarshal.ToYAML(proxyConfig); err != nil {
        log.Infof("Failed to serialize to YAML: %v", err)
    } else {
        log.Infof("Effective config: %s", out)
    }
    
    //创建sts配置
    secOpts, err := options.NewSecurityOptions(proxyConfig, stsPort, tokenManagerPlugin)
    if err != nil {
        return err
    }
    
    // If security token service (STS) port is not zero, start STS server and
    // listen on STS port for STS requests. For STS, see
    // https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16.
    // STS is used for stackdriver or other Envoy services using google gRPC.
    if stsPort > 0 {
        stsServer, err := initStsServer(proxy, secOpts.TokenManager)
        if err != nil {
            return err
        }
        defer stsServer.Stop()
    }
    
    // If we are using a custom template file (for control plane proxy, for example), configure this.
    if templateFile != "" && proxyConfig.CustomConfigFile == "" {
        proxyConfig.ProxyBootstrapTemplatePath = templateFile
    }
    
    envoyOptions := envoy.ProxyConfig{
        LogLevel:          proxyLogLevel,
        ComponentLogLevel: proxyComponentLogLevel,
        LogAsJSON:         loggingOptions.JSONEncoding,
        NodeIPs:           proxy.IPAddresses,
        Sidecar:           proxy.Type == model.SidecarProxy,
        OutlierLogPath:    outlierLogPath,
    }
    // 创建agent配置
    agentOptions := options.NewAgentOptions(proxy, proxyConfig)
    // 创建agent服务器(只是创建了一个结构体,具体功能实现在run中)
    agent := istio_agent.NewAgent(proxyConfig, agentOptions, secOpts, envoyOptions)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // If a status port was provided, start handling status probes.
    if proxyConfig.StatusPort > 0 {
        // 状态服务器初始化操作
        if err := initStatusServer(ctx, proxy, proxyConfig, agentOptions.EnvoyPrometheusPort, agent); err != nil {
            return err
        }
    }
    
    go iptableslog.ReadNFLOGSocket(ctx)
    
    // On SIGINT or SIGTERM, cancel the context, triggering a graceful shutdown
    go cmd.WaitSignalFunc(cancel)
    
    // Start in process SDS, dns server, xds proxy, and Envoy.
    // agent服务启动
    wait, err := agent.Run(ctx)
    if err != nil {
        return err
    }
    wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    配置

    对于配置在前序的时候已经讲解属性的含义,这里不再具体赘述.
    我们主要讲解一下更改这些配置的方式

    1. 使用静态的mesh配置文件,这个istio已经创建了一个configmap
    2. 使用环境变量,具体细节请参考https://istio.io/latest/zh/docs/reference/commands/pilot-agent/#pilot-agent-completion
    3. 使用运行参数.请参考https://istio.io/latest/zh/docs/reference/commands/pilot-agent/#pilot-agent-completion

    本文主要关心证书的配置,因为agent最重要的一个功能就是实现了工作负载到工作负载之间的双向TLS.
    对于/var/run/secrets/istio/root-cert.pem这个证书的ROOTCA是由dicovery中的maybeCreateCA进行创建的,然后通过initMulticluster监听命名空间资源添加生成root-cert.pem证书的事件.
    具体流程为dicovery创建了一个CA服务器,生成CA证书然后然后监视每个命名空间,为其创建cm
    这里还有一个需要注意的就是当agent连接istiod时除了证书的双向认证外还需要JWT认证(默认),默认情况下是使用k8s进行认证也就是说传输的token值必须与k8s有关系,在agent中也是这么做的,agent会将k8s中defaultSA用户的token值映射到/var/run/secrets/tokens/istio-token,然后通过grpc的前置函数将当前token值添加到请求头中.

    启动

    agnet的对于grpc的创建,envoy 的启动都在run方法中运行,接下里就让我们列举一下它都做了那些事.

    1. 创建DNS服务器
    2. 创建SDS服务器,主要用于envoy证书的申请
    3. 创建XDSproxy,主要用于envoy服务发现
    4. 启动envoy,使用cmd命令启动呗

    接下本文将重点分析2,3步
    在开始之前我们先说一下他们的通讯方式,
    agent->envoy 使用的是unix通讯,这种方式没有使用tls,但是又保证了安全性,性能要比tls要好
    agent->istiod 使用的是tls通讯,通过证书与istiod建立连接,然后通过token值进行身份校验

    SDS服务器

    SDS主要用于envoy申请证书使用,申请的证书用作envoy->envoy(工作负载->工作负载)之间的mtls.
    具体流程为

    1. envoy发送证书申请指令(这个应该是写死的请求的url,在envoy里没有找到响应的配置)
    2. 由于使用的是unix通讯没有使用tls,所以agent中的xdsserver直接跳过身份校验环节,xdsserver与envoy建立连接接受到请求后,首先查询是否使用的为静态CA,否则生成证书然后调用istiod客户端,向istiod进行签名.
    3. istiod接受到连接后对其进行签名返回
    4. xdsserver接受到返回信息后会将其缓存到agent中,然后调用envoy配置更新方法(具体的流程pilotDiscovery启动文章中有讲到),将证书转换为envoy配置资源推送给envoy.

    下面让我们看一下SDS服务器的创建代码

    func (a *Agent) initSdsServer() error {
    	var err error
    	// 检查是否配置了静态CA证书,如果配置了直接使用该证书,就不用动态判断证书提供者从而获取证书了
    	if security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath, security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
    		log.Info("workload certificate files detected, creating secret manager without caClient")
    		a.secOpts.RootCertFilePath = security.WorkloadIdentityRootCertPath
    		a.secOpts.CertChainFilePath = security.WorkloadIdentityCertChainPath
    		a.secOpts.KeyFilePath = security.WorkloadIdentityKeyPath
    		a.secretCache, err = cache.NewSecretManagerClient(nil, a.secOpts)
    		if err != nil {
    			return fmt.Errorf("failed to start workload secret manager %v", err)
    		}
    	} else {
    		// 根据istio-ca-secret生成的root.pem(istio-ca-root-cert) 与istiod进行双向认证,然后建立证书创建管理连接
    		// 并返回该client
    		a.secretCache, err = a.newSecretManager()
    		if err != nil {
    			return fmt.Errorf("failed to start workload secret manager %v", err)
    		}
    	}
    	// 这里判断如果不使用envoy
    	if a.cfg.DisableEnvoy {
    		go func() {
    			st := a.secretCache
    			st.RegisterSecretHandler(func(resourceName string) {
    				_, _ = a.getWorkloadCerts(st)
    			})
    			_, _ = a.getWorkloadCerts(st)
    		}()
    	} else {
    		// 使用tls加速,是istio1.14新特性主要加速双向tls
    		pkpConf := a.proxyConfig.GetPrivateKeyProvider()
    		// 创建sds服务器,与istiod建立连接
    		a.sdsServer = sds.NewServer(a.secOpts, a.secretCache, pkpConf)
    		// 这里的OnSecretUpdate会通知XDS服务器重新生成证书推送给envoy
    		// 注意,推送的类型为secret,但是对该类型的生成器并不是discovery默认生成器,而是sdsservice类型生成器
    		// sdsservice该生成器的作用是重新向istiod获取证书,然后推送
    		a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
    	}
    	return nil
    }
    
    // NewServer creates and starts the Grpc server for SDS.
    func NewServer(options *security.Options, workloadSecretCache security.SecretManager, pkpConf *mesh.PrivateKeyProvider) *Server {
    	s := &Server{stopped: atomic.NewBool(false)}
    	// 创建xds服务器,主要用于生成secert配置然后发送给envoy
    	s.workloadSds = newSDSService(workloadSecretCache, options, pkpConf)
    	// 与envoy建立unix连接使用socker,注册grpc方法,供envoy调用
    	s.initWorkloadSdsService()
    	sdsServiceLog.Infof("SDS server for workload certificates started, listening on %q", security.WorkloadIdentitySocketPath)
    	return s
    }
    
    func (s *Server) initWorkloadSdsService() {
    	s.grpcWorkloadServer = grpc.NewServer(s.grpcServerOptions()...)
    	s.workloadSds.register(s.grpcWorkloadServer)
    
    	var err error
    	// 对于agent<-> envoy的通讯采用的是unix的方式,通过文件管道的形式
    	s.grpcWorkloadListener, err = uds.NewListener(security.WorkloadIdentitySocketPath)
    	if err != nil {
    		sdsServiceLog.Errorf("Failed to set up UDS path: %v", err)
    	}
        s.grpcWorkloadListener, err = uds.NewListener(security.WorkloadIdentitySocketPath);
        s.grpcWorkloadServer.Serve(s.grpcWorkloadListener)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    XDS代理服务器

    xdsproxy主要作为istiod<->envoy通讯的桥梁,istiod下发配置到envoy的具体流程

    1. istiod生成配置后推送给conn连接
    2. xdsproxy接受到istiod传来的数据后,进行判断如果不是自己的则转发给下面的envoy
    3. envoy接收到配置后进行处理

    所以说envoy并不直接与istiod进行通讯,那么也就不用关心与它的认证的关系,这一些列的操作都由xdsproxy完成
    下面让我们看一下envoy服务注册的具体代码

    // grpc注册的handler
    var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{
    	ServiceName: "envoy.service.discovery.v3.AggregatedDiscoveryService",
    	HandlerType: (*AggregatedDiscoveryServiceServer)(nil),
    	Methods:     []grpc.MethodDesc{},
    	Streams: []grpc.StreamDesc{
    		{
    			StreamName:    "StreamAggregatedResources",
    			Handler:       _AggregatedDiscoveryService_StreamAggregatedResources_Handler,
    			ServerStreams: true,
    			ClientStreams: true,
    		},
    	},
    }
    func _AggregatedDiscoveryService_StreamAggregatedResources_Handler(srv interface{}, stream grpc.ServerStream) error {
    	return srv.(AggregatedDiscoveryServiceServer).StreamAggregatedResources(&aggregatedDiscoveryServiceStreamAggregatedResourcesServer{stream})
    }
    func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    	proxyLog.Debugf("accepted XDS connection from Envoy, forwarding to upstream XDS server")
        // 这里我们看到调用的是handleStream
        // 在discovery里调用的stream
    	return p.handleStream(downstream)
    }
    
    
    
    func (p *XdsProxy) handleStream(downstream adsStream) error {
    	// 创建代理连接
    	con := &ProxyConnection{
    		conID:           connectionNumber.Inc(),
    		upstreamError:   make(chan error, 2), // can be produced by recv and send
    		downstreamError: make(chan error, 2), // can be produced by recv and send
    		requestsChan:    make(chan *discovery.DiscoveryRequest, 10),
    		responsesChan:   make(chan *discovery.DiscoveryResponse, 10),
    		stopChan:        make(chan struct{}),
    		downstream:      downstream,
    	}
    	// 赋值给当前xdsproxy
    	p.RegisterStream(con)
    	defer p.UnregisterStream(con)
    
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    	defer cancel()
    	// 创建上游连接,就是构建与istiod的连接
    	// 证书信息在创建时已经初始化
    	upstreamConn, err := p.buildUpstreamConn(ctx)
    	// 这里创建与isitodgrpc通讯的客户端
    	xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
    	ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
    	for k, v := range p.xdsHeaders {
    		ctx = metadata.AppendToOutgoingContext(ctx, k, v)
    	}
    	// 开始调用服务注册请求
    	return p.HandleUpstream(ctx, con, xds)
    }
    
    func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
    	// 调用grpc服务注册方法
    	upstream, err := xds.StreamAggregatedResources(ctx,
    		grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
    	con.upstream = upstream
    	// 向上游发送数据方法处理函数
    	go p.handleUpstreamRequest(con)
    	// 接受上游发送的数据方法处理函数
    	go p.handleUpstreamResponse(con)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    这两个方法就是istio<->envoy的基本逻辑

    istio->envoy

    func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
    	forwardEnvoyCh := make(chan *discovery.DiscoveryResponse, 1)
    	for {
    		select {
    		// 接受数据
    		case resp := <-con.responsesChan:
    			// TODO: separate upstream response handling from requests sending, which are both time costly
    			proxyLog.Debugf("response for type url %s", resp.TypeUrl)
    			metrics.XdsProxyResponses.Increment()
    			// 判断当前请求的url,如果是调用的xdsproxy则进行处理,否则转发给envoy
    			if h, f := p.handlers[resp.TypeUrl]; f {
    				if len(resp.Resources) == 0 {
    					// Empty response, nothing to do
    					// This assumes internal types are always singleton
    					break
    				}
    				err := h(resp.Resources[0])
    				var errorResp *google_rpc.Status
    				if err != nil {
    					errorResp = &google_rpc.Status{
    						Code:    int32(codes.Internal),
    						Message: err.Error(),
    					}
    				}
    				// Send ACK/NACK
    				con.sendRequest(&discovery.DiscoveryRequest{
    					VersionInfo:   resp.VersionInfo,
    					TypeUrl:       resp.TypeUrl,
    					ResponseNonce: resp.Nonce,
    					ErrorDetail:   errorResp,
    				})
    				continue
    			}
    			switch resp.TypeUrl {
    			case v3.ExtensionConfigurationType:
    				if features.WasmRemoteLoadConversion {
    					// If Wasm remote load conversion feature is enabled, rewrite and send.
    					go p.rewriteAndForward(con, resp, func(resp *discovery.DiscoveryResponse) {
    						// Forward the response using the thread of `handleUpstreamResponse`
    						// to prevent concurrent access to forwardToEnvoy
    						select {
    						case forwardEnvoyCh <- resp:
    						case <-con.stopChan:
    						}
    					})
    				} else {
    					// 将数据发送给envoy
    					// Otherwise, forward ECDS resource update directly to Envoy.
    					forwardToEnvoy(con, resp)
    				}
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    envoy->istiod

    func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) {
    	initialRequestsSent := atomic.NewBool(false)
    	go func() {
    		for {
    			// recv xds requests from envoy
                // 接受envoy传来的数据
    			req, err := con.downstream.Recv()
    			if err != nil {
    				select {
    				case con.downstreamError <- err:
    				case <-con.stopChan:
    				}
    				return
    			}
    
    			// forward to istiod
                // 发送给istiod
    			con.sendRequest(req)
    		}
    	}()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    总结

    至此agnet流程分析结束,本文还有主要分析了证书的使用,对于status判断,审计,日志等没有进行分析.
    本文感觉agent的配置有些混乱有些用法重复.
    可能有同学观察到,SDSserver中有xdsserver,它与xdsproxy有什么不一样的地方嘛?
    这里讲解一下,sds对于证书的修改与推送采用的是istiod推送到envoy的那套机制,推送到管道,管道获取后判断资源类型然后生成envoy识别的配置下发.由于sds并不需要监控那么多资源(一个secret资源就够了)所以创建一个xdsserver感觉有些冗余.
    xdsproxy主要用于envoy<->istiod的通讯

  • 相关阅读:
    2022年值得关注的5个区块链项目 数字藏品平台开发搭建
    《实战:如何搭建一个完整的 Vue2.0 项目》- 7、Vue2.x 项目 webpack 4 升级 5(半自动升级)
    2020年12月 Python(二级)真题解析#中国电子学会#全国青少年软件编程等级考试
    摊牌了,请各位做好一年内随时失业的准备
    时序分析 47 -- 时序数据转为空间数据 (六) 马尔可夫转换场 python 实践(中)
    工作流程引擎有几个特点?可以提高办公效率吗?
    Python万圣节蝙蝠
    生成型神经网络
    MarkText如何实现图床-解决md上传到csdn图片不显示的问题
    Spring更简单的读取和存储Bean对象
  • 原文地址:https://blog.csdn.net/a1023934860/article/details/126053998