• 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统



    《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

    select word, count(1) as `count` from source group by word;
    +--------------------------------+----------------------+
    |                           word |                count |
    +--------------------------------+----------------------+
    |                              A |                    3 |
    |                              B |                    1 |
    |                              C |                    2 |
    |                              D |                    2 |
    |                              E |                    1 |
    +--------------------------------+----------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

    Sink

    Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

    Print

    为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

        # define the sink
        my_sink_ddl = """
            CREATE TABLE WordsCountTableSink (
                `word` STRING,
                `count` BIGINT
            ) WITH (
                'connector' = 'print'
            );
        """
        t_env.execute_sql(my_sink_ddl).print()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
    这一步只能创建表和连接器,具体执行还要执行下一步。

    Execute

    因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

        # execute insert
        my_select_ddl = """
            insert into WordsCountTableSink
            select word, count(1) as `count`
            from source
            group by word
        """
        t_env.execute_sql(my_select_ddl).wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    完整代码

    import argparse
    import logging
    import sys
    
    from pyflink.common import Configuration
    from pyflink.table import (EnvironmentSettings, TableEnvironment)
    
    def word_count(input_path):
        config = Configuration()
        # write all the data to one file
        config.set_string('parallelism.default', '1')
        env_settings = EnvironmentSettings \
            .new_instance() \
            .in_batch_mode() \
            .with_configuration(config) \
            .build()
        
        t_env = TableEnvironment.create(env_settings)
    
        # define the source
        my_source_ddl = """
                create table source (
                    word STRING
                ) with (
                    'connector' = 'filesystem',
                    'format' = 'csv',
                    'path' = '{}'
                )
            """.format(input_path)
        t_env.execute_sql(my_source_ddl).print()
        tab = t_env.from_path('source')
    
        # define the sink
        my_sink_ddl = """
            CREATE TABLE WordsCountTableSink (
                `word` STRING,
                `count` BIGINT
            ) WITH (
                'connector' = 'print'
            );
        """
        t_env.execute_sql(my_sink_ddl).print()
        
        # execute insert
        my_select_ddl = """
            insert into WordsCountTableSink
            select word, count(1) as `count`
            from source
            group by word
        """
        t_env.execute_sql(my_select_ddl).wait()
    
    if __name__ == '__main__':
        logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input',
            dest='input',
            required=False,
            help='Input file to process.')
    
        argv = sys.argv[1:]
        known_args, _ = parser.parse_known_args(argv)
    
        word_count(known_args.input)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    执行命令如下

    python sql_print.py --input input1.csv
    
    • 1

    输出结果如下

    Using Any for unsupported type: typing.Sequence[~T]
    No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
    OK
    OK
    +I[A, 3]
    +I[B, 1]
    +I[C, 2]
    +I[D, 2]
    +I[E, 1]

    因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
    附上input1.csv内容

    "A",
    "B",
    "C",
    "D",
    "A",
    "E",
    "C",
    "D",
    "A",
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    OpenCV:10特征检测
    Spring | 异常处理最佳实践
    ElasticSearch(四)spring-data-elasticsearch @Field注解无效,最完美解决方案
    ORB_SLAM2配置——基于Ubuntu16.04+ROS+gazebo仿真
    Unity接入北斗探针SDK(基于UnityPlayerActivity)丨五、编写Unity代码,完成整个项目
    C语言停车场管理系统
    【工具】简道云零代码开发平台
    为数据安全产业发声 | 2022数据安全技术大会获媒体高频关注
    C++ 虚函数和多态性
    c++类型转换
  • 原文地址:https://blog.csdn.net/breaksoftware/article/details/134001015