• flink on k8s


    一、环境准备

    k8s平台:kubesphere

    image-20220804180605541

    k8s中每个命名空间都有一个默认服务帐户。但是,default 服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新 default 服务账号的权限或指定另一个绑定了正确角色的服务账号。

    kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
    
    • 1

    如果不想使用 default 服务帐户,可以使用以下命令创建新的 flink-service-account 服务帐户并设置角色绑定。然后使用 config 选项 -Dkubernetes.service-account=flink-service-account 使 JobManager pod 使用 flink-service-account 服务帐户来创建/删除 TaskManager pod。

    kubectl create serviceaccount flink-service-account
    kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account
    
    • 1
    • 2

    若权限不足 flink 在任务启动时将无法创建 akka

    其他环境:

    java(1.8)、docker(20.10.17)、flink(1.13.6)、mysql(8.0.23)

    二、开始部署

    2.1 编写 flink 任务

    这里创建一个简单的 flink 任务,每秒生成一个随机数写入 mysql 中

    package test;
    
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author wjun
     * @date 2022/8/4 15:09
     * @email wjunjobs@outlook.com
     * @describe
     */
    public class K8sDemo {
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        env.addSource(new SourceFunction<String>() {
          private volatile boolean isRunning = true;
    
          @Override
          public void run(SourceContext<String> ctx) throws Exception {
            while (isRunning) {
              ctx.collect(UUID.randomUUID().toString());
              TimeUnit.SECONDS.sleep(1);
            }
          }
    
          @Override
          public void cancel() {
            isRunning = false;
          }
        }).addSink(JdbcSink.sink(
          "insert into dev.k8s values(?)",
          (ps, t) -> {
            ps.setString(1, t);
          },
          JdbcExecutionOptions.builder().withBatchSize(1).build(),
          new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
          .withUrl("jdbc:mysql://***")
          .withUsername("***")
          .withPassword("***")
          .build()
        ));
    
    
        env.execute();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    最终将任务打成 jar 包

    2.2 构建镜像

    这里使用 Application Mode 模式在生产环境可以为应用提供更好的隔离。on k8s 要求代码与 flink 镜像绑定在一起,Application Mode 确保在应用程序终止后正确清理所有 Flink 组件。

    使用 flink 社区提供的基础 docker 镜像

    FROM flink:1.13.6
    RUN mkdir -p $FLINK_HOME/jobs
    COPY flink-on-k8s.jar $FLINK_HOME/jobs/flink-on-k8s.jar
    
    • 1
    • 2
    • 3

    最终 dockerfile 的工作空间如下:

    image-20220804174533420

    构建 docker 镜像

    docker build -t super/flink-on-k8s-demo .
    
    • 1

    image-20220804174634513

    2.3 提交任务

    使用下面命令提交任务

    flink run-application \
    --class test.K8sDemo \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-first-application-cluster \
    -Dkubernetes.container.image=super/flink-on-k8s-demo \
    local:///opt/flink/jobs/flink-on-k8s.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • –class: 指定任务的主类名
    • – target: 指定任务运行模式为 native k8s application
    • -Dkubernetes.cluster-id: 指定集群名称并且必须是唯一的,若不指定 flink 将随机生成
    • -Dkubernetes.container.image: 用于启动 pod 的镜像
    • local: 指定镜像的任务 jar

    image-20220804175056425

    kubeshpere 平台中查看任务情况

    image-20220804175158040

    mysql 中观察数据是否写入

    image-20220804175252557

    2.4 任务取消

    kubesphere 平台中点击应用负载-服务,根据提交任务时候指定的 cluster-id 找到对应的 rest 服务

    image-20220804175447672

    找到 NodePort 端口

    image-20220804175625323

    使用 节点ip:NodePort 即可进入熟悉的 flink web ui 点击 cancel 即可,同时 kubeshpere 会自动删除与之相关的组件。

    这样 flink on k8s 初步的任务提交、运行、取消就搞定啦

  • 相关阅读:
    Java 相关高频面试解析
    面向对象——实现类的基本操作
    pandas使用列表(list)数据创建dataframe、pandas使用嵌套列表(nested list)数据创建dataframe
    在UE中建立一个最简单的FRunnable所需的代码
    从零开始基于LLM构建智能问答系统的方案
    中国互联网众筹行业
    使用句法依存分析实现KBQA中的约束挂载
    入门力扣自学笔记149 C++ (题目编号1608)
    spingboot之devtools热部署IntelliJ IDEA 2022.2.3不生效问题,解决
    2023 羊城杯 final
  • 原文地址:https://blog.csdn.net/qq_41858402/article/details/126164308