前言: 本篇是根据相关资料和MQ运维过程中编辑整理出来的,主要在broker端配置topic和queue的相关策略,二者有很多的相似之处。
个人不建议在broker端配置topic和queue的分发策略,只有在特殊情况下或保护broker稳定运行前提下,进行配置,通常使用默认配置即可。

activemq是我见过的最烂的官方文档有没有!

一. 相关概念

Message group,durable subscriber/no-durable subscriber

对应ActiveMq的Topic而言,有两点:1.Message group;2.durable subscriber/no-durable subscriber;

  • Message Group是指在发送端对某类消息进行标识分组,消费者可以针对Group获取消息。
    Tip:注意区别Kafka的Group,kafka Group是将消费者分组,实现消息的负责均衡或类似queue的功能,分到同一组的消费者共同消费所有消息,即一条消息只会发给Group中的一个消费者。

  • durable subscriber表示client消费者与broker断开后重连后能接收到,断开期间publisher的所有消息,但是第一次与broker建立连接告诉broker自己是durable subscriber时,broker并不会把之前的所有消息发给这个subscriber。相比no-durable subscriber只会收到建立连接后publisher发给broker的消息!
    Tip:注意与持久化和非持久化消息的区别,持久化表示消息到达broker后是否保存到磁盘上!

dispatch queue,pending cursor,message store

默认情况下,消息到来后,执行两个操作:保存到message store(磁盘)(仅针对persistence消息),放入dispatch queue(内存)中。
如果生产者生产消息的速度大于消费者消费的速度,那么broker在将消息放入dispatch queue操作时,dispatch queue中积压的消息会越来越多;
此时broker会启动Cursor游标策略,dispatch queue中积压的消息称作pending Message。游标策略其实就是开辟一块空间存储积压信息,解决生产者消费者速度不均衡时引入的一个缓冲区。而这个存储空间可以在内存中:vmCursor,也可以在磁盘中:storeCursor,也可以同时在内存和磁盘中:fileCursor

二. 配置

从一个示例出发

下面配置了destinationPolicy的相关规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>">
<!-- 仅topic且no-durable,对于slow consumer的消息保留策略 -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
<!-- 消息剔除策略 -->
<messageEvictionStrategy>
<oldestMessageWithLowestPriorityEvictionStrategy/>
</messageEvictionStrategy>
<!--仅topic且no-durable,存在slow consumer时pending消息策略-->
<pendingSubscriberPolicy>
<vmCursor/>
</pendingSubscriberPolicy>
<!--仅topic且durable,存在slow consumer时pending消息策略-->
<pendingDurableSubscriberPolicy>
<vmDurableCursor/>
</pendingDurableSubscriberPolicy>

<!-- 仅topic消息分发策略 -->
<dispatchPolicy>
<RoundRobinDispatchPolicy/>
</dispatchPolicy>

<!-- no-durable subscriber上线后可以追溯的消息策略 -->
<subscriptionRecoveryPolicy>
<!-- 1 minute -->
<timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
</subscriptionRecoveryPolicy>
</policyEntry>

<policyEntry queue="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>

policyEntry

一个policyEntry代表对一类队列的策略,这个我们也可以在client端配置,通常在broker中policyEntry上可以配置三个参数:

  1. producerFlowControl,生产者流控制策略,触发条件是:当broker的RAM或磁盘到达设置的极限时,ture表示broker通过延迟发送ACKResourceAllocationException返回异常到生产者断的方法控制生产者的发生消息速度,false表示如果RAM满了则将消息存到磁盘,如果磁盘也满了则block住生产者直到磁盘有空间。

  2. optimizedDispatch仅queue,默认false:对于每个queue不使用一个独立的线程来dispatch消息;true:每个queue分配一个线程专用于dispatch消息到consumer。

  3. memoryLimit,设置保存在内存中的数据量,默认没有限制,上限受限于systemUsage的设置,通常是用于cursor

eg:
<policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb">
其中,>是通配符,这里指所有queue队列,optimizedDispatch每个队列分配一个独立线程分发消息,memoryLimit每个队列的最大内存使用16M。

Tip:这些也可以通过Client端配置,可以参考:http://activemq.apache.org/per-destination-policies.html (详细说明了哪些是topic哪些是queue独有,共有的属性)

1. 限制策略(pendingMessageLimitStrategy)

针对存在Slow Consumer时,Topic,no-durable订阅者有效,当通道中有大量的消息积压时,broker可以pending在RAM中的消息量。

  • 为了防止Topic中有慢速消费者,导致整个通道消息积压,从而导致broker减缓producer的发送速度,从而影响正常的快速的消费者消费速度。
  • 对于Topic,一条消息只有所有的订阅者都消费才会被删除,当出现slow consumer时,超过pendingMessageLimitStrategy的消息会直接被删除,slow consumer将收不到这些消息。
  • 存在Slow Consumer时,会启用Cursor,超过limit的消息会采用MessageEvictionStrategy策略剔除Dispatch queue中积压的消息。

