Skip to content

RabbitMQ

消息队列概述

消息队列作用

消息队列就是基础数据结构中的“先进先出”的一种数据机构。

应用解耦

传统模式:系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码。

使用消息队列中间件:将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信提示用户注册成功。注意:邮件,短信并不是必须的,它只是一个通知。传统的做法有两种

串行处理:

并行处理:

消息队列异步处理:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

流量削峰

传统模式:并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

使用消息队列:将所有请求存放到消息队列中,系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

消息队列模型

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

AMQP

AMQP是一种高级消息队列协议(Advanced Message Queuing Protocol),更准确的说是一种binary wire-level protocol(链接协议)。AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

概念说明
连接Connection一个网络连接,比如TCP/IP套接字连接。
信道Channel多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
客户端ClientAMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
服务节点Broker消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。
端点AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
消费者Consumer一个从消息队列里请求消息的客户端程序。
生产者Producer一个向交换机发布消息的客户端应用程序。

JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

关联

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富
  • 可以将JWT看做AMQP的一种实现。

常见消息队列产品

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

其他概念

QPS:每秒的响应请求数,也即是最大吞吐能力。

  • QPS = 并发量 / 平均响应时间

  • 并发量 = QPS * 平均响应时间

  • 原理:每天80%的访问集中在20%的时间里,这20%时间叫做峰值时间。

  • 公式:( 总PV数 * 80% ) / ( 每天秒数 * 20% ) = 峰值时间每秒请求数(QPS) 。

  • 机器:峰值时间每秒QPS / 单台机器的QPS = 需要的机器 。

PV(page view):即页面浏览量,或点击量,等于不重复的访问的IP数*平均访问的页面

uv(unique visitor):指访问某个站点或点击某条新闻的不同IP地址的人数。

PR(PageRank),网页的级别技术,用来标识网页的等级/重要性。级别从1到10级,10级为满分。PR值越高说明该网页越受欢迎。

RabbitMQ模式

概述

  • RabbitMQ基本框架

  • Broker:RabbitMQ的核心,它负责接收、存储和转发消息。可以把它想象成一个邮局,负责处理所有的信件(消息)

    • Virtual host:一个逻辑分区,用于隔离不同的应用或用户。可以把它想象成邮局里的不同信箱,每个信箱属于不同的用户或部门。每个用户在自己的 vhost 创建 exchange/queue 等
      • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point)topic (publish-subscribe)fanout (multicast)
      • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
      • Queue:存储消息的容器,消息最终被送到这里,等待 consumer 取走。
  • Connection:publisher/consumer(应用程序) 和 broker(rabbitmq) 之间的 TCP 连接,相当于连接池。

    • Channel:Channel是建立在Connection之上的虚拟连接。一个Connection可以包含多个Channel,每个Channel都是独立的,可以并行处理消息。

运转流程

  • 生产者发送消息
  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
  2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
  3. 将路由键(空字符串)与队列绑定起来;
  4. 发送消息至RabbitMQ Broker;
  5. 关闭信道;
  6. 关闭连接;
  • 消费者接收消息
  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
  2. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
  3. 等待Broker投递响应队列中的消息,消费者接收消息;
  4. 确认(ack,自动确认)接收到的消息;
  5. RabbitMQ从队列中删除相应已经被确认的消息;
  6. 关闭信道;
  7. 关闭连接;

简单模式

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

工作模式

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。表面看工作队列模式的生产方是面向队列发送消息,但实际上工作队列模式会将队列绑定到默认的交换机 。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

订阅模式

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

Publish/Subscribe发布与订阅模式

1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

Routing路由模式

概述:队列与交换机的绑定,需要指定一个RoutingKey(路由key),消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

Topics通配符模式

通配符规则:

  • #:匹配零个或多个词,item.#:能够匹配item.insert.abc 或者 item.insert或者item
  • *:匹配不多不少恰好1个词,item.*:只能匹配item.insert

  • 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到。
  • 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。

Headers模式

headers类型的Exchange与Routing有点类似,不同的是,headers类型的Exchange取消了Routing Key,使用key/value来匹配队列

headers的匹配有两种方式,一种是all另一种是any。这两种方式是在接收端必须要用键值x-match来定义。

  • all:代表必须匹配Consumer端的所有键值对
  • any:代表只要匹配Consumer端的任意一个键值对即可

