• Flink-源算子Source(获取数据源)的使用


    5.1 整体介绍

    1. 获取执行环境
    2. 读取数据源
    3. 定义基于数据的转换操作
    4. 定义计算结果的输出位置
    5. 触发程序执行

    5.2 创建集成环境

    5.2.1 获取执行环境

    1. 批处理getExecutionEnvironment
    • 提交命令行设置
    bin/flink run -Dexecution.runtime-mode=BATCH ...
    
    • 1
    • 代码
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    • 1

    使用StreamExecutionEnvironment类调用getExecutionEnvironment的方法[不推荐,直接写死了]

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
    • 1
    • 2

    设置setRuntimeMode 方法,传入 BATCH 模式

    1. 流处理
    // 批处理环境
    ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
    // 流处理环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    5.3 源算子

    5.3.1 读取有界流

    1. 数据准备
    • 基本数据类型
    import java.sql.Timestamp;
    
    public class Event {
        public String user;
        public String url;
        public Long timestamp;
    
        //无参构造方法
        public Event() {
        }
    
        public Event(String user, String url, Long timestamp) {
            this.user = user;
            this.url = url;
            this.timestamp = timestamp;
        }
    
        @Override
        public String toString() {
            return "Event{" +
                    "user='" + user + '\'' +
                    ", url='" + url + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    基本数据类型有:用户名、url以及时间戳

    • 文件
    Mary,./home,1000
    Alice,./cart,2000
    Bob,./prod?id=100,3000
    Bob,./cart,4000
    Bob,./home,5000
    Mary,./home,6000
    Bob,./cart,7000
    Bob,./home,8000
    Bob,./prod?id=10,9000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. 代码

    3种了,从文件,从集合,从元素

    public class SourceTest {
        public static void main(String[] args) throws Exception{
            //1.创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //2.从文件中读取数据(有界流)
            DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");
    
    
            //3.从集合中读取数据
            ArrayList<Integer> nums = new ArrayList<>();
            nums.add(2);
            nums.add(5);
            DataStreamSource<Integer> numsStream = env.fromCollection(nums);
    
            //泛型选择event,从event读取数据
            ArrayList<Event> events = new ArrayList<>();
            events.add(new Event("Mary","./home",1000L));
            events.add(new Event("Bob","./cart",2000L));
            DataStreamSource<Event> stream2 = env.fromCollection(events);
            
            
            //4.从元素读取数据
            //不用通过数组中间装载,直接可以放到fromElement中
            DataStreamSource<Event> stream3 = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L));
    
            stream1.print();
            numsStream.print();
            stream2.print();
            stream3.print();
            env.execute();
            
                }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    1. 结果
    2
    5
    Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    Mary,./home,1000
    Alice,./cart,2000
    Bob,./prod?id=100,3000
    Bob,./cart,4000
    Bob,./home,5000
    Mary,./home,6000
    Bob,./cart,7000
    Bob,./home,8000
    Bob,./prod?id=10,9000
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    5.3.2 从socket读取数据

    1. 启动hadoop2虚拟机中的nc -lk
    2. 代码
            //5.从socket文本流读取
            DataStreamSource<String> stream4 = env.socketTextStream("hadoop2", 7777);
            stream4.print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    1. 结果

    在这里插入图片描述

    5.3.3 读取kafka

    1. 引入连接器依赖
    <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
     <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 启动kafka
    • 启动zk和kadfa
    [hadoop1@hadoop2 kafka]$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties 
    
    • 1
    [hadoop1@hadoop2 kafka]$ ./bin/kafka-server-start.sh -daemon ./config/server.properties 
    
    • 1
    • 启动生产者
    [hadoop1@hadoop2 kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
    
    • 1
    1. 启动程序

    使用addSource方法中传入flink连接器传入的FlinkKafkaConsumer

    		//6.从kafka读取数据
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","hadoop2:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
            DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
            kafkaStream.print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 结果

    在这里插入图片描述

    5.3.4 自定义Source

    1. 思路

    自定义实现SourceFunction接口,重写两个方法run()和cancel()

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    1. 代码
    public class SourceCustomTest {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<Event> customStream = env.addSource(new ClickSource());
            customStream.print();
            env.execute();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 自定义的ClickSource
    public class ClickSource implements SourceFunction<Event> {
    
        //声明一个标志位控制数据生成
        private Boolean running = true;
        @Override
        //泛型为Event
        public void run(SourceContext<Event> ctx) throws Exception {
    
            //随机生成数据
            Random random = new Random();
            //定义字段选取的数据集
            String[] users = {"Mary","Alice","Bob","Cary"};
            String[] urls = {"./home","./cart","./fav","./prod?id=100","/prod?id=10"};
    
            //一直循环生成数据
            while (running){
                String user = users[random.nextInt(users.length-1)];
                String url = users[random.nextInt(urls.length-1)];
                //系统当前事件的毫秒数
                Long timestamp = Calendar.getInstance().getTimeInMillis();
                //collect收集Event发往下游
                ctx.collect(new Event(user,url,timestamp));
    
                Thread.sleep(1000L);
            }
        }
    
        @Override
        public void cancel() {
            running =false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    1. 结果

    在这里插入图片描述

    5.3.5 自定义并行Source

    1. 分析

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    传入的还是SourceFunction,于是说如果是继承了ParallelSourceFunction的话,就可以设置并行度

    1. 代码
    public class SourceCustomTest {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //DataStreamSource customStream = env.addSource(new ClickSource());
            //这边并行度改成2
            DataStreamSource<Integer> customStream = env.addSource(new ParallelCustomSource())
                    .setParallelism(2);
            customStream.print();
            env.execute();
        }
    
        //定义一个静态类吧
        //实现自定义的并行SourceFunction
        public static class ParallelCustomSource implements ParallelSourceFunction<Integer> {
    
            //同样来一个标志位
            private Boolean running =true;
            private Random random = new Random();
    
            @Override
            public void run(SourceContext<Integer> ctx) throws Exception {
                while (running){
                    ctx.collect(random.nextInt());
                    Thread.sleep(1000L);
                }
    
    
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    1. 结果

    在这里插入图片描述

  • 相关阅读:
    硬件设计之——GPIO配置表
    0基础学习VR全景平台篇第117篇:利用插件地拍补地 - PS教程
    crontab失效怎么解决
    【c代码】【字符串数组排序】
    Scratch软件编程等级考试一级——20220619
    STM32第九节(中级篇):RCC(第一节)——时钟树讲解
    cat命令详解
    COCO数据集(Common Objects in COntext)
    腾讯发布 2022 年季度财报,员工月薪 85473 元,网友看完炸了...
    操作系统(五)| 文件系统上 结构 存取方式 文件目录 检索
  • 原文地址:https://blog.csdn.net/m0_46507516/article/details/127896407