在Zookeeper官网,选择你需要的版本进行下载,以下是我下载的版本。
Step1:将下载好的压缩包上传到虚拟机的指定目录下,我上传到了/opt/software/
Step2:将压缩包进行解压到指定目录下,我解压到了/opt/app/下
tar -zxvf apache-zookeeper-3.8.2-bin.tar.gz -C /opt/app/
Step3:将zookeeper的文件夹进行重命名
mv apache-zookeeper-3.8.2-bin/ zookeeper-3.8.2
Step4:配置环境变量,并进行source使配置文件生效
vim /etc/profile
source /etc/profile
Step5:进入/opt/app/zookeeper-3.8.2/conf
目录下,将此配置文件进行重命名mv zoo_sample.cfg zoo.cfg
,然后进行编辑。并且在/opt/app/zookeeper-3.8.2/
新建目录touch zkData
Step6:使用命令zkServer.sh start
启动zookeeper,使用命令zkServer.sh status
查看zookeeper状态,使用命令netstat -untlp
查看端口号
Step7:使用命令zkCli.sh -server localhost:2181
,进入客户端
Step8:使用命令zkServer.sh stop
退出
集群规划
在node1、node2、node3三个节点上部署Zookeeper。
先选择node1节点进行解压安装步骤和本地模式安装部署一样
修改配置文件zoo.cfg
[root@node1 software]# vim /opt/app/zookeeper/conf/zoo.cfg
#修改dataDir数据目录
dataDir=/opt/module/zookeeper-3.8.2/zkData
#在文件最后增加如下配置
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
/opt/app/zookeeper-3.8.2
下创建一个文件夹mkdir zkData
,再进入此目录创建文件touch myid
定义当前主机的编号。#在配置zoo.cfg的时候配置了server.1/2/3这个配置项中 数字123代表的就是第几号服务器
#其中这个数字必须在zookeeper的zkData的myid文件中定义 并且定义的时候必须和配置项对应的IP相互匹配
[root@node1 zookeeper]# touch /opt/app/zookeeper/zkData/myid
[root@node1 zookeeper]# vim /opt/app/zookeeper/zkData/myid
#文件中写入当前主机对应的数字 然后保存退出即可 例 node1节点的myid写入1 node2节点的myid写入:2 node3节点的myid写入:3
拷贝配置好的zookeeper到其他机器上
scp -r /opt/app/zookeeper-3.8.2/ root@node2:/opt/app/
scp -r /opt/app/zookeeper-3.8.2/ root@node3:/opt/app/
并分别修改myid文件中内容为2、3
Zookeeper的内部选举机制
假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动, 他们内部的实现过程如图所示
zkCli.sh -server node:2181,node2:2181,node3:2181
命令基本语法 | 功能描述 |
---|---|
help | 显示所有操作命令 |
ls path [watch] | 使用 ls 命令来查看当前znode中所包含的内容 |
ls -s path [watch] | 查看当前节点信息 |
create [-e] [-s] | 创建节点 -s 含有序列 -e 临时(重启或者超时消失) |
get path [watch] | 获得节点的值 |
set | 设置节点的具体值 |
stat | 查看节点状态 |
delete | 删除节点 |
rmr/deleteall | 递归删除节点 |
启动命令行客户端
zkCli.sh -server node1:2181,node2:2181,node3:2181
显示所有操作命令
help
查看znode节点信息
ls /
查看znode某节点的详细信息
[zk: node1:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
(1)czxid:创建节点的事务 zxid 每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所 有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
(2)ctime:znode 被创建的毫秒数(从 1970 年开始)
(3)mzxid:znode 最后更新的事务 zxid
(4)mtime:znode 最后修改的毫秒数(从 1970 年开始)
(5)pZxid:znode 最后更新的子节点 zxid
(6)cversion:znode 子节点变化号,znode 子节点修改次数
(7)dataversion:znode 数据变化号
(8)aclVersion:znode 访问控制列表的变化号
(9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是 临时节点则是 0。
(10)dataLength:znode 的数据长度
(11)numChildren:znode 子节点数量
创建普通节点(永久节点 + 不带序号)
create /sanguo "weishuwu"
获得节点的值
get -s /test
创建带序号的节点(永久节点 + 带序号)
create -s /a
create -s /a
create /a
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
创建短暂节点(短暂节点 + 不带序号 or 带序号)
(1)创建短暂的不带序号的节点
create -e /b
(2)创建短暂的带序号的节点
create -e -s /b
(3)在当前客户端是能查看到的
ls /
(4)退出当前客户端然后再重启客户端
[zk: node1:2181(CONNECTED) 12] quit
[root@node1 zookeeper-3.5.7]$ bin/zkCli.sh
(5)再次查看根目录下短暂节点已经删除
ls /
修改节点数据值
[zk: node1:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
删除节点
delete /test
递归删除节点
deleteall /test
查看节点状态
stat /sanguo
13.监听节点的数据变化
get -w /sanguo
14.监听节点的子节点变化
ls -w /sanguo
HDFS-HA工作要点
元数据管理方式需要改变(不需要SecondaryNameNode)
内存中各自保存一份元数据;
Edits日志只有Active状态的namenode节点可以做写操作;
两个namenode都可以读取edits;
共享的edits放在一个共享存储中管理(qjournal和NFS两个主流实现);
需要一个状态管理功能模块
实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在namenode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生。
必须保证两个NameNode之间能够ssh无密码登录。
隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务。
环境准备:
修改IP
修改主机名及主机名和IP地址的映射
关闭防火墙
ssh免密登录
安装JDK,配置环境变量等
规划集群
node1 | node2 | node3 |
---|---|---|
NameNode | NameNode | - |
JournalNode | JournalNode | JournalNode |
DataNode | DataNode | DataNode |
ZK | ZK | ZK |
ResourceManager | ||
NodeManager | NodeManager | NodeManager |
配置Zookeeper集群:在上面的笔记中已经记录过!
配置HDFS-HA集群:
配置hadoop-env.sh
export JAVA_HOME=/opt/app/jdk
配置core-site.xml
<configuration>
<property>
<name>fs.defaultFSname>
<value>hdfs://HCvalue>
property>
<property>
<name>hadoop.tmp.dirname>
<value>/opt/app/hadoop-3.1.4/metaDatavalue>
property>
<property>
<name>ha.zookeeper.quorumname>
<value>node1:2181,node2:2181,node3:2181value>
property>
configuration>
配置hdfs-site.xml
<configuration>
<property>
<name>dfs.nameservicesname>
<value>HCvalue>
property>
<property>
<name>dfs.ha.namenodes.HCname>
<value>nn1,nn2value>
property>
<property>
<name>dfs.namenode.rpc-address.HC.nn1name>
<value>node1:9000value>
property>
<property>
<name>dfs.namenode.rpc-address.HC.nn2name>
<value>node2:9000value>
property>
<property>
<name>dfs.namenode.http-address.HC.nn1name>
<value>node1:9870value>
property>
<property>
<name>dfs.namenode.http-address.HC.nn2name>
<value>node2:9870value>
property>
<property>
<name>dfs.namenode.shared.edits.dirname>
<value>qjournal://node1:8485;node2:8485;node3:8485/HadoopClustervalue>
property>
<property>
<name>dfs.ha.fencing.methodsname>
<value>sshfencevalue>
property>
<property>
<name>dfs.ha.fencing.ssh.private-key-filesname>
<value>/root/.ssh/id_rsavalue>
property>
<property>
<name>dfs.journalnode.edits.dirname>
<value>/opt/app/hadoop-3.1.4/journalnodeDatavalue>
property>
<property>
<name>dfs.permissions.enablename>
<value>falsevalue>
property>
<property>
<name>dfs.client.failover.proxy.provider.HCname>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvidervalue>
property>
<property>
<name>dfs.replicationname>
<value>3value>
property>
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-checkname>
<value>truevalue>
property>
<property>
<name>dfs.ha.automatic-failover.enabledname>
<value>truevalue>
property>
configuration>
拷贝配置好的hadoop环境到其他节点
scp /opt/app/hadoop-3.1.4/etc/hadoop/core-site.xml root@node2:/opt/app/hadoop-3.1.4/etc/hadoop/
scp /opt/app/hadoop-3.1.4/etc/hadoop/core-site.xml root@node3:/opt/app/hadoop-3.1.4/etc/hadoop/
scp /opt/app/hadoop-3.1.4/etc/hadoop/hdfs-site.xml root@node3:/opt/app/hadoop-3.1.4/etc/hadoop/
scp /opt/app/hadoop-3.1.4/etc/hadoop/hdfs-site.xml root@node2:/opt/app/hadoop-3.1.4/etc/hadoop/
启动HDFS-HA集群
zkfc的自动故障转移需要借助psmisc软件完成,因此需要在三个节点上安装这个软件
yum install -y psmisc
sbin/hadoop-daemon.sh start journalnode
rm -rf metaData/ journalnodeData/ 删除三台节点
/opt/app/hadoop-3.1.4 hdfs namenode -format 只需要在第一台节点格式化
hadoop-daemon.sh start namenode 只需要执行一次即可,之后就不需要再执行
遇到报错,如图
start-dfs.sh
在[nn2]上,同步nn1的元数据信息:
bin/hdfs namenode -bootstrapStandby
只需要执行一次即可,之后就不需要再执行;
hadoop-daemon.sh start namenode
并在第二台节点上启动namenode
在三台节点上启动datanodehadoop-daemon.sh start datanode
重新启动HDFS
验证
node1 | node2 | node3 |
---|---|---|
NameNode | NameNode | |
JournalNode | JournalNode | JournalNode |
DataNode | DataNode | DataNode |
ZK | ZK | ZK |
ResourceManager | ResourceManager | |
NodeManager | NodeManager | NodeManager |
具体配置 —— 在每个节点上进行配置
yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-servicesname>
<value>mapreduce_shufflevalue>
property>
<property>
<name>yarn.resourcemanager.ha.enabledname>
<value>truevalue>
property>
<property>
<name>yarn.resourcemanager.cluster-idname>
<value>cluster-yarn1value>
property>
<property>
<name>yarn.resourcemanager.ha.rm-idsname>
<value>rm1,rm2value>
property>
<property>
<name>yarn.resourcemanager.hostname.rm1name>
<value>node1value>
property>
<property>
<name>yarn.resourcemanager.hostname.rm2name>
<value>node2value>
property>
<property>
<name>yarn.resourcemanager.zk-addressname>
<value>node1:2181,node2:2181,node3:2181value>
property>
<property>
<name>yarn.resourcemanager.recovery.enabledname>
<value>truevalue>
property>
<property>
<name>yarn.resourcemanager.store.classname>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStorevalue>
property>
configuration>
同步更新其他节点的配置信息
scp /opt/app/hadoop-3.1.4/etc/hadoop/yarn-site.xml root@node2:/opt/app/hadoop-3.1.4/etc/hadoop/
scp /opt/app/hadoop-3.1.4/etc/hadoop/yarn-site.xml root@node3:/opt/app/hadoop-3.1.4/etc/hadoop/
启动hdfs (本步骤可以不做,如果搭建过HA-Hadoop集群)
在各个JournalNode节点上,输入以下命令启动journalnode服务:sbin/hadoop-daemon.sh start journalnode
在[nn1]上,对其进行格式化,并启动:
bin/hdfs namenode -format
sbin/hadoop-daemon.sh start namenode
在[nn2]上,同步nn1的元数据信息:bin/hdfs namenode -bootstrapStandby
启动[nn2]:sbin/hadoop-daemon.sh start namenode
启动所有datanode:sbin/hadoop-daemons.sh start datanode
将[nn1]切换为Active:bin/hdfs haadmin -transitionToActive nn1
启动yarn
编辑一下wc.txt,上传到hdfs上
打开idea,创建一个maven项目,引入编程依赖于pom.xml中
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.kanggroupId>
<artifactId>ha-testartifactId>
<version>1.0version>
<packaging>jarpackaging>
<name>ha-testname>
<url>http://maven.apache.orgurl>
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>3.1.4version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-hdfsartifactId>
<version>3.1.4version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.6.1version>
dependency>
dependencies>
project>
编写MapReduce代码
package com.kang;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WCMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word),new LongWritable(1L));
}
}
}
package com.kang;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WCReduce extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key,new LongWritable(sum));
}
}
package com.kang;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import javax.xml.soap.Text;
import java.io.IOException;
public class WCDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://HC");
Job job = Job.getInstance(conf);
job.setJarByClass(WCDriver.class);
FileInputFormat.setInputPaths(job,new Path("/wc.txt"));
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(WCReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job,new Path("/output"));
boolean flag = job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
将hdfs-site.xml和core-site.xml从虚拟机上导出到Java项目目录下
然后运行WCDriver,会收到报错信息,会显示权限不足,然后我们为了实现这一任务,我们将权限进行修改
再次运行程序,返回代码0,即运行成功
最后,我们将Hadoop分布式文件系统(HDFS)中文件和目录的默认权限恢复到默认权限类型