• flink之Sink to MySQL和Redis


    前言

    下面这篇文章是使用Flink的Sink 写出数据到Redis和MySQL

    Flink之Sink写入Redis和MySQL

    Flink需要添加Sink的时候,需要自己去添加写Sink,我们可以实现SinkFunction,或者我们也可以继承RichSinkFunction,RichSinkFunction是实现了SinkFunction和继承了一个AbstractRichFunction,而增强主要是在AbstractRichFunction里面是有生命周期函数,这个对我们使用Sink的时候非常重要

    // --------------------------------------------------------------------------------------------
        //  Default life cycle methods
        // --------------------------------------------------------------------------------------------
    
        @Override
        public void open(Configuration parameters) throws Exception {}
    
        @Override
        public void close() throws Exception {}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    具体是否可以使用我们可以在官网里面查询你的数据库是否可以支持Source和Sink,下面这个是在1.15的文档下,可能后面社区会推出更多的支持,大家可以去官网中去看Overview | Apache Flink

    Connectorssourcesink
    Kafka支持支持
    Cassandra不支持支持
    Kinesis支持支持
    Elasticsearch不支持支持
    FileSystem不支持支持
    RabbitMQ支持支持
    Google PubSub支持支持
    Hybrid Source支持不支持
    NiFi支持支持
    Pulsar支持不支持
    JDBC支持不支持
    ActiveMQ支持支持
    Flume不支持支持
    Redis不支持支持
    Akka不支持支持
    Netty支持不支持

    Sink

    下面我们来看一个例子吧,这个是日志数据,本次例子也是自己来模拟的

    202512120010,c.com,2000
    202512120010,c.com,5000
    202512120010,a.com,6000
    202512120010,c.com,1000
    202512120010,b.com,2000
    202512120010,a.com,2000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    下面的是一个例子,里面有两个例子,一个是写入MySQL的,具体整个函数的处理就是根据域名进行点击量的统计,首先我们需要对数据进行转化成一个Access实体,然后再进行FlatMap转化,你可以看到添加一个Sink写出数据也是通过stream.addSink()添加一个Sink来写出数据。

    public static void toMySql(StreamExecutionEnvironment env) {
            DataStreamSource<String> source = env.readTextFile("D:/code/flink/coding510/com.dy.flink/data/access.log");
            SingleOutputStreamOperator<Access> mapStream = source.map(new MapFunction<String, Access>() {
                @Override
                public Access map(String value) throws Exception {
                    String[] splits = value.split(",");
                    Long time = Long.parseLong(splits[0].trim());
                    String domain = splits[1].trim();
                    Double traffic = Double.parseDouble(splits[2].trim());
                    return new Access(time, domain, traffic);
                }
            });
            SingleOutputStreamOperator<Tuple2<String, Double>> reduceStream = mapStream.flatMap(new FlatMapFunction<Access, Tuple2<String, Double>>() {
                @Override
                public void flatMap(Access value, Collector<Tuple2<String, Double>> out) throws Exception {
                    out.collect(Tuple2.of(value.getDomain(), value.getTraffic()));
                }
            }).keyBy(new KeySelector<Tuple2<String, Double>, String>() {
                @Override
                public String getKey(Tuple2<String, Double> value) throws Exception {
                    return value.f0;
                }
            }).reduce(new ReduceFunction<Tuple2<String, Double>>() {
                @Override
                public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) throws Exception {
                    return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                }
            });
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPassword("123456")
                    .setPort(6379).build();
            reduceStream.addSink(new RedisSink<Tuple2<String, Double>>(conf, new PkRedisSink()));
            //reduceStream.addSink(new PkMySqlSink());
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    下面先来看Redis的Sink,这种采用的是实现RedisMapper来实现Redis的写出

    public class PkRedisSink implements RedisMapper<Tuple2<String, Double>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "pk-traffic");
        }
    
        @Override
        public String getKeyFromData(Tuple2<String, Double> data) {
            return data.f0;
        }
    
        @Override
        public String getValueFromData(Tuple2<String, Double> data) {
            return data.f1 + "";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    但是这种方式不是特别灵活,我们一般都使用继承RichSinkFunction来进行数据的写出,因为我们可以使用它的生命周期函数,这个是非常有用的,为什么这样说呢,使用这一方法我们可以适用于非常非常多的Sink的需求,需要修改的不是很多,使用起来也会很方便

    public class PkMySqlSink extends RichSinkFunction<Tuple2<String, Double>> {
    
        Connection connection;
        PreparedStatement insertPstmt;
        PreparedStatement updatePstmt;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Connection connection = MySQLUtils.getConnection();
            insertPstmt = connection.prepareStatement("insert into traffic(domain, traffic) values(?, ?)");
            updatePstmt = connection.prepareStatement("update traffic set traffic = ? where domain = ?");
    
        }
    
        @Override
        public void close() throws Exception {
            super.close();
            if (null != insertPstmt) {
                insertPstmt.close();
            }
    
            if (null != updatePstmt) {
                updatePstmt.close();
            }
    
            if (null != connection) {
                connection.close();
            }
    
        }
    
        @Override
        public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
    
            System.out.println("=====invoke======" + value.f0 + "==>" +value.f1);
            updatePstmt.setString(2, value.f0);
            updatePstmt.setDouble(1, value.f1);
            updatePstmt.execute();
    
            if (updatePstmt.getUpdateCount() == 0) {
                insertPstmt.setString(1, value.f0);
                insertPstmt.setDouble(2, value.f1);
                insertPstmt.execute();
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    最后

    这里就展示了两种写入Sink的方式,其他方式如果需要使用我们可以去查询官方文档,文档都有会有一个demo,大家可以根据demo改成自己需要的就可以sink出去了

  • 相关阅读:
    剑指offer刷题笔记整理
    AWS SAA-C03 #101
    机器学习算法(7)—— 朴素贝叶斯算法
    3D激光点云霍夫变换拟合直线
    在哪里可以制作一本精美的翻页产品册呢?
    工作以来一直在CRUD,Spring源码该怎么阅读?这份价值百万的源码解析让你如有神助!
    C#教程12:结构
    Npm使用教程(详细讲解)
    C# Winform PropertyGrid中文排序
    [开学季]ChatPaper全流程教程
  • 原文地址:https://blog.csdn.net/zly03/article/details/126437166