之前在写Spring Cloud Stream专题内容的时候,特地介绍了一下如何使用RabbitMQ的延迟消息来实现定时任务。最近正好因为开发碰到了使用过程中发现,延迟消息没有效果,消息直接就被消费了的情况。因此就继续深入研究了一下问题原因,在此记录下来,给碰到类似问题的童鞋们参考。
问题定位
因为不是所有的消息都出现了没有延迟消息效果的因素,通过有问题的消息特征,大致猜测可能是延迟时间过长导致了消息延迟失败。为了验证这个原因,先拿之前文章中的例子,来测试一下延迟时间是否与问题直接相关。
之前在写Spring Cloud Stream专题内容的时候,特地介绍了一下如何使用RabbitMQ的延迟消息来实现定时任务。最近正好因为开发碰到了使用过程中发现,延迟消息没有效果,消息直接就被消费了的情况。因此就继续深入研究了一下问题原因,在此记录下来,给碰到类似问题的童鞋们参考。
因为不是所有的消息都出现了没有延迟消息效果的因素,通过有问题的消息特征,大致猜测可能是延迟时间过长导致了消息延迟失败。为了验证这个原因,先拿之前文章中的例子,来测试一下延迟时间是否与问题直接相关。
我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次。然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送。对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理。
为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用。RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍以下如何利用Spring Cloud Stream以及RabbitMQ轻松的处理上述问题。
有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:
@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
if("1.0".equals(version)) {
// Version 1.0
}
if("2.0".equals(version)) {
// Version 2.0
}
}
之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队。
准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:
前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略:
上一篇《Spring Cloud Stream消费失败后的处理策略(一):自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等,无论重试多少次都不可能成功的问题,是无法修复的。对于这样的情况,前文中说了可以利用日志记录消息内容,配合告警来做补救,但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。所以,我们需要需求更好的办法,本文将介绍针对该类问题的一种处理方法:自定义错误处理逻辑。
之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如:
在上一篇《Spring Cloud Stream如何处理消息重复消费》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢?
在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^)。以下错误基于Spring Boot 2.0.5、Spring Cloud Finchley SR1。
最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及作用。
那么什么是消费组呢?为什么要用消费组?它解决什么问题呢?摘录一段之前博文的内容,来解答这些疑问:
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
通过上一篇《消息驱动的微服务(消费组)》的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理。但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费。这时候我们就需要对消息进行分区处理。