接上,其实不知道要不要继续写这篇博客,对于知识的掌握程度、不能着急,脚踏实地、接着……也不是写,但是我也是加了自己的东西的比如加粗什么滴O(∩_∩)O哈哈~
原文:http://www.360doc.com/content/16/0406/18/203871_548376577.shtml
15、Consumer端消息接收模型
16、慢速消费者Slow Consumer
//Flow Control伪代码,参见Queue.send方法。
Message message = queue.receiveMessage();
//如果通道中memoryList已经满载
if(memoryLimit.isFull()){
//如果开启了flow Control,则阻塞
if(flowControll){
waitForSpace();//block
}
}
//无论如何,持久化消息都要存储。、
//假如磁盘足够
if(message.isPersistent()){
store.add(message);
}
//对于非持久化消息,将直接存入在pendingCursor中。不同的pendingCursor存储机制不同
//比如在storeQueueCursor中,如果broker指定了persistent=true,则会将非持久化信息写入临时文件
//否则,将直接存入内存,并导致systemMemoryUsage计数器增加,直到full阻塞。
pendingMessageCursor.add(message);
慢速,相对于producer而言;producer不断产生新的消息,broker端在内存中已经积压的足够多(比如cacheLimit已满),但在转发给某个consumer时,发现此consumer仍然有大量的消息尚没有消费(ACK),broker会认为此consumer是慢速的。在Queue中,如果已发送(dispatched)但没有消费(unAck)的消息条数 > prefetchSize时,此consumer被标记为Slow。在Topic中,如果cacheLimit已满,但是需要向此订阅者发送的消息量 > prefetchSize时,此订阅者被标记为Slow。简单描述为: 快速的producer生产的消息,不能被消费者及时的消费,而导致在broker端积压。
慢速消费者会给整个broker带来潜在的危险,积压消息可能耗尽broker内存,也可能会导致消息不断的从文件pageIn到内存然后swap到文件中(如果使用了临时文件),而消耗磁盘IO,最重要的一点是,它们还会拖累Producer,导致producer端的阻塞而降低消息生产的效率,从而牵连那些快速消费者;
建议开启Flow Control,以及为每个通道设定合适的memoryLimit。
<!-- 每个队列,内存使用上限为256M,userCache表示持久化的消息也可以缓存以提高转发效率 -->
<!-- advisoryForSlowConsumers表示将慢速消费者信息发布到advisory通道中 -->
<!-- advisoryWhenFull表示当memoryLimit慢载时,向advisory通道中发布通知。 -->
<policyEntry queue=">" producerFlowControl="true" memoryLimit="256mb" useCache="false" advisoryForSlowConsumers="true" advisoryWhenFull="true" />
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="8gb"/><!-- 所有的通道缓存消息的总内存大小,memoryLimit作为其子模块 -->
</memoryUsage>
</systemUsage>
</systemUsage>
保证消息吞吐量:
1、关闭Slow Consumer
<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">
<slowConsumerStrategy>
//是否关闭底层的transport,默认false:通过transport向client发送指令,client接受后,调用consumer.close
//为true,底层transport链接关闭,如果client多个consumer共享一个connection,导致all的consumer关闭
<abortSlowConsumerStrategy abortConnection="false"/>
</slowConsumerStrategy>
</policyEntry>
broker发现slow consumer 会将其注册到慢速消费者列表,此后将有额外的线程扫描并关闭他们;
2、抛弃旧消息(仅对Topic有效,仅对nondurable订阅者有效)
<policyEntry topic=">" producerFlowControl="true" memoryLimit="512mb">
<pendingMessageLimitStrategy>
<!-- 对于慢速消费者,只保留最近100条未消费的消息,仅对topic有效 -->
<constantPendingMessageLimitStrategy limit="100"/>
<!-- 值保留 2.5 * prefetchSize条消息
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
-->
</pendingMessageLimitStrategy>
</policyEntry>
pendingMessageLimitStrategy仅对Topic有效,可以保证慢速消费者不会拖累太久。如果滞留的消息大于了pendingMessageLimit,我们需要使用“移除策略”来移除多余的消息。
<policyEntry topic=">" producerFlowControl="true" memoryLimit="512mb">
<messageEvictionStrategy>
<oldestMessageEvictionStrategy />
</messageEvictionStrategy>
</policyEntry>
topic支持支持3种移除策略:
(1) oldestMessageEvictionStrategy表示移除最旧的消息
(2) uniquePropertyMessageEvictionStrategy表示移除根据属性值筛选消息并移除最旧的
(3) OldestMessageWithLowestPriorityEvictionStrategy表示在旧消息中移除权重最低的。
3、写入临时文件
//如果期望非持久化数据写入文件,必须将broker的持久性设置为true,(默认值为true),即
<broker persistent="true" brokerName="server1">
<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">
<pendingQueuePolicy>
<storeCursor />
</pendingQueuePolicy>
</policyEntry>
<persistenceAdapter>
<!--
<kahaDB directory="${activemq.data}/kahadb"/>
-->
<levelDB directory="${activemq.data}/leveldb"/>
</persistenceAdapter>
<!-- 临时文件存储,默认不存储任何临时文件 -->
<tempDataStore>
<!--
<pListStoreImpl directory="${activemq.data}/tmp"/>
-->
<levelDB directory="${activemq.data}/leveldb/tmp"/>
</tempDataStore>
</broker>
对于Queue而言,支持storeCursor,vmQueueCursor , fileQueueCursor。其中storeCursor是一个“综合”策略,持久化消息使用fileQueueCurosr支持,非持久化消息使用vmQueueCursor支持。vmQueueCursor基于内存,fileQueueCursor表示将数据写入本地临时文件(由tempDataStore决定)。(具体参见:[ActiveMQ策略])
<policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb">
<pendingSubscriberPolicy>
<!-- 对于非耐久的订阅者,非持久化消息: vmCursor,fileCursor -->
<fileCursor/>
</pendingSubscriberPolicy>
<pendingDurableSubscriberPolicy>
<!-- 对于耐久的订阅者,非持久化消息 -->
<!-- storeDurableSubscriberCursor -->
<!-- vmDurableCursor -->
<!-- fileDurableSubscriberCursor -->
<storeDurableSubscriberCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
4、offlineDurableSubscriberTimeout
<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">
durable订阅者,我 broker对你说:及时你永不再上线,我也一直保留着你的消息,及时让那消息积压,除非我如上被迫限制了你的离线时间,超时、泪眼婆娑地将你还有你的消息删除,虽然是这样但是请你相信、我是无心的,请你不要怪我;
17、过期消息
当消息到达consumer端时,会检测消息是否过期,对于过期的消息将不会传递到listener.onMessage方法或者通过receive返回,这些消息将会被直接ACK。
判断一个消息是否过期的方式很简单,就是比较消息的Expiration时间戳和当前时间戳。
18、重复消息Duplicate
因为网络问题或者consumer在集群中迁移的问题(迁移到其他broker中)等,有可能导致consumer接收到重复的消息;
“重复消息”就是当前consumer已经“遇到”过的消息被再次接收到。consumer在将消息消费之前,都会检测消息是否重复,对于重复消息,直接发送poisonAck(毒丸),broker端会将消息直接删除。(具体源码,请参见ConnectionAudit.isDuplicate(),使用了bitSet来标记某个消息ID是否出现过)
19.ActiveMQ Consumer与spring代码样例
<bean id="activeMQConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value=""/>
<property name="password" value=""/>
<property name="sendTimeout" value="3000" />
</bean>
<!-- order -->
<bean id="orderDestination" class="org.apache.activemq.command.ActiveMQQueue">
<property name="physicalName" value="order.queue"></property>
</bean>
<bean id="orderListener" class="com.test.service.listener.OrderListener"/>
<bean id="orderConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="activeMQConnectionFactory" />
<property name="destination" ref="orderDestination" />
<property name="messageListener" ref="orderListener" />
<property name="concurrentConsumers" value="10" />
<property name="maxConcurrentConsumers" value="20" />
</bean>
<!--
<bean id="orderMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="2" />
<property name="maxPoolSize" value="5" />
<property name="daemon" value="true" />
<property name="keepAliveSeconds" value="120" />
</bean>
-->
DefaultMessageListenerContainer:可以指定其他选项、属性列表
1、messageListener: 消息侦听器,必选属性
2、taskExecutor: 任务调度器,可以使用线程池开并发的消费消息。默认TaskExecutor(SimpleAsyncTaskExecutor,类似于CachedThreadPool)。
3、concurrentConsumers: 消费者的最大个数,messageListener实例是单例,所以spring-jms不能自作主张的创建多个messageListener实例来并发消费。
spring在内部,创建了多个MessageConsumer实例,并使用consumer.receive()方法以阻塞的方式来获取消息,当获取消息后,再执行messageListener.onMessage()方法;
当messageConsumer实例被创建后,将会封装在一个Runner接口并交给taskExecutor来调度;如果consumer在一直没有收到消息,则会被置为“idle”并从consumer列表中移除;如果所有的consumer都处于active状态,则会创建新的consumer实例直到达到maxConcurrentConsumers个数上限。通常taskExecutor的线程池容量稍大于concurrentConsumer。
4、maxMessagesPerTask: 每个consumer所消费的消息个数,因为每个consumer都会独占一个Thread[consumer.receive()是阻塞的],当consumer消费maxMessagesPerTask个消息后,它就会退出线程,由taskExecutor重新调度。
5、receiveTimeout: 内部的consumer在receive方法中阻塞的时间。默认为1秒
6、recoveryInterval: 当消息消费时,底层connection异常而无法继续,listener需要等待恢复的时间间隔。默认为5000ms。
7、concurrency: “concurrentConsumers”与“maxConcurrentConsumers”两个参数的简写方式;格式为“5-10”,则表示concurrentConsumers为5,maxConcurrentConsumers为10。
8、 sessionTransacted: Session是否为事务类型。默认为false。
9、 messageSelector: 消息选择器。如果你希望此listener只接受某种特性的消息,可以通过指定selector的方式来过滤消息。
10、pubSubDomain: 此消费通道是否为Topic,默认为“false”。所有与Topic有关的属性,只有在pubSubDomain为true的情况下生效。
11、pubSubNoLocal: 对于Topic而言,此消费者是否消费本地消息。
所谓本地消息,就是当Consumer与Producer公用底层一个Connection时,那么Producer发送的消息,相对于此Consumer就是本地消息。在pubSubDomain为true时有效。
12、subscriptionDurable: 是否为“耐久性”订阅者。在pubSubDomain为true时有效。默认为false
13、durableSubscriptionName: 耐久订阅者名称,每个clientId下可以有多个耐久订阅者,但是他们必须有不同的名字。默认为className。
14、errorHandler: 当listener.onMessage方法抛出异常时,异常该如何处理。
15、autoStartup: 消费者是否自动启动,默认为true,那么在messageContainer实例化后,将会启动consumer(即调用Connection.start());如果为false,那些开发者需要在合适的时机手动启动。
16、clientId: 对于Topic订阅者而言,此参数必备。
17、sessionAcknowledgeMode: ACK MODE,默认为AUTO。
spring-jms做了一件非常遗憾的事情,如果指定了sessionTransacted为true,那么在调用listener.onMessage()方法之后,则会立即提交事务(session.commit()),即使开发者使用了sessionAwareMessageListener,所以开发者无法实现基于事务的“批量”确认机制。
如果开发者指定为CLIENT_ACK,那么spring-JMS将会在onMessage方法返回后立即调用message.acknowlege()方法,所以开发者自己是否确认以及何时确认,将没有意义,如果不希望spring来确认消息,只能在onMessage方法中通过抛出异常的方式。
其中“1”表示AUTO_ACKNOWLEDGE,“2”为CLIENT_ACKNOWLEDGE = 2,“3”为 DUPS_OK_ACKNOWLEDGE = 3。
18、OrderListener.java
……
原文:http://www.360doc.com/content/16/0406/18/203871_548376577.shtml
特别丰富的一篇文章,转载完了,谢谢 KILLKISS的分享