• Flink实时计算中台Kubernates功能改造点


    背景

    平台为数据开发人员提供基本的实时作业的管理功能,其中包括jar、sql等作业的在线开发;因此中台需要提供一个统一的SDK支持平台能够实现flink jar作业的发布;绝大多数情况下企业可能会考虑Flink On Yarn的这个发布模式,但是伴随云原生的呼声越来越大,一些企业不希望部署一套YARN繁重的基座平台作为资源调度平台,期望使用容器的特性实现存储分离的架构;还有很多其他的原因…不在赘述

    改造步骤

    • 基于官方镜像重新打包flink服务,实现能够讲平台容器日志直接传输到kafka中,其次我们复写了Kubernates flink native的客户端,因此需修改flink-console.sh脚本,因此我们需要编写DockerFile重新打包镜像
      文件路径
    FROM flink:1.17.1-scala_2.12
    MAINTAINER jiangzhongzhou <jiangzhongzhou@jd.com>
    
    # 拷贝 client/kafka append文件到flink的lib下
    COPY client-1.17.1-1.0.jar $FLINK_HOME/lib/
    COPY kafka-clients-2.2.0.jar $FLINK_HOME/lib/
    # 修改flink-console.sh脚本启动类
    COPY flink-console.sh $FLINK_HOME/bin/flink-console.sh
    # 设定容器时区
    ENV TZ=Asia/Shanghai
    RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    flink-console.sh
    flink-console.sh脚本
    在Kubernetes的其他节点安装改镜像,我这里把名字为flink-mirror:1.0

    [root@CentOSB flink-mirror]# docker build -t flink-mirror:1.0 .
    [+] Building 0.1s (10/10) FINISHED                                                                                                                                d
     => [internal] load .dockerignore
     => => transferring context: 2B
     => [internal] load build definition from Dockerfile
     => => transferring dockerfile: 574B
     => [internal] load metadata for docker.io/library/flink:1.17.1-scala_2.12
     => [1/5] FROM docker.io/library/flink:1.17.1-scala_2.12
     => [internal] load build context
     => => transferring context: 432B
     => CACHED [2/5] COPY client-1.17.1-1.0.jar /opt/flink/lib/
     => CACHED [3/5] COPY kafka-clients-2.2.0.jar /opt/flink/lib/
     => CACHED [4/5] COPY flink-console.sh /opt/flink/bin/flink-console.sh
     => CACHED [5/5] RUN ln -snf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo Asia/Shanghai > /etc/timezone
     => exporting to image
     => => exporting layers
     => => writing image sha256:2c97c90b70f63a0a52241b2237f4eaa22316756001f54d5704ba86f85512c5c5
     => => naming to docker.io/library/flink-mirror:1.0
    [root@CentOSB flink-mirror]# docker images
    REPOSITORY                                                       TAG                 IMAGE ID       CREATED         SIZE
    flink-mirror                                                     1.0                 2c97c90b70f6   4 hours ago     859MB
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 痛点二
      安装官方的使用说明,在镜像Application-Mode部署的时候,用户需要更具发布的jar包每次都需要重新打包镜像,启动作业,这样在生产场景下比较满,导致作业的制作工艺比较复杂,因此我们需要针对TaskManager和JobManagwer的pod进行修改,总体思想是通过在构建TaskManager、JobManagwer pod的时候,自动挂载本地的NFS镜像资源Volume到镜像的/opt/flink/usrLib目录下,这样就可以不需要每个作业都打包了;同时在考虑kubernates可能需要访问大数据平台的组件,但是大数据平台的组件,基本上都是基于主机名的,因此我们还需要在kubernates上实现主机名挂载;

    • 卷挂载解决用户jar的问题

    
    /*在指定路径下挂载userLib服务*/
    public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {
       
    
        private final AbstractKubernetesParameters kubernetesComponentConf;
    
        public UserLibMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {
       
            this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);
        }
    
        @Override
        public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
       
            final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());
    
            final Container mountedMainContainer =
                    new ContainerBuilder(flinkPod.getMainContainer())
                            .addNewVolumeMount()
                            .withName(getUserLibName(kubernetesComponentConf.getClusterId()))
                            .withMountPath(FLINK_USER_LIB)
                            .endVolumeMount()
                            .build();
    
            return new FlinkPod.Builder(flinkPod)
                    .withPod(mountedPod)
                    .withMainContainer(mountedMainContainer)
                    .build();
        }
    
        private Pod decoratePod(Pod pod) {
       
    
            final Volume podTemplateVolume =
                    new VolumeBuilder()
                            .withName(getUserLibName(kubernetesComponentConf.getClusterId()))
                            .withNfs(
                                    new NFSVolumeSource(
                                            kubernetesComponentConf
                                                    .getFlinkConfiguration()
                                                    .getValue
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
  • 相关阅读:
    C#:实现分枝绑定背包求解器算法(附完整源码)
    【C++常见八股2】vector 的 push_back 扩容问题 | char *和 char [] 区别
    测试技术:关于上下文驱动测试的总结
    Hproxy项目前端
    Spring MVC(七) 异常处理
    21天学习挑战:经典算法---顺序查找
    docker版jxTMS使用指南:4.6版升级内容
    【回眸】HighTec编译文件烧录及串口调试
    SpringMVC之JSR303和拦截器
    【小程序开发实战】使用WxJava实现手机号获取
  • 原文地址:https://blog.csdn.net/weixin_38231448/article/details/132640813