Zookeeper 是一个分布式协调服务的开源框架。 主要用来解决分布式集群中应用系统的一致性问题,例如怎样避免同时操作同一数据造成脏读的问题。
ZooKeeper 本质上是一个分布式的小文件存储系统。 提供基于类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。从而用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。
诸如:


Znode 有两种,分别为临时节点和永久节点。
节点的类型在创建时即被确定,并且不能改变。
临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话结束,临时节点将被自动删除,当然可以也可以手动删除。 临时节点不允许拥有子节点。
永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。
Znode 还有一个序列化的特性,如果创建的时候指定的话,该 Znode 的名字后面会自动追加一个不断增加的序列号。 序列号对于此节点的父节点来说是唯一的, 这样便会记录每个子节点创建的先后顺序。 它的格式为“ %10d” (10 位数字,没有数值的数位用 0 补充,例如“ 0000000001” )。


服务端启动

客户端启动
./zkCli.sh -server localhost:2181

退出
quit
查看
ls /
创建
create /appl cj

获取数据
get /appl
设置数据
set /appl 55

删除
delete /appl/55
删除全部
deleteall
help
创建临时节点
create -e /app1
创建顺序节点
create -s /app1
创建临时顺序节点
create -es /app1
查看节点信息
ls -s /

引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.jqgroupId>
<artifactId>curator-zkartifactId>
<version>1.0-SNAPSHOTversion>
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>
<dependencies>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.10version>
<scope>testscope>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<version>4.0.0version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
<version>1.7.21version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.21version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.8.1version>
<configuration>
<source>1.8source>
<target>1.8target>
configuration>
plugin>
plugins>
build>
project>
package com.jq.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
public class CuratorTest {
/**
* 建立连接
*/
@Test
public void testConnect(){
/**
* 第一种方式
* @param connectString 连接字符串,zk server地址和端口
* @param sessionTimeoutMs 会话超时时间
* @param connectionTimeoutMs 连接超时时间
* @param retryPolicy 重试策略
*/
// //重试策略
// ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);
// CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.56.10:2181", 60 * 1000, 15 * 1000, retryPolicy);
// client.start();
// //开启连接
/**
* 第二种方式
* */
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.56.10:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("cj").build();
client.start();
}
}
package com.jq.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
public class CuratorTest {
private CuratorFramework client;
/**
* 建立连接
*/
@Before
public void testConnect(){
/**
* 第一种方式
* @param connectString 连接字符串,zk server地址和端口
* @param sessionTimeoutMs 会话超时时间
* @param connectionTimeoutMs 连接超时时间
* @param retryPolicy 重试策略
*/
// //重试策略
// ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);
// CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.56.10:2181", 60 * 1000, 15 * 1000, retryPolicy);
// client.start();
// //开启连接
/**
* 第二种方式
* */
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("192.168.56.10:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("cj").build();
client.start();
}
/**
* 创建节点:create 持久,临时,顺序 数据
* 1. 基本创建
* 2. 创建节点带有数据
* 3.设置节点类型
* 4.创建带有多级节点 /app1/p1
*/
@Test
public void testCreate() throws Exception {
//1.基本创建
//如果没有指定数据,默认为当前客户端的ip地址作为数据
//String path= client.create().forPath("/app1");
//2. 创建节点带有数据
//String path= client.create().forPath("/app2","hello app2".getBytes());
//3.设置节点类型
//String path= client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
//4.创建带有多级节点 /app4/p1
String path= client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(path);
}
@After
public void close(){
if (client!=null){
client.close();
}
}
}
/**
* 删除节点:delete,deleteall
* 1. 删除单个节点
* 2.删除带有子节点的节点
* 3.必须成功删除
* 4.回调
*/
@Test
public void testDelete() throws Exception {
//1.删除单个节点
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
//2.删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception {
//3.必须成功删除
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
//4.回调
BackgroundCallback backgroundCallback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("我被删除了");
System.out.println(curatorEvent);
}
};
client.delete().guaranteed().inBackground(backgroundCallback).forPath("/dubbo");
}v
/**
* 修改节点:
* 1. 修改数据
* 2.根据版本修改
*/
@Test
public void testSet() throws Exception {
client.setData().forPath("/app1","jeee".getBytes());
}
@Test
public void testSetForVersion() throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
int version = stat.getVersion();
System.out.println(version);
client.setData().withVersion(version).forPath("/app1","jeee2".getBytes());
}
/**
* 查询节点:
* 1. 查询数据:get
* 2.查询子节点: ls
* 3. 查询节点的状态信息: ls-s
* @throws Exception
*/
@Test
public void testGet1() throws Exception {
//1. 查询数据:get
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
}
@Test
public void testGet2() throws Exception {
//2.查询子节点: ls
List<String> path = client.getChildren().forPath("/app4");
System.out.println(path);
}
@Test
public void testGet3() throws Exception {
//3. 查询节点的状态信息: ls-s
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
}

