如何保证消息恰好被消费一次?

全是干货的技术号: 本文已收录在github仓库 Java-Interview-Tutorial,欢迎 star!

0 前言

对系统增加MQ:

  • 对峰值写流量做削峰填谷
  • 对次要业务逻辑做异步
  • 对不同系统模块做解耦

因为业务逻辑从同步代码中移除,所以也要有相应队列处理程序处理消息、执行业务逻辑。随着业务逻辑复杂,会引入更多外部系统和服务,就会越来越多使用MQ,与外部系统解耦合以及提升系统性能。

如系统要加红包功能:用户在购买一定数量商品后,系统给用户发一个现金红包激励用户消费。由于发放红包这种次要业务过程不应在购买商品的主业务流程,所以考虑MQ异步。 但引入 MQ 就必然遇到如下问题:

  • 若消息在投递过程丢失 用户就会因没有得到红包而投诉到你这边
  • 消息在投递过程出现重复 就会因发送两个红包而导致公司资产损失

1 消息为何会丢失?

消息从被写入到MQ,到被消费者消费完成,该链路上的如下场景可能丢失消息:

  • 消息从生产者(后文简称为Pro)写入到MQ的过程
  • 消息在MQ中的存储场景
  • 消息被消费者(后文简称为Con)消费的过程

1.1 在消息生产的过程

消息的Pro一般是业务服务器,MQ独立部署在单独服务器。二者间网络虽是内网,但也存在抖动可能,一旦网络抖动,消息就可能因网络错误而丢失。

1.1.1 解决方案

推荐消息重传:当你发现发送超时后,重发一次消息,但也不能无限重发。一般若不是MQ故障或到MQ的网络断开了,重试2~3次即可。

但这种方案可能造成【消息重复】,从而在消费时重复消费同样的消息。 比方说消息生产时,由于MQ处理慢或网络抖动,导致虽最终写入MQ成功,但对于Pro却是超时的,于是Pro重传这条消息,导致重复消息,你收到了两个现金红包!

1.2 在MQ中

消息在Kafka是存在本地磁盘,为减少消息存储时对磁盘的随机I/O,一般会将消息先写到os的Page Cache,然后择机机刷盘。

如Kafka可配置异步刷盘时机:

  • 当达到某一时间间隔
  • 或累积一定消息数量

假如你经营一个图书馆,读者每还一本书你都要去把图书归位,不仅工作量大且效率低下,但若你能选择每隔3h或图书达到一定数量,再把图书归位,这就能把同一类型的书一起归位,节省查找图书位置的时间,提高人效。

不过若发生掉电或异常重启,Page Cache还没有来得及刷盘的消息就会丢失。这咋办?你可能会:

  • 把刷盘的间隔设置很短
  • 或设置累积一条消息

就刷盘,但频繁刷盘很影响性能,且宕机或掉电几率其实也不高,故不推荐!

若你的系统对消息丢失容忍度很低,可考虑集群部署Kafka,通过部署多个副本备份数据,保证尽量不丢消息。

ISR

Kafka集群中有个Leader,负责消息的写入和消费,可有多个Follower负责数据备份。

Follower中有个特殊集合 — ISR(in-sync replicas),当Leader故障,新选举出来的Leader会从ISR中选择。默认Leader的数据会异步复制给Follower,这样在Leader掉电或宕机时,Kafka会从Follower中消费消息,减少消息丢失的可能。

但因消息默认是异步从Leader复制到Follower,所以一旦Leader宕机,那些还没来得及复制到Follower的消息还是会丢失。为解决该问题,Kafka为生产者提供“acks”:

acks

当该选项被设置为“all”,Pro发的每条消息,除了发给Leader,还会发给所有ISR,且必须得到Leader和所有ISR的确认后,才被认为发送成功。这样,只有Leader和所有ISR都挂了消息才会丢失。

当设置“acks=all”,需同步执行1、3、4三个步骤,对消息生产的性能有很大影响,实际应用中需仔细权衡。

最佳实践

  • 若你就是要确保消息一条都不能丢,就你让开启MQ的同步刷盘,而应该用集群方案,可配置当所有ISR Follower都接收到消息,才返回成功
  • 若对丢消息有一定容忍度,则建议不部署集群,即使集群部署,也推荐配置只发送给一个Follower即可返回成功
  • 业务系统一般对消息丢失有一定容忍度,如红包系统,若红包消息丢了,只要后续给没发送红包的用户补偿发送即可!

1.3 在消费过程

一个Con消费消息的进度是记录在MQ集群中的,消费过程分为如下步骤:

  • 接收消息
  • 处理消息
  • 更新消费进度

接收消息,处理消息的过程都可能异常,如:

  • 接收消息时网络抖动,导致消息并未被正确接收
  • 处理消息时可能发生一些业务异常,导致处理流程未执行完成,这时若更新消费进度,这条失败的消息就永远不会被处理了,就算丢失了