可选参数:
<constantPendingMessageLimitStrategy limit="1000"/>: 内存保留1000条消息。
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>: 如果prefetchSize为1000,则保留2.5 * 1000条消息

参考:http://activemq.apache.org/slow-consumer-handling.html

2. 消息剔除策略(MessageEvictionStrategy)

针对Slow Consumer,Topic,nondurable订阅者有效,PendingMessage的数量超过限制时,broker该如何剔除多余的消息。
当一条Topic消息达到broker后,两步操作:1.若是持久化消息,先保存到磁盘;2.放入Dispatch queue,此时pendingMessageLimitStrategy策略会检测pending Message是否到达limit,如果是则采用MessageEvictionStrategy策略将内存中(dispatch queue)的消息剔除,并检测是否存在slow consumer,若是则启用Cursor策略,分页的方式将Cursor中的消息放入Dispatch queue,这个cursor策略由pendingSubscriberPolicypendingDurableSubscriberPolicy控制。

问:这个剔除的消息,Slow Consumer还会收到吗?
答:不会,通过一个简单的测试可以验证,验证测试逻辑如下:
首先,1.开启2个线程往A_TEST.TOPIC里publishe消息;2.开启多线程subscribe消息,模拟快速消费者;3.开启一个线程subscribe消息并每消费一条消息sleep 0.5s,模拟成慢速消费者;
然后,先开启2,3两个订阅者,再开启1;观察管理页面的Active Subscribers for A_TEST.TOPIC,可以观察到:一个Inflight为0或1且基本满足Enqueued = Dequeued,表示能及时消费所有的消息,另一个的Inflight为1~32767的数,其中Inflight<=Prefetch,说明当前存在一个快速消费者和一个slow consumer!
最后,统计两个subscriber消费的消息
结果: 一个subscriber消费了所有消息,另一个subscriber只消费了一部分消息。

结论:MessageEvictionStrategy的discard old messages是将Dispatch queue中的old Message删除,这些剔除的消息不再发给slow consumer。。这些被剔除的消息默认被放入ActiveMQ.DLQ队列中。

可选参数:
<oldestMessageEvictionStrategy/>: 移除旧消息,默认策略。
<oldestMessageWithLowestPriorityEvictionStrategy/>: 旧数据中权重较低的消息,将会被移除。权重在生产者端设置(producer.setPriority(0~9),默认4)
<uniquePropertyMessageEvictionStrategy propertyName="test" />: 移除具有指定property的旧消息。此属性值在生产者端设置,并移除较旧的。

3. Topic游标策略 pendingSubscriberPolicy和pendingDurableSubscriberPolicy

ActiveMQ 5.0之前所有non-persistent messaging保存在内存中,persistent messaging直接启用storeCursor,缺点是non-persistent所有消息都保存在内存中,当出现slow consumer会导致RAM到达上限,从而block或减缓producer;对于persistent,如果消费者足够快,那么消息持久化到磁盘后,消息发送是从storCursor到Dispatch queue,这样性能不是最好的!
ActiveMQ 5.0之后,non-persistent消息直接放入Dispatch queue,当slow consumer出现时启用Cursor策略,而不是block或减缓producer;persistent消息持久化到磁盘后直接将消息放入Dispatch queue,当出现slow consumer后,才启用Cursor策略.

  • pendingSubscriberPolicy:是针对nondurable subscriber,三种策略:storeCursor, vmCursorfileCursor
  • pendingDurableSubscriberPolicy:是针对durable subscriber,三种策略:storeDurableSubscriberCursor, vmDurableCursorfileDurableSubscriberCursor
    都默认是store,一般都使用默认,每个具体含义参见:http://activemq.apache.org/message-cursors.html

4. Queue游标策略 pendingQueuePolicy

同上,也有三种策略:fileQueueCursorstoreCursorvmQueueCursor

eg:对持久化和非持久化进行个性化配置:
持久化和非持久化采用vmQueueCursor:

1
2
3
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>

非持久化采用fileQueueCursor:

1
2
3
4
5
6
7
<pendingQueuePolicy>
<storeCursor>
<nonPersistent>
<fileQueueCursor/>
</nonPersistent>
</storeCursor>
</pendingQueuePolicy>

Tip: 无论是pendingQueuePolicy还是pendingSubscriberPolicy和pendingDurableSubscriberPolicy,都是解决由于消费者速度相对慢情景下,RAM的Dispatch queue无法保存所有积压的消息引入的中间层,解决速度不均衡导致的消息积压问题的方案。且与消息是否是persistent无关。

5. 转发策略dispatchPolicy

