• Flink-输出算子(Sink)使用


    5.5 输出算子

    5.5.1 概述

    1. 调用print是返回输出类,作为最后一环sink存在

    image.png

    该方法创建了一个PrintSinkFunction 操作,然后作为addSink方法的参数

    image.png

    PrintSinkFunction这个类继承自RichSinkFunction富函数类

    1. RichSinkFunction类
      image.png
    • 继承了AbstractRichFunction富函数类

    因此就可以调用富函数类(是一个实现类)的声明周期方法,例如open,close,以及获取运行时上下文,运行环境,定义状态等等

    • RichSinkFunction类同时也实现了SinkFunction这个接口,所以本质上也是SinkFunction

    image.png

    image.png

    • SinkFunction接口的抽象方法有invoke,传入是value,以及当前的上下文
    1. 关系图

    image.png

    1. 如果需要自定义输出算子
      image.png

    可以调用DataStream的addSink方法

    image.png

    然后传入自己实现的SinkFunction

    1. flink提供的第三方系统连接器

    image.png

    5.5.2 输出到文件

    1. StreamingFileSink流文件输出类
    • 来源

    image.png

    继承RichSinkFunction类,并实现CheckpointedFunction,CheckpointListener(检查点)

    • 创建实例

    image.png

    在StreamingFileSink类中调用forRowFormat()方法传入Path以及Encoder返回StreamingFileSink.DefaultBulkFormatBuilder,DefaultBulkFormatBuilder是一个静态类并继承RowFormatBuilder类,RowFormatBuilder类又继承BucketsBuilder类,底层将数据写入bucket(桶),桶里面分大小存储分区文件,实现了分布式存储

    image.png

    使用Builder构建器构建

    image.png

    image.png

    RowFormatBuilder是行编码

    image.png

    BulkFormatBuilder是列存储编码格式

    • 关系图
      image.png

    • 代码

    public class SinkToFileTest {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
    
            DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L));
    
    
            //2.为了得到并传入SinkFunction,需要构建StreamingFileSink的一个对象
            //调用forRowFormat方法或者forBulkformat方法得到一个DefaultRowFormatBuilder
                //  其中forBulkformat方法前面还有类型参数,以及传参要求一个目录名称,一个编码器
                    //写入文件需要序列化,需要定义序列化方法并进行编码转换,当成Stream写入文件
            //然后再使用builder创建实例
            StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),new SimpleStringEncoder<>("UTF-8"))
                    .withRollingPolicy(//指定滚动策略,根据事件或者文件大小新产生文件归档保存
                            DefaultRollingPolicy.builder()//使用builder构建实例
                                    .withMaxPartSize(1024 * 1024 * 1024)
                                    .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))//事件间隔毫秒数
                                    .withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))//当前不活跃的间隔事件,隔多长事件没有数据到来
                                    .build()
                    )
                    .build();
            //1.写入文件调用addSink()方法,并传入SinkFunction
            stream
                    .map(data -> data.toString())//把Event类型转换成String
                    .addSink(streamingFileSink);
    
            env.execute();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 结果

    image.png

    5.5.3 输出到kafka

    image.png

    构造FlinkKafkaProducer类传入三个参数:brokerList(主机+端口号)和topicId(topic)以及serializationSchema(编码序列化)完成构造

    1. 代码
    public class SinkToKafka {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //1.从kafka中读取数据
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","hadoop2:9092");
            properties.setProperty("group.id", "consumer-group");
    
            DataStreamSource<String> kafkaStream = env.addSource(
                    new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
    
            //2.用flink进行简单的etl处理转换
            SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
    
                    String[] fields = value.split(",");
                    return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
    
                }
            });
    
            //3.结果数据写入kafka
                //FlinkKafkaProducer传参borckList,topicid,序列化
            result.addSink(new FlinkKafkaProducer<String>(
                    "hadoop2:9092","events",new SimpleStringSchema()));
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    1. kafka输出结果

    image.png

    5.5.4 输出到redis

    1. 引入依赖
    <dependency>
     <groupId>org.apache.bahirgroupId>
     <artifactId>flink-connector-redis_2.11artifactId>
     <version>1.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 分析
    • RedisSink类分析

    image.png

    RedisSink类继承自RichSinkFunction

    • 参数分析

    image.png

    去调构造方法,传入redis集群的配置FlinkJedisConfigBase以及RedisMapper写入命令

    image.png

    new FlinkJedisConfigBase的时候,可以使用FlinkJedisPoolConfig没毛病,直接继承的FlinkJedisConfigBase

    image.png

    FlinkJedisConfigBase是一个接口

    image.png

    实例FlinkJedisPoolConfig的时候也是使用的构造器Builder()的设计模式即,同样再使用.build实例它

    • 第二个参数分析

    image.png

    RedisMapper是一个接口

    image.png

    自定义一个实现类并重写方法getCommandDescription(),getKeyFromData(Event data),getValueFromData(Event data)

    • 关系图

    image.png

    1. 代码
    public class SinkToRedis {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //1.输入ClickSource是自定义输入
            DataStreamSource<Event> stream = env.addSource(new ClickSource());
    
    
            //2.创建一个jedis连接配置
            //FlinkJedisPoolConfig直接继承的FlinkJedisConfigBase
            FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                    .setHost("hadoop2")
                    .build();
    
    
    
            //3.写入redis
            stream.addSink(new RedisSink<>(config,new MyRedisMapper()));
    
            env.execute();
    
        }
    
        //3.自定义类实现 redisMapper接口
        public static class MyRedisMapper implements RedisMapper<Event>{
    
            @Override
            //返回一个redis命令的描述
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.HSET,"clicks");//写入哈希表
            }
    
            @Override
            //把key定义成user
            public String getKeyFromData(Event data) {
                return data.user;
            }
    
            @Override
            //把value定义成url
            public String getValueFromData(Event data) {
                return data.url;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    1. 结果

    运行redis

    [hadoop1@hadoop2 redis]$ ./src/redis-server 
        
    [hadoop1@hadoop2 bin]$ pwd
    /usr/local/bin
    
    • 1
    • 2
    • 3
    • 4

    image.png

    5.5.5 输出到ElasticSearch

    1. 引入依赖
    <dependency>
     <groupId>org.apache.flinkgroupId> 
    <artifactId>flink-connector-elasticsearch6_${scala.binary.version}artifact
    Id>
    <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 分析
    • ElasticsearchSink类分析

    image.png

    image.png

    ElasticsearchSink类继承ElasticsearchSinkBase抽象类,ElasticsearchSinkBase抽象类继承RichSinkFunction接口

    • 实例

    image.png

    ElasticsearchSink类调用Builder()传入参数是List和ElasticsearchSinkFunction


    image.png

    HttpHost需要参数主机名和端口号

    image.png
    是一个接口,写一个实现类重写他的方法,写入逻辑

    • 关系图
      image.png
    1. 代码
    public class SinToES {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //1.输入
            DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L));
    
    
    
            
            //2.定义hosts的列表
            ArrayList<HttpHost> httpHosts = new ArrayList<>();
            httpHosts.add(new HttpHost("hadoop",9200));
    
            //3.定义ElasticsearchSinkFunction,是个接口,重写process方法
            //向es发送请求,并插入数据
            ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
                @Override
                //输入,运行上下文,发送任务请求
                public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                    HashMap<String, String> map = new HashMap<>();
                    map.put(element.user, element.url);
    
                    //构建一个indexrequest
                    IndexRequest request = Requests.indexRequest()
                            .index("clicks")
                            .type("types")
                            .source(map);
    
                    indexer.add(request);
                }
            };
    
            //4.写入es
            //传入参数是List和ElasticsearchSinkFunction
            stream.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build());
    
            env.execute();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    1. 结果

    image.png

    image.png

    5.5.6 输入到Mysql

    1. 引入依赖
    <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-connector-jdbc_${scala.binary.version}artifactId>
     <version>${flink.version}version>
    dependency>
    <dependency>
     <groupId>mysqlgroupId>
     <artifactId>mysql-connector-javaartifactId>
     <version>5.1.47version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 分析
    • JdbcSink来源

    image.png

    无继承,无实现

    image.png

    定义了sink方法,三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置,然后返回SinkFunction
    image.png

    • 参数分析

    JdbcStatementBuilder是个接口,实现了BiConsumerWithException接口

    image.png

    单一抽象方法accept(),lambda使用

    image.png

    构造器私有,因此调用JdbcConnectionOptionsBuilder.build()进行实例化

    • 关系图

    image.png

    1. 代码
    public class SinkToMysql {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //1.输入
            DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L));
    
            //三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置
            stream.addSink(JdbcSink.sink(
                    "INSERT INTO clicks (user,url) VALUES(?,?)",
                    ((statement,event)->{
                        statement.setString(1,event.user);
                        statement.setString(2,event.url);
                    }),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("jdbc:mysql://localhost:3306/test2")
                            .withDriverName("com.mysql.jdbc.Driver")
                            .withUsername("root")
                            .withPassword("123456")
                            .build()
            ));
    
            env.execute();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    1. mysql前期准备
    • 创建mysql的test2
    • 创建clicks表
    mysql> create table clicks(
        -> user varchar(20) not null,
        -> url varchar(100) not null);
    Query OK, 0 rows affected (0.02 sec)
    
    • 1
    • 2
    • 3
    • 4
    1. 结果

    image.png

    5.5.7 自定义Sink输出

    1. 分析

    调用DataStream的addSink()方法,并传入自定义好的SinkFunction(采用富函数类),重写关键方法invoke(),并且重写富函数类的生命周期相关方法open和close

    1. 导入依赖
    <dependency>
     <groupId>org.apache.hbasegroupId>
     <artifactId>hbase-clientartifactId>
     <version>${hbase.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 代码

  • 相关阅读:
    支付宝SDK接口调试- cpolar内网穿透工具实现公网地址调试
    要不要做全链路压测
    AIGC: 关于ChatGPT这个智能工具带来的几点思考
    Ubuntu 20.04上docker安装Redis
    吃透Chisel语言.27.Chisel进阶之有限状态机(一)——基本有限状态机(Moore机)
    【Qt图形视图框架】QGraphicsView分析
    VUE3 + Django 接口请求每次都产生新的session_id,应该如何解决?
    Intel GPU Gen 9 架构
    SharePoint 非365版本接入简要笔记
    @vue/cli4--使用图形化界面创建项目--方法/实例
  • 原文地址:https://blog.csdn.net/m0_46507516/article/details/127930387