SpringBoot整合RabbitMQ 延时消息的实现

流氓凡 技术分享 2022-08-26 4.65 K 0

实现方式

死信消息(队列ttl+死信exchange)

延时插件 (rabbitmq-delayed-message-exchange)


延时插件实现

简述:延时消息不直接投递到队列中,而是先转储到本地Mnesia数据库中,然后定时器在消息到期后再将其投递到队列中。

关于用法可以直接看这个文档或者网上搜一搜,这里就不介绍了。

github地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

其大概原理就是:指定了延时的消息,会被先保存在 Mnesia (erlang编写的数据库管理系统)中,然后有一个定时器去查询最近需要被投递的消息,将其投递到目标队列中。

在这里插入图片描述

简单分析:

优点:

  • 基本支持任意延迟时间(不能超过1个月)

缺点:

  • 延时不可靠,存在消息数量较大或使用很久后延迟不准确(会推迟), 见 issues#72

  • 无备份机制,延时消息存在单个节点磁盘中

  • 不支持ram类型的节点 (数据得存磁盘里面)

  • 增加大量内存的占用 (经测试发现,发送大量延时消息后,rabbitmq内存占用明显增高,比普通消息还要高很多那种。)

  • 安装插件需要重启

适用场景:

  • 如果不是无关紧要的小业务,不建议使用。


死信队列实现

简述:使用两个队列,一个队列接收消息不消费,等待指定时间后消息死亡,再由该队列绑定的死信exchange再次将其路由到另一个队列提供业务消费。本文也是主要以此方式实现延迟消息

使用介绍

当一个队列设置了死信exchange 后,这个队列的死信都会被投递到死信exchange中,然后可以再次路由到其他队列中(如果指定了死信routing key 则死信消息routing key 变为设置的routing key,未设置则为原始 routing key)。

步骤一般都是:

  • 先声明一个消费队列 queue_dlx,用来接收死信消息,并提供消费;

  • 然后声明一个死信exchange_dlx, 绑定 queue_dlx,接收消息后路由至queue_dlx;

  • 声明一个延迟队列,queue_delay, 用来接收业务消息,但不提供消费,等待消息死亡后转至死信exchange。(即延迟)

  • 声明一个exchange,由业务发送消息到exchange,然后转至queue_delay.

听着好像比较复杂,其实代码难度很简单,区别去网上大多数教程的繁琐,直接上代码demo,GitHub地址 ,它的一个消息的流程大概是:








简单分析:

缺点:

  • 使用较复杂,得声明一堆队列&exchange

优点:

  • 支持镜像队列复制,实现高可用 

  • 支持大量消息(成千上万)

适用场景:

  • 任意场景,尤其是分布式场景下表现非常优越。

  • 固定时长的场景

备注:对于高版本(3.6及以上)的rabbitmq建议使用lazy-mode作为延迟队列,防止大量延时消息堆积而占用大量内存,从而触发rabbitmq换页阻塞队列。 (如果使用spring的话,即使低版本rabbitmq也不用太担心:spring-amqp默认发送持久化消息,即使触发换页,也只是把消息从内存中逐出而已。)


代码实现:

1. 先安装相关依赖:

<!--    安装mq依赖    -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
</dependency>


2. 然后配置下yaml

spring:
   application:
      name: delay-rabbitmq
   rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest
      virtual-host: /
      # 路由失败回调
      publisher-returns: true
      publisher-confirms: true

3. 去mq后台新建两个交换机

img_1.png


代码中新建一个枚举类:

package com.rabbitmqdelay.enums;

import lombok.Getter;

/**
 * 消息队列枚举类
 */
@Getter
public enum QueueEnum {

    /**
     * 消息通知队列
     */
    QUEUE_ORDER_CANCEL("order.direct", "order.cancel", "order.cancel"),
    /**
     * 消息通知ttl队列
     */
    QUEUE_TTL_ORDER_CANCEL("order.direct.ttl", "order.cancel.ttl", "order.cancel.ttl");

    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }

}


然后新建一个mq的配置接管

package com.rabbitmqdelay.config;

import com.rabbitmqdelay.enums.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 消息队列配置类
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 订单消息实际消费队列所绑定的交换机
     */
    @Bean
    DirectExchange orderDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 订单延迟队列队列所绑定的交换机
     */
    @Bean
    DirectExchange orderTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 订单实际消费队列
     */
    @Bean
    public Queue orderQueue() {
        return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
    }

    /**
     * 订单延迟队列(死信队列)
     */
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
                .build();
    }

    /**
     * 将订单队列绑定到交换机
     */
    @Bean
    Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){
        return BindingBuilder
                .bind(orderQueue)
                .to(orderDirect)
                .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
    }

    /**
     * 将订单延迟队列绑定到交换机
     */
    @Bean
    Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
        return BindingBuilder
                .bind(orderTtlQueue)
                .to(orderTtlDirect)
                .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
    }

}


至此,我们的配置已经基本完成,接下来创建一个生产者和消费者

package com.rabbitmqdelay.component;

import com.rabbitmqdelay.enums.QueueEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 取消订单消息的发出者
 */
@Component
@Slf4j
public class CancelOrderSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 直接给队列发送消息
     *
     * @param orderId
     */
    public void sendMessage(Long orderId) {
        //直接给队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey(), orderId);
        log.debug("发出消息:{}", orderId);
    }

    /**
     * 给延迟队列发送消息
     *
     * @param orderId
     * @param delayTimes
     */
    public void sendMessage(Long orderId, final long delayTimes) {
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, (Message message) -> {
            //给消息设置延迟毫秒值
            message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
            return message;
        });
        log.debug("发出消息:{}", orderId);
    }
}
package com.rabbitmqdelay.component;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 取消订单消息的处理者
 */
@Component
@RabbitListener(queues = "order.cancel")
@Slf4j
public class CancelOrderReceiver {


    @RabbitHandler
    public void handle(Long orderId){
        log.info("接收消息:{}",orderId);
        // 订单业务处理逻辑
    }

}


OK,大工完成,我们测试下,新建一个controller

package com.rabbitmqdelay.controller;

import com.rabbitmqdelay.component.CancelOrderSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class OmsPortalOrderController {

    @Autowired
    private CancelOrderSender cancelOrderSender;

    @GetMapping("/delay-message")
    public void sendDelayMessage() {
        // 生成订单
        // 为订单设置超时时间
        long delayTimes = 3 * 1000;
        // 发送延迟消息
        cancelOrderSender.sendMessage(Long.parseLong("20102010201"), delayTimes);
        log.info("发送延迟消息成功");
    }

    @GetMapping("/send-message")
    public void sendMessage() {
        // 生成订单
        // 发送消息
        cancelOrderSender.sendMessage(Long.parseLong("20102010201"));
        log.info("直接发送消息成功");
    }
}


然后发送一个直接即时消息看看:

curl http://localhost:8080/send-message


然后发送一个延迟3秒的消息:

curl http://localhost:8080/delay-message

img_2.png

评论