• 2.Flink安装部署\Local本地模式-了解\Standalone独立集群模式\Standalone-HA高可用集群模式(原理|操作|测试)


    本文来自:Flink1.12-2021黑马程序员贺岁视频

    2.Flink安装部署
    2.1.Local本地模式-了解
    2.1.1.原理
    2.1.2.操作
    2.1.3.测试
    2.2.Standalone独立集群模式
    2.2.1.原理
    2.2.2.操作
    2.2.3.测试
    2.3.Standalone-HA高可用集群模式
    2.3.1.原理
    2.3.2.操作
    2.3.3.测试

    2.Flink安装部署

    2.1.Local本地模式-了解

    2.1.1.原理

    在这里插入图片描述

    1、Flink程序由JobClient进行提交。
    2、JobClient将作业提交给JobManager
    3、JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
    4、TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。
    5、作业执行完成后,结果将发送回客户端(JobClient)

    2.1.2.操作

    1.下载安装包
    https://archive.apache.org/dist/flink/

    2.上传flink-1.12.0-bin-scala_2.12.tgz到node1的指定目录

    3.解压
    tar -zxvf flink-1.12.0-bin-scala_2.12.tgz

    4.如果出现权限问题,需要修改权限
    chown -R root:root /export/server/flink-1.12.0

    5.改名或创建软链接
    mv flink-1.12.0 flink
    ln -s /export/server/flink-1.12.0 /export/server/flink

    2.1.3.测试

    1.准备文件/root/words.txt

    vim /root/words.txt
    hello me you her
    hello me you
    hello me
    hello
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.启动Flink本地”集群”

    /export/server/flink/bin/start-cluster.sh
    
    • 1

    3.使用jps可以查看到下面两个进程

    - TaskManagerRunner
    - StandaloneSessionClusterEntrypoint
    
    • 1
    • 2

    4.访问Flink的Web UI
    http://node1:8081/#/overview
    在这里插入图片描述

    slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。

    5.执行官方示例

    /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out
    
    • 1

    6.停止Flink

    /export/server/flink/bin/stop-cluster.sh
    
    • 1

    启动shell交互式窗口(目前所有Scala2.12版本的安装包暂时都不支持Scala Shell)

    /export/server/flink/bin/start-scala-shell.sh local
    
    • 1

    执行如下命令:

    benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
    
    • 1

    退出shell

    :quit
    
    • 1

    2.2.Standalone独立集群模式

    2.2.1.原理

    在这里插入图片描述

    2.2.2.操作

    1.集群规划

    • 服务器: node1(Master + Slave): JobManager + TaskManager
    • 服务器: node2(Slave): TaskManager
    • 服务器: node3(Slave): TaskManager

    2.修改flink-conf.yaml

    vim /export/server/flink/conf/flink-conf.yaml
    jobmanager.rpc.address: node1
    taskmanager.numberOfTaskSlots: 2
    web.submit.enable: true
    
    #历史服务器
    jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
    historyserver.web.address: node1
    historyserver.web.port: 8082
    historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.修改masters

    vim /export/server/flink/conf/masters
    
    • 1

    node1:8081

    4.修改slaves

    vim /export/server/flink/conf/workers
    node1
    node2
    node3
    
    • 1
    • 2
    • 3
    • 4

    5.添加HADOOP_CONF_DIR环境变量

    vim /etc/profile
    
    • 1
    export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
    
    • 1

    6.分发

    scp -r /export/server/flink node2:/export/server/flink
    scp -r /export/server/flink node3:/export/server/flink
    scp /etc/profile node2:/etc/profile
    scp /etc/profile node3:/etc/profile
    
    • 1
    • 2
    • 3
    • 4

    for i in {2..3}; do scp -r flink node$i:$PWD; done
    
    • 1

    7.source

    source /etc/profile
    
    • 1

    2.2.3.测试

    1.启动集群,在node1上执行如下命令

    /export/server/flink/bin/start-cluster.sh
    
    • 1

    或者单独启动

    /export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
    /export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all
    
    • 1
    • 2

    2.启动历史服务器

    /export/server/flink/bin/historyserver.sh start
    
    • 1

    3.访问Flink UI界面或使用jps查看

    http://node1:8081/#/overview
    http://node1:8082/#/overview
    
    • 1
    • 2

    4.执行官方测试案例

    /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
    
    • 1

    5.停止Flink集群

    /export/server/flink/bin/stop-cluster.sh
    
    • 1

    2.3.Standalone-HA高可用集群模式

    2.3.1.原理

    在这里插入图片描述

    2.3.2.操作

    1.集群规划

    • 服务器: node1(Master + Slave): JobManager + TaskManager
    • 服务器: node2(Master + Slave): JobManager + TaskManager
    • 服务器: node3(Slave): TaskManager

    2.启动ZooKeeper

    zkServer.sh status
    zkServer.sh stop
    zkServer.sh start
    
    • 1
    • 2
    • 3

    3.启动HDFS

    /export/serves/hadoop/sbin/start-dfs.sh 
    
    • 1

    4.停止Flink集群

    /export/server/flink/bin/stop-cluster.sh
    
    • 1

    5.修改flink-conf.yaml

    vim /export/server/flink/conf/flink-conf.yaml
    
    • 1

    增加如下内容

    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
    high-availability: zookeeper
    high-availability.storageDir: hdfs://node1:8020/flink/ha/
    high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6.修改masters

    vim /export/server/flink/conf/masters
    
    • 1

    7.同步

    scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
    scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
    scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
    scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/
    
    • 1
    • 2
    • 3
    • 4

    8.修改node2上的flink-conf.yaml

    vim /export/server/flink/conf/flink-conf.yaml
    
    • 1
    jobmanager.rpc.address: node2
    
    • 1

    9.重新启动Flink集群,node1上执行

    /export/server/flink/bin/stop-cluster.sh
    /export/server/flink/bin/start-cluster.sh
    
    • 1
    • 2

    在这里插入图片描述
    10.使用jps命令查看
    发现没有Flink相关进程被启动

    11.查看日志

    cat /export/server/flink/log/flink-root-standalonesession-0-node1.log
    
    • 1

    发现如下错误
    在这里插入图片描述
    因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar

    12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
    下载地址

    https://flink.apache.org/downloads.html
    
    • 1

    13.放入lib目录

    cd /export/server/flink/lib
    
    • 1

    在这里插入图片描述
    14.分发

    for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done
    
    • 1

    15.重新启动Flink集群,node1上执行

    /export/server/flink/bin/stop-cluster.sh
    /export/server/flink/bin/start-cluster.sh
    
    • 1
    • 2

    16.使用jps命令查看,发现三台机器已经

    2.3.3.测试

    1.访问WebUI

    http://node1:8081/#/job-manager/config
    http://node2:8081/#/job-manager/config
    
    • 1
    • 2

    2.执行wc

    /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar
    
    • 1

    3.kill掉其中一个master

    4.重新执行wc,还是可以正常执行

    /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar
    
    • 1

    5.停止集群

    /export/server/flink/bin/stop-cluster.sh	
    
    • 1
  • 相关阅读:
    Redis主从模式下过期数据和数据不一致
    矩阵类问题处理技巧
    深度学习入门(十六)实战Kaggle比赛:预测房价
    Python实现猎人猎物优化算法(HPO)优化循环神经网络回归模型(LSTM回归算法)项目实战
    C#内映射lua表
    在Centos7.9_2207安装CDH6.3.2
    Java基础语法部分
    .NET性能优化-使用ValueStringBuilder拼接字符串
    PMP每日一练 | 考试不迷路-11.28(包含敏捷+多选)
    学习笔记——网络管理与运维——概述(背景)
  • 原文地址:https://blog.csdn.net/toto1297488504/article/details/125630845