消息重复

本页目录:

对于重复消息,最好的方法是消息可重入(消息重复消费对业务无影响)。做不到可重入时,需要在消费端去重。

重复消息出现的原因

notion image
网络异常、服务器宕机等原因都有可能导致消息丢失。CMQ 为了做到不丢消息、可靠交付,采用了消息生产、消费确认机制。
生产消息确认:生产者向 CMQ 发送消息后,等待 CMQ 回复确认;CMQ 将消息持久化到磁盘后,向生产者返回确认成功。否则在生产者请求超时、CMQ 返回失败等情况下,生产者需要向 CMQ 重发消息。
消费者确认:CMQ 向消费者交付消息后,将消息置为不可见;在消息不可见时间内,消费者使用句柄删除消息。如果消息未被删除,且不可见时间超时,消息将重新可见。
由于消息确认机制是“至少一次交付(at least once)”,在网络抖动、生产者/消费者异常等情况下,就会出现生产者重复生产、消费者重复消费的情况。

去重方案

要去重,先要识别重复消息。通常的做法是在生产消息时,业务方在消息体中插入去重 key,消费时通过该去重 key 来识别重复消息。去重 key 可以是由 <生产者 IP + 线程 ID + 时间戳 + 时间内递增值> 组成的唯一值。
只有一个消费者时,您可以将消费过的去重 key 缓存(如 KV 等),然后每次消费时检查去重 key 是否已消费过。去重 key 缓存可以根据消息最大有效时间来淘汰。CMQ 提供了队列当前最小未消费消息的时间(min_msg_time),您可以使用该时间和业务生产消息最大重试时间来确定缓存淘汰时间。 存在多个消费者时,去重 key 缓存就需要是分布式的。
根据消息最大有效时间,计算 key 过期时间点:
current_time + max_retention_time + max_retry_time + max_network_time (当前时间)+(最大有效时间)+(最大重试时间)+(最大网络时间)
根据 CMQ 最小未消费时间,计算 key 过期时间点:
min_msg_time + max_retry_time + max_network_time (最小未消费时间)+(最大重试时间)+(最大网络时间)
CMQ 可配置消息最大有效时间为15天,业务可根据实际情况调整。 CMQ 队列当前最小未消费消息时间,即下图中最远时间点。该时间之前的消息都已经被删除,之后的消息可能未被删除。
notion image

举例说明

避免重复提交:
场景:A 为生产者,B 为消费者,中间是 CMQ。A 已完成10元转账操作,且将消息发送给 CMQ,CMQ 也已成功收到。此时网络闪断或者客户端 A 宕机导致服务端应答给客户端 A 失败。A 会认为发送失败,从而再次生产消息。这会造成重复提交。
解决方法:A 在生产消息时,加入 time 时间戳等信息,生成唯一的去重 key。若生产者 A 由于网络问题判断当前发送失败,重试时,去重 key 沿用第一次发送的去重 key。此时消费者 B 可通过去重 key 判断并做去重。 该案例也说明了不能使用 CMQ 的 Message ID 进行去重,因为这两条消息有不同的 ID,但却有相同的 body。
注意事项:生产者A,在发送消息之前,要将去重 key 做持久化(写磁盘等,避免掉电后丢失)
避免多条相同 body 的消息被过滤:
场景:A 给 B 转账10元,一共发起5次,每一次提交的 body 内容是一样的。如果消费者粗暴用 body 做去重判断,就会把5次请求,当做1次请求来处理。
解决方法:A 在生产消息时,加入 time 时间戳等信息。此时哪怕消息 body 一样,生成的去重 key 都是不同的,这样就满足了多次发送同样内容的需求。
Loading...
文章列表
王小扬博客
云原生
Git
Elasticsearch
Apollo
产品
Think
生活技巧
软件开发
计算机网络
CI
DB
设计
缓存
Docker
Node
操作系统
Java
大前端
Nestjs
其他
PHP
AI