• pyflink连接iceberg 实践


    参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/python/index.html

    pyflink是什么

    1. 数据流处理的框架
    2. 这个框架是同时运行在多台主机上
    3. 通过某种方式这多台主机之间可以通信
    4. 可以单机运行
    5. pyflink只是对java的flink的一个调用工具,不能直接用python来对sourcesink组件进行实现。

    Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。

    PyFlink 架构

    PyFlink 的核心目标:

    1.将 Flink 能力输出到 Python 用户,进而可以让 Python 用户使用所有的 Flink 能力。

    2.将 Python 生态现有的分析计算功能运行到 Flink 上,进而增强 Python 生态对大数据问题的解决能力。

    应用场景

    • 第一个,事件驱动型,比如:刷单,监控等;
    • 第二个,数据分析型的,比如:库存,双11大屏等;
    • 第三个适用的场景是数据管道,也就是ETL场景,比如一些日志的解析等;
    • 第四个场景,机器学习,比如个性推荐等。

    API

    Flink 为流/批处理应用程序提供了不同级别的抽象。

    • SQL
    • Table API
    • DataStream/DataSet API(核心 API)
    • Stateful Stream Processing

    PyFlink API 完全与 Java Table API 对齐,各种关系操作都支持,同时对 window 也有很好的支持,除了这些 APIs,PyFlink还提供多种定义 Python UDF 的方式.

    UDF自定义函数

    首先,可以扩展 ScalarFunction,这种方式可以提供更多的辅助功能,比如添加 Metrics 。除此之外 Python 语言所支持的任何方式的方法定义,在 PyFlink UDF 中都是支持的,比如:Lambda Function,Named Function 和 CallableFunction等。

    当定义完方法后,用 PyFlink 所提供的 Decorators 进行打标,并描述 input 和 output 的数据类型就可以了。后面版本也可以根据 Python 语言的 type hint 特性再进一步简化,进行类型推导。

    from pyflink.table import ScalarFunction, DataTypes
    from pyflink.table.udf import udf
    
    # Extend ScalarFunction
    class ADD(ScalarFunction):
        def eval(self, i ,j):
            return i + j
    
    add1 = udf (ADD(), [DataTypes.BIGINT(), DataTypes.BIGINT()] ,DataTypes.BIGINT())
    
    # Named function
    @udf(input_types =  [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
    def add2( i ,j):
        return i + j
    
    # Lambda function
    add3 = udf(lambda i,j :i+j, [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
    
    # Callable Function
    class CallableAdd(object):
        def __call__(self, i,j):
            return i + j
    
    add4 = udf(CallableAdd(),[DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    pyflink安装

    pip3  install -i https://pypi.tuna.tsinghua.edu.cn/simple/ apache-flink==1.13.5
    
    • 1

    最好指定版本,如1.13.2

    实战

    apache-flink 1.13.2

    读取iceberg数据

    pyflink安装目录/lib

    flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

    hive-exec-3.1.2.jar

    alluxio-2.6.2-client.jar

    iceberg-flink-runtime-1.13-0.13.1.jar

    import os
    
    
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import EnvironmentSettings, TableEnvironment,StreamTableEnvironment
    
    os.environ.setdefault('HADOOP_USER_NAME', 'root')
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env_settings  = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    t_env = StreamTableEnvironment.create(env,environment_settings=env_settings )
    
    
    t_env.get_config().get_configuration().set_string("parallelism.default", "1")
    # t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
    
    iceberg_hive_catalog = """
        CREATE CATALOG iceberg WITH
        (
            'type'='iceberg'
            ,'catalog-type'='hive' -- 可选 catalog类型 hive、hadoop、custom
            ,'property-version'='1' -- 可选 属性版本号,可向后兼容,目前版本号为1
            ,'cache-enabled' = 'true' -- 可选 是否启用catalog缓存 默认为true
            ,'uri'='thrift://192.168.xxx.xxx:9083' -- 必填 hive 元数据存储连接
            ,'clients'='5' -- hive metastore clients连接池大小,默认为2
            ,'warehouse'='hdfs://ns1/lakehouse/'
        )
    """ 
     
    
    t_env.get_current_catalog()
    t_env.get_current_database()
    # t_env.execute_sql(iceberg_hive_catalog).print()
     
    
    t_env.execute_sql("use catalog iceberg").print()
    
    t_env.execute_sql("show current catalog").print()
    #
    t_env.execute_sql("show databases").print()
    
    t_env.execute_sql("use dbname").print()
    
    t_env.execute_sql("show tables").print()
     
    
     
    table1 = t_env.execute_sql(
        "select * from  ***")
    
    table2 = t_env.sql_query(
        "select * from  xxx")
     
    
    pd = table2.to_pandas()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    读取mysql数据

    配置

    pyflink安装目录/lib

    mysql-connector-java-8.0.16.jar

    flink-connector-jdbc_2.12-1.13.2.jar(https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.13.2)

    mysql创表

    CREATE TABLE `flink_test` (
    
      `f0` int(11) DEFAULT NULL,
      `f1` int(11) DEFAULT NULL
    ) 
    
    insert into flink_test VALUES(1,11)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import StreamTableEnvironment, EnvironmentSettings
    from pyflink.table import EnvironmentSettings, TableEnvironment
    # create environment
    from pyflink.table.expressions import lit
     
    settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
    t_env = TableEnvironment.create(settings)
     
    
    
    mysql_sink_ddl = """
    CREATE TABLE flink_test (
      id BIGINT,
      word VARCHAR,
     `count` BIGINT,
      PRIMARY KEY (id) NOT ENFORCED
     ) WITH (
     'connector.type' = 'jdbc', -- 使用 jdbc connector
     'connector.url' = 'jdbc:mysql://192.168.xx.xx:3306/test',
     'connector.table' = 'flink_test',
     'connector.username' = 'xxx',
     'connector.password' = 'xxx',
     'connector.write.flush.interval' = '1s'
    )
    """
    
    mysql_sink_ddl = """
    create table flink_test (
    f0 INT,
    f1 INT
    ) WITH (
    'connector' = 'jdbc', -- 使用 jdbc connector
     'url' = 'jdbc:mysql://192.168.xx.xxx:3306/test',
     'username' = 'xx',
     'table-name' = 'flink_test',
     'password' = 'xx'
      
    )
    """
     
    
    
    
    t_env.execute_sql(mysql_sink_ddl)
    table = t_env.execute_sql("select * from flink_test")
    # +-------------+-------------+
    # |          f0 |          f1 |
    # +-------------+-------------+
    # |           1 |          11 |
    # +-------------+-------------+
    # 1 row in set
    table = t_env.execute_sql("insert into  flink_test values(2,22)")
    
    table2 = t_env.sql_query('select * from flink_test')
    # tab = t_env.from_path('flink_test')
    table2.to_pandas()
    #    f0  f1
    # 0   1  11
    # 1   2  22
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    读取一个 csv 文件,计算词频,并将结果写到一个结果文件中

    文件名:word_count.py

    参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/

    from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
    from pyflink.table.descriptors import Schema, OldCsv, FileSystem
    from pyflink.table.expressions import lit
    
    # 创建TableEnvironment 。这是Python Table API作业的入口类。
    settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
    t_env = TableEnvironment.create(settings)
    
    # write all the data to one file
    t_env.get_config().get_configuration().set_string("parallelism.default", "1")
    
    # 创建源表和结果表,在ExecutionEnvironment中注册表名分别为mySource和mySink的表。
    t_env.connect(FileSystem().path('./input/word_count.csv')) \
        .with_format(OldCsv()
                     .field('word', DataTypes.STRING())) \
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())) \
        .create_temporary_table('mySource')
    
    t_env.connect(FileSystem().path('./ouput/output.csv')) \
        .with_format(OldCsv()
                     .field_delimiter('\t')
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) \
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) \
        .create_temporary_table('mySink')
        
    # 下列代码实现的输出文件名是乱码,'/ouput/output.csv'被当作目录
    '''
    my_source_ddl = """
        create table mySource (
            word VARCHAR
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = './input/word_count.csv'
        )
    """
    
    my_sink_ddl = """
        create table mySink (
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/ouput/output.csv'
        )
    """
    t_env.execute_sql(my_source_ddl)
    t_env.execute_sql(my_sink_ddl)
    '''
    # 该作业读取表mySource中的数据
    tab = t_env.from_path('mySource')
    # 启动Flink Python Table API作业
    # 当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
    tab.group_by(tab.word) \
       .select(tab.word, lit(1).count) \
       .execute_insert('mySink').wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    报错

    Could not find any factory for identifier ‘jdbc‘

    flink-connector-jdbc_2.12-1.13.2.jar包没有放入指定位置

    org.apache.flink.sql.parser.impl.ParseException: Encountered “)” at line 12, column 1.

    建表语句最后一个括号前多了个逗号

    参考

    实例:

    csv读入参数:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/csv/

    使用mysql_Flink 使用python连接mysql: https://blog.csdn.net/weixin_32136203/article/details/112684291

  • 相关阅读:
    [附源码]计算机毕业设计企业人事管理系统Springboot程序
    Golang gorm 常用新增方法
    汽车标定技术(九)--标定常量与#pragma的趣事
    反向代理原理
    如何设计 DAO 的 PoW 评判标准 并平衡不可能三角
    化工供应链如何向产业互联网转型,S2B2C供应链电商系统提升企业供应链效率
    怎么做好测试用例评审
    计算机键盘用途及快捷键
    详细解析什么是期权交易的获利方法
    【B/S架构】医院不良事件报告系统源码
  • 原文地址:https://blog.csdn.net/weixin_38235865/article/details/125917104