• docker安装flink


    docker安装flink

    5.1、拉取flink镜像,创建网络

    docker pull flink
    docker network create flink-network
    
    • 1
    • 2

    5.2、创建 jobmanager

    # 创建 JobManager 
    docker run \
     -itd \
     --name=jobmanager \
     --publish 8081:8081 \
     --network flink-network \
     --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
     flink:latest jobmanager 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5.3、创建 TaskManager

    # 创建 TaskManager 
     docker run \
      -itd \
      --name=taskmanager \
      --network flink-network \
      --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
      flink:latest taskmanager 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5.4、访问公网ip

    http://localhost:8081/

    访问 http://150.158.119.225/:8081/

    5.5 修改Task Slots

    默认的Slots num是1,我们可以修改为5:
    修改的目录是jobmanager和taskmanager的/opt/flink/confflink-conf.yaml文件:

    修改taskmanager.numberOfTaskSlots:即可。
    注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:

    docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
    docker cp flink-conf.yaml taskmanager:/opt/flink/conf/
    
    • 1
    • 2

    5.6、通过flinksql消费Kafka

    Docker安装kafka 3.5
    并且通过python,简单写一个生产者
    Python生产、消费Kafka

    5.7 导入flink-sql-connector-kafka jar

    顾名思义,用于连接flinksql和kafka。
    进入flink

    docker exec -it jobmanager /bin/bash
    
    • 1

    进入 flink的bin目录

    cd /opt/flink/bin
    
    • 1

    查看flink版本:

    flink --version
    
    • 1

    根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
    https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
    因为我是1.18.0,所以选择下图的版本包:

    将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib目录下,注意,是两个都要放,如下图:

    可以使用docker cp test.txt jobmanager:/opt/flink/lib命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可

    docker cp test.txt jobmanager:/opt/flink/lib
    docker cp test.txt taskmanager:/opt/flink/lib
    
    • 1
    • 2

    5.8 flinksql消费kafka

    java结合日志

    kafka.send("GatewayLog", JSONUtil.toJsonStr(gatewayLog));
    
    • 1

    GatewayLog是topic

    yaml的服务配置

    spring:
      kafka:
        bootstrap-servers: "10.10.10.155:9092"
        consumer:
          group-id: "teleGatewayGroup"
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我在本地生成了一条log,将使用flinksql处理这个数据。

    进入jobmanager中,执行

    cd /opt/flink/bin
    sql-client.sh
    
    • 1
    • 2

    Flink SQL执行以下语句:

    CREATE TABLE GatewayLog (
        platform VARCHAR,
        serviceId VARCHAR,
        targetServer VARCHAR,
        requestPath VARCHAR,
        requestMethod VARCHAR,
        schema VARCHAR,
        requestContentType VARCHAR,
        headers VARCHAR,
        requestBody VARCHAR,
        ip VARCHAR,
        startTime TIMESTAMP,
        endTime VARCHAR,
        executeTime VARCHAR,
        status VARCHAR,
        nickName VARCHAR,
        account VARCHAR,
        accountType VARCHAR,
        serviceName VARCHAR,
        orgCode VARCHAR
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'GatewayLog',
      'properties.bootstrap.servers' = '150.158.119.225:9092',
      'properties.group.id' = 'flinKGroup',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    );
    
    select * from GatewayLog;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    可以看到Flink在消费kafka数据,如下图:

    中间缺少很多包。
    在这里插入图片描述

    flink-connector-kafka

    https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.1.0-1.18
    
    • 1

    依赖的kafka-clients

    https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/3.6.1
    
    • 1

    然后在Linux需要看权限问题。

    chmod -R 777 /lib 
    
    • 1

    把文件夹都改成777 所有人。

    然后执行

    sql最好先改成varchar,变成成功。

    最后select * from table

    执行成功。

    在这里插入图片描述

  • 相关阅读:
    Python爬虫技术系列-03/4flask结合requests测试静态页面和动态页面抓取
    手机用户的开源福音「GitHub 热点速览」
    【无标题】
    【暴力剪枝】CF1708D
    【Web】CSS学习笔记
    【MySQL】排序和分页
    Centos7搭建hadoop3.3.4分布式集群
    SynchronousQueue源码分析_第二讲:非公平模式TransferStack
    实战演练 | 在 MySQL 中计算每日平均日期或时间间隔
    c++20模块导入module
  • 原文地址:https://blog.csdn.net/qq_33472553/article/details/136274360