Dubbo中负载均衡
官方文档
http://dubbo.apache.org/zh-cn/docs/user/demos/loadbalance.html
http://dubbo.apache.org/zh-cn/docs/source_code_guide/loadbalance.html
简介
    LoadBalance ,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况,解决大量并发访问服务问题。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费。
负载均衡的分类
    负载均衡可分为软件负载均衡和硬件负载均衡。
- 软件负载:Nginx、LVS、HAProxy
 
- 硬件负载:Array、F5
 
Dubbo中负载均衡
    在 Dubbo 中,Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。Dubbo 提供了四种负载均衡实现:
- 基于权重随机算法的 RandomLoadBalance
 
- 基于最少活跃调用数算法的 LeastActiveLoadBalance
 
- 基于 hash 一致性的 ConsistentHashLoadBalance
 
- 基于加权轮询算法的 RoundRobinLoadBalance
 
LoadBalance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
   |  @SPI(RandomLoadBalance.NAME) public interface LoadBalance {
      
 
 
 
 
 
      @Adaptive("loadbalance")     <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
  }
 
  | 
 
AbstractLoadBalance
    该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
   | public abstract class AbstractLoadBalance implements LoadBalance {            static int calculateWarmupWeight(int uptime, int warmup, int weight) {         int ww = (int) ((float) uptime / ((float) warmup / (float) weight));         return ww < 1 ? 1 : (ww > weight ? weight : ww);     }
           @Override     public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {                  if (invokers == null || invokers.isEmpty())             return null;                  if (invokers.size() == 1)             return invokers.get(0);                  return doSelect(invokers, url, invocation);     }
           protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
           protected int getWeight(Invoker<?> invoker, Invocation invocation) {                  int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100);         if (weight > 0) {                          long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L);             if (timestamp > 0L) {                                  int uptime = (int) (System.currentTimeMillis() - timestamp);                                  int warmup = invoker.getUrl().getParameter("warmup", 600000);                                  if (uptime > 0 && uptime < warmup) {                     weight = calculateWarmupWeight(uptime, warmup, weight);                 }             }         }         return weight;     } }
  | 
 
说明:calculateWarmupWeight【(uptime / warmup)* weight】
| 运行时长 | 
公式 | 
计算后权重 | 
| 1分钟 | 
1/10 * 100 | 
10 | 
| 2分钟 | 
2/10 * 100 | 
20 | 
| 5分钟 | 
5/10 * 100 | 
50 | 
| 10分钟 | 
10/10 * 100 | 
100 | 
ww < 1 ? 1 : (ww > weight ? weight : ww);==》 ww < 1 ? 1 : (ww > 100 ? 100 : ww)
主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,避免让服务在启动之初就处于高负载状态。服务预热是一个优化手段,与此类似的还有 JVM 预热。主要目的是让服务启动后“低功率”运行一段时间,使其效率慢慢提升至最佳状态。
RandomLoadBalance
    RandomLoadBalance 是加权随机算法的具体实现。参照  加权随机 方式二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
   | public class RandomLoadBalance extends AbstractLoadBalance {
      public static final String NAME = "random";     private final Random random = new Random();
      @Override     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {                  int length = invokers.size();                  int totalWeight = 0;                  boolean sameWeight = true;                                    for (int i = 0; i < length; i++) {             int weight = getWeight(invokers.get(i), invocation);             totalWeight += weight;             if (sameWeight && i > 0                     && weight != getWeight(invokers.get(i - 1), invocation)) {                                  sameWeight = false;             }         }                  if (totalWeight > 0 && !sameWeight) {                          int offset = random.nextInt(totalWeight);             for (int i = 0; i < length; i++) {                 offset -= getWeight(invokers.get(i), invocation);                 if (offset < 0) {                     return invokers.get(i);                 }             }         }                  return invokers.get(random.nextInt(length));     } }
 
 
 
 
 
  | 
 
LeastActiveLoadBalance
    最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。
    在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
   | public class LeastActiveLoadBalance extends AbstractLoadBalance {
      public static final String NAME = "leastactive";
      private final Random random = new Random();
      @Override     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {                  int length = invokers.size();                  int leastActive = -1;                  int leastCount = 0;         int[] leastIndexs = new int[length];                  int totalWeight = 0;                  int firstWeight = 0;         boolean sameWeight = true;         for (int i = 0; i < length; i++) {             Invoker<T> invoker = invokers.get(i);                          int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();                          int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100);                          if (leastActive == -1 || active < leastActive) {                 leastActive = active;                 leastCount = 1;                 leastIndexs[0] = i;                 totalWeight = weight;                 firstWeight = weight;                 sameWeight = true;                              } else if (active == leastActive) {                                  leastIndexs[leastCount++] = i;                 totalWeight += weight;                                  if (sameWeight && i > 0                         && weight != firstWeight) {                     sameWeight = false;                 }             }         }                  if (leastCount == 1) {             return invokers.get(leastIndexs[0]);         }                  if (!sameWeight && totalWeight > 0) {             int offsetWeight = random.nextInt(totalWeight);             for (int i = 0; i < leastCount; i++) {                 int leastIndex = leastIndexs[i];                 offsetWeight -= getWeight(invokers.get(leastIndex), invocation);                 if (offsetWeight <= 0)                     return invokers.get(leastIndex);             }         }                  return invokers.get(leastIndexs[random.nextInt(leastCount)]);     } }
  | 
 
