• Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)


    在这里插入图片描述
                           星光下的赶路人star的个人主页

                          世间真正温煦的春色,都熨帖着大地,潜伏在深谷

    1、输出算子(Sink)

    Flink作为数据处理框架,最终还是要把计算处理的结果写入外部储存,为外部应用提供支持。
    在这里插入图片描述

    1.1 连接到外部系统

    Flink的DataStream API专门提供了向外部提供写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的。Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
    Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

    stream.addSink(new SinkFunction());
    
    • 1

    addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

    Flink1.12开始,同样重构了Sink架构,

    stream.sinkTo()
    
    • 1

    当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

    在这里插入图片描述
    我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

    除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
    在这里插入图片描述
    除此以外,就需要用户自定义实现sink连接器了。

    1.2 输出到文件

    Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
    FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

    • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
    • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
    public class SinkFile {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 每个目录中,都有 并行度个数的 文件在写入
            env.setParallelism(2);
    
            // 必须开启checkpoint,否则一直都是 .inprogress
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    
    
            DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                    new GeneratorFunction<Long, String>() {
                        @Override
                        public String map(Long value) throws Exception {
                            return "Number:" + value;
                        }
                    },
                    Long.MAX_VALUE,
                    RateLimiterStrategy.perSecond(1000),
                    Types.STRING
            );
    
            DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
    
            // 输出到文件系统
            FileSink<String> fieSink = FileSink
                    // 输出行式存储的文件,指定路径、指定编码
                    .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                    // 输出文件的一些配置: 文件名的前缀、后缀
                    .withOutputFileConfig(
                            OutputFileConfig.builder()
                                    .withPartPrefix("atguigu-")
                                    .withPartSuffix(".log")
                                    .build()
                    )
                    // 按照目录分桶:如下,就是每个小时一个目录
                    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                    // 文件滚动策略:  1分钟 或 1m
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    .withRolloverInterval(Duration.ofMinutes(1))
                                    .withMaxPartSize(new MemorySize(1024*1024))
                                    .build()
                    )
                    .build();
    
    
            dataGen.sinkTo(fieSink);
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    1.3 输出到Kafka

    (1)添加Kafka 连接器依赖
    由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
    (2)启动Kafka集群
    (3)编写输出到Kafka的示例代码

    输出无key的record:

    public class SinkKafka {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 如果是精准一次,必须开启checkpoint(后续章节介绍)
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    
    
            SingleOutputStreamOperator<String> sensorDS = env
                    .socketTextStream("hadoop102", 7777);
    
            /**
             * Kafka Sink:
             * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
             * 1、开启checkpoint(后续介绍)
             * 2、设置事务前缀
             * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
             */
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                    // 指定 kafka 的地址和端口
                    .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                    // 指定序列化器:指定Topic名称、具体的序列化
                    .setRecordSerializer(
                            KafkaRecordSerializationSchema.<String>builder()
                                    .setTopic("ws")
                                    .setValueSerializationSchema(new SimpleStringSchema())
                                    .build()
                    )
                    // 写到kafka的一致性级别: 精准一次、至少一次
                    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                    // 如果是精准一次,必须设置 事务的前缀
                    .setTransactionalIdPrefix("atguigu-")
                    // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                    .build();
    
    
            sensorDS.sinkTo(kafkaSink);
    
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    自定义序列化器,实现带key的record:

    public class SinkKafkaWithKey {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
            env.setRestartStrategy(RestartStrategies.noRestart());
    
    
            SingleOutputStreamOperator<String> sensorDS = env
                    .socketTextStream("hadoop102", 7777);
    
    
            /**
             * 如果要指定写入kafka的key,可以自定义序列化器:
             * 1、实现 一个接口,重写 序列化 方法
             * 2、指定key,转成 字节数组
             * 3、指定value,转成 字节数组
             * 4、返回一个 ProducerRecord对象,把key、value放进去
             */
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                    .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                    .setRecordSerializer(
                            new KafkaRecordSerializationSchema<String>() {
    
                                @Nullable
                                @Override
                                public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                    String[] datas = element.split(",");
                                    byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                    return new ProducerRecord<>("ws", key, value);
                                }
                            }
                    )
                    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                    .setTransactionalIdPrefix("atguigu-")
                    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                    .build();
    
    
            sensorDS.sinkTo(kafkaSink);
    
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    运行代码,在Linux主机启动一个消费者,查看是否收到数据

    bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
    
    • 1

    1.4 输出到MySQL(JDBC)

    (1)添加依赖

    <!--mysql驱动 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.27</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.0-1.17</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (2)启动MySQL,在test库下建表

    CREATE TABLE `ws` (
      `id` varchar(100) NOT NULL,
      `ts` bigint(20) DEFAULT NULL,
      `vc` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    (3)输出到MySQL的示例代码

    public class SinkMySQL {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("hadoop102", 7777)
                    .map(new WaterSensorMapFunction());
    
    
            /**
             * TODO 写入mysql
             * 1、只能用老的sink写法: addsink
             * 2、JDBCSink的4个参数:
             *    第一个参数: 执行的sql,一般就是 insert into
             *    第二个参数: 预编译sql, 对占位符填充值
             *    第三个参数: 执行选项 ---》 攒批、重试
             *    第四个参数: 连接选项 ---》 url、用户名、密码
             */
            SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                    "insert into ws values(?,?,?)",
                    new JdbcStatementBuilder<WaterSensor>() {
                        @Override
                        public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                            //每收到一条WaterSensor,如何去填充占位符
                            preparedStatement.setString(1, waterSensor.getId());
                            preparedStatement.setLong(2, waterSensor.getTs());
                            preparedStatement.setInt(3, waterSensor.getVc());
                        }
                    },
                    JdbcExecutionOptions.builder()
                            .withMaxRetries(3) // 重试次数
                            .withBatchSize(100) // 批次的大小:条数
                            .withBatchIntervalMs(3000) // 批次的时间
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                            .withUsername("root")
                            .withPassword("000000")
                            .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                            .build()
            );
    
    
            sensorDS.addSink(jdbcSink);
    
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    (4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

    1.4 自定义Sink输出

    如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

    stream.addSink(new MySinkFunction<String>());
    
    • 1

    在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
    这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
    在这里插入图片描述
                          您的支持是我创作的无限动力

    在这里插入图片描述
                          希望我能为您的未来尽绵薄之力

    在这里插入图片描述
                          如有错误,谢谢指正;若有收获,谢谢赞美

  • 相关阅读:
    pxb 使用物理备份恢复数据库
    springMVC下载文件
    基于Spring Boot+ Vue的健身房管理系统与实现
    docker基础镜像定制
    云原生之深入解析如何使用Vcluster Kubernetes加速开发效率
    “维护者都快累死了!”Linux 宣布:LTS 版本的维护期,将从 6 年变回 2 年
    Fourier分析导论——第6章——R^d 上的Fourier变换(E.M. Stein & R. Shakarchi)
    试编写算法(用C语言)打印值为x的结点的所有祖先,假设值为x的结点不多于一个。(递归实现和非递归实现)
    genius-storage使用文档,一个浏览器缓存工具
    中石化、中石油接口文档源码分享
  • 原文地址:https://blog.csdn.net/qq_44804713/article/details/133300069