码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • 35、Flink 的 Formats 之CSV 和 JSON Format


    Flink 系列文章

    1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

    13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
    14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
    15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
    17、Flink 之Table API: Table API 支持的操作(1)
    17、Flink 之Table API: Table API 支持的操作(2)
    18、Flink的SQL 支持的操作和语法
    19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
    19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
    19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
    19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
    20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
    21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
    21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
    21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
    21、Flink 的table API与DataStream API 集成(完整版)
    22、Flink 的table api与sql之创建表的DDL
    24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
    24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
    24、Flink 的table api与sql之Catalogs(java api操作视图)-3
    24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
    25、Flink 的table api与sql之函数(自定义函数示例)
    26、Flink 的SQL之概览与入门示例
    27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
    27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
    27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
    27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
    27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
    27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
    27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
    28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
    29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
    29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
    30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
    32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
    33、Flink 的Table API 和 SQL 中的时区
    35、Flink 的 Formats 之CSV 和 JSON Format
    41、Flink之Hive 方言介绍及详细示例
    42、Flink 的table api与sql之Hive Catalog
    43、Flink之Hive 读写及详细验证示例
    44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


    文章目录

    • Flink 系列文章
    • 一、Flink 的 Formats
    • 二、CSV Format
      • 1、maven 依赖
      • 2、Flink sql client 建表示例
      • 3、table api建表示例
      • 4、Format 参数
      • 5、数据类型映射
    • 三、JSON Format
      • 1、maven 依赖
      • 2、Flink sql client 建表示例
      • 3、table api 建表示例
      • 4、Format 参数
      • 5、数据类型映射关系


    本文介绍了Flink 支持的数据格式中的csv和json,并分别以sql和table api作为示例进行了说明。
    本文依赖flink、kafka集群能正常使用。
    本文分为3个部分,即概述、CSV和JSON Format。
    本文的示例是在Flink 1.17版本(flink 集群和maven均是Flink 1.17)中运行。

    一、Flink 的 Formats

    Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。

    Flink 支持以下格式:
    在这里插入图片描述

    二、CSV Format

    CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据。 目前 CSV schema 是基于 table schema 推断而来的。

    1、maven 依赖

    <dependency>
      <groupId>org.apache.flinkgroupId>
      <artifactId>flink-csvartifactId>
      <version>1.17.1version>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、Flink sql client 建表示例

    以下是一个使用 Kafka 连接器和 CSV 格式创建表的示例。

    
    CREATE TABLE Alan_KafkaTable (
      `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
      `partition` BIGINT METADATA VIRTUAL,
      `offset` BIGINT METADATA VIRTUAL,
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_behavior',
      'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    Flink SQL> CREATE TABLE Alan_KafkaTable (
    >   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    >   `partition` BIGINT METADATA VIRTUAL,
    >   `offset` BIGINT METADATA VIRTUAL,
    >   `user_id` BIGINT,
    >   `item_id` BIGINT,
    >   `behavior` STRING
    > ) WITH (
    >   'connector' = 'kafka',
    >   'topic' = 'Alan_KafkaTable',
    >   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
    >   'properties.group.id' = 'testGroup',
    >   'scan.startup.mode' = 'earliest-offset',
    >   'format' = 'csv'
    > );
    [INFO] Execute statement succeed.
    
    # kafka客户端命令行输入数据
    [alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic Alan_KafkaTable
    >1,1001,login
    >1,2001,p_read
    >
    # flink sql client 查询数据即可
    
    Flink SQL> select * from Alan_KafkaTable;
    +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
    | op |              event_time |            partition |               offset |              user_id |              item_id |                       behavior |
    +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
    | +I | 2023-11-15 15:53:17.925 |                    0 |                    0 |                    1 |                 1001 |                          login |
    | +I | 2023-11-15 15:53:45.839 |                    0 |                    1 |                    1 |                 2001 |                         p_read |
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3、table api建表示例

    通过table api建表,参考文章:
    17、Flink 之Table API: Table API 支持的操作(1)
    17、Flink 之Table API: Table API 支持的操作(2)

    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.and;
    import static org.apache.flink.table.api.Expressions.lit;
    import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
    
    import java.sql.Timestamp;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Over;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableDescriptor;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.CatalogView;
    import org.apache.flink.table.catalog.Column;
    import org.apache.flink.table.catalog.ObjectPath;
    import org.apache.flink.table.catalog.ResolvedCatalogView;
    import org.apache.flink.table.catalog.ResolvedSchema;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
    import org.apache.flink.types.Row;
    
    import com.google.common.collect.Lists;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @author alanchan
     *
     */
    public class TestTableAPIDemo {
    
    	/**
    	 * @param args
    	 * @throws Exception
    	 */
    	public static void main(String[] args) throws Exception {
    		testCreateTableBySQLAndAPI();
    	}
    	
    	static void testCreateTableBySQLAndAPI() throws Exception {
    //		EnvironmentSettings env = EnvironmentSettings.newInstance().inStreamingMode().build();
    //		TableEnvironment tenv = TableEnvironment.create(env);
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
            // SQL 创建输入表
    //        String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" + 
    //        		"  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" + 
    //        		"  `partition` BIGINT METADATA VIRTUAL,\r\n" + 
    //        		"  `offset` BIGINT METADATA VIRTUAL,\r\n" + 
    //        		"  `user_id` BIGINT,\r\n" + 
    //        		"  `item_id` BIGINT,\r\n" + 
    //        		"  `behavior` STRING\r\n" + 
    //        		") WITH (\r\n" + 
    //        		"  'connector' = 'kafka',\r\n" + 
    //        		"  'topic' = 'user_behavior',\r\n" + 
    //        		"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" + 
    //        		"  'properties.group.id' = 'testGroup',\r\n" + 
    //        		"  'scan.startup.mode' = 'earliest-offset',\r\n" + 
    //        		"  'format' = 'csv'\r\n" + 
    //        		");";
    //        tenv.executeSql(sourceSql);
            
            //API创建表
            Schema schema = Schema.newBuilder()
                    .columnByMetadata("event_time", DataTypes.TIME(3), "timestamp")
                    .columnByMetadata("partition", DataTypes.BIGINT(), true)
                    .columnByMetadata("offset", DataTypes.BIGINT(), true)
                    .column("user_id", DataTypes.BIGINT())
                    .column("item_id", DataTypes.BIGINT())
                    .column("behavior", DataTypes.STRING())
                    .build();
            
            TableDescriptor kafkaDescriptor = TableDescriptor.forConnector("kafka")
                    .comment("kafka source table")
                    .schema(schema)
                    .option(KafkaConnectorOptions.TOPIC, Lists.newArrayList("user_behavior"))
                    .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
                    .option(KafkaConnectorOptions.PROPS_GROUP_ID, "testGroup")
                    .option("scan.startup.mode", "earliest-offset")
                    .format("csv")
                    .build();
            
            tenv.createTemporaryTable("Alan_KafkaTable", kafkaDescriptor);
            
            //查询
            String sql = "select * from Alan_KafkaTable ";
            Table resultQuery = tenv.sqlQuery(sql);
    
            DataStream<Tuple2<Boolean, Row>> resultDS =  tenv.toRetractStream(resultQuery, Row.class);
    		
            // 6、sink
    		resultDS.print();
    
    		// 7、执行
    		env.execute();
    		//kafka中输入测试数据
    //		1,1001,login
    //		1,2001,p_read
    		
    		//程序运行控制台输入如下
    //		11> (true,+I[16:32:19.923, 0, 0, 1, 1001, login])
    //		11> (true,+I[16:32:32.258, 0, 1, 1, 2001, p_read])
    	}
    
    	@Data
    	@NoArgsConstructor
    	@AllArgsConstructor
    	public static class User {
    		private long id;
    		private String name;
    		private int age;
    		private Long rowtime;
    	}
    	
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136

    4、Format 参数

    在这里插入图片描述

    5、数据类型映射

    目前 CSV 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。

    下面的表格列出了flink数据和CSV数据的对应关系。
    在这里插入图片描述

    三、JSON Format

    JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。

    1、maven 依赖

    <dependency>
      <groupId>org.apache.flinkgroupId>
      <artifactId>flink-jsonartifactId>
      <version>1.17.1version>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、Flink sql client 建表示例

    以下是一个利用 Kafka 以及 JSON Format 构建表的例子。

    CREATE TABLE Alan_KafkaTable_json (
        `id` INT,
        name string,
        age BIGINT,
        t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
        WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'Alan_KafkaTable_json',
        'scan.startup.mode' = 'earliest-offset',
        'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
        'format' = 'json'
    );
    
    Flink SQL> CREATE TABLE Alan_KafkaTable_json (
    >     `id` INT,
    >     name string,
    >     age BIGINT,
    >     t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp',
    >     WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND
    > ) WITH (
    >     'connector' = 'kafka',
    >     'topic' = 'Alan_KafkaTable_json',
    >     'scan.startup.mode' = 'earliest-offset',
    >     'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
    >     'format' = 'json'
    > );
    [INFO] Execute statement succeed.
    
    # kafka 客户端命令输入数据
    [alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic Alan_KafkaTable_json
    >{ "id":"1" ,"name":"alan","age":"12" } 
    >{ "id":"2" ,"name":"alanchan","age":"22" } 
    >{ "id":"3" ,"name":"alanchanchan","age":"32" }
    >{ "id":"4" ,"name":"alan_chan","age":"42" } 
    >{ "id":"5" ,"name":"alan_chan_chn","age":"52" } 
    >
    
    # flink sql client查询数据
    Flink SQL> select * from Alan_KafkaTable_json;
    +----+-------------+--------------------------------+----------------------+-------------------------+
    | op |          id |                           name |                  age |           t_insert_time |
    +----+-------------+--------------------------------+----------------------+-------------------------+
    | +I |           1 |                           alan |                   12 | 2023-11-15 16:03:49.805 |
    | +I |           2 |                       alanchan |                   22 | 2023-11-15 16:04:02.632 |
    | +I |           3 |                   alanchanchan |                   32 | 2023-11-15 16:04:08.810 |
    | +I |           4 |                      alan_chan |                   42 | 2023-11-15 16:04:15.132 |
    | +I |           5 |                  alan_chan_chn |                   52 | 2023-11-15 16:04:21.146 |
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3、table api 建表示例

    通过table api建表,参考文章:
    17、Flink 之Table API: Table API 支持的操作(1)
    17、Flink 之Table API: Table API 支持的操作(2)

    参考上文中关于CSV Format的table api 建表示例,变化的是json的格式参数。

    4、Format 参数

    在这里插入图片描述

    5、数据类型映射关系

    当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。

    在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。

    下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

    在这里插入图片描述

    以上,介绍了Flink 支持的数据格式中的csv和json,并分别以sql和table api作为示例进行了说明。

  • 相关阅读:
    STM32day2
    杭电oj 2049 不容易系列之(4)——考新郎 C语言
    住院管理系统
    基于JAVA旅行网的设计与实现计算机毕业设计源码+数据库+lw文档+系统+部署
    Java 定时任务技术发展历程
    如何让你的程序支持lua脚本
    排序算法概述
    光伏发电预测(GRU模型,Python代码)
    conda的安装和使用
    2020最新Java常见面试题及答案
  • 原文地址:https://blog.csdn.net/chenwewi520feng/article/details/132041995
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号