限制com.xxx.ApiService的每个方法,服务器端并发执行(或占用线程池线程数)不能超过 10 个:
<dubbo:service interface="com.xxx.ApiService" executes="10" />
限制com.xxx.ApiService的每个方法,服务器端并发执行(或占用线程池线程数)不能超过 10 个:
@DubboService(version = "1.0.0",executes=10)
public class DefaultRpcOrderProcessApi implements RpcOrderProcessApi {}
限制com.xxx.ApiService的sayHello 方法,服务器端并发执行(或占用线程池线程数)不能超过 10 个:
<dubbo:service interface="com.xxx.ApiService">
<dubbo:method name="sayHello" executes="10" />
dubbo:service>
限制com.xxx.ApiService的sayHello 方法,服务器端并发执行(或占用线程池线程数)不能超过 10 个:
@DubboService(version = "1.0.0",
methods = {
@Method(name = "sayHello", executes= 10, retries = 0))
public class DefaultRpcOrderProcessApi implements RpcOrderProcessApi {}
限制 com.xxx.ApiService的每个方法,每客户端并发执行(或占用连接的请求数)不能超过 10 个:
<dubbo:reference interface="com.xxx.ApiService" actives="10" />
限制 com.xxx.ApiService的每个方法,每客户端并发执行(或占用连接的请求数)不能超过 10 个:
@DubboReference(version = "1.0.0",actives = 10)
RpcShopCarProcessApi rpcShopCarProcessApi;
限制 com.xxx.ApiService 的 sayHello 方法,每客户端并发执行(或占用连接的请求数)不能超过 10 个:
<dubbo:reference interface="com.xxx.ApiService">
<dubbo:method name="sayHello" actives="10" />
dubbo:service>
限制 com.xxx.ApiService 的 sayHello 方法,每客户端并发执行(或占用连接的请求数)不能超过 10 个:
@DubboReference(version = "1.0.0",methods = {@Method(name = "sayHello", actives= 10, retries = 0)})
RpcShopCarProcessApi rpcShopCarProcessApi;```
在集群负载均衡时,Dubbo 提供了多种均衡策略,缺省为 random 随机调用。配置服务的客户端的 loadbalance 属性为leastactive,此 Loadbalance 会调用并发数最小的 Provider(Consumer端并发数)。
目前 Dubbo 内置了如下负载均衡算法,用户可直接配置使用:
算法 | 特性 | 备注 |
---|---|---|
RandomLoadBalance | 加权随机 | 默认算法,默认权重相同 |
RoundRobinLoadBalance | 加权轮询 | 借鉴于 Nginx 的平滑加权轮询算法,默认权重相同 |
LeastActiveLoadBalance | 最少活跃优先 + 加权随机 | 背后是能者多劳的思想 |
ShortestResponseLoadBalance | 最短响应优先 + 加权随机 | 更加关注响应速度 |
ConsistentHashLoadBalance | 一致性 Hash | 确定的入参,确定的提供者,适用于有状态请求 |
public interface LoadbalanceRules {
/**
* This class select one provider from multiple providers randomly.
**/
String RANDOM = "random";
/**
* Round robin load balance.
**/
String ROUND_ROBIN = "roundrobin";
/**
* Filter the number of invokers with the least number of active calls and count the weights and quantities of these invokers.
**/
String LEAST_ACTIVE = "leastactive";
/**
* Consistent Hash, requests with the same parameters are always sent to the same provider.
**/
String CONSISTENT_HASH = "consistenthash";
/**
* Filter the number of invokers with the shortest response time of success calls and count the weights and quantities of these invokers.
**/
String SHORTEST_RESPONSE = "shortestresponse";
String EMPTY = "";
}
加权随机,按权重设置随机概率。在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
/**
* Select one invoker between a list using a random criteria
* @param invokers List of possible invokers
* @param url URL
* @param invocation Invocation
* @param
* @return The selected invoker
*/
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
if (!needWeightLoadBalance(invokers,invocation)){
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
// Every invoker has the same weight?
boolean sameWeight = true;
// the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
int[] weights = new int[length];
// The sum of weights
int totalWeight = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// Sum
totalWeight += weight;
// save for later use
weights[i] = totalWeight;
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
private <T> boolean needWeightLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
Invoker invoker = invokers.get(0);
URL invokerUrl = invoker.getUrl();
// Multiple registry scenario, load balance among multiple registries.
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(invokerUrl.getServiceInterface())) {
String weight = invokerUrl.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY);
if (StringUtils.isNotEmpty(weight)) {
return true;
}
} else {
String weight = invokerUrl.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY);
if (StringUtils.isNotEmpty(weight)) {
return true;
}else {
String timeStamp = invoker.getUrl().getParameter(TIMESTAMP_KEY);
if (StringUtils.isNotEmpty(timeStamp)) {
return true;
}
}
}
return false;
}
}
当我们使用它的时候,只需要进行选择loadBalance值作为“random”( public static final String NAME = “random”;)
加权轮询,按公约后的权重设置轮询比率,循环调用节点
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
private static final int RECYCLE_PERIOD = 60000;
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
/**
* get invoker addr list cached for specified invocation
*
* for unit test only
*
* @param invokers
* @param invocation
* @return
*/
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
int weight = getWeight(invoker, invocation);
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
得出结论可以使用roundrobin进行指定(public static final String NAME = "roundrobin)
加权最少活跃调用优先,活跃数越低,越优先调用,相同活跃数的进行加权随机。活跃数指调用前后计数差(针对特定提供者:请求发送数 - 响应返回数),表特定提供者的任务堆积量,活跃数越低,代表该提供者处理能力越强。
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokers
int totalWeight = 0;
// The weight of the first least active invoker
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoker's configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexes
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexes order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
如果要使用它,则需要设置loadBalance参数为leastactive, (public static final String NAME = “leastactive”😉
加权最短响应优先,在最近一个滑动窗口中,响应时间越短,越优先调用。相同响应时间的进行加权随机。使得响应时间越快的提供者,处理更多的请求。
public class ShortestResponseLoadBalance extends AbstractLoadBalance {
public static final String NAME = "shortestresponse";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Estimated shortest response time of all invokers
long shortestResponse = Long.MAX_VALUE;
// The number of invokers having the same estimated shortest response time
int shortestCount = 0;
// The index of invokers having the same estimated shortest response time
int[] shortestIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the shortest response invokers
int totalWeight = 0;
// The weight of the first shortest response invokers
int firstWeight = 0;
// Every shortest response invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the shortest response invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// Calculate the estimated response time from the product of active connections and succeeded average elapsed time.
long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
int active = rpcStatus.getActive();
long estimateResponse = succeededAverageElapsed * active;
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
// Same as LeastActiveLoadBalance
if (estimateResponse < shortestResponse) {
shortestResponse = estimateResponse;
shortestCount = 1;
shortestIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (estimateResponse == shortestResponse) {
shortestIndexes[shortestCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
if (shortestCount == 1) {
return invokers.get(shortestIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < shortestCount; i++) {
int shortestIndex = shortestIndexes[i];
offsetWeight -= weights[shortestIndex];
if (offsetWeight < 0) {
return invokers.get(shortestIndex);
}
}
}
return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
}
使用配置shortestresponse值进行设计对应策略
一致性 Hash,相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
/**
* Hash nodes name
*/
public static final String HASH_NODES = "hash.nodes";
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// using the hashcode of list to compute the hash only pay attention to the elements in the list
int invokersHashCode = invokers.hashCode();
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != invokersHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = Bytes.getMD5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = Bytes.getMD5(key);
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
}
使用配置consistenthash值进行设计对应策略
<dubbo:service interface="..." loadbalance="random | leastactive | shortestresponse | consistenthash" />
<dubbo:reference interface="..." loadbalance="random | leastactive | shortestresponse | consistenthash" />
<dubbo:service interface="...">
<dubbo:method name="..." loadbalance="random | leastactive | shortestresponse | consistenthash"/>
dubbo:service>
<dubbo:reference interface="...">
<dubbo:method name="..." loadbalance="random | leastactive | shortestresponse | consistenthash"/>
dubbo:reference>
@DubboService(loadbalance = “random | leastactive | shortestresponse | consistenthash”)
@DubboReference(loadbalance = “random | leastactive | shortestresponse | consistenthash”)
方法级别就不一一展示了!
ServiceConfig serviceConfig= new ServiceConfig();
serviceConfig.setLoadBalance(“random | leastactive | shortestresponse | consistenthash”);
ReferenceConfig referenceConfig = new ReferenceConfig();
referenceConfig.setLoadBalance(“random | leastactive | shortestresponse | consistenthash”);
dubbo.service.loadbalance=random | leastactive | shortestresponse | consistenthash
dubbo.reference.loadbalance=random | leastactive | shortestresponse | consistenthash
一致性Hash模式,默认采用第一个参数作为哈希 key,如果需要切换参数,可以指定 hash.arguments 属性
ReferenceConfig<DemoService> referenceConfig = new ReferenceConfig<DemoService>();
// ... init
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("hash.arguments", "1");
parameters.put("sayHello.hash.arguments", "0,1");
referenceConfig.setParameters(parameters);
referenceConfig.setLoadBalance("consistenthash");
referenceConfig.get();
从多个服务提供方中选择一个进行调用
org.apache.dubbo.rpc.cluster.LoadBalance
<dubbo:protocol loadbalance="xxx" />
<dubbo:provider loadbalance="xxx" />
org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance
src
|-main
|-java
|-com
|-xxx
|-XxxLoadBalance.java (实现LoadBalance接口)
|-resources
|-META-INF
|-dubbo
|-org.apache.dubbo.rpc.cluster.LoadBalance (纯文本文件,内容为:xxx=com.xxx.XxxLoadBalance)
XxxLoadBalance.java:
package com.xxx;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.RpcException;
public class XxxLoadBalance implements LoadBalance {
public <T> Invoker<T> select(List<Invoker<T>> invokers, Invocation invocation) throws RpcException {
// ...
}
}
META-INF/dubbo/org.apache.dubbo.rpc.cluster.LoadBalance:
xxx=com.xxx.XxxLoadBalance