在正式开始源码分析之前,我们需要了解一下agent的配置
type Agent struct {
// 代理配置,配置envoy运行文件目录,代理镜像地址等
// 与mesh.proxyconfig属性一致
proxyConfig *mesh.ProxyConfig
// 主要将proxyConfig,secOpts信息进行封装
cfg *AgentOptions
// 安全信息配置,主要存储了静态证书的地址,证书提供者等
secOpts *security.Options
// envoy运行时所需要的信息,比如envoy二进制文件,日志级别等
envoyOpts envoy.ProxyConfig
envoyAgent *envoy.Agent
// envoy等待管道
envoyWaitCh chan error
// SDSGRPC服务器,主要用于工作负载的证书申请
// sds会生成证书然后调用secretCache对证书进行签证,完成后发送给envoy
sdsServer *sds.Server
// 用于SDS证书签证,可以通过文件的形式进行签证
// 默认使用istiod对工作负载进行签证
secretCache *cache.SecretManagerClient
// Used when proxying envoy xds via istio-agent is enabled.
// agnet的XDS服务器,主要用于连接上游istiod与下游envoy通讯的桥梁
// envoy发送注册事件给agent,agent 会将注册事件转发给istiod
// istiod下发的配置会通过agent传输给envoy
xdsProxy *XdsProxy
// 证书监听器,监听证书更新事件然后触发证书更新策略
// 主要是获取证书然后重新进行签证生成envoy配置下发
caFileWatcher filewatcher.FileWatcher
// local DNS Server that processes DNS requests locally and forwards to upstream DNS if needed.
// 本地DNS服务器
localDNSServer *dnsClient.LocalDNSServer
// Signals true completion (e.g. with delayed graceful termination of Envoy)
wg sync.WaitGroup
}
以上配置包含了agent运行过程中使用到的所有配置,我们对他们混个眼熟后就开始逐个击破吧!
下面重点分析proxyConfig,cfg,secOpts.
代理配置,属于mesh中的属性,可以对全局的服务网格中的代理进行配置.
type ProxyConfig struct {
// 生成配置输出的目录,默认为./etc/istio/proxy 主要生成envoy的bootstrap配置文件
ConfigPath string
// envoy运行文件的目录
BinaryPath string
// 当前服务的名称istio-proxy
ClusterName isProxyConfig_ClusterName `protobuf_oneof:"cluster_name"`
// envoy热启动耗时时间
DrainDuration *duration.Duration
// envoy关闭等待时间
ParentShutdownDuration *duration.Duration
// istiod地址,默认为istiod.istio-system.svc:15012
DiscoveryAddress string
//UDp地址
StatsdUdpAddress string
//envoyMetrics地址
EnvoyMetricsServiceAddress string
// 代理管理端口
ProxyAdminPort int32
// 连接控制层面 istiod 时的身份验证策略,默认为双向TLS
ControlPlaneAuthPolicy AuthenticationPolicy
// 自定义配置文件
CustomConfigFile string
// 指标最大名字长度
StatNameLength int32
// 工作线程
Concurrency *wrappers.Int32Value
// bootstrap模板文件目录
ProxyBootstrapTemplatePath string
// 流入代理策略
InterceptionMode ProxyConfig_InboundInterceptionMode
// 追踪
Tracing *Tracing
// 附加的环境变量
ProxyMetadata map[string]string
// 状态端口
StatusPort int32
// 额外证书列表
CaCertificatesPem []string
// 代理镜像地址
Image *v1beta1.ProxyImage
}
安全配置包含了agent要使用的证书,token等,强烈建议仔细理解,对于下面的证书请求,认证有很大的帮助.
type Options struct {
// CA签发服务器,默认为istiod.istio-system.svc:15012
CAEndpoint string
// 设置ServerName 来覆盖CAEndpoint提取的ServerName
CAEndpointSAN string
// CA提供者的名称,默认为Citadel说明使用内部的证书管理
CAProviderName string
// 是否生成PKCS#8私钥
Pkcs8Keys bool
// 为工作负载生成的证书输出目录
OutputKeyCertToDir string
// 客户端在认证时向CA服务器提供密钥和证书的目录
ProvCert string
//签名算法
ECCSigAlg string
// 证书提供者 默认为istiod
PilotCertProvider string
// STS port
STSPort int
//身份验证提供程序特定插件
TokenExchanger TokenExchanger
// 这里主要用于grpcPerRPCCredentials中的get.token使用
// 在请求发送前运行,将token值添加到请求头中
CredFetcher CredFetcher
// 凭证身份提供者
CredIdentityProvider string
// 与工作负载对应的命名空间
WorkloadNamespace string
// token管理器
TokenManager TokenManager
//证书签名者信息
CertSigner string
//系统CA证书根地址值
CARootPath string
// 本地现有证书集合,agent会首先使用该证书作为工作负载的证书.
CertChainFilePath string
// The path for an existing key file
KeyFilePath string
// The path for an existing root certificate bundle
RootCertFilePath string
}
对于证书的配置,我感觉istio配置的有些乱,接下来我会在证书章节对上面的配置进行讲解.
对于cfg,它就像一个缝合怪将上面的配置封装了一下,下面将重点讲解几个比较重要的属性
// XDSRootCerts is the location of the root CA for the XDS connection. Used for setting platform certs or
// using custom roots.
// 创建XDS服务器中的上游连接时所需要的Root证书,默认为/var/run/secrets/istio/root-cert.pem
// 这个是cm中的istio-ca-root-cert里的key
// istio-ca-root-cert是dicovery创建的,当命名空间被创建后,会自动为其创建一个istio-ca-root-cert
XDSRootCerts string
// CARootCerts of the location of the root CA for the CA connection. Used for setting platform certs or
// using custom roots.
// 用于SDS连接上游istiod时所用到的证书默认值也是/var/run/secrets/istio/root-cert.pem
CARootCerts string
// Path to local UDS to communicate with Envoy
// agent使用unix与envoy通讯,当前unix的目录
XdsUdsPath string
pilot-agent是istio向应用注入的sidecar容器,它主要有以下工作
cmd.PrintFlags(c.Flags())
log.Infof("Version %s", version.Info.String())
// 初始化代理结构体,解析podip,domain
proxy, err := initProxy(args)
if err != nil {
return err
}
// 生成代理配置
proxyConfig, err := config.ConstructProxyConfig(meshConfigFile, serviceCluster, options.ProxyConfigEnv, concurrency, proxy)
if err != nil {
return fmt.Errorf("failed to get proxy config: %v", err)
}
if out, err := protomarshal.ToYAML(proxyConfig); err != nil {
log.Infof("Failed to serialize to YAML: %v", err)
} else {
log.Infof("Effective config: %s", out)
}
//创建sts配置
secOpts, err := options.NewSecurityOptions(proxyConfig, stsPort, tokenManagerPlugin)
if err != nil {
return err
}
// If security token service (STS) port is not zero, start STS server and
// listen on STS port for STS requests. For STS, see
// https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16.
// STS is used for stackdriver or other Envoy services using google gRPC.
if stsPort > 0 {
stsServer, err := initStsServer(proxy, secOpts.TokenManager)
if err != nil {
return err
}
defer stsServer.Stop()
}
// If we are using a custom template file (for control plane proxy, for example), configure this.
if templateFile != "" && proxyConfig.CustomConfigFile == "" {
proxyConfig.ProxyBootstrapTemplatePath = templateFile
}
envoyOptions := envoy.ProxyConfig{
LogLevel: proxyLogLevel,
ComponentLogLevel: proxyComponentLogLevel,
LogAsJSON: loggingOptions.JSONEncoding,
NodeIPs: proxy.IPAddresses,
Sidecar: proxy.Type == model.SidecarProxy,
OutlierLogPath: outlierLogPath,
}
// 创建agent配置
agentOptions := options.NewAgentOptions(proxy, proxyConfig)
// 创建agent服务器(只是创建了一个结构体,具体功能实现在run中)
agent := istio_agent.NewAgent(proxyConfig, agentOptions, secOpts, envoyOptions)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// If a status port was provided, start handling status probes.
if proxyConfig.StatusPort > 0 {
// 状态服务器初始化操作
if err := initStatusServer(ctx, proxy, proxyConfig, agentOptions.EnvoyPrometheusPort, agent); err != nil {
return err
}
}
go iptableslog.ReadNFLOGSocket(ctx)
// On SIGINT or SIGTERM, cancel the context, triggering a graceful shutdown
go cmd.WaitSignalFunc(cancel)
// Start in process SDS, dns server, xds proxy, and Envoy.
// agent服务启动
wait, err := agent.Run(ctx)
if err != nil {
return err
}
wait()
对于配置在前序的时候已经讲解属性的含义,这里不再具体赘述.
我们主要讲解一下更改这些配置的方式
本文主要关心证书的配置,因为agent最重要的一个功能就是实现了工作负载到工作负载之间的双向TLS.
对于/var/run/secrets/istio/root-cert.pem这个证书的ROOTCA是由dicovery中的maybeCreateCA进行创建的,然后通过initMulticluster监听命名空间资源添加生成root-cert.pem证书的事件.
具体流程为dicovery创建了一个CA服务器,生成CA证书然后然后监视每个命名空间,为其创建cm
这里还有一个需要注意的就是当agent连接istiod时除了证书的双向认证外还需要JWT认证(默认),默认情况下是使用k8s进行认证也就是说传输的token值必须与k8s有关系,在agent中也是这么做的,agent会将k8s中defaultSA用户的token值映射到/var/run/secrets/tokens/istio-token,然后通过grpc的前置函数将当前token值添加到请求头中.
agnet的对于grpc的创建,envoy 的启动都在run方法中运行,接下里就让我们列举一下它都做了那些事.
接下本文将重点分析2,3步
在开始之前我们先说一下他们的通讯方式,
agent->envoy 使用的是unix通讯,这种方式没有使用tls,但是又保证了安全性,性能要比tls要好
agent->istiod 使用的是tls通讯,通过证书与istiod建立连接,然后通过token值进行身份校验
SDS主要用于envoy申请证书使用,申请的证书用作envoy->envoy(工作负载->工作负载)之间的mtls.
具体流程为
下面让我们看一下SDS服务器的创建代码
func (a *Agent) initSdsServer() error {
var err error
// 检查是否配置了静态CA证书,如果配置了直接使用该证书,就不用动态判断证书提供者从而获取证书了
if security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath, security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
log.Info("workload certificate files detected, creating secret manager without caClient")
a.secOpts.RootCertFilePath = security.WorkloadIdentityRootCertPath
a.secOpts.CertChainFilePath = security.WorkloadIdentityCertChainPath
a.secOpts.KeyFilePath = security.WorkloadIdentityKeyPath
a.secretCache, err = cache.NewSecretManagerClient(nil, a.secOpts)
if err != nil {
return fmt.Errorf("failed to start workload secret manager %v", err)
}
} else {
// 根据istio-ca-secret生成的root.pem(istio-ca-root-cert) 与istiod进行双向认证,然后建立证书创建管理连接
// 并返回该client
a.secretCache, err = a.newSecretManager()
if err != nil {
return fmt.Errorf("failed to start workload secret manager %v", err)
}
}
// 这里判断如果不使用envoy
if a.cfg.DisableEnvoy {
go func() {
st := a.secretCache
st.RegisterSecretHandler(func(resourceName string) {
_, _ = a.getWorkloadCerts(st)
})
_, _ = a.getWorkloadCerts(st)
}()
} else {
// 使用tls加速,是istio1.14新特性主要加速双向tls
pkpConf := a.proxyConfig.GetPrivateKeyProvider()
// 创建sds服务器,与istiod建立连接
a.sdsServer = sds.NewServer(a.secOpts, a.secretCache, pkpConf)
// 这里的OnSecretUpdate会通知XDS服务器重新生成证书推送给envoy
// 注意,推送的类型为secret,但是对该类型的生成器并不是discovery默认生成器,而是sdsservice类型生成器
// sdsservice该生成器的作用是重新向istiod获取证书,然后推送
a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
}
return nil
}
// NewServer creates and starts the Grpc server for SDS.
func NewServer(options *security.Options, workloadSecretCache security.SecretManager, pkpConf *mesh.PrivateKeyProvider) *Server {
s := &Server{stopped: atomic.NewBool(false)}
// 创建xds服务器,主要用于生成secert配置然后发送给envoy
s.workloadSds = newSDSService(workloadSecretCache, options, pkpConf)
// 与envoy建立unix连接使用socker,注册grpc方法,供envoy调用
s.initWorkloadSdsService()
sdsServiceLog.Infof("SDS server for workload certificates started, listening on %q", security.WorkloadIdentitySocketPath)
return s
}
func (s *Server) initWorkloadSdsService() {
s.grpcWorkloadServer = grpc.NewServer(s.grpcServerOptions()...)
s.workloadSds.register(s.grpcWorkloadServer)
var err error
// 对于agent<-> envoy的通讯采用的是unix的方式,通过文件管道的形式
s.grpcWorkloadListener, err = uds.NewListener(security.WorkloadIdentitySocketPath)
if err != nil {
sdsServiceLog.Errorf("Failed to set up UDS path: %v", err)
}
s.grpcWorkloadListener, err = uds.NewListener(security.WorkloadIdentitySocketPath);
s.grpcWorkloadServer.Serve(s.grpcWorkloadListener)
}
xdsproxy主要作为istiod<->envoy通讯的桥梁,istiod下发配置到envoy的具体流程
所以说envoy并不直接与istiod进行通讯,那么也就不用关心与它的认证的关系,这一些列的操作都由xdsproxy完成
下面让我们看一下envoy服务注册的具体代码
// grpc注册的handler
var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{
ServiceName: "envoy.service.discovery.v3.AggregatedDiscoveryService",
HandlerType: (*AggregatedDiscoveryServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamAggregatedResources",
Handler: _AggregatedDiscoveryService_StreamAggregatedResources_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
}
func _AggregatedDiscoveryService_StreamAggregatedResources_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(AggregatedDiscoveryServiceServer).StreamAggregatedResources(&aggregatedDiscoveryServiceStreamAggregatedResourcesServer{stream})
}
func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
proxyLog.Debugf("accepted XDS connection from Envoy, forwarding to upstream XDS server")
// 这里我们看到调用的是handleStream
// 在discovery里调用的stream
return p.handleStream(downstream)
}
func (p *XdsProxy) handleStream(downstream adsStream) error {
// 创建代理连接
con := &ProxyConnection{
conID: connectionNumber.Inc(),
upstreamError: make(chan error, 2), // can be produced by recv and send
downstreamError: make(chan error, 2), // can be produced by recv and send
requestsChan: make(chan *discovery.DiscoveryRequest, 10),
responsesChan: make(chan *discovery.DiscoveryResponse, 10),
stopChan: make(chan struct{}),
downstream: downstream,
}
// 赋值给当前xdsproxy
p.RegisterStream(con)
defer p.UnregisterStream(con)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// 创建上游连接,就是构建与istiod的连接
// 证书信息在创建时已经初始化
upstreamConn, err := p.buildUpstreamConn(ctx)
// 这里创建与isitodgrpc通讯的客户端
xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
for k, v := range p.xdsHeaders {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
// 开始调用服务注册请求
return p.HandleUpstream(ctx, con, xds)
}
func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
// 调用grpc服务注册方法
upstream, err := xds.StreamAggregatedResources(ctx,
grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
con.upstream = upstream
// 向上游发送数据方法处理函数
go p.handleUpstreamRequest(con)
// 接受上游发送的数据方法处理函数
go p.handleUpstreamResponse(con)
}
这两个方法就是istio<->envoy的基本逻辑
func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
forwardEnvoyCh := make(chan *discovery.DiscoveryResponse, 1)
for {
select {
// 接受数据
case resp := <-con.responsesChan:
// TODO: separate upstream response handling from requests sending, which are both time costly
proxyLog.Debugf("response for type url %s", resp.TypeUrl)
metrics.XdsProxyResponses.Increment()
// 判断当前请求的url,如果是调用的xdsproxy则进行处理,否则转发给envoy
if h, f := p.handlers[resp.TypeUrl]; f {
if len(resp.Resources) == 0 {
// Empty response, nothing to do
// This assumes internal types are always singleton
break
}
err := h(resp.Resources[0])
var errorResp *google_rpc.Status
if err != nil {
errorResp = &google_rpc.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}
}
// Send ACK/NACK
con.sendRequest(&discovery.DiscoveryRequest{
VersionInfo: resp.VersionInfo,
TypeUrl: resp.TypeUrl,
ResponseNonce: resp.Nonce,
ErrorDetail: errorResp,
})
continue
}
switch resp.TypeUrl {
case v3.ExtensionConfigurationType:
if features.WasmRemoteLoadConversion {
// If Wasm remote load conversion feature is enabled, rewrite and send.
go p.rewriteAndForward(con, resp, func(resp *discovery.DiscoveryResponse) {
// Forward the response using the thread of `handleUpstreamResponse`
// to prevent concurrent access to forwardToEnvoy
select {
case forwardEnvoyCh <- resp:
case <-con.stopChan:
}
})
} else {
// 将数据发送给envoy
// Otherwise, forward ECDS resource update directly to Envoy.
forwardToEnvoy(con, resp)
}
}
}
}
}
func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) {
initialRequestsSent := atomic.NewBool(false)
go func() {
for {
// recv xds requests from envoy
// 接受envoy传来的数据
req, err := con.downstream.Recv()
if err != nil {
select {
case con.downstreamError <- err:
case <-con.stopChan:
}
return
}
// forward to istiod
// 发送给istiod
con.sendRequest(req)
}
}()
}
至此agnet流程分析结束,本文还有主要分析了证书的使用,对于status判断,审计,日志等没有进行分析.
本文感觉agent的配置有些混乱有些用法重复.
可能有同学观察到,SDSserver中有xdsserver,它与xdsproxy有什么不一样的地方嘛?
这里讲解一下,sds对于证书的修改与推送采用的是istiod推送到envoy的那套机制,推送到管道,管道获取后判断资源类型然后生成envoy识别的配置下发.由于sds并不需要监控那么多资源(一个secret资源就够了)所以创建一个xdsserver感觉有些冗余.
xdsproxy主要用于envoy<->istiod的通讯