Fellow Travellers

Dubbo中使用的负载均衡实现解析

万世威
字数统计: 3.2k阅读时长: 15 min
2018/02/22 Share

Dubbo中负载均衡

官方文档

http://dubbo.apache.org/zh-cn/docs/user/demos/loadbalance.html

http://dubbo.apache.org/zh-cn/docs/source_code_guide/loadbalance.html

简介

​ LoadBalance ,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况,解决大量并发访问服务问题。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费。

负载均衡的分类

​ 负载均衡可分为软件负载均衡和硬件负载均衡。

  • 软件负载:Nginx、LVS、HAProxy
  • 硬件负载:Array、F5

Dubbo中负载均衡

​ 在 Dubbo 中,Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。Dubbo 提供了四种负载均衡实现:

  • 基于权重随机算法的 RandomLoadBalance
  • 基于最少活跃调用数算法的 LeastActiveLoadBalance
  • 基于 hash 一致性的 ConsistentHashLoadBalance
  • 基于加权轮询算法的 RoundRobinLoadBalance

LoadBalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 默认使用 RandomLoadBalance
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

/**
* select one invoker in list.
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

AbstractLoadBalance

​ 该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public abstract class AbstractLoadBalance implements LoadBalance {

// 计算权重
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

// 选取invoker
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 不存在提供者
if (invokers == null || invokers.isEmpty())
return null;
// 只有一个提供者
if (invokers.size() == 1)
return invokers.get(0);
// 负载具体实现
return doSelect(invokers, url, invocation);
}

// 抽象方法,由不同的子类实现
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

// 获取权重配置信息
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 获取权重值:默认100
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100);
if (weight > 0) {
// 服务提供者启动时间戳
long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L);
if (timestamp > 0L) {
// 运行的时间:当前时间-启动时间
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 获取服务预热时间:10分钟
int warmup = invoker.getUrl().getParameter("warmup", 600000);
// 如果服务启动未到达10分钟
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}

说明:calculateWarmupWeight【(uptime / warmup)* weight】

运行时长 公式 计算后权重
1分钟 1/10 * 100 10
2分钟 2/10 * 100 20
5分钟 5/10 * 100 50
10分钟 10/10 * 100 100

ww < 1 ? 1 : (ww > weight ? weight : ww);==》 ww < 1 ? 1 : (ww > 100 ? 100 : ww)

主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,避免让服务在启动之初就处于高负载状态。服务预热是一个优化手段,与此类似的还有 JVM 预热。主要目的是让服务启动后“低功率”运行一段时间,使其效率慢慢提升至最佳状态。

RandomLoadBalance

​ RandomLoadBalance 是加权随机算法的具体实现。参照 加权随机 方式二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RandomLoadBalance extends AbstractLoadBalance {

public static final String NAME = "random";
private final Random random = new Random();

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 获取总数
int length = invokers.size();
// 总权重
int totalWeight = 0;
// 是否有相同的权重
boolean sameWeight = true;
// 循环,
// 1.计算总权重数
// 2.判断是否所有机器的权重相同
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight;
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
// 只有有一台机器的权重不一致,设置false
sameWeight = false;
}
}
// 总权重大于0,不存在相同的权重数
if (totalWeight > 0 && !sameWeight) {
// 产生一个[0, length)之间的随机数
int offset = random.nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 所有的权重相同,随机选取一个
return invokers.get(random.nextInt(length));
}
}
// 我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。
// 第一次循环,offset - 5 = 2 > 0,即 offset > 5,
// 表明其不会落在服务器 A 对应的区间上。
// 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,
// 表明其会落在服务器 B 对应的区间上

LeastActiveLoadBalance

​ 最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。

​ 在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class LeastActiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "leastactive";

private final Random random = new Random();

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// invokers 总数
int length = invokers.size();
// 最小活跃数值
int leastActive = -1;
// 最小活跃数invoker数量
int leastCount = 0;
int[] leastIndexs = new int[length];
// 总权重
int totalWeight = 0;
// 第一个Invoker权重值 用于比较invoker直接的权重是否相同
int firstWeight = 0;
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取活跃数大小
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取权重值
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100);
// 找到最小活跃数
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexs[0] = i;
totalWeight = weight;
firstWeight = weight;
sameWeight = true;
// 如果当前Invoker的活跃数 与 最小活跃数相等
} else if (active == leastActive) {
// 记录当前 Invoker
leastIndexs[leastCount++] = i;
totalWeight += weight;
// 判断是否相同权重
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}
// 最小活跃数Invoker只有一个
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}
// 多个Invoker具体相同的最小活跃数,但权重不同,就走权重的逻辑
if (!sameWeight && totalWeight > 0) {
int offsetWeight = random.nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// 所有Invoker权重一样,或总权重==0,采取随机策略
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
}

活跃数的自增、自减

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// com\alibaba\dubbo\rpc\filter\ActiveLimitFilter.java
try {
// 触发active自增操作
RpcStatus.beginCount(url, methodName);
// 具体调用,选取负载策略、默认 random
Result result = invoker.invoke(invocation);
// 触发active自减操作
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} finally {
if (max > 0) {
synchronized (count) {
count.notify();
}
}
}

需要添加以下配置信息:

1
<dubbo:consumer filter="activelimit"></dubbo:consumer>

ConsistentHashLoadBalance

​ 首先根据 ip 或其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 类名+方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 对当前的invokers进行hash取值,native 方法
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);

// 如果ConsistentHashSelector为空 或者 新的invokers hashCode取值不同
// 说明服务提供者列表可能发生变化,需要重新创建ConsistentHashSelector
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 选择Invoker
return selector.select(invocation);
}

private static final class ConsistentHashSelector<T> {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private static final class ConsistentHashSelector<T> {

// 使用 TreeMap 存储 Invoker 虚拟节点
private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 虚拟节点数量
private final int replicaNumber;
// 服务提供者列表的Hash值
private final int identityHashCode;
// 参数下标
private final int[] argumentIndex;

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 循环创建虚拟节点Invoker
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}

public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
// 对第一个参数进行md5,相同的参数值就会得到同一个hash值,
// 只会受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}

// 只会获取参数列表中的第一个参数
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}

// 选取节点
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}

RoundRobinLoadBalance

​ 加权轮询:服务器【A、B、C】,权重分别是【1、2、3】。面对6次请求,它们负载均衡的结果如下:【A、B、C、B、C、C】。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class RoundRobinLoadBalance extends AbstractLoadBalance {

public static final String NAME = "roundrobin";
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 全限定类名 + "." + 方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 服务提供者数量
int length = invokers.size();
// 最大权重
int maxWeight = 0;
// 最小权重
int minWeight = Integer.MAX_VALUE;
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
// 用于查找最大和最小权重,计算权重总和等
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight);
minWeight = Math.min(minWeight, weight);
if (weight > 0) {
// 将Invoker对象和对应的权重大小IntegerWrapper放入Map中
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
// 累加权重
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 获取当前的调用编号
int currentSequence = sequence.getAndIncrement();
// 如果最小权重小于最大权重,表明服务提供者之间的权重是不相等的
if (maxWeight > 0 && minWeight < maxWeight) {
// 使用调用编号对权重总和进行取余操作
int mod = currentSequence % weightSum;
// 进行 maxWeight 次遍历
for (int i = 0; i < maxWeight; i++) {
// 遍历 invokerToWeightMap
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
// 获取 Invoker
final Invoker<T> k = each.getKey();
// 获取权重包装类 IntegerWrapper
final IntegerWrapper v = each.getValue();
// 如果 mod = 0,且权重大于0,此时返回相应的 Invoker
if (mod == 0 && v.getValue() > 0) {
return k;
}
// mod != 0,且权重大于0,此时对权重和 mod 分别进行自减操作
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// 服务提供者之间的权重相等,此时通过轮询选择 Invoker
return invokers.get(currentSequence % length);
}

private static final class IntegerWrapper {
private int value;
public IntegerWrapper(int value) {
this.value = value;
}
public void decrement() {
this.value--;
}
}
}
  • 此时有服务器【A、B、C】,权重分别是【1、2、3】,总权重为6,最大权重为3。
  • mod = 0:满足条件,此时直接返回服务器 A
  • mod = 1:自减1次后才能满足条件,此时返回服务器 B
  • mod = 2:自减2次后才能满足条件,此时返回服务器 C
  • mod = 3:自减3次后才能满足条件,经过递减后,服务器权重为 [0, 1, 2],此时返回服务器 B
  • mod = 4:自减4次后才能满足条件,经过递减后,服务器权重为 [0, 0, 1],此时返回服务器 C
  • mod = 5:只剩服务器C还有权重,返回C。
  • 这样6次调用,得到的结果就是【A、B、C、B、C、C】。
  • 当第7次调用时,此时调用编号为6,总权重大小也为6;mod则为0,重新开始。

1550752690270

CATALOG
  1. 1. Dubbo中负载均衡
    1. 1.1. 官方文档
    2. 1.2. 简介
    3. 1.3. 负载均衡的分类
    4. 1.4. Dubbo中负载均衡
    5. 1.5. LoadBalance
    6. 1.6. AbstractLoadBalance
      1. 1.6.1. RandomLoadBalance
      2. 1.6.2. LeastActiveLoadBalance
        1. 1.6.2.1. 活跃数的自增、自减
      3. 1.6.3. ConsistentHashLoadBalance
      4. 1.6.4. RoundRobinLoadBalance