Dubbo中负载均衡
官方文档
http://dubbo.apache.org/zh-cn/docs/user/demos/loadbalance.html
http://dubbo.apache.org/zh-cn/docs/source_code_guide/loadbalance.html
简介
LoadBalance ,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况,解决大量并发访问服务问题。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费。
负载均衡的分类
负载均衡可分为软件负载均衡和硬件负载均衡。
- 软件负载:Nginx、LVS、HAProxy
- 硬件负载:Array、F5
Dubbo中负载均衡
在 Dubbo 中,Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。Dubbo 提供了四种负载均衡实现:
- 基于权重随机算法的 RandomLoadBalance
- 基于最少活跃调用数算法的 LeastActiveLoadBalance
- 基于 hash 一致性的 ConsistentHashLoadBalance
- 基于加权轮询算法的 RoundRobinLoadBalance
LoadBalance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @SPI(RandomLoadBalance.NAME) public interface LoadBalance {
@Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
|
AbstractLoadBalance
该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public abstract class AbstractLoadBalance implements LoadBalance { static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); }
@Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); }
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100); if (weight > 0) { long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L); if (timestamp > 0L) { int uptime = (int) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter("warmup", 600000); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; } }
|
说明:calculateWarmupWeight【(uptime / warmup)* weight】
运行时长 |
公式 |
计算后权重 |
1分钟 |
1/10 * 100 |
10 |
2分钟 |
2/10 * 100 |
20 |
5分钟 |
5/10 * 100 |
50 |
10分钟 |
10/10 * 100 |
100 |
ww < 1 ? 1 : (ww > weight ? weight : ww);==》 ww < 1 ? 1 : (ww > 100 ? 100 : ww)
主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,避免让服务在启动之初就处于高负载状态。服务预热是一个优化手段,与此类似的还有 JVM 预热。主要目的是让服务启动后“低功率”运行一段时间,使其效率慢慢提升至最佳状态。
RandomLoadBalance
RandomLoadBalance 是加权随机算法的具体实现。参照 加权随机 方式二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random"; private final Random random = new Random();
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); int totalWeight = 0; boolean sameWeight = true; for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { int offset = random.nextInt(totalWeight); for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } return invokers.get(random.nextInt(length)); } }
|
LeastActiveLoadBalance
最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。
在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
private final Random random = new Random();
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); int leastActive = -1; int leastCount = 0; int[] leastIndexs = new int[length]; int totalWeight = 0; int firstWeight = 0; boolean sameWeight = true; for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100); if (leastActive == -1 || active < leastActive) { leastActive = active; leastCount = 1; leastIndexs[0] = i; totalWeight = weight; firstWeight = weight; sameWeight = true; } else if (active == leastActive) { leastIndexs[leastCount++] = i; totalWeight += weight; if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } if (leastCount == 1) { return invokers.get(leastIndexs[0]); } if (!sameWeight && totalWeight > 0) { int offsetWeight = random.nextInt(totalWeight); for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } return invokers.get(leastIndexs[random.nextInt(leastCount)]); } }
|
活跃数的自增、自减
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| try { RpcStatus.beginCount(url, methodName); Result result = invoker.invoke(invocation); RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } finally { if (max > 0) { synchronized (count) { count.notify(); } } }
|
需要添加以下配置信息:
1
| <dubbo:consumer filter="activelimit"></dubbo:consumer>
|
ConsistentHashLoadBalance
首先根据 ip 或其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ConsistentHashLoadBalance extends AbstractLoadBalance { private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector<T> {} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } }
public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); }
private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); }
private Invoker<T> selectForKey(long hash) { Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } }
|
RoundRobinLoadBalance
加权轮询:服务器【A、B、C】,权重分别是【1、2、3】。面对6次请求,它们负载均衡的结果如下:【A、B、C、B、C、C】。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin"; private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); int maxWeight = 0; int minWeight = Integer.MAX_VALUE; final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); minWeight = Math.min(minWeight, weight); if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } } return invokers.get(currentSequence % length); }
private static final class IntegerWrapper { private int value; public IntegerWrapper(int value) { this.value = value; } public void decrement() { this.value--; } } }
|
- 此时有服务器【A、B、C】,权重分别是【1、2、3】,总权重为6,最大权重为3。
- mod = 0:满足条件,此时直接返回服务器 A
- mod = 1:自减1次后才能满足条件,此时返回服务器 B
- mod = 2:自减2次后才能满足条件,此时返回服务器 C
- mod = 3:自减3次后才能满足条件,经过递减后,服务器权重为 [0, 1, 2],此时返回服务器 B
- mod = 4:自减4次后才能满足条件,经过递减后,服务器权重为 [0, 0, 1],此时返回服务器 C
- mod = 5:只剩服务器C还有权重,返回C。
- 这样6次调用,得到的结果就是【A、B、C、B、C、C】。
- 当第7次调用时,此时调用编号为6,总权重大小也为6;mod则为0,重新开始。
