文章

RabbitMQ高级特性

RabbitMQ高级特性

1 消息的可靠性

1.1 发送方

  • 需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理
  • 需要使用RabbitMQ消息返回机制,若没有发现目标队列,中间件会通知发送方

1.2 消费端

  • 需要使用RabbitMQ消费端确认机制,确认消息没有发生异常
  • 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

1.3 RabbitMQ

  • 大量的消息堆积会给rabbitMQ产生很大的压力,需要RabbitMQ消息过期时间,防止消息大量积压
  • 过期后会直接被丢弃,无法对系统运行发出警报,需要使用RabbitMQ死信队列,收集过期消息。

2 发送端

2.1 发送端确认机制

 1、消息发送后,发送端不知道RabbitMQ是否真的收到了消息
 2、若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常
 3、需要使用RabbitMQ发送端确认机制,确认消息发送**

什么是发送端确认机制

消息发送后,若中间件收到消息,会给发送端一个应答, 生产者接收应答,用来确认这条消息是否正常发送到中间件

三种确认机制

1、单条同步

  static String EXCHANGE = "exchange.order.restaurant";
    @Test
    public void mqTestConnect() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.6.131");
        factory.setPort(5672);
        try (
                Connection conn = factory.newConnection();
                Channel channel = conn.openChannel().get()
        ){
            channel.confirmSelect();//开启确认模式
            String payload = "user order ...";
            
            channel.basicPublish(EXCHANGE,"key.order",null,payload.getBytes());
            log.info("msg send ...");
            if (channel.waitForConfirms(1000)) {//单条同步确认
                log.info("msg confirm success");
            }else {
                log.info("msg confirm failed");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

2、多条同步确认

  static String EXCHANGE = "exchange.order.restaurant";
    @Test
    public void mqTestConnect() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.6.131");
        factory.setPort(5672);
        try (
                Connection conn = factory.newConnection();
                Channel channel = conn.openChannel().get()
        ){
            channel.confirmSelect();//开启确认模式
            String payload = "user order ...";

            for (int i = 0; i < 10; i++) {
                channel.basicPublish(EXCHANGE,"key.order",null,payload.getBytes());
                log.info("msg send ...");
            }
            if (channel.waitForConfirms(1000)) {//同步确认
                log.info("msg confirm success");
            }else {
                log.info("msg confirm failed");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

3、异步确认

 public void mqACK() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.6.131");
        factory.setPort(5672);
        try (
                Connection conn = factory.newConnection();
                Channel channel = conn.openChannel().get()
        ){
            channel.confirmSelect();//开启确认模式
            ConfirmListener listener = new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("ack deliveryTag: {},multiple: {}",deliveryTag,multiple);
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("nack deliveryTag: {},multiple: {}",deliveryTag,multiple);
                }
            };
            channel.addConfirmListener(listener);
            String payload = "user order ...";
            for (int i = 0; i < 100; i++) {
                channel.basicPublish(EXCHANGE,"key.order",null,payload.getBytes());
            }
            log.info("msg send ...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

2.2 消息确认返回机制

1、消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃
2、消息丢弃后,订单处理流程停止,业务异常
3、需要使用RabbitMQ消息返回机制,确认消息被正确路由

原理

消息发送后,中间件会对消息进行路由,若没有发现目标队列,中间件会通知发送方,Return Listener会被调用

消息返回的开启方法
在RabbitMQ基础配置中有一个关键配置项:Mandatory

  • Mandatory若为false,RabbitMQ将直接丢弃无法路由的消息
  • Mandatory若为true,RabbitMQ才会处理无法路由的消息

实践

    @Test
    public void mqRoutingAck() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.6.131");
        factory.setPort(5672);
        try (
                Connection conn = factory.newConnection();
                Channel channel = conn.openChannel().get()
        ){
            ReturnListener listener = (replyCode, replyText, exchange, routingKey, properties, body) ->
                    log.info("replyCode:{},replyText:{},exchange:{}," +
                                    "routingKey:{},properties:{},body:{}",
                    replyCode,replyText,exchange,routingKey,properties,new String(body));
            channel.addReturnListener(listener);
            String payload = "user order ...";
            channel.basicPublish(EXCHANGE,"key.order1",true,null,payload.getBytes());
            log.info("msg send ...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

3 消费端

3.1 消费端确认机制

默认情况下,消费端接收消息时,消息会被自动确认(ACK)
消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
需要使用RabbitMQ消费端确认机制,确认消息被正确处理

确认机制

  • 自动ACK:消费端收到消息后,会自动签收消息
  • 手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息(单条手动ACK 多条手动ACK)

但是我们是实际使用中,我们需要知道具体那条消息出现异常,推荐使用单条ACK;

重回队列

若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理.

一般不建议开启重回队列,因为第一次处理异常的消息,再次处理,基本上也是异常.

代码

    Channel channel;
    @Test
    public void mqConsumer() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.6.131");
        factory.setPort(5672);
        try (
                Connection conn = factory.newConnection();
                Channel channel = conn.openChannel().get()
        ){
            this.channel =channel;
            log.info("start listening message...");
            channel.basicConsume(QUEUE, false,deliverCallback ,consumerTag -> {
                log.info("消息消费被中断");
            });
            while (true) {
                Thread.sleep(100000);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    DeliverCallback deliverCallback = ((consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        log.info("deliverCallback:messageBody:{}", messageBody);
    });

重回队列代码

channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);

3.2 消费端限流机制

​ 业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃,需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定。

场景

1、业务高峰期,有个微服务崩溃了,崩溃期间队列挤压了大量消息,微服务上线后,收到大量并发消息

2、将同样多的消息推给能力不同的副本,会导致部分副本异常

RabbitMQ Qos

针对以上问题,RabbitMQ开发了QoS(服务质量保证)功能,QoS功能保证了在一定数目的消息未被确认前,不消费新的消息,QoS功能的前提是不使用自动确认

原理:QoS原理是当消费端有一定数量的消息未被ACK确认时,RabbitMQ不给消费端推送新的消息,RabbitMQ使用QoS机制实现了消费端限流

消费端限流机制参数设置

 prefetchCount:针对一个消费端最多推送多少未确认消息
 global: true:针对整个消费端限流  false:针对当前channel
 prefetchSize : 0(单个消息大小限制,一般为0)
 prefetchSize与global两项,RabbitMQ暂时未实现

代码

 Channel channel;
    @Test
    public void mqConsumer() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.6.131");
        factory.setPort(5672);
        try (
                Connection conn = factory.newConnection();
                Channel channel = conn.openChannel().get()
        ){
            this.channel =channel;
            log.info("start listening message...");
            channel.basicQos(2);
            channel.basicConsume(QUEUE, false,deliverCallback ,consumerTag -> {
                log.info("消息消费被中断");
            });
            while (true) {
                Thread.sleep(100000);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    DeliverCallback deliverCallback = ((consumerTag, message) -> {
        String messageBody = new String(message.getBody());

        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        log.info("deliverCallback:messageBody:{}", messageBody);
    });

rabbitmq控制台

image-20240506173102562

可以看到rabbitMq不会一下把消息全都推送到消费端,而是最多会推送我们指定的数值。

4 RabbitMQ自身

4.1 消息过期机制

一、队列爆满怎么办?
1、默认情况下,消息进入队列,会永远存在,直到被消费
2、大量堆积的消息会给RabbitMQ产生很大的压力
3、需要使用RabbitMQ消息过期时间,防止消息大量积压

二、RabbitMQ的过期时间(TTL)
RabbitMQ的过期时间称为TTL (Time to Live),生存时间
RabbitMQ的过期时间分为消息TTL和队列TTL
消息TTL设置了单条消息的过期时间
队列TTL设置了队列中所有消息的过期时间

三、如何找到适合自己的TTL?
1、TTL的设置主要考虑技术架构与业务
2、TTL应该明显长于服务的平均重启时间
3、建议TTL长于业务高峰期时间

代码

    @Test
    public void mqACK() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.6.131");
        factory.setPort(5672);
        try (
                Connection conn = factory.newConnection();
                Channel channel = conn.openChannel().get()
        ){
            channel.confirmSelect();//开启确认模式
            String payload = "user order ...";
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder().expiration("15000").build();
            channel.basicPublish(EXCHANGE,"key.order",props,payload.getBytes());
            log.info("msg send ...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

rabbitmq控制台查看

image-20240506173742499

过期时间为 15秒,过期之后会自动丢弃.

队列设置过期代码

 HashMap<String, Object> args = new HashMap<>(16);
            args.put("x-message-ttl",15000);

            channel.queueDeclare(QUEUE+"ttl",
                    true,
                    false,
                    false,
                    args);

image-20240506174001621

4.2 死信队列

一、如何转移过期消息?
1、消息被设置了过期时间,过期后会直接被丢弃
2、直接被丢弃的消息,无法对系统运行异常发出警报
3、需要使用RabbitMQ死信队列,收集过期消息,以供分析
二、什么是死信队列
死信队列:队列被配置了DLX属性(Dead-Letter-Exchange)
当一个消息变成死信(dead message)后,能重新被发布到另一个Exchange,这个Exchange也是一个普通交换机
死信被死信交换机路由后,一般进入一个固定队列
image-20240506180840875

三、怎样变成死信

1、消息被拒绝(reject/nack)并且requeue=false
2、消息过期(TTL到期)
3、队列达到最大长度

四、死信队列设置方法
1、设置转发、接收死信的交换机和队列:

 Exchange: dlx.exchange
 Queue: dlx.queue
 RoutingKey: #

2、在需要设置死信的队列加入参数: x-dead-letter-exchange = dlx.exchange
代码

            //声明接收死信的交换机
            channel.exchangeDeclare(
                    DLX_EXCHANGE,
                    BuiltinExchangeType.TOPIC,
                    true,
                    false,
                    null
            );
            //声明接收死信的队列
            channel.queueDeclare(DLX_QUEUE,
                    true,
                    false,
                    false,
                    null);
            channel.queueBind(DLX_QUEUE,DLX_EXCHANGE,"#");
            HashMap<String, Object> args = new HashMap<>(16);
            args.put("x-message-ttl",15000);
            args.put("x-dead-letter-exchange",DLX_EXCHANGE);

            channel.queueDeclare(QUEUE+"ttl",
                    true,
                    false,
                    false,
                    args);

rabbitmq控制台:

image-20240506180413062

15秒后,可以看到 消息 转移到接收死信消息的队列中。

image-20240506180433574

License:  CC BY 4.0