MQ中默认一个队列只对应一个消费者,假设单位时间内新来M/M/1
系统1。
队列中消息的到达和消费可以建模为生灭过程2,假设有
如果队列在状态
对于
对于
结合上述两式可以获得
队列全部状态的概率和为一
系统消息总数均值为处于各个状态的概率和:
等待消息总数均值为系统消息总数均值减去正在处理的消息数:
应用Little Law
3获得消息在系统的平均逗留时间:
应用Little Law
获得消息在队列中平均等待时间:
MQ开启批量消费,一个队列对应n
个消费者,多个消费者共享同一个队列,假设单位时间内新到达M/M/n
系统4。
如果队列消息数小于消费者数
如果队列消息数小于消费者
如果队列在状态
对于
对于
对于
综合上式递推得到系统空闲概率
在队列中等待处理的消息总数均值为:
系统平均消息总数均值为处于等待状态的消息总数均值加上正在处理的消息总数均值:
应用Little Law
获得消息在系统的平均逗留时间均值:
应用Little Law
获得消息在队列中平均等待均值时间:
现在假设有5个消费者n=5
,每个消费者单位时间内可以处理一条消息
对于n个单队列单消费者构成的系统,定义系统负载
对于包含n个消费者的单队列多消费者系统,定义系统负载
当二者系统负载相同时,将在单位时间内收到相同数量的新消息。下图展示了二者在相同系统负载时队列中等待处理的消息总数均值
可见当新消息到来速度小于系统消息消费能力M/M/n
系统能立即处理新到来消息,没有消息积压,也没有等待处理耗时。然而单队列单消费者系统相较于单队列多消费者系统,消息积压较为严重,消息等待处理时间更长,在系统负载增大时,劣势更加明显。
MQ出于消息顺序性消费考虑、以及单个队列读写性能受限的原因,默认限制每个分区只能由一个消费者组中的一个消费者消费。
M/M/1
和M/M/n
系统性能差距主要来源于:由于队列间独立,并且没有类似Golang
中GMP
的消息窃取机制(同样出于有序性要求),当一些队列面临高压力出现长队列,而其他队列可能暂时空闲,相应空闲队列对应的消费者空转,导致系统资源利用率低,消息不能及时处理。
MQ的解决方式是使用再平衡机制,在消费者与队列之间动态地重新分配队列来实现负载平衡。再平衡机制将在周期性延迟、消费者发生变化,以及订阅topic变化后,被触发执行再平衡。
Rocket MQ中在RebalanceService#run()
中定时(默认20s)触发再平衡流程。
1// RebalanceService#run()
2public void run() {
3 long realWaitInterval = waitInterval;
4 while (!this.isStopped()) {
5 // 定时休眠后触发再平衡
6 this.waitForRunning(realWaitInterval);
7 boolean balanced = this.mqClientFactory.doRebalance();
8 }
9 }
10}
具体通过RebalanceImpl
实施平衡:再平衡过程中,获取当前所有活跃的消费者列表和所有队列的列表,然后根据轮询或者一致性哈希分配等算法将topic下队列分配给所有活跃的消费者。
xxxxxxxxxx
91// RebalanceImpl#doRebalance()
2public boolean doRebalance(final boolean isOrder) {
3 // 获取当前所有活跃的消费者列表和所有队列的列表
4 for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
5 final String topic = entry.getKey();
6 // 执行再平衡
7 boolean result = this.rebalanceByTopic(topic, isOrder);
8 }
9}
xxxxxxxxxx
161// RebalanceImpl#rebalanceByTopic
2private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
3 // 获取topic对应的队列和consumer信息
4 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
5 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
6 // 选择再平衡策略
7 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
8 // 完成再平衡
9 List<MessageQueue> allocateResult = null;
10 allocateResult = strategy.allocate(
11 this.consumerGroup,
12 this.mQClientFactory.getClientId(),
13 mqAll,
14 cidAll);
15 }
16}
再平衡策略决定队列和消费者间的对应关系,RocketMQ提供了下列分配策略
AllocateMessageQueueAveragely
:默认策略,先计算队列数除以消费者数的余数
AllocateMessageQueueAveragelyByCircle
:每个消费者依次分配一个队列,循环分配,直至分配完毕全部队列。
AllocateMessageQueueConsistentHash
:通过一致性hash算法计算队列所属消费者。此策略可以尽量减少 因为队列或者消费者数量发生变化,导致消费者与队列关联关系变化,从而需要重新建立TCP连接的数量。同时再平衡阶段消费者将不能正常消费消息,更少的关系变化,可以更快的完成再平衡过程,减少MQ不可用时间。
再平衡流程将在消费者组或者topic发生变化后,当前消费者组领导者加入消费组后进行
xxxxxxxxxx
131// ConsumerCoordinator#onJoinComplete
2protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
3 // 再平衡策略
4 ConsumerPartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
5 // 待分配分区
6 SortedSet<TopicPartition> assignedPartitions = new TreeSet(COMPARATOR);
7 assignedPartitions.addAll(assignment.partitions());
8 // 执行分配
9 firstException.compareAndSet((Object)null, this.invokeOnAssignment(assignor, assignment));
10 this.subscriptions.assignFromSubscribed(assignedPartitions);
11 firstException.compareAndSet((Object)null, this.invokePartitionsAssigned(addedPartitions));
12 }
13}
再平衡策略:Kafka提供了和RocketMQ相似的分配策略
RangeAssignor
与RocketMQ
的AllocateMessageQueueAveragely
策略一致。
RoundRobinAssignor
与RocketMQ
的AllocateMessageQueueAveragelyByCircle
策略一致。
StickyAssignor
:分配尽量均匀,尽量与上一次分配的相同,尽量减少分配关系的变动,从而减少需要重新建立消费者与分区之间TCP连接的数量,缩短再平衡过程耗时,降低MQ不可用时间。。
CooperativeStickyAssignor
:两阶段再平衡策略,先尝试只移动最少量的分区完成再分配,如果分配后不满足平衡条件,再进行完整的再平衡。
队列长度与逗留时间计算绘制脚本
x
1from scipy.special import factorial
2import numpy as np
3import matplotlib.pyplot as plt
4
5
6# MM1 队列模型
7def mm1_queue(lambda_, mu):
8 if lambda_ >= mu:
9 return "系统不稳定"
10 rho = lambda_ / mu
11 # 等候+服务人数
12 L = rho / (1 - rho)
13 # 等候+服务时间
14 W = 1 / (mu - lambda_)
15 # 等候人数
16 Lq = rho**2 / (1 - rho)
17 # 等候时间
18 Wq = rho / (mu - lambda_)
19 return L, W, Lq, Wq
20
21# MMn 队列模型
22def mmn_queue(lambda_, mu, n):
23 # 计算流量强度
24 rho = lambda_ / (n * mu)
25 if rho >= 1:
26 return "系统不稳定,请确保 rho < 1"
27
28 # 计算P0
29 sum_p = sum((lambda_ / mu)**k / factorial(k) for k in range(n))
30 p0 = (sum_p + (lambda_ / mu)**n / (factorial(n) * (1 - rho)))**(-1)
31 # 计算队列中的平均客户数 Lq
32 lq = (rho**n * rho / factorial(n)) * p0 / (1 - rho)**2
33 # 计算平均等待时间在队列中 Wq
34 wq = lq / lambda_
35 # 计算系统中的总平均客户数 L
36 l = lq + lambda_ / mu
37 # 计算系统中的总平均等待时间 W
38 w = wq + 1 / mu
39
40 return l, w, lq, wq
41
42
43
44def plot(x, y, y_label):
45 # 绘制图表
46 plt.figure()
47 # 绘制等待时间
48 plt.plot(x, y[0], label="M/M/1", marker='o')
49 plt.plot(x, y[1], label="M/M/n", marker='o')
50 plt.title(y_label+" vs $\\rho$")
51 plt.xlabel("$\\rho$")
52 plt.ylabel(y_label)
53 plt.legend()
54 plt.grid(True)
55
56
57# 利用率从0.1到0.9
58rho_values = [0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.85,0.9]
59mu = 1
60# 服务台数量
61n = 5
62
63# 客户到达率
64lambda_ = rho_values*mu
65mm1_array = np.zeros((len(rho_values), 4))
66mmn_array = np.zeros((len(rho_values), 4))
67
68for i in range(len(rho_values)):
69 lam = lambda_[i]
70 mm1_array[i] = mm1_queue(lam, mu)
71 mmn_array[i] = mmn_queue(n*lam, mu, n)
72
73plt.figure()
74# 绘制队列长度
75plt.plot(rho_values, n*mm1_array[:, 0], label="n$\\times$M/M/1 L", marker='o', color='r')
76plt.plot(rho_values, mmn_array[:, 0], label="M/M/n L", marker='o', color='g')
77plt.plot(rho_values, n*mm1_array[:, 2],
78 label="n$\\times$M/M/1 Lq", marker='*', color='r')
79plt.plot(rho_values, mmn_array[:, 2], label="M/M/n Lq", marker='*', color='g')
80plt.title("$L,Lq$"+" vs $\\rho$")
81plt.xlabel("$\\rho$")
82plt.ylabel("$L,Lq$")
83plt.legend()
84plt.grid(True)
85
86plt.figure()
87# 绘制等待时间
88plt.plot(rho_values, mm1_array[:, 1], label="n$\\times$M/M/1 W", marker='o', color='r')
89plt.plot(rho_values, mmn_array[:, 1], label="M/M/n W", marker='o', color='g')
90plt.plot(rho_values, mm1_array[:, 3],
91 label="n$\\times$M/M/1 Wq", marker='*', color='r')
92plt.plot(rho_values, mmn_array[:, 3], label="M/M/n Wq", marker='*', color='g')
93plt.title("$W,Wq$"+" vs $\\rho$")
94plt.xlabel("$\\rho$")
95plt.ylabel("$W,Wq$")
96plt.legend()
97plt.grid(True)
98
99plt.show()
100