RPC远程调用模式

RPC模式是指远程调用,客户端通过RabbitMQ发送消息至服务器端,在服务器端调用各种函数对消息进行处理后,将处理结果通过另一消息队列返回给客户端。本质上可以认为是同步调用。

SpringBoot整合RabbitMQ

配置概述

  • 引入依赖

    xml
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 基本信息在yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置

    yml
    spring: 
      rabbitmq: 
        host: 192.168.6.100
        port: 5672
        username: admin
        password: 123456
        # Rabbitmq的虚拟主机逻辑分区
        virtual-host: /
  • 生产端直接注入RabbitTemplate完成消息发送

  • 消费端直接使用@RabbitListener完成消息接收

Simple/Work模式

创建Bean

java
import org.springframework.amqp.core.Queue;

//允许队列独占的是Simple模式,反之为工作模式(允许多个consumer监听)
@Configuration
public class Config_01_Work {

    /**
     * 定义一个队列
     * @return
     */
    @Bean("bootWorkQueue")
    public Queue bootWorkQueue() {
        /*
            第一种方式:
                durable():代表需要持久化
                exclusive(): 代表该队列独占(只允许有一个consumer监听)
                autoDelete(): 代表需要自动删除(没有consumer自动删除)
                withArgument(): 队列的其他参数,用于在创建队列时添加一些额外的参数。这些参数可以是任何 AMQP 队列支持的参数。例如,可以设置消息的存活时间、队列的优先级等。
        */
		return QueueBuilder.durable("boot_work_queue").exclusive().autoDelete().withArgument("key", "val").build();
        
        
        
        /*
            第二种方式:
                通过new Queue对象来创建队列
                参数1: 队列名称
                参数2: 是否持久化(默认:true)
                参数3: 是否独占(默认:false)
                参数4: 是否自动删除(默认:false)
                参数5: 队列的其他参数
         */
        return new Queue("boot_work_queue", true, false, false,null);
    }
}

生产者

java
 @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     *  work 模式
     * @throws Exception
     */
    @Test
    public void testWork() throws Exception{
        //第一个参数是消息队列的名称,第二个参数是发送的消息
        rabbitTemplate.convertAndSend("boot_work_queue","boot work...");
    }

消费者

java
@Component
public class Listener_01_Work {

    /**
     * 监听boot_work_queue队列的消息
     * @param message
     */
    @RabbitListener(queues = "boot_work_queue")
    public void receive(Message message){
        System.out.println("boot_work_queue: "+new String(message.getBody()));
    }
}

订阅模式

Publish/Subscribe发布与订阅模式

创建Bean
java
/**
 * Pub/Sub模式(交换机类型为Fanout)
 */

@Configuration
public class Config_03_Fanout {

    //创建队列1:boot_fanout_queue1
    @Bean("bootFanoutQueue1")
    public Queue bootFanoutQueue1(){
        return QueueBuilder.durable("boot_fanout_queue1").build();
    }

    //创建队列2:boot_fanout_queue2
    @Bean("bootFanoutQueue2")
    public Queue bootFanoutQueue2(){
        return QueueBuilder.durable("boot_fanout_queue2").build();
    }

    // 创建fanout类型交换机
    @Bean("bootFanoutExchange")
    public Exchange bootFanoutExchange(){
        return ExchangeBuilder.fanoutExchange("boot_fanout_exchange").durable(true).build();
    }

    /**
     * 交换机与队列1进行绑定
     */
    @Bean
    public Binding bindFanout1(){

        Queue bootFanoutQueue1 = bootFanoutQueue1();
        Exchange bootFanoutExchange = bootFanoutExchange();

        //方法一:
        //queue:这是一个 Queue 对象,表示你想要绑定的队列。  
		//Binding.DestinationType.QUEUE:这是一个枚举值,表示绑定的目标类型。在这个例子中,目标类型是队列。  
		//exchangeName:这是一个字符串,表示你想要绑定的交换器的名称。  
		//"":这是一个字符串,表示路由键。在这个例子中,路由键为空字符串,这意味着这个绑定将接收交换器上的所有消息。  
		//null:这是一个 Map 对象,用于传递一些额外的参数。在这个例子中,没有传递任何额外的参数
        // return new Binding(queue, Binding.DestinationType.QUEUE,exchangeName,"",null);
        
        
        //方法二:
        //bind(bootFanoutQueue1):接收一个 Queue 对象作为参数,表示你想要绑定的队列。
		//to(bootFanoutExchange):这是一个方法,接收一个 Exchange 对象作为参数,表示你想要队列要绑定到的交换器。
		//with(""):这是一个方法,接收一个字符串作为参数,表示路由键。
		//noargs():这是一个方法,表示在创建绑定时不需要额外的参数。 
        return BindingBuilder.bind(bootFanoutQueue1).to(bootFanoutExchange).with("").noargs();
    }

