• flink 入门(一)


    flink 入门(一)

    image-20220930110928773

    简介

    ​ 阅读目标:

    • 本文为入门级别文章,即阅读完下文你需要简单的知道 flink 是做什么用的,他的主要特点是什么。工欲善其事必先利其器更深入的了解,待熟练后再回头看看。

    • 简而言之flink就是一个框架,你在框架里面编写代码(接收从某处来的数据->数据处理/转换->将处理好的数据输出到某地),将编写好的代码交给flink集群,由集群取调度任务去处理

    • 阅读并实践本文可能会存在某些问题,你还需要阅读其他文章/博客加深对flink的理解(如下文中提到的某些概念:有界、无界等等

      实际是因为我懒得写了。。。

    ​ Flink 起源于一个叫作 Stratosphere 的项目,它是由 3 所地处柏林的大学和欧洲其他一些大 学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl) 领衔开发。2014 年 4 月,Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,Flink 就 是在此基础上被重新设计出来的。 在德语中,“flink”一词表示“快速、灵巧”。项目的 logo 是一只彩色的松鼠,当然了, 这不仅是因为 Apache 大数据项目对动物的喜好(是否联想到了 HadoopHive?),更是因为 松鼠这种小动物完美地体现了“快速、灵巧”的特点。关于 logo 的颜色,还一个有趣的缘由: 柏林当地的松鼠非常漂亮,颜色是迷人的红棕色;而 Apache 软件基金会的 logo,刚好也是一 根以红棕色为主的渐变色羽毛。于是,Flink 的松鼠 Logo 就设计成了红棕色,而且拥有一个漂 亮的渐变色尾巴,尾巴的配色与 Apache 软件基金会的 logo 一致。这只松鼠色彩炫目,既呼应 了 Apache 的风格,似乎也预示着 Flink 未来将要大放异彩。

    ​ Flink 的官网主页地址:https://flink.apache.org/ 在 Flink 官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。

    ​ 很多专业词汇,我们从中至少可以提炼出一些容易理解的信息:Flink 是一个“框 架”,是一个数据处理的“引擎”;既然是“分布式”,当然是为了应付大规模数据的应用场景 了;另外,Flink 处理的是数据流。所以,Flink 是一个流式大数据处理引擎。 而“内存执行速度”和“任意规模”,突出了 Flink 的两个特点:速度快、可扩展性强— —这说的自然就是小松鼠的“快速”和“灵巧”了。

    java 开发案例

    以下案例为环境jdk1.8,且以下案例均为展示使用,目的是为了明白这两种方式的区别以及基本使用

    • jdk 1.8
    • maven
    • win10
    • flink 1.15.2

    以下示例代码仅做入门级别使用,非生产可用。

    • pom文件

      
              UTF-8
              1.8
      
              1.15.2
              1.8
              2.12
              2.17.1
      
      
      
              
              
              
                  org.apache.flink
                  flink-streaming-java
                  ${flink.version}
                  provided
              
              
                  org.apache.flink
                  flink-clients
                  ${flink.version}
                  provided
              
      
              
              
                  org.apache.flink
                  flink-connector-kafka
                  ${flink.version}
              
      
              
              
                  org.ansj
                  ansj_seg
                  5.1.6
              
      
      
              
              
              
                  org.apache.logging.log4j
                  log4j-slf4j-impl
                  ${log4j.version}
                  runtime
              
              
                  org.apache.logging.log4j
                  log4j-api
                  ${log4j.version}
                  runtime
              
              
                  org.apache.logging.log4j
                  log4j-core
                  ${log4j.version}
                  runtime
              
          
      

    Streaming(无界)

    这里可以简单的理解为源源不断的数据,需要不断监听某个消息队列(kafka)或者其他来源。

        public static final String HOST = "192.168.20.127";
        public static final Integer PORT = 8888;
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource source = environment.socketTextStream(HOST, PORT);
    
            SingleOutputStreamOperator> wordsCollector = source.flatMap((String line, Collector> collector) -> {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(new Tuple2(word, 1L));
                }
    
            }).returns(Types.TUPLE(Types.STRING, Types.LONG));
    
    
            SingleOutputStreamOperator> sum = wordsCollector.keyBy(0).sum(1);
    
            sum.print();
    
            environment.execute();
        }
    

    Batch(有界)

    这里可以简单的理解为批量数据处理。

    kafka
    • 运行类

      public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
      
              //new 一个实例!
              Properties properties = new Properties();
      
              //告诉程序我们要接收那台机器上生产的数据
              properties.setProperty("bootstrap.servers", "master:9092");
      
              //告诉程序开启分区,已经分区名称
              properties.setProperty("group.id", "temp-1");
      
              //属性key.serializer和value.serializer就是key和value指定的序列化方式。
              properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      
              //读取kafka数据的时候需要指定消费策略,如果不指定会使用auto.offset.reset设置
              //earliest当各分区下有已提交的offset时,从提交的offset开始消费;
              //无提交的offset时,从头开始消费;
              //latest,当各分区下有已提交的offset时,从提交的offset开始消费;
              //无提交的offset时,消费新产生的该分区下的数据;
              //none,topic各分区都存在已提交的offset时,从offset后开始消费;
              //只要有一个分区不存在已提交的offset,则抛出异常
              properties.setProperty("auto.offset.reset", "earliest");
      
              //enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
              properties.setProperty("enable.auto.commit", "false");
      
              //如果FlinkKafkaConsumer没有开启checkpoint功能,为了不重复读取
              //这种方式无法实现Exactly-Once(只执行一次)
              FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer("test_topic", new SimpleStringSchema(), properties);
      
              DataStreamSource lines = environment.addSource(flinkKafkaConsumer);
      
              SingleOutputStreamOperator> sum = lines.flatMap((String line, Collector> collector) -> {
      
                  List terms = ToAnalysis.parse(line).getTerms();
      
                  terms.forEach(item -> {
                      collector.collect(new Tuple2<>(item.getName(), 1L));
                  });
      
      
              }).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(0).sum(1);
      
      
              sum.print();
      
              environment.execute("word-coun-kafka");
          }
      

    任务提交

    提交有两种方式

    • web-ui界面

      1. 访问部署服务器 ip:8081

      2. 点击 Submit new Job

        image-20220930110400414

      3. 点击Add new

      4. 编辑Entry class与Parallelism等

        1. Entry class 为入口类 即为上文中的运行main()函数的类的全限定名

        image-20220930110446236

      5. 点击Submit

      6. 点击Jobs -> Running Jobs 查看

        image-20220930110504840

    • 命令行

      如果要把job提交到jobmanager,应该在jobmanager服务器上提交

    flink 安装与部署

    • Flink的安装和部署主要分为本地模式和集群模式,其中本地模式只需直接解压就可以使用,不以修改任何参数,一般在做一些简单测试的时候使用。

    • 集群模式包含Standalone、Flink on Yarn等模式,适合在生产环境下面使用,且需要修改对应的配置 参数。

    flink 下载

    ## 官方版本(可能下载速度慢)
    curl -O https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
    ## 腾讯云镜像(推荐,国内速度快)
    curl -O http://mirrors.cloud.tencent.com/apache/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
    

    下载完成解压,解压后目录如下

    image-20220929104102078

    CentOS/Kernel环境

    系统环境

    以下均基于 Kernel

    • CentOS Linux release 7.9.2009 (Core)
    • Linux version 3.10.0-1160.el7.x86_64
    • gcc version 4.8.5 20150623 (Red Hat 4.8.5-44) (GCC)
    • open-jdk 11
    • 大部分过程中使用root用户。请在生产环境或特殊环境注意用户切换。本文不在linux用户做过多赘述。
    本地模式

    自己是jobmanager也是taskmanager(会话模式)

    1. 配置文件详解

      1. 修改conf/flink-conf.yaml
      cd conf
      vim flink-conf.yaml
      
      # 此处修改集群时需要修改
      jobmanager.rpc.address: localhost
      # 默认1623
      jobmanager.rpc.port: 6123
      # 任务管理默认
      jobmanager.memory.process.size: 1600m
      taskmanager.memory.process.size: 1728m
      # 任务槽 资源(并行执行 相当于 组)
      taskmanager.numberOfTaskSlots: 1
      # 默认并行度
      parallelism.default: 1
      # web界面默认端口 需要修改时 解开注释
      #rest.port: 8081
      
      1. master

        当前jobmanager(默认localhost)以及webui端口(默认8081)

      2. works

        单节点启动默认这里面没有东西

    2. 启动脚本

      # 进入flink bin目录
      cd bin
      # 单节点集群启动
      ./start-cluster.sh
      
    3. 访问服务器ip加8081(默认)

    4. 停止服务

      # 进入flink bin目录
      cd bin
      # 单节点集群启动
      ./stop-cluster.sh
      
    集群

    至少需要三台服务器。一台jobmanager,两台taskmanager,三台服务器之间需要配置免密登录,这里为了方便,我修改了hosts文件,三台服务器分别为

    masterslave0slave1。(会话模式)

    1. 修改hosts(ip地址 主机名/域名 (主机别名))

      自己的服务器IP-1 master
      自己的服务器IP-2 slave0
      自己的服务器IP-3 slave1
      

      使配置文件生效请参考 CentOS修改hosts

    2. 服务器之间免密登录

      请自行百度/google(master 最好也将自身产生的秘钥导入自身,不导也可以会导致每次启动flink需要输入本机密码)

    3. 修改配置文件

      • master 服务器

        • flink-conf.yaml

          # 用于节点间通信
          jobmanager.rpc.address: 0.0.0.0
          
        • master

          master:8081
          
        • works

          # 另外两台机器
          slave0
          slave1
          
      • slave0 服务器

        • flink-conf.yaml

          jobmanager.rpc.address: master
          # 不改此处 集群运行后 solt为0
          jobmanager.bind-host: 0.0.0.0
          
        • master

          master:8081
          
        • works

          slave0
          slave1
          
      • slave1 服务器

        • flink-conf.yaml

          jobmanager.rpc.address: master
          # 不改此处 集群运行后 solt为0
          jobmanager.bind-host: 0.0.0.0
          
        • master

          master:8081
          
        • works

          slave0
          slave1
          
    4. 修改环境变量

      master/slave0/slave1 分别执行以下操作(因文件都是由master分发,所以目录位置应都一致,当然可自行修改)

      ## 修改环境变量
      vim /etc/profile
      
      ## 新增以下内容
      
      export FLINK_HOME=/software/flink-cluster/flink/
      export PATH=$PATH:$FLINK_HOME/bin
      
      ## 使环境变量生效
      source /etc/profile  
      
    5. 运行集群

      master bin目录下执行,看到以下几截图后集群启动成功,即可访问webUI界面

      ./start-cluster.sh
      

      image-20220929151931120

      且执行jps命令后

      image-20220929152033006

      slave0slave1执行jps

      image-20220929152141082

      web ui 界面

      image-20220929152941340

    jdk安装(多版本切换)

    ## 下载openjdkjdk
    curl -O https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz
    
    ## 解压
    tar zxf openjdk-11+28_linux-x64_bin.tar.gz
    
    ## 添加jdk11  /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
    sudo update-alternatives --install /usr/bin/java java /home/flink/opt/jdk-11/bin/java 1
    
    ## 添加jdk11  /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
    sudo update-alternatives --install /usr/bin/javac javac /home/flink/opt/jdk-11/bin/javac 1
    
    ## 切换
    sudo update-alternatives --config java
    sudo update-alternatives --config javac
    

    docker for windows

    windows 10 专业版 21H2 WSL2

    下载docker-desktop docker 历史版本

    运行Docker Desktop Installer.exe

    参考链接

    用户提权

    su
    chmod -v u+w /etc/sudoers
    vim /etc/sudoers
    
    root       ALL=(ALL)           ALL
    
    
    chmod -v u-w /etc/sudoers
    
    exit
    

    centos镜像

    阿里云centos镜像

    北京外国语大学开源镜像

    Vmware

    VMWARE

    VMWARE 秘钥以及安装

    VMWARE TOOLS

    其他参考

    Storm入门 3

    Flink从入门到入土(详细教程)

    JDK11下载界面

    flink下载界面

    flink官方安装教程

    flink-streaming-platform-web

    flink国内镜像 腾讯云

    flink-kafka

  • 相关阅读:
    leetcode解题思路分析(一百三十一)1103 - 1109 题
    电力通信专业技术总结,智能电网通信技术总结
    前途无量的MEMS传感器技术
    外汇天眼:每周都能赢奖金?
    鸿鹄工程项目管理系统 Spring Cloud+Spring Boot+Mybatis+Vue+ElementUI+前后端分离构建工程项目管理系统
    值得推荐的小型 C 语言开源项目:Triggerhappy
    牛客小白月赛52 E 分组求对数和(容斥定理+二分)
    限流、流量控制方案
    static成员
    STM32G4系列之DAC
  • 原文地址:https://blog.csdn.net/qq_37681291/article/details/127120738