MQ消息丢失场景

丢数据一般分为三种,分别是:生产者(Producer)数据丢失MQ(Broker)消息丢失消费者(Consumer)数据丢失

RabbitMQ消息丢失场景

  • Producer消息丢失 生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。

  • Broker消息丢失 如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。

  • Consumer消息丢失 主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。

RabbitMQ是如何防止消息丢失?

Producer保证消息不丢失
  • ①、可以选择使用rabbitmq事务功能,就是生产者在发送数据之前开启事务,然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事务,然后尝试重新发送;如果收到了消息,那么就可以提交事务。

<!---->

channel.txSelect();//开启事务
try{
    //发送消息
}catch(Exection e){
    channel.txRollback();//回滚事务
    //重新提交
}

缺点:rabbitmq事务已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。

  • ②、可以开启confirm模式。在生产者那里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

//开启confirm
channel.confirm();
//发送成功回调
public void ack(String messageId){
}
// 发送失败回调
public void nack(String messageId){
//重发该消息
}

二者不同 : 事务机制是同步的,你提交了一个事务之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rabbitmq会回调告知成功与否。

一般在生产者这块避免丢失,都是用confirm机制。

  • Broker保证消息不丢失 设置消息持久化到磁盘。设置持久化有两个步骤:

  • ①、创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。

  • ②、发送消息的时候将消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。必须要同时开启这两个才可以。而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行 消息重发。

  • Consumer保证消息不丢失 使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。

RocketMQ数据丢失场景

  • Producer消息丢失 生产者发送消息时,由于网络故障或broker的master节点宕机,导致消息丢失。。

  • Broker消息丢失 消息已经发送到RocketMQ了,消息暂存在JVM堆外内存或暂存在os cache中,而后台线程还没有将消息刷到磁盘中,此时JVM系统崩溃或Linux服务宕机,导致JVM堆外内存或os cache内存数据丢失,导致消息丢失。

  • Consumer消息丢失 消息已经发送到RocketMQ了,但是消息消费者在消费消息的时候,还未消费完就返回ack,此时消费者宕机了,但是MQ认为消费成功了,将消费消息offset跳到了下一个消息,此时也相当于出现消息丢失的情况。

RocketMQ是如何防止消息丢失?

  • Producer保证消息不丢失 RocketMQ发送消息有三种模式,即同步发送异步发送单向发送

  • 1、同步发送消息时会同步阻塞等待Broker返回发送结果,如果发送失败不会收到发送结果SendResult,这种是最可靠的发送方式。

  • 2、异步发送消息可以在回调方法中得知发送结果。

  • 3、单向发送是消息发送完之后就不管了,不管发送成功没成功,是最不可靠的一种方式。

    • @description: 单向发送

      • 这种方式主要用在不特别关心发送结果的场景,例如日志发送。

        public void sendMq() {
        for (int i = 0; i < 10; i++) {
           rocketMQTemplate.convertAndSend("xiaojie-test", "测试发送消息》》》》》》》》》" + i);
            }
        }
    • @description: 同步发送

      • 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

        public void sync() {
            SendResult sendResult = rocketMQTemplate.syncSend("xiaojie-test", "sync发送消息。。。。。。。。。。");
            log.info("发送结果{}", sendResult);
        }
    • @description: 异步发送

      • 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

        public void async() {
         String msg = "异步发送消息。。。。。。。。。。";
         log.info(">msg:<<" + msg);
         rocketMQTemplate.asyncSend("xiaojie-test", msg, new SendCallback() {
             @Override
             public void onSuccess(SendResult var1) {
             log.info("异步发送成功{}", var1);
             }
             @Override
             public void onException(Throwable var1) {
              //发送失败可以执行重试
              log.info("异步发送失败{}", var1);
             }
         });
        }

RocketMQ为生产者提供了失败重试机制,同步发送和异步发送默认都是失败重试两次当然可以修改重试次数,如果多次还是失败,那么可以采取记录这条信息,然后人工采取补偿机制。

  • Broker消息保证不丢失

  • 1、刷盘策略 RocketMq持久化消息有两种策略即同步刷盘和异步刷盘。默认情况下是异步刷盘,此模式下当生产者把消息发送到broker,消息存到内存之后就认为消息发送成功了,就会返回给生产者消息发送成功的结果。但是如果消息还没持久化到硬盘,服务器宕机了,那么消息就会丢失。同步刷盘是当Broker接收到消息并且持久化到硬盘之后才会返回消息发送成功的结果,这样就会保证消息不会丢失,但是同步刷盘相对于异步刷盘来说效率上有所降低,大概降低10%,具体情况根据业务需求设定吧。

修改配置文件中刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
  • 2、集群模式

    rocketmq的集群模式保证可rocketMQ高可用。利用多master多slave节点保证rocketmq的高可用。
    #主从复制方式ASYNC_MASTER异步复制,SYNC_MASTER同步复制
    brokerRole=SYNC_MASTER
    #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
    flushDiskType=SYNC_FLUSH

此模式是broker保证消息不丢失的配置,主从复制同步复制,刷盘模式同步刷盘,但是这种模式下性能会有所降低。

  • Consumer消息保证不丢失

  • 1、手动ack

    • @description: 消费端确认消息消费成功的消费者

    @Component
    @Slf4j
    public class MqConsumerAck implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg:msgs){
            log.info("接收到的消息是>>>>>>>{}",new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
  • 2、消费者消费失败重试机制 消费者消费失败会自动重试,如果消费失败没有手动ack则会自动重试15次。

kafka数据丢失场景

  • Producer消息丢失 目前 Kafka Producer 是异步发送音讯的,如果你的 Producer 客户端应用了 producer.send(msg) 办法来发送消息,办法会立刻返回,但此时并不能代表消息已经发送成功了。 如果消息再发送的过程中产生了网络抖动,那么消息可能没有传递到 Broker,那么消息可能会失落。 如果发送的音讯自身不合乎,如大小超过了 Broker 的承受能力等。

  • Broker消息丢失 比较常见的一个场景,就是kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,他就少了一部分数据。

  • Consumer消息丢失 消费者拉取了消息,并解决了消息,但解决消息异常了导致失败,并且提交了offset,消费者重启后,会从之前已提交的位移的下一个地位从新开始生产,生产失败的那些消息不会再次解决,即相当于消费者丢失了消息。

kafka是如何防止消息丢失?

  • Producer保证消息不丢失 如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

  • Broker消息保证不丢失 一般要求设置4个参数来保证消息不丢失:

    • ①、给topic设置 replication.factor参数:这个值必须大于1,表示要求每个partition必须至少有2个副本。

    • ②、在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。

    • ③、在生产者端设置acks=all:表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了

    • ④、在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示 这个是要求一旦写入事败,就无限重试

  • Consumer消息保证不丢失 关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。