影刀RPA拼多多店群自动化:幂等性设计与分布式锁防重实战

影刀RPA拼多多店群自动化:幂等性设计与分布式锁防重实战

店群自动化跑久了,一定会遇到一个问题:

同一个任务被重复执行。

比如订单发货任务,因为网络超时重试,结果发了两遍货。客户收到两个包裹,钱没多收,货没了。

再比如商品上架任务,节点宕机后任务转移到另一个节点,两边同时执行,同一款商品在店铺里出现了两个重复链接。

这些问题在单机时代不那么明显,因为任务队列是单进程的。但一旦上了分布式调度、多节点执行、自动重试,重复执行就成了大概率事件

这篇文章不讲调度架构,也不讲资源回收。

专门聊聊我们在拼多多和TEMU店群项目中,如何设计幂等性分布式锁,让每个任务无论被执行多少次,结果都像只执行了一次。

适用场景:任何需要防止重复执行的自动化任务(发货、上架、改价、发消息)。 技术栈:影刀RPA + Python + Redis 分布式锁 + 幂等表。


一、重复执行是怎么发生的?

先列举我们实际遇到过的几种场景。

picture.image

picture.image 场景一:超时重试

一个订单发货任务,调用影刀RPA脚本后,网络突然抖动,调度器没收到响应,触发了重试机制。

实际上第一个脚本正在慢慢执行,只是回包丢了。结果两个脚本同时发货。

picture.image 场景二:节点宕机任务转移

A节点正在执行一个商品上架任务,突然宕机了。调度中心发现任务超时,重新分配给了B节点。

但A节点并没有真的死透,只是网络断了几秒,后来恢复了,继续执行任务。两个节点同时上架同一商品。

场景三:消息队列重复消费

picture.image

我们用的Redis队列,在某些场景下(比如消费者rebalance)可能重复投递同一条消息。

场景四:影刀RPA脚本内部循环重试

脚本里写了“如果点击失败就重试”,但重试逻辑没有判断是否已经成功过。

这些场景的共同点是:系统设计了重试,但没有设计重试的安全边界

picture.image

picture.image 解决方案只有两个字:幂等


二、幂等性设计的三个层次

我们把幂等性分成三个层次来落地:

第一层:任务级幂等

保证同一个任务ID永远不会被执行两次。

第二层:业务级幂等

即使任务ID不同,也要防止对同一个业务对象(如同一订单、同一商品)的重复操作。

第三层:脚本级幂等

影刀RPA脚本内部的操作应该是可重入的,或者能够在重复执行时跳过已完成步骤。

下面分别说。


三、任务级幂等:分布式锁 + 幂等表

最直接的方式:每个任务执行前,先抢一个分布式锁,锁的key就是任务ID。抢到锁才能执行。

我们用的是Redis分布式锁,带自动续期机制:

# distributed_lock.py
import redis
import uuid
import time
import threading