ZooKeeper 提供了分布式数据发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使他们能够做出相应的处理。
ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。
ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。
触发事件种类很多, 如:节点创建,节点删除,节点改变,子节点改变等。
总的来说可以概括 Watcher 为以下三个过程:
相比于redis和数据库 性能稳定可靠
分布式锁,这个主要得益于 ZooKeeper 保证了数据的强一致性。
锁服务可以分为两类,一个是保持独占,另一个是控制时序。
所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把 zk 上的一个 znode 看作是一把锁,通过 createznode 的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。
控制时序,就是所有试图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这/distribute_lock 已经预先存在,客户端在它下面创建临时有序节点
(这个可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENTIAL 来指定)。 Zk 的父节点(/distribute_lock)维持一份 sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。




tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz
mkdir /usr/local/zookper-cluster
cp -r apache-zookeeper-3.5.6-bin /usr/local/zookper-cluster/zookper-1
cp -r apache-zookeeper-3.5.6-bin /usr/local/zookper-cluster/zookper-2
cp -r apache-zookeeper-3.5.6-bin /usr/local/zookper-cluster/zookper-3

mkdir /usr/local/zookper-cluster/zookper-1/data
mkdir /usr/local/zookper-cluster/zookper-2/data
mkdir /usr/local/zookper-cluster/zookper-3/data
mv /usr/local/zookper-cluster/zookper-1/conf/zoo_sample.cfg /usr/local/zookper-cluster/zookper-1/conf/zoo.cfg
mv /usr/local/zookper-cluster/zookper-2/conf/zoo_sample.cfg /usr/local/zookper-cluster/zookper-2/conf/zoo.cfg
mv /usr/local/zookper-cluster/zookper-3/conf/zoo_sample.cfg /usr/local/zookper-cluster/zookper-3/conf/zoo.cfg
分别为2181 2182 2183
修改/usr/local/zookper-cluster/zookper-1/conf/zoo.cfg
vim /usr/local/zookper-cluster/zookper-1/conf/zoo.cfg
clientPort=2181
dataDir=/usr/local/zookper-cluster/zookper-1/data
修改/usr/local/zookper-cluster/zookper-2/conf/zoo.cfg
vim /usr/local/zookper-cluster/zookper-2/conf/zoo.cfg
clientPort=2182
dataDir=/usr/local/zookper-cluster/zookper-2/data
修改/usr/local/zookper-cluster/zookper-3/conf/zoo.cfg
vim /usr/local/zookper-cluster/zookper-3/conf/zoo.cfg
clientPort=2183
dataDir=/usr/local/zookper-cluster/zookper-3/data
echo 1 >/usr/local/zookper-cluster/zookper-1/data/myid
echo 2 >/usr/local/zookper-cluster/zookper-2/data/myid
echo 3 >/usr/local/zookper-cluster/zookper-3/data/myid
集群服务器IP列表如下
vim /usr/local/zookper-cluster/zookper-1/conf/zoo.cfg
vim /usr/local/zookper-cluster/zookper-2/conf/zoo.cfg
vim /usr/local/zookper-cluster/zookper-3/conf/zoo.cfg
server.1=192.168.56.10:2181:3881
server.2=192.168.56.10:2182:3882
server.3=192.168.56.10:2183:3883
解释:server.服务器id=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
/usr/local/zookper-cluster/zookper-1/bin/zkServer.sh start
/usr/local/zookper-cluster/zookper-2/bin/zkServer.sh start
/usr/local/zookper-cluster/zookper-3/bin/zkServer.sh start