所以,务必等到消息接收、处理完成后,才能更新消费进度,但这也会造成消息重复,比如某条消息在处理后,Con恰好宕机,就因未更新消费进度,所以当该Con重启后,还会重复消费这条消息。

2 保证消息只被消费一次

经过上面分析发现,为避免消息丢失,我们需要付出代价:

  • 性能损耗
  • 可能造成消息重复消费

性能损耗还能接受,因为一般业务系统只有在写请求时,才有发送MQ的操作,而一般系统的写请求的量级并不高。但消息一旦被重复消费,就会造成业务逻辑处理错误,如何避免消息重复消费问题呢?

完全避免消息重复发生真的很难,因为网络抖动、机器宕机和处理异常都难以避免,业界也并无成熟方法,只能将要求放宽,只要保证即使消费到了重复消息,从消费的最终结果来看和只消费一次是等同即可,即保证在消息的生产和消费的过程“幂等”。

2.1 幂等

多次执行同一个操作和执行一次操作,最终得到的结果是相同的。

若消费一条消息,要将库存-1,则若消费两条相同消息,库存-2,这就非幂等的。 而若:

  • 消费一条消息后,处理逻辑是将库存数置0
  • 或若当前库存数是10,则减1

这样消费多条消息时,所得结果相同,这就是幂等。

2.1.1 生产过程增加消息幂等

消息在生产、消费过程中都可能重复,所以要在生产、消费过程增加消息幂等性保证,这就能认为从“最终结果看”,消息实际上是只被消费一次

消息生产过程中,Kafka0.11和Pulsar都支持“producer idempotency”,即生产过程幂等性,这保证消息虽然可能在生产端产生重复,但最终在MQ存储时只会存一份。

实现原理

给每个Pro一个唯一ID,并为生产的每条消息赋予一个唯一ID,MQ服务端会存储:

生产者ID=》最后一条消息ID的映射。 当某Pro产生新消息,Broker比对消息ID是否与存储的最后一条ID一致:

  • 若一致,就认为是重复消息,Broker自动丢弃

2.1.2 消费过程增加消息幂等

消费端幂等性保证稍微复杂,可从通用层和业务*两个层面考虑:

通用层面

消息被生产时,使用发号器给其生成一个全局唯一消息ID。消息被处理后,将该ID存储在DB,在处理下一条消息前,先从DB查询该全局ID是否被消费:

  • 若被消费过,就放弃消费

生产端幂等保证 && 消费端通用层面的幂等保证,都是为每个消息生成唯一ID,然后在使用该消息时,先判断ID是否已存在,若存在,则认为消息已被使用。 这其实是一种标准的幂等实现方案:

// 判断ID是否存在
boolean isIDExisted = selectByID(ID);

if(isIDExisted) {
  // 存在则直接返回
  return;
} else {
  // 不存在,则处理消息
  process(message);
  // 存储ID
  saveID(ID);
}

但若消息在处理后,还没有来得及写DB,Con宕机了,重启后发现DB并无这条消息,还是会重复执行两次消费逻辑,这就要引入事务,保证消息处理和写入DB必须同时成功或失败,但这样消息处理成本更高,所以若对消息重复无苛刻要求,可直接使用这种通用方案,而不考虑引入事务。

业务层面

有很多处理方式,有种是使用乐观锁,如你的消息处理程序要给一个人的账户加钱: 给每个人的账户数据加个版本号:

  • Pro生产消息时,先查询该账户版本号,将版本号连同消息一起发给Broker
  • Con拿到消息和版本号后,在执行更新账户金额SQL时,带上版本号:
update user 
set amount = amount + 20,
	version=version+1
where userId=1
and version=1;

更新数据时,给数据加乐观锁,这样在消费第一条消息时,version值为1,SQL可执行成功且同时把version值改为2。 在执行第二条相同消息时,由于version值不再是1,所以该SQL不会再成功执行,这就实现了消息幂等。

3 总结

消息丢失可通过如下方案解决:

  • 生产端重试
  • 消息队列配置集群模式
  • 消费端合理处理消费进度

为解决消息丢失问题,通常会造成:

  • 性能损失
  • 消息重复

通过保证消息处理的幂等性可以解决消息的重复问题。

并非说消息丢失一定不能接受,在允许消息丢失情况下,MQ性能更好,方案实现复杂度也最低。像是日志处理等场景,日志意义在于排查系统问题,而系统问题几率不高,偶发丢几条日志也能接受。

方案设计都看业务要求,你不能把所有MQ都配置成防止消息丢失方式,也不能要求所有业务处理逻辑都要支持幂等性,这会给开发和运维带来额外负担。

原文地址:https://cloud.tencent.com/developer/article/2181139

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340