店群自动化跑久了,一定会遇到一个问题:
同一个任务被重复执行。
比如订单发货任务,因为网络超时重试,结果发了两遍货。客户收到两个包裹,钱没多收,货没了。
再比如商品上架任务,节点宕机后任务转移到另一个节点,两边同时执行,同一款商品在店铺里出现了两个重复链接。
这些问题在单机时代不那么明显,因为任务队列是单进程的。但一旦上了分布式调度、多节点执行、自动重试,重复执行就成了大概率事件。
这篇文章不讲调度架构,也不讲资源回收。
专门聊聊我们在拼多多和TEMU店群项目中,如何设计幂等性和分布式锁,让每个任务无论被执行多少次,结果都像只执行了一次。
适用场景:任何需要防止重复执行的自动化任务(发货、上架、改价、发消息)。 技术栈:影刀RPA + Python + Redis 分布式锁 + 幂等表。
一、重复执行是怎么发生的?
先列举我们实际遇到过的几种场景。
场景一:超时重试
一个订单发货任务,调用影刀RPA脚本后,网络突然抖动,调度器没收到响应,触发了重试机制。
实际上第一个脚本正在慢慢执行,只是回包丢了。结果两个脚本同时发货。
场景二:节点宕机任务转移
A节点正在执行一个商品上架任务,突然宕机了。调度中心发现任务超时,重新分配给了B节点。
但A节点并没有真的死透,只是网络断了几秒,后来恢复了,继续执行任务。两个节点同时上架同一商品。
场景三:消息队列重复消费
我们用的Redis队列,在某些场景下(比如消费者rebalance)可能重复投递同一条消息。
场景四:影刀RPA脚本内部循环重试
脚本里写了“如果点击失败就重试”,但重试逻辑没有判断是否已经成功过。
这些场景的共同点是:系统设计了重试,但没有设计重试的安全边界。
解决方案只有两个字:幂等。
二、幂等性设计的三个层次
我们把幂等性分成三个层次来落地:
第一层:任务级幂等
保证同一个任务ID永远不会被执行两次。
第二层:业务级幂等
即使任务ID不同,也要防止对同一个业务对象(如同一订单、同一商品)的重复操作。
第三层:脚本级幂等
影刀RPA脚本内部的操作应该是可重入的,或者能够在重复执行时跳过已完成步骤。
下面分别说。
三、任务级幂等:分布式锁 + 幂等表
最直接的方式:每个任务执行前,先抢一个分布式锁,锁的key就是任务ID。抢到锁才能执行。
我们用的是Redis分布式锁,带自动续期机制:
# distributed_lock.py
import redis
import uuid
import time
import threading
class RedisDistributedLock:
def __init__(self, redis_client, lock_key, ttl=60):
self.redis = redis_client
self.lock_key = f"lock:{lock_key}"
self.ttl = ttl
self.lock_value = str(uuid.uuid4())
self.renewal_thread = None
self.locked = False
def acquire(self, timeout=10):
start = time.time()
while time.time() - start < timeout:
# SET NX EX 原子操作
if self.redis.set(self.lock_key, self.lock_value, nx=True, ex=self.ttl):
self.locked = True
self._start_renewal()
return True
time.sleep(0.1)
return False
def _start_renewal(self):
"""启动续期线程,每 ttl/3 秒续期一次"""
def renew():
while self.locked:
time.sleep(self.ttl // 3)
if self.locked:
self.redis.expire(self.lock_key, self.ttl)
self.renewal_thread = threading.Thread(target=renew, daemon=True)
self.renewal_thread.start()
def release(self):
self.locked = False
# 使用Lua脚本保证原子性:只有value匹配时才删除
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(lua_script, 1, self.lock_key, self.lock_value)
if self.renewal_thread:
self.renewal_thread.join(timeout=1)
在任务执行的最外层使用:
def execute_task_with_lock(task_id, task_func):
lock = RedisDistributedLock(redis_client, f"task:{task_id}", ttl=120)
if not lock.acquire(timeout=5):
# 没抢到锁,说明已经在执行了
return {"status": "skipped", "reason": "already running"}
try:
result = task_func()
return result
finally:
lock.release()
这个锁能解决大部分重复执行问题,但有一个边界情况:任务执行时间超过TTL,锁自动释放,另一个节点抢到锁又执行了一遍。
所以我们加了续期线程,每40秒续一次120秒的TTL,只要任务还在跑,锁就不会自动释放。
注意:续期线程必须和任务在同一个进程里。如果任务进程挂了,续期线程也会挂,锁到期自动释放,这是期望的行为。
四、业务级幂等:唯一约束 + 状态机
任务级锁能防止同一任务ID重复执行,但不同任务ID操作同一个业务对象呢?
比如:两个不同的任务(一个是手动创建的,一个是自动重试生成的)都要求给订单ORDER_123发货。
任务ID不同,锁不冲突,但业务上必须只发一次货。
解决方案:业务幂等键。
每个业务操作都定义一个唯一的幂等键,比如order_delivery:ORDER_123。执行操作前,先检查这个幂等键是否已经处理过。
我们用一个MySQL表来存储已处理的幂等键:
CREATE TABLE idempotency_keys (
idempotency_key VARCHAR(255) PRIMARY KEY,
task_id VARCHAR(64) NOT NULL,
status ENUM('processing', 'success', 'failed') DEFAULT 'processing',
result JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_created (status, created_at)
);
业务代码中:
class BusinessIdempotency:
def __init__(self, db_conn, redis_client):
self.db = db_conn
self.redis = redis_client
def process_with_idempotency(self, idempotency_key, task_id, business_func):
# 先查MySQL
cursor = self.db.cursor()
cursor.execute("SELECT status FROM idempotency_keys WHERE idempotency_key = %s", (idempotency_key,))
row = cursor.fetchone()
if row:
if row[0] == 'success':
return {"status": "already_done", "message": "idempotency key already processed"}
elif row[0] == 'processing':
# 等待一小会儿,可能正在执行
time.sleep(2)
cursor.execute("SELECT status FROM idempotency_keys WHERE idempotency_key = %s", (idempotency_key,))
if cursor.fetchone()[0] == 'success':
return {"status": "already_done"}
# 插入processing记录,利用主键冲突防止并发
try:
cursor.execute(
"INSERT INTO idempotency_keys (idempotency_key, task_id, status) VALUES (%s, %s, 'processing')",
(idempotency_key, task_id)
)
self.db.commit()
except IntegrityError:
# 冲突,说明其他事务已经插入了
self.db.rollback()
return {"status": "already_done"}
try:
result = business_func()
cursor.execute(
"UPDATE idempotency_keys SET status = 'success', result = %s WHERE idempotency_key = %s",
(json.dumps(result), idempotency_key)
)
self.db.commit()
return result
except Exception as e:
cursor.execute(
"UPDATE idempotency_keys SET status = 'failed' WHERE idempotency_key = %s",
(idempotency_key,)
)
self.db.commit()
raise
这个方案的要点:
- 幂等键有明确的业务语义(
order_delivery:123) - 数据库主键保证不会重复插入
processing状态防止两个任务同时处理(第二个事务会等待或跳过)
我们把这个幂等机制封装成了一个装饰器,业务函数只需要声明幂等键的生成规则。
@idempotent(key_func=lambda order_id: f"order_delivery:{order_id}")
def deliver_order(order_id):
# 实际的发货逻辑
pass
五、影刀RPA脚本内部的幂等性
分布式锁和业务幂等表都是在调度层解决的。但影刀RPA脚本本身也可能被重复调用,比如同一个店铺的商品上架脚本,因为某种原因被手动触发了两次。
我们要求每个影刀RPA脚本在开始执行前,先检查目标对象的状态。
以商品上架为例:
# 影刀RPA脚本中的幂等检查逻辑(伪代码)
# 假设脚本接收参数 product_id
# 1. 先查询商品当前状态
current_status = call_api("get_product_status", product_id)
if current_status == "上架中":
# 已经在上架流程中,可能是重复执行
log("商品已在处理中,跳过")
return {"skipped": True}
if current_status == "已上架":
log("商品已经上架,无需重复操作")
return {"skipped": True}
# 2. 执行上架
do_upload()
# 3. 上架完成后再次确认
final_status = call_api("get_product_status", product_id)
if final_status != "已上架":
raise Exception("上架后状态异常")
这种“执行前检查、执行后确认”的模式,让影刀脚本具备了天然的幂等性。
我们甚至把这种模式做成了脚本模板,所有新脚本默认包含幂等检查逻辑。
六、真实踩坑:锁超时与死锁
分布式锁不是万能的,我们踩过两个大坑。
坑一:锁超时导致重复执行
前面提到的续期机制看似完美,但有一个漏洞:如果任务在续期间隔内被卡住(比如Python的GIL导致线程调度延迟),续期线程没有及时执行,锁过期了。另一个节点抢到锁,两个任务同时执行。
解决方式:缩短续期间隔,同时使用更可靠的续期方式。我们后来改成了每个任务独立进程,续期用单独的进程信号而不是线程,避免GIL影响。
坑二:Redis主从切换导致锁丢失
Redis Sentinel模式下,主节点宕机后从节点升主。如果主节点上的锁还没来得及同步到从节点,锁就丢了。
解决:使用Redlock算法(多个独立Redis实例)或者直接换成ZooKeeper/etcd。我们选择了etcd,它的Raft一致性保证不会丢锁。
# etcd 分布式锁实现更简洁
import etcd3
client = etcd3.client()
lock = client.lock("task:123")
if lock.acquire(timeout=10):
try:
do_task()
finally:
lock.release()
etcd的锁自带心跳续期,不用担心超时问题。只是比Redis重一些,但对于关键任务值得。
七、混合策略:本地缓存 + 分布式锁
每个任务都去Redis/etcd抢锁,在高并发下有一定开销。我们做了一个优化:本地进程内的任务先去一个本地缓存检查,如果同一个任务的锁已经被本进程持有,就不再请求分布式锁。
class LocalLockCache:
def __init__(self):
self.local_locks = set()
self.lock = threading.Lock()
def try_acquire_local(self, lock_key):
with self.lock:
if lock_key in self.local_locks:
return False
self.local_locks.add(lock_key)
return True
def release_local(self, lock_key):
with self.lock:
self.local_locks.discard(lock_key)
这样同一个进程内重复请求同一个任务锁时,直接返回false,避免网络开销。
这个优化在高频重试场景下效果明显,减少了约70%的Redis调用。
八、幂等性对影刀RPA脚本的要求
从工程实践来看,要让整个系统幂等,影刀RPA脚本需要做到:
要求一:脚本必须是纯函数风格
给定同样的输入(店铺ID、商品ID等),无论执行多少次,产生的外部效果应该相同。脚本内部不能依赖随机值、时间戳等不确定因素。
要求二:所有写操作前都要查询状态
上架前先查商品是否已在售,发货前先查订单是否已发货。这个查询和后续操作之间可能有竞态,所以还需要配合业务幂等表。
要求三:脚本要支持“试运行”模式
有些任务可以先执行但不提交(比如预览上架效果)。我们给每个影刀脚本增加了一个dry_run参数,在测试幂等逻辑时可以安全验证。
要求四:脚本要能返回确定的业务结果
执行完成后,脚本需要返回成功/失败状态,以及业务标识(如发货单号)。调度层根据这个结果来决定是否写入幂等表。
九、监控与审计:幂等性失败的发现
即使设计了幂等,也要监控是否真的有效。
我们加了两个监控指标:
指标一:幂等拒绝次数
因分布式锁或幂等表拦截而跳过的任务数量。如果这个指标突然升高,说明存在大量重试或重复调度,需要排查原因。
指标二:幂等键重复率
同一个幂等键被尝试处理的次数。正常情况应该是1,如果大于1说明有重复提交。我们有一个看板专门展示这个指标。
有一次我们发现某个订单的幂等键重复率达到了3,排查后发现是影刀脚本内部有一个循环调用了发货接口三次。修复后问题解决。
另外,所有幂等键的记录都保留在数据库中,可以用于事后审计。运营投诉“多发了一个包裹”时,我们直接查幂等表,看是否有重复执行的痕迹。
十、总结:幂等是分布式自动化的基石
很多做店群自动化的团队,早期不重视幂等性。
觉得“重试很少发生”、“节点宕机概率低”。等到出了线上事故,损失已经造成了。
我们现在的原则是:
任何对外部系统(订单、库存、商品)有写操作的任务,必须设计幂等。
这不是可选项,是必选项。
分布式锁解决任务级重复,业务幂等表解决业务级重复,脚本内状态检查解决脚本级重复。三层加起来,才敢说系统是“安全”的。
希望这篇文章能让你在设计自动化系统时,多一层对“重复”的敬畏。
作者:林焱
