RocketMQ - Consumer消息零丢失方案

时间:2023-11-21 10:34:40  热度:0°C

收藏小程序IT藏经楼大量资源免费分享

通过前面的方案,我们可以保证消息一定会达到MQ中,也确保了MQ中的消息不会丢失,只要做到这一点,我们就可以保证下游消费者系统一定可以获取到消息,但是即使下游消费者获取了消息,这条消息数据就一定不会丢失吗?

答案是未必的,假设下游消费者系统已经获取了消息,但是消息目前还在他的内存里,还没有执行业务逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了,然后这个时候下游消费者系统突然就宕机了,内存里的消息没有了,业务逻辑也没有执行,结果broker已经收到他提交的消息offset了,还以为他已经处理完这个消息了。等消费者系统重启后,就不会再次消费这个消息了,还是会出现数据丢失。

consumer/registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List</MessageExt>/ msgs/ ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus/CONSUME_SUCCESS/ } })/

上面代码中的MessageListenerConcurrently这个类是注册一个***器,当获取到一批消息之后,就会回调你得到这个***器函数,让你来处理这一批消息。

然后当你处理完毕之后,返回consumeConcurrentlyStatus/CONSUME_SUCCESS作为消费成功的示意,告诉RocketMQ,这批消息我已经处理完毕了,RocketMQ会提交这批消息的offset到broker去。

所以如果对一批消息处理完毕了,同时提交消息的offset给broker,即使消费者系统宕机了,此时是不会丢失消息的。

如果一批消息还没处理完,没返回consumeConcurrentlyStatus/CONSUME_SUCCESS这个状态呢,此时消费者宕机了,RocketMQ其实会感知到这个个consumer已经挂了,会把没处理完的消息交给其他机器去处理,所以在这种情况下,消息也绝对不会丢失的。

需要警惕的地方:不能异步消费消息

我们不能在代码中对消息进行异步处理,因为一旦开启了异步处理,消费者的Listener就会返回ConsumeConcurrentlyStatus/CONSUME_SUCCESS状态。这就可能出现异步处理逻辑还没处理完消息,就已经提交这批消息的offset给broker了,认为已经处理结束了。这时如果异步处理逻辑报错,那消息同样会丢失了。

免责声明:
1. 《RocketMQ - Consumer消息零丢失方案》内容来源于互联网,版权归原著者或相关公司所有。
2. 若《36120132文库网》收录的文本内容侵犯了您的权益或隐私,请立即通知我们删除。