    /**
     * 交换机与队列2进行绑定
     * @return
     */
    @Bean
    public Binding bindFanout2(@Qualifier("boot_fanout_queue1") Queue bootFanoutQueue2,
                                  @Qualifier("boot_fanout_queue2") Exchange bootFanoutExchange){
        return BindingBuilder.bind(bootFanoutQueue2).to(bootFanoutExchange).with("").noargs();
    }

}
生产者
java
/**
  * Pub/Sub 模式(交换机类型为Fanout)
  * @throws Exception
*/
@Test
public void testFanout() throws Exception{
    rabbitTemplate.convertAndSend("boot_fanout_exchange","","boot fanout....");
}
消费者
java
@Component
public class Listener_02_Direct {

    @RabbitListener(queues = "boot_direct_queue1")
    public void boot_direct_queue1(Message message){
        System.out.println("boot_direct_queue1: "+new String(message.getBody()));
    }

    @RabbitListener(queues = "boot_direct_queue2")
    public void boot_direct_queue2(Message message){
        System.out.println("boot_direct_queue2: "+new String(message.getBody()));
    }

}

Routing路由模式

创建Bean
java
/**
 * routing模式(交换机类型为Direct)
 */
@Configuration
public class Config_02_Direct {

    /**
     * 准备两个队列boot_direct_queue1、boot_direct_queue2
     * @return
     */
    @Bean("bootDirectQueue1")
    public Queue bootDirectQueue1(){
        return QueueBuilder.durable("boot_direct_queue1").build();
    }

    @Bean("bootDirectQueue2")
    public Queue bootDirectQueue2(){
        return QueueBuilder.durable("boot_direct_queue2").build();
    }

    // direct类型交换机
    @Bean("bootDirectExchange")
    public Exchange bootDirectExchange(){
        /*
            第一种方式: 通过ExchangeBuilder构建交换机
                durable: 是否持久化
                autoDelete: 是否自动删除
                withArgument: 交换机其他参数
         */
       return  ExchangeBuilder.directExchange("boot_direct_exchange").durable(true)
                              .autoDelete().withArgument("key","val").build();

        /*
            第二种方式:
                参数1: 是否持久化(默认false)
                参数2: 是否自动删除(默认false)
                参数3: 其他参数
         */
        return new DirectExchange("boot_direct_exchange",true,false,null);
    }

    /**
     * 交换机与队列进行绑定
     */
    @Bean
    public Binding bindDirect1(@Qualifier("bootDirectQueue1") Queue bootDirectQueue1,
                                  @Qualifier("bootDirectQueue2") Exchange bootDirectExchange){

        /*
            第一种方式:
                bind(Queue): 需要绑定的queue
                to(Exchange): 需要绑定到哪个交换机
                with(String): routing key
                noargs(): 进行构建
         */
//        return BindingBuilder.bind(bootDirectQueue1).to(bootDirectExchange).with("article").noargs();

        /*
            第一种方式:
                参数1: 绑定的队列
                参数2: 绑定的类型 Binding.DestinationType.QUEUE: 绑定的类型为queue(交换机不仅可以绑定queue还可以绑定exchange)
                参数3: 哪个交换机需要绑定
                参数4: routing key
                参数5: 其他参数
         */
         return new Binding("boot_direct_queue1",Binding.DestinationType.QUEUE,"boot_direct_exchange","red",null);
    }

    /**
     * 交换机与队列进行绑定
     * @return
     */
    @Bean
    public Binding bindDirect2(@Qualifier("bootDirectQueue1") Queue bootDirectQueue2,
                                  @Qualifier("bootDirectQueue2") Exchange bootDirectExchange){
        return BindingBuilder.bind(bootDirectQueue2).to(bootDirectExchange).with("green").noargs();
    }
}
生产者
java
/**
  * Routing 模式(交换机类型为Direct)
  * @throws Exception
*/
@Test
public void testDirect() throws Exception{
    //第一个参数是交换机名称,第二个参数是路由名称,第三个参数是消息内容
    rabbitTemplate.convertAndSend("boot_direct_exchange","red","boot direct red...");
    rabbitTemplate.convertAndSend("boot_direct_exchange","green","boot direct green...");
}
消费者
java
@Component
public class Listener_02_Direct {

