• 快速体验 Flink Table Store 入门篇


    在本地安装单机版本,能够实现快速体验 Flink Table Store 的目的,本文以 Flink 1.15.2、flink-table-store-dist-0.2.1 和 flink-shaded-hadoop-2-uber-2.8.3-10.0 为例,系统为 Centos 3.10。

    下载

    • 1、下载 Flink 1.15.2
    https://www.apache.org/dyn/closer.lua/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
    
    • 1
    • 2、下载 Table Store 包
    https://www.apache.org/dyn/closer.lua/flink/flink-table-store-0.2.1/flink-table-store-dist-0.2.1.jar
    
    • 1
    • 3、下载 hadoop 包
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    
    • 1

    安装

    将下载好的 flink-1.15.2-bin-scala_2.12.tgz、flink-table-store-dist-0.2.1.jar 和 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 都放到安装目录下,执行以下步骤完成安装:

    • tar -xzf flink-1.15.2-bin-scala_2.12.tgz

    • cp flink-table-store-dist-0.2.1.jar flink-1.15.2/lib/

    • cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-1.15.2/lib/

    配置

    编辑 conf 目录下的配置文件 flink-conf.yaml,将配置项 taskmanager.numberOfTaskSlots 的值改为 2 。

    vi ./conf/flink-conf.yaml
    
    taskmanager.numberOfTaskSlots: 2
    
    • 1
    • 2
    • 3

    启动本地(单机模式)集群

    ./bin/start-cluster.sh
    
    • 1

    如果启动成功,则执行 jps 命令可看到 TaskManagerRunner 和 StandaloneSessionClusterEntrypoint 两个进程,执行“netstat -lpnt”可看到开启了 127.0.0.1:8081 看板端口。如果想修改看板端口,则只需要编辑配置文件 conf/flink-conf.yaml,将配置项 rest.port 改为其它值,然后重启(执行 bin/stop-cluster.sh 停止本地集群)即可。默认看板的地址为 localhost,如果需要远程查看,则还需要修改配置项 rest.bind-address,比如可将 localhost 改为 0.0.0.0 。

    如果启动报错“Unsupported major.minor version 52.0”,这是因为 JDK 的版本不匹配,可执行命令“java -version”查看 JDK 的版本。根据 flink-1.15 的发布说明,建议 Java 11,但 Java 8 也可以。本文使用“Tencent Kona JDK 11.0.17”,下载地址:

    https://github.com/Tencent/TencentKona-11/releases/download/kona11.0.17/TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz
    
    • 1

    可将 TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz 上传到 /usr/local 目录,然后解压,再建立软链接:

    cd /usr/local
    tar xzf TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz
    ln -s TencentKona-11.0.17.b1 jdk
    
    • 1
    • 2
    • 3

    将 JDK 加入 PATH 中:

    export JAVA_HOME=/usr/local/jdk
    export PATH=$JAVA_HOME/bin:$PATH
    export CLASSPATH=$JAVA_HOME/lib/tools.jar
    
    • 1
    • 2
    • 3

    启动成功后,就可开始体验了。

    体验 Flink Table Store

    以单机模式执行“./bin/sql-client.sh embedded”,进入 SQL 操作界面,然后可按如下步骤开始体验:

    • 1、创建表*
    CREATE CATALOG my_catalog WITH (
      'type'='table-store',
      'warehouse'='file:/tmp/table_store'
    );
    
    USE CATALOG my_catalog;
    
    -- create a word count table
    CREATE TABLE word_count (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 2、写数据
    -- create a word data generator table
    CREATE TEMPORARY TABLE word_table (
        word STRING
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second'='10',
        'fields.word.length' = '1'
    );
    
    -- table store requires checkpoint interval in streaming mode
    SET 'execution.checkpointing.interval' = '10 s';
    
    -- write streaming data to dynamic table
    -- 在 flink 的看板可看到产生了一个 job
    INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    注意,对表 word_table 只能执行流查询,执行批查询时会报如下所示的错误:

    org.apache.flink.table.api.ValidationException: Querying an unbounded table 'my_catalog.default.word_table' in batch mode is not allowed. The table source is unbounded.
    
    • 1
    • 3、批量查询
    -- 设置结果输出模式为 tableau,即表哥方式输出
    SET 'sql-client.execution.result-mode' = 'tableau';
    
    -- 切换到批模式
    RESET 'execution.checkpointing.interval';
    SET 'execution.runtime-mode' = 'batch';
    
    SELECT * FROM word_count;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 4、流式查询
    -- 切换到流模式
    SET 'execution.runtime-mode' = 'streaming';
    SELECT * FROM word_count; -- log.scan 默认为 full,表示从头查
    
    -- 从最新的开始查
    SELECT * FROM word_count /*+ OPTIONS ('log.scan'='latest') */;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 5、结束体验
    -- drop the dynamic table, clear the files
    DROP TABLE word_count;
    
    -- exit sql-client
    EXIT;
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    基于SpringBoot的在线学生请假管理系统的设计与实现毕业设计源码060935
    Mac 中 vim 插件配置 —— 以YouCompleteMe 为例
    开启全新教学模式!vLive虚拟直播如何赋能线上教培
    常用Linux内核调试手段介绍 01——— 内核笔记
    【学习笔记】云原生初步
    【论文阅读笔记】Deep learning for time series classification: a review
    【IBIS 模型与仿真 - IBISWriter and Write_IBIS】
    算法笔记:0-1背包问题
    大学生HTML作业篮球网页 HTML作业篮球网页期末作业 HTML+CSS篮球网页 HTML学生作业体育篮球网页
    深入浅出Spring(28)
  • 原文地址:https://blog.csdn.net/Aquester/article/details/127834798