前言:通常在开发Mq时,我们有必要关注一个点:消息从producer发往broker的速度,consumer消息的消费速度.
而上面的点又涉及到:游标策略(Message Cursors), prefetchSize和producerWindowSize,生产者流控制(Producer Flow Control)等.
对于消费者的Flow Control是prefetchSize,消息消费速度涉及到Multi-thread(这点在《Client客户端的线程资源》中有提到).
本文主要focus生产者流控制!

一. 生产者流量控制(Producer Flow Control)

在4.x的版本里ActiveMq是通过基于TCP的流量控制,但是在当有多个消费者和生产者共用这个连接的时候可能会出现死锁;所以5.x开始,引入了针对某个producer的流量控制策略,通过broker返回producer Ack的方式实现.
流量控制触发条件:

  1. 当memoryUsage,storeUsage,tempUsage达到设置的阈值(即磁盘,内存容量触顶)时触发.
  2. 正常情况下broker不会控制producer的发送消息速度.
  3. 同步消息(持久化消息默认采用同步发送,除非启用useAsyncSend)默认支持Flow Control,异步消息(非持久化消息默认采用异步发送)由于发送一条消息时不用等到任何ACK回来,所以默认不支持Flow Control. 但是我们可以设置connctionFactory.setProducerWindowSize(1024000);实现ProducerWindowSize表示在等到broker的ACK回来之前,可以发送多少bytes的消息.如果想让非持久化的消息也能立即被通知(因为非持久化消息默认异步发送),可以在connectionFactory中设置alwaysSyncSend.

设置方法:

默认情况下,当broker发现满足Flow Control条件后,直接阻塞producer,直到memoryUsage,storeUsage,tempUsage有空余空间.

  1. 启用,禁用Flow Control
    启用:在activemq.xml文件的policyEntry中设置: <policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb">
    禁用:
    1
    2
    3
    4
    5
    6
    7
    <destinationPolicy>
    <policyMap>
    <policyEntries>
    <policyEntry topic=">" producerFlowControl="false"/>
    </policyEntries>
    </policyMap>
    </destinationPolicy>

另外一种方法,是启用Message Cursors策略,这个下面讲.

  1. 流量控制策略
    activemq.xml文件的systemUsage中设置: <systemUsage sendFailIfNoSpaceAfterTimeout="3000">sendFailIfNoSpace="true" Client端抛异常的方式.
    sendFailIfNoSpaceAfterTimeoutsendFailIfNoSpace区别在于,当systemUsage达到设置的limit后, 前者等待一段时间再抛异常, 后者直接抛异常.

  2. 异步消息设置
    connctionFactory.setProducerWindowSize(1024000); 异步模式下, 在broker没有全部返回ack之前, producer能发送给broker的数据大小.
    Tip:非持久化消息默认异步发送,异步发送模式下ProducerWindowSize参数才起作用,可以在ack没有回来时,继续想broker发送消息(不阻塞)的大小。
    Tip:默认情况下,非持久化消息、事务内的消息均采用异步发送;对于持久化消息,则采用同步发送。

游标(Message Cursors)

触发游标策略:消费者处理消息速度赶不上生产者。

storeCursor(store based cursor)

broker默认情况下不使用游标,默认是直接将持久化消息分发到对应的消费者的Dispatch队列,如下图:


上图,是5.x的默认方式:当消费者的消费速度能赶上生产者的速度,那么对于持久化消息会先持久化到磁盘,然后直接发给Dispatch Queue(左图)。
当消费者速度赶不上生产者时,会有部分消息pending在broker,此时broker默认的做法是启用游标(Cursor),由于还有很大旧消息未发送到consumer,且Dispatch Queue大小固定,新来的消息不能直接发给Dispatch Queue,此时会启用游标策略,将新来的消息持久化到磁盘后,发给Cursor。实际上这里Cursor起的是一个缓冲区作用,解决生产者和消费者速度不均衡的问题!
Tip:上图Message Store不一定是磁盘哈,当使用的是levelDB时,就是内存memstore,不过会持久化LOG到磁盘。
Tip:这里Pending Cursor的实现是磁盘文件,配置文件中的tempUsage的地方!
与非持久化消息处理对比:

默认情况下,非持久化消息由于不持久到磁盘,当消费者速度慢于生产者后,会启用游标策略,将一部分消息持久化到临时文件中。
注意这里不是在当内存空间不够时才去持久化到一个临时文件中。
参考:

Note that, since the introduction of the new file cursor in ActiveMQ 5.x, non-persisted messages are shunted into the temporary file store to reduce the amount of memory used for non-persistent messaging. As a result, you may find that a queue’s memoryLimit is never reached, as the cursor doesn’t use very much memory. If you really do want to keep all your non-persistent messages in memory, and stop producers when the limit is reached, you should configure the .

VM Cursor和File based Cursor

简单介绍下另外两张游标策略:VM CursorFile based Cursor

  1. VM Cursor策略是将pending Message全部放在磁盘中,如果memoryUsage已满则会阻塞生产者。在存在较慢消费者或消息消费耗时场景慎用!优点是速度快!
  2. File based Cursor策略是 VM Cursor的改进,当memoryUsage达到极限时,将消息持久化到磁盘临时文件。用于持久化消息较慢,且消费者消费消息较快的场景,且对于某些队列可能会有突发的大量消息产生的场景。

配置

每个队列都有一个独立的dispatch Queuepending Queue,所以这里配置pending Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="org.apache.>">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
</deadLetterStrategy>
<pendingQueuePolicy>
<vmQueueCursor />
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

上述配置,使用vmQueueCursor,所有的pending消息都会保持在内存中(默认是将pending消息放到临时文件中),并且当内存达到限制时会阻塞生产者!

二. 消费者流控制

  1. 消费者比较简单通过设置prefetchSize控制:
    消费者消费消息有同步方式(通过receiver()方法同步获取),异步方式(设置Listener异步回调),Broker与Client间消息交互有pull,push两种方式;
    prefetchSize=0时Client每次调用receiver()方法去Broker啦取一条消息,当prefetchSize>=1时,Broker每次主动往prefetchSize的buffer中push数据。
    注:使用receive()时,prefetchSize可以为0和任意大于0的值,当使用MessageListener异步获取消息时,prefetchSize值必须 >=1,如果设置0,会抛异常。
  2. 通常消费者消费某个destination的消息是单线程的,这样会导致消息处理速度较慢,若要加快消息处理速度可以采用多线程方式,具体见开发中遇到的问题(八)