    @RabbitListener(queues = "boot_direct_queue1")
    public void boot_direct_queue1(Message message){
        System.out.println("boot_direct_queue1: "+new String(message.getBody()));
    }

    @RabbitListener(queues = "boot_direct_queue2")
    public void boot_direct_queue2(Message message){
        System.out.println("boot_direct_queue2: "+new String(message.getBody()));
    }

}

Topics通配符模式

创建Bean
java
/**
 * Topics模式(交换机类型为Topic)
 */
@Configuration
public class Config_04_Topic {

    @Bean("bootTopicQueue1")
    public Queue bootTopicQueue1(){
        return QueueBuilder.durable("boot_topic_queue1").build();
    }

    @Bean("bootTopicQueue2")
    public Queue bootTopicQueue2(){
        return QueueBuilder.durable("boot_topic_queue2").build();
    }

    // topic类型交换机
    @Bean("bootTopicExchange")
    public Exchange bootTopicExchange(){

        return ExchangeBuilder.topicExchange("boot_topic_exchange").durable(true).build();
    }

    /**
     * 交换机与队列进行绑定
     */
    @Bean
    public Binding bindTopic1(@Qualifier("bootTopicQueue1") Queue bootTopicQueue1,
                                  @Qualifier("bootTopicExchange") Exchange bootTopicExchange){
        return BindingBuilder.bind(bootTopicQueue1).to(bootTopicExchange).with("red.#").noargs();
    }

    /**
     * 交换机与队列进行绑定
     * @return
     */
    @Bean
    public Binding bindTopic2(@Qualifier("bootTopicQueue1") Queue bootTopicQueue2,
                                  @Qualifier("bootTopicExchange") Exchange bootTopicExchange){
        return BindingBuilder.bind(bootTopicQueue2).to(bootTopicExchange).with("green.*").noargs();
    }
}
生产者
java
/**
  * Topics模式(交换机类型为Topic)
  * @throws Exception
*/
@Test
public void tesTopic() throws Exception{
    rabbitTemplate.convertAndSend("boot_topic_exchange","red.xxx.xxx","boot topic red.# ...");
    rabbitTemplate.convertAndSend("boot_topic_exchange","green.xxx","boot topic green.* ....");
}
消费者
java
@Component
public class Listener_02_Direct {

    @RabbitListener(queues = "boot_direct_queue1")
    public void boot_direct_queue1(Message message){
        System.out.println("boot_direct_queue1: "+new String(message.getBody()));
    }

    @RabbitListener(queues = "boot_direct_queue2")
    public void boot_direct_queue2(Message message){
        System.out.println("boot_direct_queue2: "+new String(message.getBody()));
    }

}

Heads模式

java
/**
 * header模式
 */
@Configuration
public class Config_05_Header {
    // 声明一个queue
    @Bean("bootHeaderQueue")
    public Queue bootHeaderQueue(){
        return QueueBuilder.durable("boot_header_queue").build();
    }

    // header类型交换机
    @Bean("bootHeaderExchange")
    public Exchange bootHeaderExchange(){
        return ExchangeBuilder.headersExchange("boot_header_exchange").durable(true).build();
    }

    // 将exchange和queue进行绑定
    @Bean
        public Binding bindHeader(@Qualifier("bootHeaderQueue") Queue queue,
                                  @Qualifier("bootHeaderExchange") Exchange exchange){

        Map<String, Object> headers = new HashMap<>();
        /*
            all:Producer必须匹配所有的键值对
            any:只要Producer匹配任意一个键值对即可
         */
            headers.put("x-match", "any");
            headers.put("key1", "147");
            headers.put("key2", "258");
            headers.put("key3", "369");

        // routing key 为空
        return BindingBuilder.bind(queue).to(exchange).with("").and(headers);
    }
}

生产者

