SpringBoot整合RabbitMQ 延时消息的实现
实现方式
死信消息(队列ttl+死信exchange)
延时插件 (rabbitmq-delayed-message-exchange)
延时插件实现
简述:延时消息不直接投递到队列中,而是先转储到本地Mnesia数据库中,然后定时器在消息到期后再将其投递到队列中。
关于用法可以直接看这个文档或者网上搜一搜,这里就不介绍了。
github地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
其大概原理就是:指定了延时的消息,会被先保存在 Mnesia (erlang编写的数据库管理系统)中,然后有一个定时器去查询最近需要被投递的消息,将其投递到目标队列中。
简单分析:
优点:
基本支持任意延迟时间(不能超过1个月)
缺点:
延时不可靠,存在消息数量较大或使用很久后延迟不准确(会推迟), 见 issues#72
无备份机制,延时消息存在单个节点磁盘中
不支持ram类型的节点 (数据得存磁盘里面)
增加大量内存的占用 (经测试发现,发送大量延时消息后,rabbitmq内存占用明显增高,比普通消息还要高很多那种。)
安装插件需要重启
适用场景:
如果不是无关紧要的小业务,不建议使用。
死信队列实现
简述:使用两个队列,一个队列接收消息不消费,等待指定时间后消息死亡,再由该队列绑定的死信exchange再次将其路由到另一个队列提供业务消费。本文也是主要以此方式实现延迟消息
使用介绍
当一个队列设置了死信exchange 后,这个队列的死信都会被投递到死信exchange中,然后可以再次路由到其他队列中(如果指定了死信routing key 则死信消息routing key 变为设置的routing key,未设置则为原始 routing key)。
步骤一般都是:
先声明一个消费队列 queue_dlx,用来接收死信消息,并提供消费;
然后声明一个死信exchange_dlx, 绑定 queue_dlx,接收消息后路由至queue_dlx;
声明一个延迟队列,queue_delay, 用来接收业务消息,但不提供消费,等待消息死亡后转至死信exchange。(即延迟)
声明一个exchange,由业务发送消息到exchange,然后转至queue_delay.
听着好像比较复杂,其实代码难度很简单,区别去网上大多数教程的繁琐,直接上代码demo,GitHub地址 ,它的一个消息的流程大概是:
简单分析:
缺点:
使用较复杂,得声明一堆队列&exchange
优点:
支持镜像队列复制,实现高可用
支持大量消息(成千上万)
适用场景:
任意场景,尤其是分布式场景下表现非常优越。
固定时长的场景
备注:对于高版本(3.6及以上)的rabbitmq建议使用lazy-mode作为延迟队列,防止大量延时消息堆积而占用大量内存,从而触发rabbitmq换页阻塞队列。 (如果使用spring的话,即使低版本rabbitmq也不用太担心:spring-amqp默认发送持久化消息,即使触发换页,也只是把消息从内存中逐出而已。)
代码实现:
1. 先安装相关依赖:
<!-- 安装mq依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
2. 然后配置下yaml
spring: application: name: delay-rabbitmq rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / # 路由失败回调 publisher-returns: true publisher-confirms: true
3. 去mq后台新建两个交换机
代码中新建一个枚举类:
package com.rabbitmqdelay.enums; import lombok.Getter; /** * 消息队列枚举类 */ @Getter public enum QueueEnum { /** * 消息通知队列 */ QUEUE_ORDER_CANCEL("order.direct", "order.cancel", "order.cancel"), /** * 消息通知ttl队列 */ QUEUE_TTL_ORDER_CANCEL("order.direct.ttl", "order.cancel.ttl", "order.cancel.ttl"); /** * 交换名称 */ private String exchange; /** * 队列名称 */ private String name; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
然后新建一个mq的配置接管
package com.rabbitmqdelay.config; import com.rabbitmqdelay.enums.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息队列配置类 */ @Configuration public class RabbitMqConfig { /** * 订单消息实际消费队列所绑定的交换机 */ @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单延迟队列队列所绑定的交换机 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单实际消费队列 */ @Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 订单延迟队列(死信队列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键 .build(); } /** * 将订单队列绑定到交换机 */ @Bean Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 将订单延迟队列绑定到交换机 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }
至此,我们的配置已经基本完成,接下来创建一个生产者和消费者
package com.rabbitmqdelay.component; import com.rabbitmqdelay.enums.QueueEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 取消订单消息的发出者 */ @Component @Slf4j public class CancelOrderSender { @Autowired private AmqpTemplate amqpTemplate; /** * 直接给队列发送消息 * * @param orderId */ public void sendMessage(Long orderId) { //直接给队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey(), orderId); log.debug("发出消息:{}", orderId); } /** * 给延迟队列发送消息 * * @param orderId * @param delayTimes */ public void sendMessage(Long orderId, final long delayTimes) { //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, (Message message) -> { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; }); log.debug("发出消息:{}", orderId); } }
package com.rabbitmqdelay.component; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 取消订单消息的处理者 */ @Component @RabbitListener(queues = "order.cancel") @Slf4j public class CancelOrderReceiver { @RabbitHandler public void handle(Long orderId){ log.info("接收消息:{}",orderId); // 订单业务处理逻辑 } }
OK,大工完成,我们测试下,新建一个controller
package com.rabbitmqdelay.controller; import com.rabbitmqdelay.component.CancelOrderSender; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class OmsPortalOrderController { @Autowired private CancelOrderSender cancelOrderSender; @GetMapping("/delay-message") public void sendDelayMessage() { // 生成订单 // 为订单设置超时时间 long delayTimes = 3 * 1000; // 发送延迟消息 cancelOrderSender.sendMessage(Long.parseLong("20102010201"), delayTimes); log.info("发送延迟消息成功"); } @GetMapping("/send-message") public void sendMessage() { // 生成订单 // 发送消息 cancelOrderSender.sendMessage(Long.parseLong("20102010201")); log.info("直接发送消息成功"); } }
然后发送一个直接即时消息看看:
curl http://localhost:8080/send-message
然后发送一个延迟3秒的消息:
curl http://localhost:8080/delay-message
评论