在高并发的分布式系统中,缓存等服务大多都采用集群部署,但任然经常会存在大量的热点数据,经常在缓存或者数据库查询,为保证缓存或数据库服务的高可用,尽可能的让其不出现宕机的情况,需要把大批量的热点数据进行分散分别存储到不同节点,使得系统具有良好的负载均衡性能和扩展性。
哈希计算是最常见的数据分布技术,根据对数据标识的求模运算来计算得到的哈希值将数据映射到存储空间中。
存在的问题:
1,增删节点时,更新效率低。
当系统中存储节点数量发生增加或减少时,映射公式将发生变化为hash%(N±1),
这将使得大量数据的映射位置会发生变化,为对数据对象的映射位置重新计算,
系统无法对外界访问进行正常响应,将导致系统处于崩溃状态。
2,平衡性差,未考虑节点性能差异。
由于硬件性能的提升,新添加的节点具有更好的承载能力,
如何对算法进行改进,使节点性能可以得到较好利用。
3,单调性不足。
衡量数据分布技术的一项重要指标是单调性,
单调性是指如果已经有一些内容通过哈希计算分派到了相应的缓冲中,当又有新的缓冲加入到系统中时,
哈希的结果应能够保证原有已分配的内容可以被映射到新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区。
一致性哈希算法在1997年由麻省理工学院提出,是一种特殊的哈希算法,在移除或者添加一个服务器时,能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系 。一致性哈希解决了简单哈希算法在分布式哈希表( Distributed Hash Table,DHT) 中存在的动态伸缩等问题。
1、平衡性:
指哈希的结果能够尽可能分布到所有的缓冲中去,使得所有的缓冲空间都得到利用。2、单调性:
单调性指当有新的缓冲加入到系统中时,哈希的结果应能够保证原有已分配的内容可以被映射到原有的或者新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区。3、分散性:
在分布式环境中,终端有可能只能看到其中的一部分缓冲。当终端希望通过哈希过程将内容映射到缓冲上时,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率,应尽量避免。分散性的定义就是上述情况发生的严重程度。4、负载:
负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射为不同的内容。与分散性一样,这种情况也是应当避免的,因此好的哈希算法应能够尽量降低缓冲的负荷。
1,一致性哈希算法是将每个Node节点映射到同一个圆上。 将各Node的key采用hash计算,可得到一个整数数组。将该数组排序后,首尾相连即是一个圆。
2,如下图所示,Node的key分布在圆的不同弧段上。同理,若有一请求key,hash后落入该圆的某一弧段(下图三角点所示),顺时针方向寻得离其最近的节点即为其服务节点(下图Node2)。这样每个节点覆盖了圆上从上一节点到其本身的一段弧段区间。
3,如某一节点失效,之前落入其弧段区间的请求即会顺时针移到与其相邻的节点(下图如Node2失效,之前落入Node3至Node2弧段的请求会落入Node1)。
而未落入失效弧段区间的节点则不受影响(之前落入Node2至Node3弧段的请求,当Node2失效后不受影响)。增加节点的场景与此类似,新的节点承载一段新区间,这样,落入失效节点至新节点弧段的请求会被新节点所承载。
4,解决机器负载不均衡问题:
在节点固定的情况下,为了增加节点在圆上分布的均匀性与分散性,可以设置节点的replicas(副本数)。下图将replicas设置为2,各节点承载的弧段范围已更加精细且于整体而言分布更加分散。所以适当调节replicas参数可以提高算法的均衡性。
这也叫虚拟节点机制,即对每一台机器通过不同的哈希函数计算出多个哈希值,对多个位置都放置一个服务节点,称为虚拟节点。让每台机器分配数量较多的虚拟节点去抢占哈希环,数量多起来之后,哈希函数的离散性就可以得到很好的体现,然后每台机器就可以按照虚拟节点的比例来分配负载均衡
为何用环来存储节点,还有顺时针查找
环可以更好的处理数据的平衡性和分散性,顺时针查找可以更好的处理数据的单调性。增加或减少节点,只会引起很少部分的key不会命中,对集群影响很小。
1,部署多台服务器的RPC服务,将同一类型的请求路由到同一台机器上。
2,缓存集群例如redis,要求存储数据均匀的放到集群中的各个节点上,访问这些数据时能快速的路由到集群中对应存放该数据的节点上;并且要求增删节点对整个集群的影响很小,不至于有大的动荡造成整体负载的不稳定,这个时候也是可以用一致性hash算法。
1, 构造这个2的32次方的hash环:
利用底层结构为红黑树的TreeMap实现,时间复杂度在O(logN),效率较高。
2,Hash算法上也需要选好,要尽可能的打散开。一般我们采用FNV1_32_HASH、KETAMA_HASH等算法,KETAMA_HASH是MemCache集群默认的实现方法。
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
public class ConsistentHashLoadBalance{
private TreeMap<Long, String> virtualNodes = new TreeMap<>();
private LinkedList<String> nodes;
//每个真实节点对应的虚拟节点数
private final int replicCnt;
public ConsistentHashLoadBalance(LinkedList<String> nodes, int replicCnt) {
this.nodes = nodes;
this.replicCnt = replicCnt;
initalization();
}
/**
* 初始化哈希环
* 循环计算每个node名称的哈希值,将其放入treeMap
*/
private void initalization() {
for (String nodeName : nodes) {
for (int i = 0; i < replicCnt / 4; i++) {
String virtualNodeName = getNodeNameByIndex(nodeName, i);
for (int j = 0; j < 4; j++) {
virtualNodes.put(hash(virtualNodeName, j), nodeName);
}
}
}
}
private String getNodeNameByIndex(String nodeName, int index) {
return new StringBuffer(nodeName)
.append("&&")
.append(index)
.toString();
}
/**
* 根据资源key选择返回相应的节点名称
*
* @param key
* @return 节点名称
*/
public String selectNode(String key) {
Long hashOfKey = hash(key, 0);
if (!virtualNodes.containsKey(hashOfKey)) {
Map.Entry<Long, String> entry = virtualNodes.ceilingEntry(hashOfKey);
if (entry != null)
return entry.getValue();
else
return nodes.getFirst();
} else
return virtualNodes.get(hashOfKey);
}
private Long hash(String nodeName, int number) {
byte[] digest = md5(nodeName);
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
/**
* md5加密
*
* @param str
* @return
*/
public byte[] md5(String str) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.reset();
md.update(str.getBytes("UTF-8"));
return md.digest();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return null;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
public void addNode(String node) {
nodes.add(node);
String virtualNodeName = getNodeNameByIndex(node, 0);
for (int i = 0; i < replicCnt / 4; i++) {
for (int j = 0; j < 4; j++) {
virtualNodes.put(hash(virtualNodeName, j), node);
}
}
}
public void removeNode(String node) {
nodes.remove(node);
String virtualNodeName = getNodeNameByIndex(node, 0);
for (int i = 0; i < replicCnt / 4; i++) {
for (int j = 0; j < 4; j++) {
virtualNodes.remove(hash(virtualNodeName, j), node);
}
}
}
private void printTreeNode() {
if (virtualNodes != null && !virtualNodes.isEmpty()) {
virtualNodes.forEach((hashKey, node) ->
System.out.println(
new StringBuffer(node)
.append(" ==> ")
.append(hashKey)
)
);
} else
System.out.println("Cycle is Empty");
}
public static void main(String[] args) {
LinkedList<String> nodes = new LinkedList<>();
nodes.add("192.168.13.1:8080");
nodes.add("192.168.13.2:8080");
nodes.add("192.168.13.3:8080");
nodes.add("192.168.13.4:8080");
ConsistentHashLoadBalance consistentHash = new ConsistentHashLoadBalance(nodes, 160);
consistentHash.printTreeNode();
}
}