Zookeeper 是 Apache Hadoop 项目下的一个子项目, 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员.简称zk.
Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。其本质上就是提供一种集中式信息存储服务.就是一个将数据存放到内存中,以树形结构存储数据的服务.我们根据其存储数据的特点可以实现分布式统一配置中心,分布式锁等功能,主要用于分布式应用程序的高性能协调,集群高可靠等.
主要功能:
配置管理
分布式锁
3.服务器动态上下线
上传解压
#进入到apps文件夹 使用rz上传 也可以直接拖拽到crt上传
cd /opt/apps
#上传后解压
tar -zxvf zookeeper-3.5.6.tar.gz
配置
#1.进入到解压后的zk文件夹下 创建一个文件夹zkData
cd /opt/apps/zookeeper-3.5.6
mkdir zkData
#2.进入到zk的conf文件夹下 将zoo_sample.cfg改为zoo.cfg
cd conf
mv zoo_sample.cfg zoo.cfg
#3.修改zoo.cfg的配置
vi zoo.cfg
#将第12行dataDir的值改为我们刚才创建的文件夹 12+shift+G 直接跳转
dataDir=/opt/apps/zookeeper-3.5.6/zkData
#在文件的最后配置server的信息和端口
server.1=linux01:2888:3888
server.2=linux02:2888:3888
server.3=linux03:2888:3888
#4.进入到zkData文件夹下 添加linux01的myid为1
cd ../zkData
echo 1 > myid
分发
#进入到linux01的/opt/apps文件夹下
scp -r zookeeper-3.5.6 linux02:$PWD
scp -r zookeeper-3.5.6 linux03:$PWD
#设置linux02上zk的myid为2
echo 2 > /opt/apps/zookeeper-3.5.6/zkData/myid
cat /opt/apps/zookeeper-3.5.6/zkData/myid
#设置linux03上zk的myid为3
echo 3 > /opt/apps/zookeeper-3.5.6/zkData/myid
cat /opt/apps/zookeeper-3.5.6/zkData/myid
启动
#zk没有一键启动我们需要挨个启动
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh start
#zk查看状态
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh status
一键启动脚本
#!/bin/bash
for hostname in linux01 linux02 linux03
do
echo "当前正在操作的主机$hostname"
ssh $hostname "source /etc/profile ; /opt/apps/zookeeper-3.5.6/bin/zkServer.sh $1 ;exit"
done
ZooKeeper是一个树形目录服务,其数据模型和linux的文件系统目录树很类似,拥有一个层次化结构。
这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息.节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:
- PERSISTENT持久化节点
- EPHEMERAL临时节点:-e
- PERSISTENT_SEQUENTIAL持久化顺序节点:-s
- EPHEMERAL_SEQUENTIAL临时顺序节点: -es
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh start
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh stop
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh status
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh restart
连接服务端
/opt/apps/zookeeper-3.5.6/bin/zkCli.sh -server linux001:2181
连接本地
/opt/apps/zookeeper-3.5.6/bin/zkCli.sh
查看命令帮助
help
查看
ls 目录
创建节点
create /节点名 描述
create -e /节点名 描述 临时节点
create -s /节点名 描述 顺序节点
create -e -s /节点 描述 临时顺序节点
查看节点
get /节点名
设置节点描述
set /节点名 描述
删除节点
delete /节点名
删除非空节点
deleteAll /节点名
监听
ls -w 路径
-- 监听根目录 所有子节点的变化
ls - w /
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/
get -w 路径
-- 监听 /app3 节点数据的变化
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app3
Curator是 ApacheZooKeeper的Java客户端库.
常见的ZooKeeper Java APl :
Curator项目的目标是简化ZooKeeper客户端的使用。
Curator最初是Netfix研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。
http://curator.apache.org
坐标
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<version>4.0.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>4.0.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-clientartifactId>
<version>4.0.0version>
dependency>
/*
CuratorFrameworkFactory
静态方法获取连接对象
static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
String connectString:连接的字符串 zkserver地址:端口
int sessionTimeoutMs:会话超时时间 单位ms 默认 60 * 1000
int connectionTimeoutMs 连接超时时间 单位ms 默认 15 * 1000
RetryPolicy retryPolicy 重试策略 接口
实现类ExponentialBackoffRetry
static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
CuratorFramework
方法
void start() 开始连接
*/
public class Test {
public static void main(String[] args) {
String connectString = "linux001:2181";
// //基础等待时间 20 重试次数 10
RetryPolicy r = new ExponentialBackoffRetry(20,10);
// CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, r);
// curatorFramework.start();;
//第二种方式
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(connectString)
.retryPolicy(r).build();
client.start();
}
}
/*
CuratorFramework
创建节点
create().forPath(节点) 创建节点 不指定数据 默认为 ip地址
create().forPath(节点,byte[] data) 创建节点 指定数据 字节数组
create().withMode().forPath(节点,byte[] data) 创建节点 指定节点类型 默认类型 持久化
CreateMode.EPHEMERAL 临时节点
CreateMode.PERSISTENT 持久节点
CreateMode.PERSISTENT_SEQUENTIAL 持久顺序节点
CreateMode.EPHEMERAL_SEQUENTIAL 临时顺序节点
create().creatingParentsIfNeeded().forPath(节点,byte[] data) 创建多级节点
*/
public class Test {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//创建节点 由于指定了命名空间 所以会在 /doit下创建
//没有指定数据 默认数据是当前客户端的ip地址
// client.create().forPath("/app1");
//创建节点 指定数据
//client.create().forPath("/app2","abc".getBytes());
//创建临时节点 指定数据
// client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","aa".getBytes());
//Thread.sleep(10000);
//创建多级节点
client.create().creatingParentsIfNeeded().forPath("/app3/p1/p", "abc".getBytes());
client.close();
}
}
/*
查看节点
* 1. 查询数据:get: getData().forPath()
* 2. 查询子节点: ls: getChildren().forPath()
* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
*/
public class Test02 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//查询节点数据
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
//查询子节点
List<String> list = client.getChildren().forPath("/");
for (String s : list) {
System.out.println(s);
}
//查询节点状态信息 ls -s
Stat s = new Stat();
client.getData().storingStatIn(s).forPath("/app1");
System.out.println(s.getCzxid());
client.close();
}
}
/**
* 修改数据
* 1. 基本修改数据:setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
*/
public class Test03 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//基本修改数据
// client.setData().forPath("/app1","v1".getBytes());
// byte[] bytes = client.getData().forPath("/app1");
// System.out.println(new String(bytes));
//查询版本
Stat s = new Stat();
client.getData().storingStatIn(s).forPath("/app1");
int version = s.getVersion();
//使用版本进行设置
client.setData().withVersion(version).forPath("/app1");
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
client.close();
}
}
/**
* 删除节点: delete deleteall
* 1. 删除单个节点:delete().forPath("/app1");
* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
* 4. 回调:inBackground
*/
public class Test04 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//删除单个节点
// client.delete().forPath("/app1");
//删除带子节点的节点
// client.delete().deletingChildrenIfNeeded().forPath("/app3");
//必须成功的删除:为了防止网络抖动。本质就是重试。
// client.delete().guaranteed().forPath("/aa");
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被删除了");
System.out.println(event);
}
}).forPath("/app2");
client.close();
}
}
监听数据变化
public class Test05 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//创建 NodeCache对象
NodeCache nodeCache = new NodeCache(client,"/app1");
//2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
nodeCache.start(true);
// Thread.sleep(1000000);
}
}
监听子节点变化
public class Test06 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app3",true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了");
System.out.println(pathChildrenCacheEvent);
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
pathChildrenCache.start();
Thread.sleep(1000000);
}
}
/*
TreeCache 相当于 NodeCache与PathChildrenCache组合
监听当前节点 与 子节点
*/
public class Test07 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
TreeCache treeCache = new TreeCache(client,"/app4");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(treeCacheEvent);
}
});
treeCache.start();
Thread.sleep(10000000);
}
}
在ZooKeeper集群服中务中有三个角色:
Leader 领导者 :
1. 处理事务请求
2. 集群内部各服务器的调度者
Follower 跟随者 :
Observer 观察者:
选举机制
假设3台linux
linux01 启动 投自己一票 广播
linux02 启动 收到linux01 广播 投自己一票
linux01 发现 linux02的id比自己的id大 投linux02一票
linux02 投票超过半数 选为 leader
linux03 启动 由于已经有了leader follower
如果leader宕机
看谁的版本号新 谁为leader
如果版本号相同 谁id大 谁是leader