java
@Test
public void tesHeader() throws Exception {

    // 准备header参数
    Map<String, Object> headers = new HashMap<>();
    headers.put("key1", "147");
    headers.put("key2", "258");
    headers.put("key3", "369");

    // 使用的是rabbitMessagingTemplate 而不是 rabbitTemplate
    rabbitMessagingTemplate.convertAndSend("boot_header_exchange", "", "boot header....", headers);
}

消费者

java
@Component
public class Listener_05_Header {
    @RabbitListener(queues = "boot_header_queue")
    public void boot_topic_queue1(Message message) {
        System.out.println("boot_header_queue: " + new String(message.getBody()));
    }
}

RabbitMQ高级特性

消息的可靠投递

概述

  • 消息投递过程:producer—>rabbitmq broker—>exchange—>queue—>consumer,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

  • 我们将利用这两个 callback 控制消息的可靠性投递

    • confirm 确认模式:消息从 producer 到 exchange 则会返回一个 confirmCallback 。

    • return退回模式:消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

确认模式

  • 设置 ConnectionFactorypublisher-confirms="true" 开启确认模式。
  • 使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
配置文件
yml
spring:
  rabbitmq:
    host: 192.168.40.141
    port: 5672
    username: lscl
    password: admin
    virtual-host: /lscl
    # 开启消息确认模式,在SPringBoot低版本中使用,后来被废弃
    # publisher-confirms: true    
    
    # 高版本中消息确认模式使用
    #none:不启用发布确认。
	#correlated:对每个发布的消息进行确认。这是最慢的选项,因为它需要等待服务器对每个消息的确认。
	#simple:只确认每个批次的最后一个消息。这是默认值,它提供了一个折衷的解决方案,既能提供确认的保证,又能保持较高的吞吐量。
    publisher-confirm-type: simple
配置类
java
/**
 * 测试消息确认机制中的确认模式
 */
@Configuration
public class Config_01_ConfirmCallback implements RabbitTemplate.ConfirmCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * @param correlationData: 配置相关信息
     * @param ack:             交换机是否成功收到消息
     * @param cause:           失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        System.out.println("confirm executed...");

        if (ack) {
            System.out.println("success: " + cause);
        }else{
            ReturnedMessage returned = correlationData.getReturned();
            //重试……
        	rabbitTemplate.send(returned.getMessage());
            //通知相关人员……
        }
    }
}

回退模式

  • 设置 ConnectionFactorypublisher-returns="true" 开启 退回模式。
  • 使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchangequeue 失败后,若设置了rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage
配置文件
yml
spring:
  rabbitmq:
    host: 192.168.6.100
    port: 5672
    username: admin
    password: 123456
    # Rabbitmq的虚拟主机逻辑分区
    virtual-host: /
    publisher-returns: true
配置类
java
@Configuration
public class Config_02_ReturnCallback implements RabbitTemplate.ReturnCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        // 开启消息回退
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * @param message    消息对象
     * @param replyCode  错误码
     * @param replyText  错误信息
     * @param exchange   交换机
     * @param routingKey 路由键
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        //相关处理操作………
    }
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory
     确认模式开启:publisher-confirms="true"
    -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--消息可靠性投递(生产端)-->
    <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    <rabbit:direct-exchange name="test_exchange_confirm">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

提供端代码

java
//确认模式
@Test
public void testConfirm() {
    //2. 定义回调
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack){
                //接收成功
                System.out.println("接收成功消息" + cause);
            }else {
                //接收失败
                System.out.println("接收失败消息" + cause);
                //做一些处理,让消息再次发送。
            }
        }
    });

    //3. 发送消息
    rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");//成功
    //rabbitTemplate.convertAndSend("test_exchange_confirm000", "confirm", "message confirm....");//失败
}



//回退模式
@Autowired
private RabbitTemplate rabbitTemplate;

/**
     * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
     * 步骤:
     * 1. 开启回退模式:publisher-returns="true"
     * 2. 设置ReturnCallBack
     * 3. 设置Exchange处理消息的模式:
     *      1). 如果消息没有路由到Queue,则丢弃消息(默认)
     *      2). 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
     *            rabbitTemplate.setMandatory(true);
     */