class RedisDistributedLock:
    def __init__(self, redis_client, lock_key, ttl=60):
        self.redis = redis_client
        self.lock_key = f"lock:{lock_key}"
        self.ttl = ttl
        self.lock_value = str(uuid.uuid4())
        self.renewal_thread = None
        self.locked = False

    def acquire(self, timeout=10):
        start = time.time()
        while time.time() - start < timeout:
            # SET NX EX 原子操作
            if self.redis.set(self.lock_key, self.lock_value, nx=True, ex=self.ttl):
                self.locked = True
                self._start_renewal()
                return True
            time.sleep(0.1)
        return False

    def _start_renewal(self):
        """启动续期线程,每 ttl/3 秒续期一次"""
        def renew():
            while self.locked:
                time.sleep(self.ttl // 3)
                if self.locked:
                    self.redis.expire(self.lock_key, self.ttl)
        self.renewal_thread = threading.Thread(target=renew, daemon=True)
        self.renewal_thread.start()

    def release(self):
        self.locked = False
        # 使用Lua脚本保证原子性:只有value匹配时才删除
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(lua_script, 1, self.lock_key, self.lock_value)
        if self.renewal_thread:
            self.renewal_thread.join(timeout=1)

在任务执行的最外层使用:

def execute_task_with_lock(task_id, task_func):
    lock = RedisDistributedLock(redis_client, f"task:{task_id}", ttl=120)
    if not lock.acquire(timeout=5):
        # 没抢到锁,说明已经在执行了
        return {"status": "skipped", "reason": "already running"}
    try:
        result = task_func()
        return result
    finally:
        lock.release()

这个锁能解决大部分重复执行问题,但有一个边界情况:任务执行时间超过TTL,锁自动释放,另一个节点抢到锁又执行了一遍。

所以我们加了续期线程,每40秒续一次120秒的TTL,只要任务还在跑,锁就不会自动释放。

注意:续期线程必须和任务在同一个进程里。如果任务进程挂了,续期线程也会挂,锁到期自动释放,这是期望的行为。


四、业务级幂等:唯一约束 + 状态机

任务级锁能防止同一任务ID重复执行,但不同任务ID操作同一个业务对象呢?

比如:两个不同的任务(一个是手动创建的,一个是自动重试生成的)都要求给订单ORDER_123发货。

任务ID不同,锁不冲突,但业务上必须只发一次货。

解决方案:业务幂等键

每个业务操作都定义一个唯一的幂等键,比如order_delivery:ORDER_123。执行操作前,先检查这个幂等键是否已经处理过。

我们用一个MySQL表来存储已处理的幂等键:

CREATE TABLE idempotency_keys (
    idempotency_key VARCHAR(255) PRIMARY KEY,
    task_id VARCHAR(64) NOT NULL,
    status ENUM('processing', 'success', 'failed') DEFAULT 'processing',
    result JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_created (status, created_at)
);

业务代码中:

class BusinessIdempotency:
    def __init__(self, db_conn, redis_client):
        self.db = db_conn
        self.redis = redis_client

    def process_with_idempotency(self, idempotency_key, task_id, business_func):
        # 先查MySQL
        cursor = self.db.cursor()
        cursor.execute("SELECT status FROM idempotency_keys WHERE idempotency_key = %s", (idempotency_key,))
        row = cursor.fetchone()
        if row:
            if row[0] == 'success':
                return {"status": "already_done", "message": "idempotency key already processed"}
            elif row[0] == 'processing':
                # 等待一小会儿,可能正在执行
                time.sleep(2)
                cursor.execute("SELECT status FROM idempotency_keys WHERE idempotency_key = %s", (idempotency_key,))
                if cursor.fetchone()[0] == 'success':
                    return {"status": "already_done"}
                
        # 插入processing记录,利用主键冲突防止并发
        try:
            cursor.execute(
                "INSERT INTO idempotency_keys (idempotency_key, task_id, status) VALUES (%s, %s, 'processing')",
                (idempotency_key, task_id)
            )
            self.db.commit()
        except IntegrityError:
            # 冲突,说明其他事务已经插入了
            self.db.rollback()
            return {"status": "already_done"}

        try:
            result = business_func()
            cursor.execute(
                "UPDATE idempotency_keys SET status = 'success', result = %s WHERE idempotency_key = %s",
                (json.dumps(result), idempotency_key)
            )
            self.db.commit()
            return result
        except Exception as e:
            cursor.execute(
                "UPDATE idempotency_keys SET status = 'failed' WHERE idempotency_key = %s",
                (idempotency_key,)
            )
            self.db.commit()
            raise

这个方案的要点:

  • 幂等键有明确的业务语义(order_delivery:123
  • 数据库主键保证不会重复插入
  • processing状态防止两个任务同时处理(第二个事务会等待或跳过)

我们把这个幂等机制封装成了一个装饰器,业务函数只需要声明幂等键的生成规则。

@idempotent(key_func=lambda order_id: f"order_delivery:{order_id}")
def deliver_order(order_id):
    # 实际的发货逻辑
    pass

五、影刀RPA脚本内部的幂等性

分布式锁和业务幂等表都是在调度层解决的。但影刀RPA脚本本身也可能被重复调用,比如同一个店铺的商品上架脚本,因为某种原因被手动触发了两次。

我们要求每个影刀RPA脚本在开始执行前,先检查目标对象的状态。

以商品上架为例:

# 影刀RPA脚本中的幂等检查逻辑(伪代码)
# 假设脚本接收参数 product_id

# 1. 先查询商品当前状态
current_status = call_api("get_product_status", product_id)

if current_status == "上架中":
    # 已经在上架流程中,可能是重复执行
    log("商品已在处理中,跳过")
    return {"skipped": True}

if current_status == "已上架":
    log("商品已经上架,无需重复操作")
    return {"skipped": True}

# 2. 执行上架
do_upload()

# 3. 上架完成后再次确认
final_status = call_api("get_product_status", product_id)
if final_status != "已上架":
    raise Exception("上架后状态异常")

这种“执行前检查、执行后确认”的模式,让影刀脚本具备了天然的幂等性。

我们甚至把这种模式做成了脚本模板,所有新脚本默认包含幂等检查逻辑。


六、真实踩坑:锁超时与死锁

分布式锁不是万能的,我们踩过两个大坑。

坑一:锁超时导致重复执行

前面提到的续期机制看似完美,但有一个漏洞:如果任务在续期间隔内被卡住(比如Python的GIL导致线程调度延迟),续期线程没有及时执行,锁过期了。另一个节点抢到锁,两个任务同时执行。

解决方式:缩短续期间隔,同时使用更可靠的续期方式。我们后来改成了每个任务独立进程,续期用单独的进程信号而不是线程,避免GIL影响。

坑二:Redis主从切换导致锁丢失

Redis Sentinel模式下,主节点宕机后从节点升主。如果主节点上的锁还没来得及同步到从节点,锁就丢了。

解决:使用Redlock算法(多个独立Redis实例)或者直接换成ZooKeeper/etcd。我们选择了etcd,它的Raft一致性保证不会丢锁。

# etcd 分布式锁实现更简洁
import etcd3

client = etcd3.client()
lock = client.lock("task:123")
if lock.acquire(timeout=10):
    try:
        do_task()
    finally:
        lock.release()

etcd的锁自带心跳续期,不用担心超时问题。只是比Redis重一些,但对于关键任务值得。


七、混合策略:本地缓存 + 分布式锁

每个任务都去Redis/etcd抢锁,在高并发下有一定开销。我们做了一个优化:本地进程内的任务先去一个本地缓存检查,如果同一个任务的锁已经被本进程持有,就不再请求分布式锁。

class LocalLockCache:
    def __init__(self):
        self.local_locks = set()
        self.lock = threading.Lock()
        
    def try_acquire_local(self, lock_key):
        with self.lock:
            if lock_key in self.local_locks:
                return False
            self.local_locks.add(lock_key)
            return True
            
    def release_local(self, lock_key):
        with self.lock:
            self.local_locks.discard(lock_key)

这样同一个进程内重复请求同一个任务锁时,直接返回false,避免网络开销。

这个优化在高频重试场景下效果明显,减少了约70%的Redis调用。


八、幂等性对影刀RPA脚本的要求

从工程实践来看,要让整个系统幂等,影刀RPA脚本需要做到:

要求一:脚本必须是纯函数风格

给定同样的输入(店铺ID、商品ID等),无论执行多少次,产生的外部效果应该相同。脚本内部不能依赖随机值、时间戳等不确定因素。

要求二:所有写操作前都要查询状态

上架前先查商品是否已在售,发货前先查订单是否已发货。这个查询和后续操作之间可能有竞态,所以还需要配合业务幂等表。

要求三:脚本要支持“试运行”模式

有些任务可以先执行但不提交(比如预览上架效果)。我们给每个影刀脚本增加了一个dry_run参数,在测试幂等逻辑时可以安全验证。

要求四:脚本要能返回确定的业务结果

执行完成后,脚本需要返回成功/失败状态,以及业务标识(如发货单号)。调度层根据这个结果来决定是否写入幂等表。


九、监控与审计:幂等性失败的发现

即使设计了幂等,也要监控是否真的有效。

我们加了两个监控指标:

指标一:幂等拒绝次数

因分布式锁或幂等表拦截而跳过的任务数量。如果这个指标突然升高,说明存在大量重试或重复调度,需要排查原因。

指标二:幂等键重复率

同一个幂等键被尝试处理的次数。正常情况应该是1,如果大于1说明有重复提交。我们有一个看板专门展示这个指标。

有一次我们发现某个订单的幂等键重复率达到了3,排查后发现是影刀脚本内部有一个循环调用了发货接口三次。修复后问题解决。

另外,所有幂等键的记录都保留在数据库中,可以用于事后审计。运营投诉“多发了一个包裹”时,我们直接查幂等表,看是否有重复执行的痕迹。


十、总结:幂等是分布式自动化的基石

很多做店群自动化的团队,早期不重视幂等性。

觉得“重试很少发生”、“节点宕机概率低”。等到出了线上事故,损失已经造成了。

我们现在的原则是:

任何对外部系统(订单、库存、商品)有写操作的任务,必须设计幂等。

这不是可选项,是必选项。

分布式锁解决任务级重复,业务幂等表解决业务级重复,脚本内状态检查解决脚本级重复。三层加起来,才敢说系统是“安全”的。

希望这篇文章能让你在设计自动化系统时,多一层对“重复”的敬畏。


作者:林焱

0
0
0
0
评论
未登录
暂无评论