仅针对Topic有效,消息发送给多个subscriber的顺序问题。

可选参数:
<RoundRobinDispatchPolicy/>: 轮询,消息将依次发送给每个订阅者
<strictOrderDispatchPolicy/>: 严格有序,消息依次发送给每个订阅者,按照订阅的时间先后。
<PriorityDispatchPolicy/>: 基于property,subscriber初始化时可以指定priority,默认每个consumer的priority相同。
<SimpleDispatchPolicy/>: 默认值
<NoSubscriptionRecoveryPolicy/>: 关闭恢复机制。默认值,即不发送订阅开始之前的消息。
strictOrderDispatchPolicy的解释参考:http://activemq.apache.org/total-ordering.html

6. 消息恢复策略 subscriptionRecoveryPolicy

针对Topic,no-durable订阅者有效,表示no-durable上线后broker将其上线前的消息发送给subscriber的策略:
默认情况下,subscriber只能获取建立连接完成之后的消息,如果Retroactive=true(在publisher端设置),那么订阅者就可以获取其创建之前的消息列表。
subscriptionRecoveryPolicy就是用来控制“retroactive”的消息量的,也即broker保留多少已经消费了的消息。

可选参数:
<timedSubscriptionRecoveryPolicy recoverDuration="60000"/>:保留1分钟内的消息。
<fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>:保留1Kb消息。
<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>:保留100条。
<LastImageSubscriptionRecoveryPolicy/>: 只保留最新的一条数据

7. 慢速消费者策略

1
2
3
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false" maxTimeSinceLastAck="30000"/>
</slowConsumerStrategy>

Broker将如何处理慢消费者。Broker将会启动一个后台线程用来检测所有的慢速消费者,并定期关闭关闭它们。
abortConnection表示是否关闭连接,但不关闭底层链接。maxTimeSinceLastAck表示最后一个ACK距离现在的时间间隔阀值。

8. “死信”策略 DLQ策略

1
2
3
4
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>

IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,queuePrefix自定义死信前缀,useQueueForQueueMessages使用队列保存死信,还有一个属性为“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。
SharedDeadLetterStrategy: 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为ActiveMQ.DLQ,可以通过deadLetterQueue属性来设定。还有2个很重要的可选参数,“processExpired”表示是否将过期消息放入死信队列,默认为true;processNonPersistent表示是否将“非持久化”消息放入死信队列,默认为false。
DiscardingDeadLetterStrategy: broker将直接抛弃DeadLeatter。如果开发者不需要关心DeadLetter,可以使用此策略。
AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter:
指定队列的所有消息,全部,正则匹配:

1
2
3
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000" />
<discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}"reportInterval="3000" />

Tip: 默认所有死信消息都会放入ActiveMQ.DLQ队列

三. policyEntry相关问题及配置

持久订阅者Durable Subscriberspending太多消息在broker的问题

如果持久订阅者下线了很长一段时间,或者持久订阅者不再订阅某topic,默认情况下,由于broker要将没有被持久订阅者消费的所有消息保存起来,以便在其上线后发给它;
然而,如果这些消息大量积压在broker,会导致broker的磁盘内存等资源耗尽,且policyEntry没有相关配置,另外,虽然我们可以通过管理页面进行处理,但不一定能及时处理,官方提供两种办法对这种情况进行处理:

1. 过期消息

我们可以在client端将指定destination的消息设置一个过期属性(setTimeToLive(timeToLive));
然后在指定队列中配置<policyEntry topic=">" expireMessagesPeriod="300000"/>,这里在broker配置对所有topic类型消息,每5分钟检查一次,将过期的消息从broker删除!

2. 移除不活动的持久订阅者

ActiveMq提供两个参数,来配置移除策略:
offlineDurableSubscriberTimeout,默认-1不移除,单位毫秒,表示Durable Subscriber不活动多久后,就移除这个Subscriber。
offlineDurableSubscriberTaskSchedule,默认300000,单位毫秒,即默认5分钟检查一次,表示broker每间隔多久去检查一次。
eg:
<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">
表示每隔1小时去检测一次,将下线1天的持久订阅者删除掉!

参考:http://activemq.apache.org/manage-durable-subscribers.html

Tip:具体配置方法查看官方文档,每个参数可以在http://activemq.apache.org/xml-reference.html中找到。

参考

  1. http://activemq.apache.org/per-destination-policies.html
  2. http://activemq.apache.org/dispatch-policies.html
  3. https://my.oschina.net/coderedrain/blog/724943?utm_source=tuicool&utm_medium=referral
  4. http://shift-alt-ctrl.iteye.com/blog/2061859
  5. http://activemq.apache.org/xml-reference.html
  6. http://activemq.apache.org/xml-configuration.html
  7. http://activemq.apache.org/slow-consumer-handling.html