@Test
public void testReturn() {

    //设置交换机处理失败消息的模式
    rabbitTemplate.setMandatory(true);

    //2.设置ReturnCallBack
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        /**
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
        @Override
        public void returnedMessage(Message message, int replyCode,String replyText,String exchange,String routingKey) {
            System.out.println("return 执行了....");

            System.out.println(message);
            System.out.println(replyCode);
            System.out.println(replyText);
            System.out.println(exchange);
            System.out.println(routingKey);

            //处理
        }
    });

    //3. 发送消息
    rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}

备份交换机

可以为队列设置死信交换机来存储那些处理队列失败的消息,而对于exchange–>queue失败的消息则不能通过死信交换机处理。

处理这些消息,除了回退returnedMessage函数外,我们还可以通过配置备份交换机实现这些失败消息的处理。当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout

备份交换机优先级高于回退函数

java
@Configuration
public class ConfirmConfig {
    // 交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    // 队列
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    // 备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    // 备份队列
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    // 告警队列
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    // 路由键
    public static final String CONFIRM_ROUTING_KEY = "key1";

    // 声明确认交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                .durable(true)
                //在此处声明备份交换机,当路由失败后消息会转发到此交换机
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
    }
    // 声明备份交换机
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    // 声明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    // 声明备份队列
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
    // 声明报警队列
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    // 绑定确认队列到确认交换机上
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange")DirectExchange exchange,
                                        @Qualifier("confirmQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }

    // 绑定备份队列到备份交换机上
    @Bean
    public Binding queueBindingBackupExchange(@Qualifier("backupExchange")FanoutExchange exchange,
                                        @Qualifier("backupQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    //绑定警告队列到备份交换机上
    @Bean
    public Binding queueBindingBackupExchange2(@Qualifier("backupExchange")FanoutExchange exchange,
                                              @Qualifier("warningQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange);
    }
}

Consumer Ack

消费端收到消息后的确认方式(ACK:Acknowledge)。

  • 自动确认:acknowledge=“none” 默认

    • 消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
  • auto确认:acknowledge=“auto”

    • 根据情况Rabbitmq自动选择确认模式。如果消息监听器抛出异常,那么消息将被拒绝(requeue或者discard)。如果消息监听器正常返回,那么消息将被确认。
  • 手动确认:acknowledge=“manual”

    • 和auto差不多,但是需要手动在业务处理成功后,调用channel.basicAck()签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
配置文件
yml
spring:
  rabbitmq:
    host: 192.168.6.100
    port: 5672
    username: admin
    password: 123456
    # Rabbitmq的虚拟主机逻辑分区
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: auto

自动确认

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class AckListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));
    }
}

手动确认

java
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * Consumer ACK机制:
 *  1. 设置手动签收。acknowledge="manual"
 *  2. 让监听器类实现ChannelAwareMessageListener接口
 *  3. 如果消息成功处理,则调用channel的 basicAck()签收
 *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000);
        // 获取消息传递标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // ① 接收消息
            System.out.println(new String(message.getBody()));
            // ② 处理业务逻辑
            System.out.println("处理业务逻辑");
            int i = 3/0;//出现错误
            // ③ 手动签收
            /**
             * 第一个参数:表示收到的标签
             * 第二个参数:如果为true表示可以签收所有的消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            e.printStackTrace();
            // ④ 拒绝签收
            /*
            第三个参数:requeue:重回队列。
            设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

消费端限流

当RabbitMQ有大流量的消息来到队列时,我们希望消息可以在Consumer的承受范围内进行逐个消费,例如每次消费100条、1000条

配置文件

配置 prefetch 属性设置消费端一次拉取多少条消息。消费端的必须确认才会继续处理其他消息。

yml
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1       # 每次从队列中拉取一条消息进行消费

TTL(过期时间)

TTL 全称 Time To Live(存活时间/过期时间)。

  • 带有过期时间的队列,当过期时间到达后消息如果没有被消费,那么自动丢弃

  • RabbitMQ对队列和消息均可设置过期时间,当队列和消息同时设置有过期时间时,以最先过期的单位时间为准

  • 在RabbitMQ中并不是轮询方式去判断消息是否过期,而是只判断在最顶部的消息,因此消息如果不是在最顶部技术到达了过期时间也不会被移除队列;

对队列所有消息设置过期时间

rabbitMQ管理页面设置存活时间,单位为毫秒

配置类修改过期时间

java
@Configuration
public class Config_04_TTL {
    
    @Bean("testQueueTtl")
    public Queue testQueueTtl() {
        Map<String, Object> param = new HashMap<>();
        param.put("x-message-ttl", 5000);

        return QueueBuilder.durable("test_queue_ttl").withArguments(param).build();
    }
}

单独对消息设置过期时间

java
/**
     * TTL:过期时间
     *  1. 队列统一过期
     *  2. 消息单独过期
     * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
     */
@Test
public void testMessageTtl() {
    //方式一:
    // 消息后处理对象,设置一些消息的参数信息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //1.设置message的信息
            // 第二个方法:消息的过期时间 ,5秒之后过期
            message.getMessageProperties().setExpiration("5000");
            //2.返回该消息
            return message;
        }
    };

    //消息单独过期
    rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl....",messagePostProcessor);
    
    
    //方式二
        rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl....",message -> {
            message.getMessageProperties().setExpiration("5000");
            return message;
        });
    
    //方式三
    MessageProperties properties=new MessageProperties();
    properties.setExpiration("2000");
    Message message=new Message("ttl 单独消息过期".getBytes(),properties);
    rabbitTemplate.convertAndSend("test_queue_ttl", message);
}

