之前在写Spring Cloud Stream专题内容的时候,特地介绍了一下如何使用RabbitMQ的延迟消息来实现定时任务。最近正好因为开发碰到了使用过程中发现,延迟消息没有效果,消息直接就被消费了的情况。因此就继续深入研究了一下问题原因,在此记录下来,给碰到类似问题的童鞋们参考。
问题定位
因为不是所有的消息都出现了没有延迟消息效果的因素,通过有问题的消息特征,大致猜测可能是延迟时间过长导致了消息延迟失败。为了验证这个原因,先拿之前文章中的例子,来测试一下延迟时间是否与问题直接相关。
之前在写Spring Cloud Stream专题内容的时候,特地介绍了一下如何使用RabbitMQ的延迟消息来实现定时任务。最近正好因为开发碰到了使用过程中发现,延迟消息没有效果,消息直接就被消费了的情况。因此就继续深入研究了一下问题原因,在此记录下来,给碰到类似问题的童鞋们参考。
因为不是所有的消息都出现了没有延迟消息效果的因素,通过有问题的消息特征,大致猜测可能是延迟时间过长导致了消息延迟失败。为了验证这个原因,先拿之前文章中的例子,来测试一下延迟时间是否与问题直接相关。
我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次。然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送。对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理。
为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用。RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍以下如何利用Spring Cloud Stream以及RabbitMQ轻松的处理上述问题。
之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队。
准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:
前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略:
最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及作用。
那么什么是消费组呢?为什么要用消费组?它解决什么问题呢?摘录一段之前博文的内容,来解答这些疑问:
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说。对此笔者专门撰稿一篇内功心法:如何看待消息中间件的选型,不过这篇只表其意未表其行,为了弥补这种缺陷,笔者最近特意重新撰稿一篇,以供参考。温馨提示:本文一万多字,建议先马(关注)后看。
先回顾一下,在之前的Spring Cloud Config的介绍中,我们还留了一个悬念:如何实现对配置信息的实时更新。虽然,我们已经能够通过/refresh
接口和Git仓库的Web Hook来实现Git仓库中的内容修改触发应用程序的属性更新。但是,若所有触发操作均需要我们手工去维护Web Hook中的应用位置的话,这随着系统的不断扩张,会变的越来越难以维护,而消息代理中间件是解决该问题最为合适的方案。是否还记得我们在介绍消息代理中的特点时有提到过这样一个功能:消息代理中间件可以将消息路由到一个或多个目的地。利用这个功能,我们就能完美的解决该问题,下面我们来说说Spring Cloud Bus中的具体实现方案。