本文使用flink1.14.5版本,介绍standalone-HA模式的安装。
此模式时高可用架构,采用zookeeper协调多个JobManager,保持每时每刻有一个运行中的JobManager,其余JobManager处理stand by状态。
因为涉及到运行过程中的状态数据的存储,如savepoint,checkoutpoint等。采用minio替换掉hdfs来存储状态。
1、flink集群机器
机器ip | hostname | JobManager运行节点 | TaskManager运行节点 | 备注 |
10.113.1.121 | flinknode1 | JobManager | ||
10.113.1.122 | flinknode2 | JobManager | ||
10.113.1.123 | flinknode3 | TaskManager | ||
10.113.1.124 | flinknode4 | TaskManager |
如果是standalone-ha模式最少使用两台机器,保证有两个JobManager。TaskManager可以与JobManager部署在一起,也可以单独部署
2、zookeeper集群(安装省略)
hostname | 运行进程 |
zknode1 | zookeeper |
zknode2 | zookeeper |
zknode3 | zookeeper |
3、minio(安装省略)
miniodocker单机版安装参看,minio之docker的单机版安装_神云瑟瑟的博客-CSDN博客_docker minio
hostname | 运行进程 |
minionode | minio |
需要再minio中创建桶,本文用的桶名为“flink”
配置对该桶有读写权限的账户
4、各个组件架构图
1、对flinknode机器安装java环境
flink1.14版本可以使用jdk8,1.15后续的版本不能再使用jdk8了。
2、对flinknode1-4,4台机器配置hostname,并且在/etc/hosts文件中配置映射,(如果不配置hosts就使用ip访问)
实践生产中建议设置hosts
3、设置flinknode机器相互免密登录
本文配置的是相互都免密登录,
实际好像是保证JobManager可以免密登录TaskManager就可以了
4、创建存放软件的目录
/opt/soft
这个目录因人而异,本文是存放在以上目录的
1、下载并且解压
下载预览界面:https://flink.apache.org/zh/downloads.html
登录flinknode1
- cd /opt/soft
- wget https://www.apache.org/dyn/closer.lua/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
- tar zxvf flink-1.14.5-bin-scala_2.12.tgz
2、修改配置
a、对${FLINK_HOME}/conf/flink-conf.yaml修改,在文件最后添加配置
- fs.allowed-fallback-filesystems: s3
- state.backend: filesystem
- #checkout存储地址
- state.checkpoints.dir: s3://flink/checkpoint
- #savepoint存储地址
- state.savepoints.dir: s3://flink/savepoint
- #minio的地址
- s3.endpoint: http://minionode:9000
- s3.path.style.access: true
- #minio的访问key
- s3.access-key: xxxx
- s3.secret-key: xxxx
-
- high-availability: zookeeper
- #zookeeper地址
- high-availability.zookeeper.quorum: zknode1:2181,zknode2:2181,zknode3:2181
- #recovery存储地址
- high-availability.storageDir: s3://flink/recovery
- #zookeeper没有设置密码,就使用open就ok
- high-availability.zookeeper.client.acl: open
b、对${FLINK_HOME}/conf/masters修改,修改后的配置如
该配置中说明了运行JobManager的节点和IP
- flinknode1:8081
- flinknode2:8081
c、对${FLINK_HOME}/conf/works修改,修改后的配置如
该配置中说明了运行TaskManager的节点和IP
- flinknode3
- flinknode4
d、对${FLINK_HOME}/bin/config.sh修改
大概在107行的位置对变量`DEFAULT_ENV_PID_DIR`的值进行修改,修改后的内容如下
DEFAULT_ENV_PID_DIR="./.flinkpid" # Directory to store *.pid files to
主要对master和work启动后的 StandaloneSessionClusterEntrypoint进程和TaskManagerRunner进程的pid存储位置修改,默认存储在“/tmp”目录,操作系统可能会清理该目录的资源,造成启动后的flink集群无法使用“${FLINK_HOME}/bin/stop-cluster.sh”命令停止集群。
本文使用相对目录,可以存储在用户目录下,如“/root/.flinkpid/”目录
保存进程的文件为:
/root/.flinkpid/flink-root-standalonesession.pid
/root/.flinkpid/flink-root-taskexecutor.pid
3、拷贝s3支持的包,flink-s3-fs-hadoop-1.14.5.jar
- cd /opt/soft/flink-1.14.5
- mkdir plugins/s3-fs-hadoop
- cp ./opt/flink-s3-fs-hadoop-1.14.5.jar ./plugins/s3-fs-hadoop/
如果未执行上面的s3支持的包的拷贝,会报错
Could not find a file system implementation for scheme 's3'
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
4、分发修改后的flink文件夹
- cd /opt/soft
- scp -r flink-1.14.5 root@flinknode2:/opt/soft
- scp -r flink-1.14.5 root@flinknode3:/opt/soft
- scp -r flink-1.14.5 root@flinknode4:/opt/soft
5、在主节点上启动
- cd /opt/soft/flink-1.14.5
- ./bin/start-cluster.sh
在主节点上启动后,会自动将其他节点的任务也启动起来。
6、配置historyServer(非必须)
对${FLINK_HOME}/conf/flink-conf.yaml修改,在文件最后添加配置
- #==============================================================================
- # HistoryServer
- #==============================================================================
-
- # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
-
- # Directory to upload completed jobs to. Add this directory to the list of
- # monitored directories of the HistoryServer as well (see below).
- # jobmanager上传记录地址
- jobmanager.archive.fs.dir: s3://flink/completed-jobs/
-
- # The address under which the web-based HistoryServer listens.
- #historyserver.web.address: 0.0.0.0
-
- # The port under which the web-based HistoryServer listens.
- #historyserver.web.port: 8082
-
- # Comma separated list of directories to monitor for completed jobs.
- ## historyserver查看记录地址
- historyserver.archive.fs.dir: s3://flink/completed-jobs/
-
- # Interval in milliseconds for refreshing the monitored directories.
- #historyserver.archive.fs.refresh-interval: 10000
找一个节点,通过以下命令来启动停止history服务
- ./bin/historyserver.sh start
- ./bin/historyserver.sh stop
命令 | 说明 | |
./bin/start-cluster.sh | 整体启动集群 | |
./bin/stop-cluster.sh | 整体停止机器 | |
./bin/jobmanager.sh | 对本节点jobManager的操作,start,stop | |
./bin/taskmanager.sh | 对本节点taskManager的操作,start,stop |
使用后面两条命令,可以定点维护某些需要维护的节点
使用jps命令查看进程
进程名字 | |
StandaloneSessionClusterEntrypoint | jobManager |
TaskManagerRunner | taskmanager |