• dapr源码分析--injector


    dapr sidecar injector 是 dapr 几个独立软件之一, 功能是在 k8s 环境为用户服务注入 dapr runtime sidecar 容器.

    总览

    想要看懂 injector 工作原理, 我们需要从部署入手, 因为它是和 k8s 很多功能一起完成的.

    首先通过 helm 生成一份部署配置文件:

    1. helm repo add dapr https://dapr.github.io/helm-charts/
    2. helm repo update
    3. helm template dapr dapr/dapr > dapr.yaml

    dapr.yaml 就是生成出来的部署配置文件.

    搜寻 dapr-sidecar-injector 相关的配置, 可以看到除了常规的 Deployment 和 Service 配置还有个配置:

    1. ---
    2. # Source: dapr/charts/dapr_sidecar_injector/templates/dapr_sidecar_injector_webhook_config.yaml
    3. apiVersion: admissionregistration.k8s.io/v1
    4. kind: MutatingWebhookConfiguration
    5. metadata:
    6. name: dapr-sidecar-injector
    7. labels:
    8. app: dapr-sidecar-injector
    9. webhooks:
    10. - name: sidecar-injector.dapr.io
    11. clientConfig:
    12. service:
    13. namespace: youku-smart-asi
    14. name: dapr-sidecar-injector
    15. path: "/mutate"
    16. caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURNakNDQWhxZ0F3SUJBZ0lSQUx1QXREV0dkRm40K1BQSm5KbGpGblF3RFFZSktvWklodmNOQVFFTEJRQXcKSXpFaE1COEdBMVVFQXhNWVpHRndjaTF6YVdSbFkyRnlMV2x1YW1WamRHOXlMV05oTUI0WERUSXhNRGd4TXpBegpORGsxTkZvWERUTXhNRGd4TVRBek5EazFORm93SXpFaE1COEdBMVVFQXhNWVpHRndjaTF6YVdSbFkyRnlMV2x1CmFtVmpkRzl5TFdOaE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRkFBT0NBUThBTUlJQkNnS0NBUUVBd1hZZE1uMlUKSUd2STB1bERIZEduSi82RlN0T2hHZmdhZHppRHdOTVh3VEt1M0luT0NsY0tNVVNsYU0rL2dSVUtFeG1USWF2OApHUldjQXRGQlZNNTU3K2g1bU5aQ0NNQVJEVWtuU0tYbUoxTHRKTVI1dEhsRE5WallvZmNwN21VbFM1cTdIY0s2CmV2Z2VrV1hZbmxQQ25oT3dRNUVZbkgrV1R0ay8wOUg3b2w5N1NielZDU0pWbzFkN1JBL2UrRlJjTnR6ZWdEenIKTnd6S1RodXg2d1pBQ3g1S3cxVTdMR3FJZjNXRWxvM25PK21zZS8xbDdjTlJhMjhwQzc5SUJsMDlFNUFZd0hYNworK2pTM3gwbFE0NVh2Tk02R3JWMTY3UXBFdWl6cG5FRTFiZHRPV1RYYlpEQ3hVRGNISWVUc0plTDBlY3hGNnl5CldGbWR4RktWWGNudFBRSURBUUFCbzJFd1h6QU9CZ05WSFE4QkFmOEVCQU1DQXFRd0hRWURWUjBsQkJZd0ZBWUkKS3dZQkJRVUhBd0VHQ0NzR0FRVUZCd01DTUE4R0ExVWRFd0VCL3dRRk1BTUJBZjh3SFFZRFZSME9CQllFRk1tWApxaWpJZXZaaUxxYVZDQnprMWc2Z0R2UlhNQTBHQ1NxR1NJYjNEUUVCQ3dVQUE0SUJBUUFMQXVVRWxQc2E4NzdpCjJRVWdGVnlCdkx2SWJRdFd6Tk5Fd2kxOVZ1WmRCQkg3RXZwVEI5NmNEYXN4aWtrZklEcXB5ZGhtSDFqN2xPNHYKWjVjNVpMdHVaUklFWjViTlBidmpjcy9vOWFVMG1rbUhEd0FQazV4UFZVSDA4Mm1YcWdEOTVqM0w4UkFTRnlucgpydjg0UXd0ZDZ1L1cydWVLTitvV2Mya0szaVVMcU85aE1pWnVkYWVMRHg5Q2JQUVhCSTU1ZWF5TExzeXpoUjFpCllFK0dMMmtVM2dpaFlZbzNFVU9mSXJta01tYVdTRVVBNEtOMytveGhtb25JR2N5aEdHZWU1RUNaOEgraTUyTy8KMG5ra1lUaStIWUhQRmg3TnFZRWFldlcxOE00RGo0d1hBeVpIclBsdDkwZVR3SXdxNzY5RGZrdU91Rjcyck9PbgpkRHIwQ3d3dQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==
    17. rules:
    18. - apiGroups:
    19. - ""
    20. apiVersions:
    21. - v1
    22. resources:
    23. - pods
    24. operations:
    25. - CREATE
    26. failurePolicy: Ignore
    27. sideEffects: None
    28. admissionReviewVersions: ["v1", "v1beta1"]

    可以看到配置类型是 MutatingWebhookConfiguration , 它是做什么的呢?

    根据上面配置 webhooks 和 rules 配置, 大概能够猜出来: 在 k8s pod 资源创建时, 给 dapr-sidecar-injector service 的路由 /mutate 发送一个 webhook 请求.

    查看官方文档 https://kubernetes.io/docs/reference/access-authn-authz/extensible-admission-controllers 可以看到, 确实和猜想的没有太大出入. Mutating admission webhooks 可以定义一个 webhook, 会在对应的时机率先被调用, 并且允许我们对该资源进行修改, 并在修改完成后继续执行相应步骤.

    综上所述, 需要 sidecar injector 做的事情就是提供一个 http webhook handler, 收到 pod 创建请求时, 根据 pod 信息为其注入 dapr sidecar 容器(响应相应的 patchOps 修改操作).

    kubernetes api请求的生命周期如下:

    源码分析

    代码入口

    代码路径:cmd/injector/main.go

    1. func main() {
    2. // ...
    3. uids, err := injector.AllowedControllersServiceAccountUID(ctx, kubeClient)
    4. if err != nil {
    5. log.Fatalf("failed to get authentication uids from services accounts: %s", err)
    6. }
    7. injector.NewInjector(uids, cfg, daprClient, kubeClient).Run(ctx)
    8. shutdownDuration := 5 * time.Second
    9. log.Infof("allowing %s for graceful shutdown to complete", shutdownDuration)
    10. <-time.After(shutdownDuration)
    11. }

    从源码上看,主要流程是实例化injector对象,然后运行。NewInjector 函数本质是创建一个http server,添加了webhook路由 mux.HandleFunc("/mutate", i.handleRequest), handleRequest是真正处理webhook的逻辑。

    核心代码

    handleRequest

    1. func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
    2. // 检验请求参数
    3. // ...
    4. // 反序列化拿到请求参数
    5. ar := v1.AdmissionReview{}
    6. _, gvk, err := i.deserializer.Decode(body, nil, &ar)
    7. if err != nil {
    8. log.Errorf("Can't decode body: %v", err)
    9. } else {
    10. // 检查账号权限
    11. if !(utils.StringSliceContains(ar.Request.UserInfo.UID, i.authUIDs) || utils.StringSliceContains(systemGroup, ar.Request.UserInfo.Groups)) {
    12. log.Errorf("service account '%s' not on the list of allowed controller accounts", ar.Request.UserInfo.Username)
    13. } else if ar.Request.Kind.Kind != "Pod" { // 排除其他非 pod 资源的请求
    14. log.Errorf("invalid kind for review: %s", ar.Kind)
    15. } else {
    16. // 构造出 pod 资源需要的修改操作
    17. patchOps, err = i.getPodPatchOperations(&ar, i.config.Namespace, i.config.SidecarImage, i.config.SidecarImagePullPolicy, i.kubeClient, i.daprClient)
    18. if err == nil {
    19. patchedSuccessfully = true
    20. }
    21. }
    22. }
    23. // 根据上述结果构造出 webhook 响应
    24. // ...
    25. }
    除了请求校验和响应构建, 核心就是调用 getPodPatchOperations 函数构造出修改操作 patchOps 后续会作为响应 response.patch 返回, response.patch 为 []byte 类型, JSON 序列化之后会变成 base64 格式字符串。webhook的response格式如下:
    1. {
    2. "apiVersion": "admission.k8s.io/v1",
    3. "kind": "AdmissionReview",
    4. "response": {
    5. "uid": "",
    6. "allowed": true,
    7. "patchType": "JSONPatch",
    8. "patch": "W3sib3AiOiAiYWRkIiwgInBhdGgiOiAiL3NwZWMvcmVwbGljYXMiLCAidmFsdWUiOiAzfV0="
    9. }
    10. }

    getPodPatchOperations

    getPodPatchOperations 函数主要逻辑如下:

    1. 如果 pod dapr.io/enabled 注解不为 true 或者是否已有 dapr sidecar, 直接 return
    2. 根据 pod 注解构建出 sidecar 容器配置
    3. 为用户 app 注入 DAPR_HTTP_PORT 和 DAPR_GRPC_PORT 两个环境变量
    1. func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
    2. namespace, image, imagePullPolicy string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
    3. // ...
    4. // 过滤不需要注入的 pod 请求
    5. if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
    6. return nil, nil
    7. }
    8. // ...
    9. // 通过 dapr 全局配置 crd 读取是否启用 mTLS
    10. mtlsEnabled := mTLSEnabled(daprClient)
    11. // 根据 k8s secrets API 读取信任链 cert
    12. trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
    13. identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
    14. // 构建出 sidecar 容器配置
    15. sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSvcAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)
    16. // ...
    17. // 如果没有用户容器, 直接创建 dapr sidecar
    18. if len(pod.Spec.Containers) == 0 {
    19. path = containersPath
    20. value = []corev1.Container{*sidecarContainer}
    21. } else {
    22. // 有用户容器, 为用户容器注入环境变量
    23. envPatchOps = addDaprEnvVarsToContainers(pod.Spec.Containers)
    24. // 容器列表添加 dapr sidecar
    25. path = "/spec/containers/-"
    26. value = sidecarContainer
    27. }
    28. patchOps = append(
    29. patchOps,
    30. PatchOperation{
    31. Op: "add",
    32. Path: path,
    33. Value: value,
    34. },
    35. )
    36. // 合并所有 patch 操作, 并返回
    37. patchOps = append(patchOps, envPatchOps...)
    38. return patchOps, nil
    39. }

    getSidecarContainer

    getSidecarContainer 函数会根据上面条件创建出 dapr sidecar, 支持通过很多 dapr.io/ 开头的注解来控制 sidecar 容器参数:

    1. func getSidecarContainer(annotations map[string]string, id, daprSidecarImage, imagePullPolicy, namespace, controlPlaneAddress, placementServiceAddress string, tokenVolumeMount *corev1.VolumeMount, trustAnchors, certChain, certKey, sentryAddress string, mtlsEnabled bool, identity string) (*corev1.Container, error) {
    2. // ...
    3. cmd := []string{"/daprd"}
    4. args := []string{
    5. "--mode", "kubernetes",
    6. "--dapr-http-port", fmt.Sprintf("%v", sidecarHTTPPort),
    7. "--dapr-grpc-port", fmt.Sprintf("%v", sidecarAPIGRPCPort),
    8. "--dapr-internal-grpc-port", fmt.Sprintf("%v", sidecarInternalGRPCPort),
    9. "--dapr-listen-addresses", sidecarListenAddresses,
    10. "--dapr-public-port", fmt.Sprintf("%v", sidecarPublicPort),
    11. "--app-port", appPortStr,
    12. "--app-id", id,
    13. "--control-plane-address", controlPlaneAddress,
    14. "--app-protocol", getProtocol(annotations),
    15. "--placement-host-address", placementServiceAddress,
    16. "--config", getConfig(annotations),
    17. "--log-level", getLogLevel(annotations),
    18. "--app-max-concurrency", fmt.Sprintf("%v", maxConcurrency),
    19. "--sentry-address", sentryAddress,
    20. fmt.Sprintf("--enable-metrics=%t", metricsEnabled),
    21. "--metrics-port", fmt.Sprintf("%v", metricsPort),
    22. "--dapr-http-max-request-size", fmt.Sprintf("%v", requestBodySize),
    23. }
    24. // ...
    25. c := &corev1.Container{
    26. Name: sidecarContainerName,
    27. Image: daprSidecarImage,
    28. ImagePullPolicy: pullPolicy,
    29. SecurityContext: &corev1.SecurityContext{
    30. AllowPrivilegeEscalation: &allowPrivilegeEscalation,
    31. },
    32. Ports: ports,
    33. Command: cmd,
    34. Env: []corev1.EnvVar{
    35. {
    36. Name: "NAMESPACE",
    37. Value: namespace,
    38. },
    39. },
    40. Args: args,
    41. ReadinessProbe: &corev1.Probe{
    42. Handler: httpHandler,
    43. InitialDelaySeconds: getInt32AnnotationOrDefault(annotations, daprReadinessProbeDelayKey, defaultHealthzProbeDelaySeconds),
    44. TimeoutSeconds: getInt32AnnotationOrDefault(annotations, daprReadinessProbeTimeoutKey, defaultHealthzProbeTimeoutSeconds),
    45. PeriodSeconds: getInt32AnnotationOrDefault(annotations, daprReadinessProbePeriodKey, defaultHealthzProbePeriodSeconds),
    46. FailureThreshold: getInt32AnnotationOrDefault(annotations, daprReadinessProbeThresholdKey, defaultHealthzProbeThreshold),
    47. },
    48. LivenessProbe: &corev1.Probe{
    49. Handler: httpHandler,
    50. InitialDelaySeconds: getInt32AnnotationOrDefault(annotations, daprLivenessProbeDelayKey, defaultHealthzProbeDelaySeconds),
    51. TimeoutSeconds: getInt32AnnotationOrDefault(annotations, daprLivenessProbeTimeoutKey, defaultHealthzProbeTimeoutSeconds),
    52. PeriodSeconds: getInt32AnnotationOrDefault(annotations, daprLivenessProbePeriodKey, defaultHealthzProbePeriodSeconds),
    53. FailureThreshold: getInt32AnnotationOrDefault(annotations, daprLivenessProbeThresholdKey, defaultHealthzProbeThreshold),
    54. },
    55. }
    56. // ...
    57. }

    举例

    deployment.yaml

    1. ---
    2. apiVersion: apps/v1
    3. kind: Deployment
    4. metadata:
    5. namespace: youku-smart-asi
    6. name: pythonapp
    7. labels:
    8. app: python
    9. spec:
    10. replicas: 1
    11. selector:
    12. matchLabels:
    13. app: python
    14. template:
    15. metadata:
    16. labels:
    17. app: python
    18. annotations:
    19. dapr.io/enabled: "true"
    20. dapr.io/app-id: "pythonapp"
    21. spec:
    22. tolerations:
    23. - effect: NoSchedule
    24. key: sigma.ali/resource-pool
    25. value: ackee_pool
    26. - effect: NoSchedule
    27. key: sigma.ali/is-ecs
    28. operator: Exists
    29. containers:
    30. - name: python
    31. image: dapriosamples/hello-k8s-python:1.2.0

    执行kubectl apply -f deployment.yaml,在部署dapr的k8s集群中,kubectl edit查看pod,如下图,我们发现pod的yaml文件中增加了一个containers,这部分就是injector注入的daprd sidecar容器。

    总结

    在阅读dapr injector源码之前,需要知道k8s的动态准入控制原理,否则无法理解代码的功能。

    参考

    k8s动态准入控制原理 动态准入控制 | Kubernetes

  • 相关阅读:
    如何使用ArcGIS Pro将等高线转DEM
    CRUD之文件的上传和下载
    springboot项目作为静态文件服务器
    作业-11.8
    JavaScript 日常开发的 9 个实用代码片段 (part 1)
    汽车工业能效管理平台助力能源管理体系的建立和实施
    Gitee——详细教程如何将远程仓库与本地仓库建立链接
    AQS源码解析 2.简介 & 内部核心结构
    被315点名的流氓下载器,又回来了…
    Idea 中 Git 不提交当前分支修改代码并切换分支
  • 原文地址:https://blog.csdn.net/cbmljs/article/details/127575428