• CentOS7安装flink1.17完全分布式


    前提条件

    准备三台CenOS7机器,主机名称,例如:node2,node3,node4

    三台机器安装好jdk8,通常情况下,flink需要结合hadoop处理大数据问题,建议先安装hadoop,可参考 hadoop安装

    Flink集群规划

    node2node3node4

    JobManager

    TaskManager

    TaskManagerTaskManager

    下载安装包

    在node2机器操作

    [hadoop@node2 ~]$ cd installfile/
    [hadoop@node2 installfile]$ wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz --no-check-certificate
    

    解压安装包

    [hadoop@node2 installfile]$ tar -zxvf flink-1.17.1-bin-scala_2.12.tgz -C ~/soft

    进入到解压后的目录,查看解压后的文件

    [hadoop@node2 installfile]$ cd ~/soft/
    [hadoop@node2 soft]$ ls
    ​

    配置环境变量

    [hadoop@node2 soft]$ sudo nano /etc/profile.d/my_env.sh

    添加如下内容

    #FLINK_HOME
    export FLINK_HOME=/home/hadoop/soft/flink-1.17.1
    export PATH=$PATH:$FLINK_HOME/bin

    让环境变量生效

    [hadoop@node2 soft]$ source /etc/profile

    验证版本号

    [hadoop@node2 soft]$ flink -v
    Version: 1.17.1, Commit ID: 2750d5c

    看到如上Version: 1.17.1版本号字样,说明环境变量配置成功。

    配置flink

    进入flink配置目录,查看配置文件

    [hadoop@node2 ~]$ cd $FLINK_HOME/conf
    [hadoop@node2 conf]$ ls
    flink-conf.yaml       log4j-console.properties  log4j-session.properties  logback-session.xml  masters  zoo.cfg
    log4j-cli.properties  log4j.properties          logback-console.xml       logback.xml          workers
    ​
    

    配置flink-conf.yaml

    [hadoop@node2 conf]$ vim flink-conf.yaml

    找到相关配置项并修改,如下

    jobmanager.rpc.address: node2
    jobmanager.bind-host: 0.0.0.0
    taskmanager.bind-host: 0.0.0.0
    taskmanager.host: node2
    rest.address: node2
    rest.bind-address: 0.0.0.0

    配置workers

    [hadoop@node2 conf]$ vim workers

    把原有内容删除,添加内容如下:

    node2
    node3
    node4

    配置masters

    [hadoop@node2 conf]$ vim masters 

    修改后内容如下:

    node2:8081

    分发flink安装目录

    确保node3、node4机器已开启的情况下,执行如下分发命令。

    [hadoop@node2 conf]$ xsync ~/soft/flink-1.17.1

    修改node3和node4的配置

    node3

    进入node3机器flink的配置目录

    [hadoop@node3 ~]$ cd ~/soft/flink-1.17.1/conf/

    配置flinke-conf.yaml文件

    [hadoop@node3 conf]$ vim flink-conf.yaml

    taskmanager.host的值修改为node3

    taskmanager.host: node3

    node4

    进入node4机器flink的配置目录

    [hadoop@node4 ~]$ cd ~/soft/flink-1.17.1/conf/

    配置flinke-conf.yaml文件

    [hadoop@node4 conf]$ vim flink-conf.yaml

    taskmanager.host的值修改为node4

    taskmanager.host: node4

    配置node3、node4的环境变量

    分别到node3、node4机器配置环境变量

    sudo nano /etc/profile.d/my_env.sh

    添加如下配置

    #FLINK_HOME
    export FLINK_HOME=/home/hadoop/soft/flink-1.17.1
    export PATH=$PATH:$FLINK_HOME/bin

    让环境变量生效

    source /etc/profile

    验证版本号

    flink -v

    看到Version: 1.17.1版本号字样,说明环境变量配置成功。

    启动flink集群

    在node2机器,执行如下命令启动集群

    [hadoop@node2 conf]$ start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host node2.
    Starting taskexecutor daemon on host node2.
    Starting taskexecutor daemon on host node3.
    Starting taskexecutor daemon on host node4.

    查看进程

    分别在node2、node3、node4机器上执行jps查看进程

    [hadoop@node2 conf]$ jps
    2311 StandaloneSessionClusterEntrypoint
    2793 Jps
    2667 TaskManagerRunner
    ​
    [hadoop@node3 conf]$ jps
    1972 TaskManagerRunner
    2041 Jps
    ​
    [hadoop@node4 conf]$ jps
    2038 Jps
    1965 TaskManagerRunner
    ​

    node2有StandaloneSessionClusterEntrypointTaskManagerRunner进程

    node3有TaskManagerRunner进程

    node4有TaskManagerRunner进程

    看到如上进程,说明flink集群配置成功。

    Web UI

    浏览器访问

    node2的ip:8081
    

    或者使用主机名称代替ip访问

    node2:8081

    注意:如果用windows的浏览器访问,需要先在windows的hosts文件添加ip和主机名node2的映射。

    关闭flink集群

    [hadoop@node2 ~]$ stop-cluster.sh 
    Stopping taskexecutor daemon (pid: 2667) on host node2.
    Stopping taskexecutor daemon (pid: 1972) on host node3.
    Stopping taskexecutor daemon (pid: 1965) on host node4.
    Stopping standalonesession daemon (pid: 2311) on host node2.

    查看进程

    [hadoop@node2 ~]$ jps
    4215 Jps
    ​
    [hadoop@node3 ~]$ jps
    2387 Jps
    ​
    [hadoop@node4 ~]$ jps
    2383 Jps
    ​

    单独启动/关闭flink进程

    单独启动flink进程

    $ jobmanager.sh start
    $ taskmanager.sh start
    

    node2

    [hadoop@node2 ~]$ jobmanager.sh start
    Starting standalonesession daemon on host node2.
    [hadoop@node2 ~]$ jps
    4507 StandaloneSessionClusterEntrypoint
    4572 Jps
    ​
    [hadoop@node2 ~]$ taskmanager.sh start
    Starting taskexecutor daemon on host node2.
    [hadoop@node2 ~]$ jps
    4867 TaskManagerRunner
    4507 StandaloneSessionClusterEntrypoint
    4940 Jps
    ​
    

    node3

    [hadoop@node3 ~]$ taskmanager.sh start
    Starting taskexecutor daemon on host node3.
    [hadoop@node3 ~]$ jps
    2695 TaskManagerRunner
    2764 Jps
    ​
    

    node4

    [hadoop@node4 ~]$ taskmanager.sh start
    Starting taskexecutor daemon on host node4.
    [hadoop@node4 ~]$ jps
    2691 TaskManagerRunner
    2755 Jps
    ​

    单独关闭flink进程

    $ jobmanager.sh stop
    $ taskmanager.sh stop
    

    node4

    [hadoop@node4 ~]$ taskmanager.sh stop
    Stopping taskexecutor daemon (pid: 2691) on host node4.
    [hadoop@node4 ~]$ jps
    3068 Jps

    node3

    [hadoop@node3 ~]$ taskmanager.sh stop
    Stopping taskexecutor daemon (pid: 2695) on host node3.
    [hadoop@node3 ~]$ jps
    3073 Jps

    node2

    [hadoop@node2 ~]$ taskmanager.sh stop
    Stopping taskexecutor daemon (pid: 4867) on host node2.
    [hadoop@node2 ~]$ jobmanager.sh stop
    Stopping standalonesession daemon (pid: 4507) on host node2.
    [hadoop@node2 ~]$ jps
    5545 Jps

    提交应用测试

    启动flink集群

    [hadoop@node2 ~]$ start-cluster.sh 

    运行flink提供的wordcount案例程序

    [hadoop@node2 ~]$ cd $FLINK_HOME/
    [hadoop@node2 flink-1.17.1]$ flink run examples/streaming/WordCount.jar
    Executing example with default input data.
    Use --input to specify file input.
    Printing result to stdout. Use --output to specify output path.
    Job has been submitted with JobID 845db6f62321830f287e71b525e87dbe
    Program execution finished
    Job with JobID 845db6f62321830f287e71b525e87dbe has finished.
    Job Runtime: 1290 ms
    ​
    

    查看结果

    查看输出的wordcount结果的末尾10行数据

    [hadoop@node2 flink-1.17.1]$ tail log/flink-*-taskexecutor-*.out
    (nymph,1)
    (in,3)
    (thy,1)
    (orisons,1)
    (be,4)
    (all,2)
    (my,1)
    (sins,1)
    (remember,1)
    (d,4)

    Web UI查看作业

    查看作业

    查看作业结果

    在Task Managers 的node2上可以查看到作业的结果

    分别查看Task Managers 的node3、node4的输出结果

    可以看到,三台Task Manager机器中,只有node2机器有结果,说明,本次wordcount计算只用到了node2进行计算。

    总结:至此,flink进程正常,可以提交应用到fink集群运行,同时能查看到相应计算结果,说明集群功能正常。

    完成!enjoy it!

  • 相关阅读:
    Smart UI Web 16.0.1 WebComponents htmlelements Crack
    HIVE中替换UDF神器-- TRANSFORM()函数
    Hadoop学习1:概述、单体搭建、伪分布式搭建
    kmeans实现图像像素分类
    AM@幂级数性质@幂级数和函数求解
    C语言每日一题(11):杨辉三角
    「React Native」为什么要选择 React Native 作为的跨端方案
    PTA 7-102 藏头诗
    力扣记录:剑指offer(3)——JZ24-32
    51单片机智能语音识别分类垃圾箱桶新国标垃圾分类4种垃圾脚踏开关4个舵机
  • 原文地址:https://blog.csdn.net/qq_42881421/article/details/137293343