死信队列

死信,即无法被消费的消息。一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

死信队列和普通队列没有区别,只不过是用于处理不同信息而已

消息成为死信的三种情况:

  1. 队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

死性队列的处理方式

① 丢弃,如果不是很重要 ,可以选择丢弃

② 记录死信入库,然后做后续的业务分析或处理

通过死信队列,由负责监听死信的应用程序进行处理

死信队列配置

1)声明正常的队列,并且在队列参数中设置死信交换机(消息成为死信后再次被利用起来只能绑定交换机,不能直接绑定Queue)

2)声明死信交换机(和正常交换机一样)

3)声明死信队列(和正常队列一样)

4)死信队列绑定死信交换机

java
/**
 * 测试死信队列
 */
@Configuration
public class Config_05_Dlx {

    // 声明正常的队列,设置其死信转发给的死信交换机和对应路由key
    @Bean("testQueueDlx")
    public Queue testQueueDlx() {

        Map<String,Object> params=new HashMap();
        // 设置死信交换机
        params.put("x-dead-letter-exchange","exchange_dlx");

        // 转发给死信交换机的routingKey
        params.put("x-dead-letter-routing-key","dlx");

        // 队列的过期时间
//        params.put("x-message-ttl",5000);

        // 设置队列的最大长度限制
//        params.put("x-max-length",10);

        return QueueBuilder.durable("test_queue_dlx").withArguments(params).build();
    }

    // 声明死信队列
    @Bean("queue_dlx")
    public Queue queueDlx() {
        return QueueBuilder.durable("queue_dlx").build();
    }


    // 声明死信交换机
    @Bean("test_exchange_dlx")
    public Exchange testExchangeDlx() {
        return ExchangeBuilder.directExchange("exchange_dlx").durable(true).build();
    }

    // 死信交换机绑定死信队列
    @Bean
    public Binding binding(@Qualifier("queue_dlx") Queue bootTopicQueue2,
                                  @Qualifier("test_exchange_dlx") Exchange bootTopicExchange){

        return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
    }
}

延迟队列

延迟队列存储的对象是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景:

在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功30分钟后,发送短信问候。

实现:

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果,然后使用定时任务定期检查死信队列中的订单即可。

  1. 为队列消息设置过期时间,过期后自动投递到死信队列,这样做可能会导致不同的延期时间需要创建不同的队列。

  2. 为消息设置过期时间,消息过期后自动投递到死信队列,由于Rabbitmq只会检查第一条消息,所以可能而投递消息的过期时间不同,可能导致一些消息一直无法被消费。

  3. 安装rabbitmq_delayed_message_exchange 插件

    shell
    # RabbitMQ 的安装目录
    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
    # 将下载的插件放到 RabbitMQ 安装目录的 plugins 目录下
    cp /usr/local/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
    # 安装插件
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    # 重启 RabbitMQ 
    systemctl restart rabbitmq-serve
java
/**
 * 测试死信队列
 */
@Configuration
public class Config_06_Delay {

    // 声明正常的队列,设置死信交换机参数
    @Bean("testQueueDlx")
    public Queue testQueueDlx() {

        Map<String, Object> params = new HashMap();
        // 设置死信交换机
        params.put("x-dead-letter-exchange", "exchange_dlx");

        // 转发给死信交换机的routingKey
        params.put("x-dead-letter-routing-key", "dlx");

        // 队列的过期时间
		//params.put("x-message-ttl",5000);

        // 设置队列的最大长度限制
		//params.put("x-max-length",10);

        return QueueBuilder.durable("test_queue_dlx").withArguments(params).build();
    }

