• Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)


    flink输出到es、redis、mysql、kafka、file

    自己先准备一下相关环境

    配置pom文件

     
            8
            8
            UTF-8
            1.13.0
            1.8
            2.12
            1.7.30
        
        
            
            
                org.apache.flink
                flink-java
                ${flink.version}
            
            
                org.apache.flink
                flink-streaming-java_${scala.binary.version}
                ${flink.version}
            
            
                org.apache.flink
                flink-clients_${scala.binary.version}
                ${flink.version}
            
            
            
                org.slf4j
                slf4j-api
                ${slf4j.version}
            
            
                org.slf4j
                slf4j-log4j12
                ${slf4j.version}
            
            
                org.apache.logging.log4j
                log4j-to-slf4j
                2.14.0
            
    
            
            
                org.apache.flink
                flink-connector-kafka_${scala.binary.version}
                ${flink.version}
            
            
                org.projectlombok
                lombok
                1.18.22
                compile
            
    
            
                com.alibaba
                fastjson
                1.2.58
            
    
        
        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
        
    
        
        
            org.apache.flink
            flink-connector-elasticsearch7_${scala.binary.version}
            ${flink.version}
        
    
         
        
            org.apache.flink
            flink-connector-jdbc_${scala.binary.version}
            ${flink.version}
        
        
            mysql
            mysql-connector-java
            5.1.47
        
        
        
            
                
                    org.apache.maven.plugins
                    maven-assembly-plugin
                    3.0.0
                    
                        
                            jar-with-dependencies
                        
                    
                    
                        
                            make-assembly
                            package
                            
                                single
                            
                        
                    
                
                
                    net.alchim31.maven
                    scala-maven-plugin
                    3.2.2
                    
                        
                            scala-compile-first
                            process-resources
                            
                                add-source
                                compile
                            
                        
                        
                            scala-test-compile
                            process-test-resources
                            
                                testCompile
                            
                        
                    
                
                
                    org.apache.maven.plugins
                    maven-compiler-plugin
                    
                        1.8
                        1.8
                    
                
            
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141

    公共实体类

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @NoArgsConstructor
    @ToString
    @AllArgsConstructor
    public class UserEvent {
        private String userName;
        private String url;
        private Long timestemp;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    KafkaSInk

    将数据输出到kafka中,先启动kafka consumer,再运行程序

    import com.event.UserEvent;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.lang.reflect.Array;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaSinkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            Properties properties = new Properties();
            //kafka相关配置
            properties.setProperty("bootstrap.servers", "hadoop01:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
    
            DataStreamSource<String> stream = env.fromCollection(Arrays.asList(
                    "xiaoming,www.baidu.com,1287538716253",
                    "Mr Li,www.baidu.com,1287538710000",
                    "Mr Zhang,www.baidu.com,1287538710900"
            ));
    
            SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                	//输出规则
                    String[] split = value.split(",");
                    return new UserEvent(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString();
                }
            });
    		//启动kafkaconsumer指令
    		// ./bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --topic events
            result.addSink(new FlinkKafkaProducer<String>(
            		//kafka所在地址
                    "hadoop01:9092",
                    //指定输出的topic
                    "events",
                    new SimpleStringSchema()
            ));
    
            env.execute();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    运行结果

    在这里插入图片描述

    ElasticsearchSink(EsSink)

    将数据输出到elasticsearch

    示例代码

    
    import com.event.UserEvent;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
    import org.apache.flink.table.descriptors.Elasticsearch;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    
    public class EsSinkTest {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource userEventDataStreamSource =
                    env.fromCollection(
                            Arrays.asList(
                                    new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
                                    new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
                                    new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
                                    new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
                                    new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
                                    new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
                            ));
    
    
            //定义host列表
            List hosts = Arrays.asList(new HttpHost("hadoop01", 9200));
    
            //定义ElasticsearchSinkFunction
            ElasticsearchSinkFunction elasticsearchSinkFunction = new ElasticsearchSinkFunction() {
                @Override
                public void process(UserEvent userEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                    IndexRequest indexRequest = Requests.indexRequest()
                            .index("events")
                            .type("type")
                            .source(new HashMap() {{
                                put(userEvent.getUserName(), userEvent.getUrl());
                            }});
                    requestIndexer.add(indexRequest);
                }
            };
    
            //写入es
            userEventDataStreamSource.addSink(new ElasticsearchSink.Builder<>(hosts, elasticsearchSinkFunction).build());
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    指令

    GET _cat/indices
    
    GET _cat/indices/events
    
    GET events/_search
    
    • 1
    • 2
    • 3
    • 4
    • 5

    运行结果
    在这里插入图片描述

    RedisSink

    将数据输出到Redis

    示例代码

    
    import com.event.UserEvent;
    import my.test.source.CustomSouce;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    public class RedisSinkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<UserEvent> streamSource = env.addSource(new CustomSouce());
    
            //创建jedis连接配置
            FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                    .setHost("master")
                    .setTimeout(10000)
                    .setPort(6379)
                    .build();
            
    
            //写到redis
            streamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));
    
    
            env.execute();
        }
    
        public static class MyRedisMapper implements RedisMapper<UserEvent>{
            @Override
            public RedisCommandDescription getCommandDescription() {
                //写入方式为hset
                return new RedisCommandDescription(RedisCommand.HSET, "events"); //additionalKey参数标识存储再哪里
            }
    
            @Override
            public String getKeyFromData(UserEvent userEvent) {
                return userEvent.getUserName();
            }
    
            @Override
            public String getValueFromData(UserEvent userEvent) {
                return userEvent.getUrl();
            }
        }
    
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    自定义source

    import com.event.UserEvent;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.Calendar;
    import java.util.Random;
    
    public class CustomSouce implements SourceFunction {
        // 声明一个布尔变量,作为控制数据生成的标识位
        private Boolean running = true;
    
        @Override
        public void run(SourceContext ctx) throws Exception {
            Random random = new Random(); // 在指定的数据集中随机选取数据
            String[] users = {"Mary", "Alice", "Bob", "Cary"};
            String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
                    "./prod?id=2"};
            while (running) {
                ctx.collect(new UserEvent(
                        users[random.nextInt(users.length)],
                        urls[random.nextInt(urls.length)],
                        Calendar .getInstance().getTimeInMillis()
                ));
                // 隔 1 秒生成一个点击事件,方便观测
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            running = false;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    运行结果
    因为上述source是一个无界流,所以数据一直会变化
    在这里插入图片描述

    MysqlSink(JdbcSink)

    将数据输出到mysql

    表结构

    create table events(
        user_name varchar(20) not null,
        url varchar(100) not null
    );
    
    • 1
    • 2
    • 3
    • 4

    示例代码

    
    import com.event.UserEvent;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    
    public class MysqlSinkTest {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //一组数据
            DataStreamSource<UserEvent> userEventDataStreamSource =
                    env.fromCollection(
                            Arrays.asList(
                                    new UserEvent("zhangsan", "/path?test123", System.currentTimeMillis() - 2000L),
                                    new UserEvent("zhangsan", "/path?test", System.currentTimeMillis() + 2000L),
                                    new UserEvent("lisi", "/path?checkParam", System.currentTimeMillis()),
                                    new UserEvent("bob", "/path?test", System.currentTimeMillis() + 2000L),
                                    new UserEvent("mary", "/path?checkParam", System.currentTimeMillis()),
                                    new UserEvent("lisi", "/path?checkParam123", System.currentTimeMillis() - 2000L)
                            ));
    
    
           userEventDataStreamSource.addSink(JdbcSink.sink(
           			//要执行的sql语句
                   "INSERT INTO events (user_name, url) VALUES(?, ?)",
                   new JdbcStatementBuilder<UserEvent>() {
                       @Override
                       public void accept(PreparedStatement preparedStatement, UserEvent userEvent) throws SQLException {
                       		//sql占位符赋值
                            preparedStatement.setString(1, userEvent.getUserName());
                            preparedStatement.setString(2, userEvent.getUrl());
                       }
                   },
                   //jdbc相关参数配置
                   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                           .withUrl("jdbc:mysql://hadoop01:3306/mysql")
                           .withUsername("root")
                           .withPassword("123456")
                           .withDriverName("com.mysql.jdbc.Driver")
                           .build()
           ));
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    当程序运行结束之后可以看到mysql的events表里面多了数据
    在这里插入图片描述

    FileSink

    将数据输出到文件中(可以输出分区文件)

    import com.event.UserEvent;
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    import org.apache.flink.util.TimeUtils;
    
    import java.util.Arrays;
    import java.util.concurrent.TimeUnit;
    
    public class FileSinkTest {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<UserEvent> userEventDataStreamSource =
                    env.fromCollection(
                            Arrays.asList(
                                    new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
                                    new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
                                    new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
                                    new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
                                    new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
                                    new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
                            ));
    
    
            StreamingFileSink<String> streamingFileSink = StreamingFileSink.
                    <String>forRowFormat(new Path("./output/"), new SimpleStringEncoder<>("UTF-8"))
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    .withMaxPartSize(1024 * 1024 * 1024)
                                    .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                    //不活跃的间隔时间,用于归档保存使用
                                    .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                    .build()
                    ).build();
    
            userEventDataStreamSource.map(data -> data.getUserName()).addSink(streamingFileSink);
    
    
            env.execute();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    运行结束后会多出来一些文件
    在这里插入图片描述

  • 相关阅读:
    对基本数据类型、String类型、Object类型元素所构成的数组或List集合进行排序
    线程池ThreadPoolExecutor
    4月02日,每日信息差
    java毕业设计橱柜定制系统Mybatis+系统+数据库+调试部署
    C语言 内存操作函数
    Centos下安装MySQL,配置远程连接(无坑版)
    2020年江西省职业院校技能大赛高职组“信息安全管理与评估”赛项任务书样题
    GAMES104-引擎架构分层
    iPhone 15有始终显示功能吗?它会出现在更多的苹果手机上吗?
    Angular知识点系列(4)-每天10个小知识
  • 原文地址:https://blog.csdn.net/asd1358355022/article/details/128044483