1. 生产者给搞丢了!

想象一种情况,当生产者将消息准备好后并通过 basicPublish 将消息发送出去后,由于网络的原因没有发送到 Broker(即 MQ 服务器),从而导致 Broker 没有转发到消费者手上。但是生产者却以为将消息发送出去了并且被处理了。怎么解决呢?有两种方法:

事务机制

该机制与数据库的类似,txSelect 方法将当前 channel 的模式设置为事务模式,当消息成功发送时通过 txCommit 提交事务。当消息发送失败我们可以捕获到异常并通过 txRollback 回滚事务:

channel.txSelect();  // 开启事务模式
try {
    channel.basicPublish("", "hello", null, "Hello World".getBytes());
    channel.txCommit();  // 提交事务
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback(); // 回滚事务
    // 可以进行重发消息
}

事务机制存在缺点,因为它每发送一个消息都是同步的,会严重影响到系统的吞吐量

confirm模式

如果将生产者的channel 设置为 confirm 模式后,所有在该 channel 上面发布的消息都会被指派一个唯一的 ID (deliveryTag)。一旦消息被发送给消息队列之后, broker 如果处理成功写入之后会回传给生产者 ack 消息,如果没能处理这个消息,则会回传一个 nack 消息。我们可以通过监听器来接收到确认消息的的回调:

channel.confirmSelect(); // 进入 confirm 模式
channel.addConfirmListener(new ConfirmListener() { // 添加确认消息的监听器
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("received ack : " + deliveryTag);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("received nack : " + deliveryTag);
    }
});
// 发送消息,不需要同步等待
channel.basicPublish("", "hello", null, "Hello World".getBytes());

从代码上也可以看出,confirm 模式相对于事务机制是异步的,所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

2. broker 给搞丢了!

虽然我们防止了生产者发送到 broker 的消息丢失,但是如果 broker 在处理完消息后并回传给生产者 ack 之后,服务器忽然断电了(好吧又是这么狗血,高并发极限条件——断电)!导致 broker 服务器挂掉了,等到重新归来时,broker 虽然记得答应了生产者的事儿,但是却找不到刚才还没发送的消息了。

解决方式开启 RabbitMQ 的持久化功能,在消息落地后写入磁盘,哪怕挂掉恢复之后也能读取之前接收到的消息,并可以重新发送。代码层面需要两个步骤: 第一我们在声明消息队列时,将 durable 设置为 true,保证 RabbitMQ 持久化 queue 的元数据:

// 参数顺序依次为:queueName, durable, exclusive, autoDelete, arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

单单这个设置还不够,因为仅仅持久化了元数据,如果要持久化 queue 里的数据的则需要在发布消息时将消息的 deliveryMode 设置为 2,这样RabbitMQ 就会将消息持久化到磁盘上去:

channel.basicPublish("", "hello",
                MessageProperties.PERSISTENT_BASIC, "Hello World".getBytes());

// MessageProperties.PERSISTENT_BASIC 定义如下:
public static final BasicProperties PERSISTENT_BASIC =
    new BasicProperties("application/octet-stream",
                        null,
                        null,
                        2,
                        0, null, null, null,
                        null, null, null, null,
                        null, null);

3. 消费给搞丢了!

现在我们的 MQ 应该比较健全了!应该不会啥问题。此时,broker 正确地将消息发送到了消费者,消费者成功消费后开始处理自己的业务:不巧!数据库插入失败了,导致业务无法继续进行,这条消息应该实现的业务并没有完成。但是 生产者、broker 都以为你处理完了。

但是 RabbitMQ 提供了 brok -> consumer 的 ack 机制,默认是自动确认的,我们只需要将它关闭,然后在消费消息并处理完业务之后再手动进行确认。就算出现特殊情况导致业务没有进行,这样消息也就没有确认,RabbitMQ 会把这个消费分配给别的 consumer 去处理。

// 第二个参数设置 autoAck
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String content = new String(body);
        // 处理业务 ...
        channel.basicAck(envelope.getDeliveryTag(), false); // 手动接受消息
    }
});