    // 声明死信队列
    @Bean("queue_dlx")
    public Queue queueDlx() {
        return QueueBuilder.durable("queue_dlx").build();
    }


    // 声明死信交换机
    @Bean("test_exchange_dlx")
    public Exchange testExchangeDlx() {
        return ExchangeBuilder.directExchange("exchange_dlx").durable(true).build();
    }

    // 死信交换机绑定死信队列
    @Bean
    public Binding binding() {
        Queue queue = queueDlx();
        Exchange exchange = testExchangeDlx();

        return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
    }
}

优先队列

订单催付的场景中,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒。对此,需要进行区分大小客户,创造很大的利润商家的订单必须得到优先处理,如果使用 redis 来存放的定时轮询,就无法实现该操作。采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。

优先级的值越大,消息的优先级越高。

配置类

  • 队列需要设置为优先级队列,
  • 消息需要设置消息的优先级,
  • 消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序。(否则发一条消费一条,打不到效果了。)
java
//将队列设置为优先级队列
@Bean
public Queue myQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-max-priority", 10); // 设置队列的最大优先级为10
    return new Queue("myQueue", true, false, false, args);
}
   
//发送消息时手动为消息添加优先级
rabbitTemplate.convertAndSend("myQueue", (Object) "Hello, RabbitMQ!", message -> {
            message.getMessageProperties().setPriority(5); // 设置消息的优先级为5
            return message;
        });

惰性队列

惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中设计目标是能够支持更长的队列(即更多的消息存储)。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。

java
@Bean
public Queue queueDlx() {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-queue-mode", "lazy");
    return QueueBuilder.durable("queue_dlx").withArguments(args).build();
}

消息的百分百投递

Step 1: 首先把业务数据存储到数据库中,紧接着,我们再把这个RabbitMq发送的消息也存储到一张消息记录表里

Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)

Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!

Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失。(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)

Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败

Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。

数据库文件

mysql
-- ----------------------------
-- Table structure for broker_message_log
-- ----------------------------
DROP TABLE IF EXISTS `broker_message_log`;
CREATE TABLE `broker_message_log` (
  `message_id` varchar(255) NOT NULL COMMENT '消息唯一ID',
  `message` varchar(4000) NOT NULL COMMENT '消息内容',
  `try_count` int(4) DEFAULT '0' COMMENT '重试次数',
  `status` varchar(10) DEFAULT '' COMMENT '消息投递状态 0投递中,1投递成功,2投递失败',
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP COMMENT '下一次重试时间',
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `message_id` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2018091102 DEFAULT CHARSET=utf8;

消息幂等

幂等性

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

生产者发送消息:

id=1,money=500,version=1

消费者接收消息

id=1,money=500,version=1
id=1,money=500,version=1

消费者需要保证幂等性:第一次执行SQL语句

第一次执行:version=1
update account set money = money - 500 , version = version + 1
where id = 1 and version = 1

消费者需要保证幂等性:第二次执行SQL语句

第二次执行:version=2
update account set money = money - 500 , version = version + 1
where id = 1 and version = 1

实现消息幂等

  • 全局消息ID:发送消息时给消息分配一个全局ID,每次消费消息时判断该ID是否存在过,如果存在过则已经消费过,如果不存在则说明是第一次消费;

  • Redis标识:发送消息时,给消息分配一个全局的唯一ID,消费消息时,将id与消息以K-V的形式写入Redis<id,meessage>,如果能写入成功代表第一次消费消息,如果写入不成功代表消费已经被消费过了

java
@Component
public class TestListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @RabbitListener(queues = "test_queue")
    public void test_queue_confirm(Message message) {

        String messageId = message.getMessageProperties().getMessageId();

        if (null == messageId) {
            System.out.println("消息id为null!");
            return;
        }
		//设置恰当的过期时间
        if (redisTemplate.opsForValue().setIfAbsent(messageId, new String(message.getBody()))) {
            // 代表第一次消费消息
            System.out.println("消息消费成功: " + new String(message.getBody()));
            
        } else {
            System.out.println("消息已经被消费过了!");
        }
    }
}