• 【API篇】六、Flink输出算子Sink


    Flink做为数据处理引擎,要把最终处理好的数据写入外部存储,为外部系统或应用提供支持。与输入算子Source相对应的,输出算子为Sink。

    在这里插入图片描述
    前面一直在用的print就是一种Sink,用来将数据流写到控制台打印

    在这里插入图片描述

    1、输出到外部系统

    Flink程序中所有对外的输出操作,利用Sink算子完成

    Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法

    stream.addSink(new SinkFunction());
    //重写SinkFunction接口的invoke方法,用来将指定的值写入到外部系统中
    //invoke方法在每条数据记录到来时都会调用。
    
    • 1
    • 2
    • 3

    Flink1.12开始,Sink算子的创建是通过调用DataStream的.sinkTo()方法

    stream.sinkTo()
    
    • 1

    Flink官网为我们提供了一部分的框架的Sink连接器:

    Flink官方为我们提供了一部分的框架的Sink连接器

    source/sink即可读可写,能做为数据源连接,也能做为下游去输出。

    2、输出到文件

    先引入Flink流式文件系统的连接器FileSink的依赖:

    <dependency>
    	<groupId>org.apache.flinkgroupId>
    	<artifactId>flink-connector-filesartifactId>
    	<version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder):

    • 行编码: FileSink.forRowFormat(basePath,rowEncoder)
    • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)

    下面演示实现读往d盘下的tmp目录写数据(tmp目录不用提前创建,不存在会自动创建):

    public class SinkFile {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 每个目录中,都有 并行度个数的 文件是正在写入状态
            env.setParallelism(1);
    
            // 必须开启checkpoint,否则文件一直都是 .inprogress状态,即正在写入
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    
    		//生成器模拟一个数据源
            DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                    new GeneratorFunction<Long, String>() {
                        @Override
                        public String map(Long value) throws Exception {
                            return "Number:" + value;
                        }
                    },
                    Long.MAX_VALUE,
                    RateLimiterStrategy.perSecond(1000), //每秒生成1000条数据
                    Types.STRING
            );
    
            DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
    
            // 输出到文件系统
            FileSink<String> fieSink = FileSink
                    // 输出行式存储的文件,指定路径、指定编码
                    .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                    // 输出文件的一些配置: 文件名的前缀、后缀,new也行,这里展示build方式创建配置对象
                    .withOutputFileConfig(
                            OutputFileConfig.builder()
                                    .withPartPrefix("code9527")
                                    .withPartSuffix(".log")
                                    .build()
                    )
                    // 按照目录分桶:如下,就是每个小时一个目录。ZoneId.systemDefault()即系统默认时区,也可是ZoneId类中的其他时区
                    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                    // 文件滚动策略:  1分钟 或 1m
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    .withRolloverInterval(Duration.ofMinutes(1))
                                    .withMaxPartSize(new MemorySize(1024*1024))
                                    .build()
                    )
                    .build();
    
    
            dataGen.sinkTo(fieSink);
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    运行,看下效果:inprocess,此时文件正在写入数据,不可读。一个这个inprocess文件,因为上面并行度设置的1

    在这里插入图片描述

    总结:重点还是FileSink对象的创建

    • 输出行/批文件存储的文件,可指定文件路径、文件编码、文件前后缀

    • 按目录分桶,传参的接口实现类对象自选,demo中是按照时间给文件夹命名

    • 特别注意文件滚动策略,是达到指定时间或者文件到达指定大小,是或的关系

    • FileSink对象创建完后,直接流对象调用sinkTo即可完成写入到文件的动作

    3、输出到KafKa

    添加KafKa连接器的依赖:

    <dependency>
    	<groupId>org.apache.flinkgroupId>
    	<artifactId>flink-connector-kafkaartifactId>
    	<version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    以下用socket模拟无界流,来演示数据输出到KafKa:

    public class SinkKafka {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 如果是精准一次,必须开启checkpoint,否则无法写入Kafka
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    
            SingleOutputStreamOperator<String> sensorDS = env
                    .socketTextStream("node1", 9527);
    
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                    // 指定 kafka 的地址和端口
                    .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                    // 指定序列化器:指定Topic名称、具体的序列化
                    .setRecordSerializer(
                            KafkaRecordSerializationSchema.<String>builder()
                                    .setTopic("topic1")
                                    .setValueSerializationSchema(new SimpleStringSchema())
                                    .build()
                    )
                    // 写到kafka的一致性级别: 精准一次、至少一次
                    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                    // 如果是精准一次,必须设置 事务的前缀
                    .setTransactionalIdPrefix("test-")
                    // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                    .build();
    
    
            sensorDS.sinkTo(kafkaSink);
    
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    关于 Kafka Sink,如果要使用精准一次写入Kafka,需要满足以下条件,缺一不可

    • 开启checkpoint(后续介绍)
    • 设置事务前缀
    • 设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟

    如果要指定写入kafka的key,可以自定义序列化器:

    • 实现 一个接口,重写 序列化 方法
    • 指定key,转成 字节数组
    • 指定value,转成 字节数组
    • 返回一个 ProducerRecord对象,把key、value放进去
    public class SinkKafkaWithKey {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
            env.setRestartStrategy(RestartStrategies.noRestart());
    
            SingleOutputStreamOperator<String> sensorDS = env
                    .socketTextStream("node1", 9527);
            /**
             *指定写入kafka的key,可以自定义序列化器:
             */
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                    .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                    .setRecordSerializer(
                            new KafkaRecordSerializationSchema<String>() {
    
                                @Nullable
                                @Override
                                public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                    String[] datas = element.split(",");  //输入的测试数据格式为a,b,c,所以这里先分割一下
                                    byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                    return new ProducerRecord<>("topic1", key, value);
                                }
                            }
                    )
                    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                    .setTransactionalIdPrefix("test-")
                    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                    .build();
    
    
            sensorDS.sinkTo(kafkaSink);
    
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    4、输出到MySQL(JDBC)

    添加MySQL驱动依赖:

    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>8.0.27version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    再引入flink-jdbc连接器依赖:

    
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-jdbcartifactId>
        <version>3.1.1-1.17version>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7


    PS:

    教学视频中提到了另一种情况,这里记录下。即:官方还未提供flink-connector-jdbc的某高版本的正式依赖,如1.17.0(当前时间已有),暂时从apache snapshot仓库下,因此引入依赖前,先在pom文件中指定仓库路径

    <repositories>
        <repository>
            <id>apache-snapshotsid>  
            <name>apache-snapshotsname>
    		<url>https://repository.apache.org/content/repositories/snapshots/url>
        repository>
    repositories>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    再引入flink-jdbc连接器依赖:

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-jdbcartifactId>
        <version>1.17-SNAPSHOTversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加!apache-snapshots

    <mirror>
    	<id>aliyunmavenid>
    	<mirrorOf>*,!apache-snapshotsmirrorOf>   
    	<name>阿里云公共仓库name>
    	<url>https://maven.aliyun.com/repository/publicurl>
    mirror>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6


    根据你的数据类型,建立对应结构的表,这里根据要接收的自定义对象WaterSensor建表test:

    mysql>     
    CREATE TABLE `ws` (
      `id` varchar(100) NOT NULL,
      `ts` bigint(20) DEFAULT NULL,
      `vc` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    输出到MySQL的Demo代码:

    public class SinkMySQL {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());  //输入的信息映射转为自定义的WaterSensor实体类对象
    
            SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                    "insert into ws values(?,?,?)",
                    new JdbcStatementBuilder<WaterSensor>() {
                        @Override
                        public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                            //每收到一条WaterSensor,如何去填充占位符
                            preparedStatement.setString(1, waterSensor.getId());
                            preparedStatement.setLong(2, waterSensor.getTs());
                            preparedStatement.setInt(3, waterSensor.getVc());
                        }
                    },
                    JdbcExecutionOptions.builder()
                            .withMaxRetries(3) // 重试次数
                            .withBatchSize(100) // 批次的大小:条数
                            .withBatchIntervalMs(3000) // 批次的时间
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("jdbc:mysql://node01:3306/testDB?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                            .withUsername("root")
                            .withPassword("admin123")
                            .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                            .build()
            );
            sensorDS.addSink(jdbcSink);
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    总结: 写入mysql时注意只能用老的sink写法: addsink,此外JdbcSink的4个参数:

    • 第一个参数: 执行的sql,一般就是 insert into搭配占位符
    • 第二个参数: 预编译sql对象, 对占位符填充值
    • 第三个参数: 执行选项 ,比如批次大小、重试时间
    • 第四个参数: 数据库连接选项 , url、用户名、密码

    运行,输入数据,查看MySQL:

    在这里插入图片描述

    5、自定义Sink输出

    现有的Flink连接器不能满足需求时,需要自定义连接器进行输出。与Source类似,Flink提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,实现这个接口,就可通过DataStream的.addSink()方法自定义写入任何的外部存储。

    public class MySinkFunction implements SinkFunction<String>{
    
    	@Override
    	public void invoke(String value, Context context) throws Exception{
    		//输出逻辑
    		//value即流中的数据,来一条数据,invoke方法就被调用一次(所以不要在这里创建连接对象)
    		//如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    stream.addSink(new MySinkFunction<String>());
    
    • 1

    来一条数据,invoke方法就被调用一次,如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象:

    public class MySinkFunction implements RichSinkFunction<String>{
    
    	Connection connection = null;
    
    	@Overrdie
    	public void open(Configuration parameters) throws Exception{
    		connection = new xxConnection(xx);
    	}
    
    	@Override
    	public void close() throws Exception{
    		super.close();
    	}
    
    	@Override
    	public void invoke(String value, Context context) throws Exception{
    		//输出逻辑
    		connection.executeXXX(xxx);
    		
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    Linux下的常见工具的简单使用
    面试之腾讯
    如何实现巡检报告?
    通达信期货接口代码分享
    Mybatis - 常用 SQL 语句设计思路及具体实现 - 数据存在则更新,不存在则插入、批量更新、批量插入、连表查询 + - 字段加减法
    一键实现冒泡排序算法,代码质量有保障!
    两个读书笔记:springboot+vue.js分布式组件全栈开发训练营 + 大数据开发基础
    centos 上容器配置X11
    【20221204】【每日一题】监控二叉树
    113. 求坐上公交的最晚时间(考察贪心算法)
  • 原文地址:https://blog.csdn.net/llg___/article/details/133997001