活跃数的自增、自减
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   |  try {          RpcStatus.beginCount(url, methodName);          Result result = invoker.invoke(invocation);          RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);     return result; } finally {     if (max > 0) {         synchronized (count) {             count.notify();         }     } }
 
  | 
 
需要添加以下配置信息:
1
   | <dubbo:consumer filter="activelimit"></dubbo:consumer>
   | 
 
ConsistentHashLoadBalance
    首先根据 ip 或其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | public class ConsistentHashLoadBalance extends AbstractLoadBalance {     private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
      protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {                  String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();                  int identityHashCode = System.identityHashCode(invokers);         ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
                            if (selector == null || selector.identityHashCode != identityHashCode) {             selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));             selector = (ConsistentHashSelector<T>) selectors.get(key);         }                  return selector.select(invocation);     }          private static final class ConsistentHashSelector<T> {} }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
   | private static final class ConsistentHashSelector<T> {            private final TreeMap<Long, Invoker<T>> virtualInvokers;          private final int replicaNumber;          private final int identityHashCode;          private final int[] argumentIndex;
      ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {         this.virtualInvokers = new TreeMap<Long, Invoker<T>>();         this.identityHashCode = identityHashCode;         URL url = invokers.get(0).getUrl();         this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);         String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));         argumentIndex = new int[index.length];         for (int i = 0; i < index.length; i++) {             argumentIndex[i] = Integer.parseInt(index[i]);         }                  for (Invoker<T> invoker : invokers) {             String address = invoker.getUrl().getAddress();             for (int i = 0; i < replicaNumber / 4; i++) {                 byte[] digest = md5(address + i);                 for (int h = 0; h < 4; h++) {                     long m = hash(digest, h);                     virtualInvokers.put(m, invoker);                 }             }         }     }
      public Invoker<T> select(Invocation invocation) {         String key = toKey(invocation.getArguments());                           byte[] digest = md5(key);         return selectForKey(hash(digest, 0));     }
           private String toKey(Object[] args) {         StringBuilder buf = new StringBuilder();         for (int i : argumentIndex) {             if (i >= 0 && i < args.length) {                 buf.append(args[i]);             }         }         return buf.toString();     }
           private Invoker<T> selectForKey(long hash) {         Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();        if (entry == null) {           entry = virtualInvokers.firstEntry();        }        return entry.getValue();     } }
  | 
 
RoundRobinLoadBalance
    加权轮询:服务器【A、B、C】,权重分别是【1、2、3】。面对6次请求,它们负载均衡的结果如下:【A、B、C、B、C、C】。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
   | public class RoundRobinLoadBalance extends AbstractLoadBalance {
      public static final String NAME = "roundrobin";     private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
      @Override     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {                  String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();                  int length = invokers.size();                  int maxWeight = 0;                  int minWeight = Integer.MAX_VALUE;          final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();         int weightSum = 0;                  for (int i = 0; i < length; i++) {             int weight = getWeight(invokers.get(i), invocation);             maxWeight = Math.max(maxWeight, weight);             minWeight = Math.min(minWeight, weight);             if (weight > 0) {                                  invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));                                  weightSum += weight;             }         }         AtomicPositiveInteger sequence = sequences.get(key);         if (sequence == null) {             sequences.putIfAbsent(key, new AtomicPositiveInteger());             sequence = sequences.get(key);         }                  int currentSequence = sequence.getAndIncrement();                  if (maxWeight > 0 && minWeight < maxWeight) {                          int mod = currentSequence % weightSum;                          for (int i = 0; i < maxWeight; i++) {                                  for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { 					                     final Invoker<T> k = each.getKey();                                          final IntegerWrapper v = each.getValue();                                          if (mod == 0 && v.getValue() > 0) {                         return k;                     }                                          if (v.getValue() > 0) {                         v.decrement();                         mod--;                     }                 }             }         }                  return invokers.get(currentSequence % length);     }
      private static final class IntegerWrapper {         private int value;         public IntegerWrapper(int value) {             this.value = value;         }         public void decrement() {             this.value--;         }     } }
  | 
 
- 此时有服务器【A、B、C】,权重分别是【1、2、3】,总权重为6,最大权重为3。
 
- mod = 0:满足条件,此时直接返回服务器 A
 
- mod = 1:自减1次后才能满足条件,此时返回服务器 B
 
- mod = 2:自减2次后才能满足条件,此时返回服务器 C
 
- mod = 3:自减3次后才能满足条件,经过递减后,服务器权重为 [0, 1, 2],此时返回服务器 B
 
- mod = 4:自减4次后才能满足条件,经过递减后,服务器权重为 [0, 0, 1],此时返回服务器 C
 
- mod = 5:只剩服务器C还有权重,返回C。
 
- 这样6次调用,得到的结果就是【A、B、C、B、C、C】。
 
- 当第7次调用时,此时调用编号为6,总权重大小也为6;mod则为0,重新开始。
 
