影刀RPA拼多多店群自动化实战:任务并发调度与生命周期管理

影刀RPA拼多多店群自动化实战:任务并发调度与生命周期管理

很多人以为店群自动化的难点在流程编写。
其实不是。

单个店铺的上架、改价、发消息,花几天时间磨一磨总能跑通。
真正把几十个拼多多店铺、TEMU 店铺一起挂到系统里的时候,你才会发现——
让几百个任务平稳、有序、可追踪地完成,才是真正的工程难题。

这篇文章不写流程。
我打算把过去一年在做多平台店群自动化时,围绕任务生命周期和并发调度踩过的坑、最终落地的方案,原原本本地复盘出来。
不涉及具体业务细节,只讲工程设计和系统思路。


一、任务不是“跑完就行”,它需要一条严格的生命线

最早我们用影刀跑任务,做法很简单:
Python 脚本通过 API 触发影刀流程,然后轮询等结果。
任务状态只有三种:成功、失败、没反应。

当店铺数超过 20 个,每天任务量超过 1000 条的时候,这种粗粒度的状态管理直接导致了运维噩梦。
你不知道一个“没反应”的任务到底是卡在页面加载,还是流程已经崩了,还是影刀服务死了。
也不知道一个“失败”的任务该不该重试,重试多少次算合理。

工程设计的第一件事,就是给任务定义一条严格的生命线。


我们把任务状态细化为七个节点:

CREATED → QUEUED → ASSIGNED → RUNNING → COMPLETED
                   ↓           ↓
                REJECTED   TIMEOUT / FAILED → RETRYING → (返回 QUEUED 或 最终 FAILED)
                
![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/b379ca56ae2d4865bbabb70fadd2dce7~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1780970945&x-signature=sWeV4AvGG1A9nrhmHM9ynX9e2lE%3D)

picture.image 这不仅仅是多了几个标签。
每个状态转换都必须带上时间戳、执行节点 ID、错误码。
正是这些看似冗余的记录,后来让我们能快速定位到某一个节点在特定时间段的浏览器崩溃规律。

picture.image 状态机本身是用 Python 的 transitions 库实现的,但外部只通过一个薄薄的领域服务操作,不直接暴露状态变更。
为什么要包一层?
因为状态变更往往伴随着副作用:记录日志、发送通知、更新数据库、触发后续任务。
如果让调用方随便改状态,系统状态一定会乱。

class TaskLifecycleService:
    def __init__(self, task_repo, event_bus):
    
![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/32370510d4a94a44b1786524c439b393~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1780970945&x-signature=rWkaDi%2Fko4FEDEvdTKcMxMew5UE%3D)
        self.repo = task_repo
        self.bus = event_bus

    def mark_assigned(self, task_id: str, node_id: str):
        task = self.repo.get(task_id)
        if task.status != TaskStatus.QUEUED:
            raise InvalidTransitionError(task.status, TaskStatus.ASSIGNED)
        task.status = TaskStatus.ASSIGNED
        task.assigned_node = node_id
        task.assigned_at = time.time()
        self.repo.save(task)
        self.bus.emit(TaskAssigned(task_id, node_id))

状态机的严谨,是系统稳定性的第一根骨架。


二、并发不是开线程,是控制资源

一个执行节点可以同时跑多少个影刀流程,这个问题我们反复调整了很久。

最初的想法是“多开几个线程,能跑多少跑多少”。
很快机器就变成幻灯片——每个影刀流程都会附着到一个浏览器实例,每个浏览器实例占几百兆内存,10 个并行任务直接吃光 16GB 内存,OOM Killer 开始无差别杀进程。

我们后来定了三条硬规矩:

  1. 并发数必须和物理资源绑定,不允许人工随意设置。
  2. 浏览器的获取和释放必须走池化模型,不能随用随开。
  3. 任何一个节点的并发槽用完,就拒绝新任务,让任务在队列里等待,绝不允许超额调度。

在 Python 守护进程里,我们用 asyncio.Semaphore 控制本地并发数:

class NodeExecutor:
    def __init__(self, max_concurrency: int, browser_pool: BrowserPool, yidao_client: YidaoClient):
        self.semaphore = asyncio.Semaphore(max_concurrency)
        self.browser_pool = browser_pool
        self.yidao = yidao_client

    async def execute(self, task: Task):
        async with self.semaphore:
            try:
                browser = await self.browser_pool.acquire(task.shop_id)
                result = await self.yidao.run_flow(task.action, task.params, browser.debug_port)
                # ... 处理结果
            finally:
                await self.browser_pool.release(task.shop_id)

这里有个容易被忽略的细节:browser_pool.acquirerelease 都需要是异步的,因为浏览器启动可能耗时数秒。
如果放在同步里做,整个事件循环会被卡住,其他任务的超时检测都无法运作。

真正跑到几十个店铺后,问题才会开始出现——
比如偶发的浏览器启动超时,如果没有独立的超时控制,一个任务卡住就会慢慢耗尽整个信号量,导致节点假死。

所以 acquire 必须带超时:

async def acquire_with_timeout(self, shop_id: str, timeout: float = 15.0):
    return await asyncio.wait_for(self.browser_pool.acquire(shop_id), timeout=timeout)

三、多节点并发下的任务分配策略

任务并发不只在单个节点内部,更在多个执行节点之间。
我们有三种类型的节点:拼多多执行机、TEMU 执行机、TikTok Shop 执行机。
每种节点跑不同的流程包,消费不同的 Redis Stream。

这就天然形成了一种“平台隔离”。
但隔离还不够,还需要解决同一个平台内多个节点的任务分配问题。

最简单的策略是广播,谁抢到谁执行。
这种方式在负载均衡上没问题,但有两个麻烦:

  • 同一个店铺的连续任务可能落在不同节点,导致浏览器实例被反复创建销毁,浪费资源。
  • 某个节点突发繁忙时,某些任务会被反复抢占又回滚,增加延迟。

我们的解法是:带权重的粘性分配
每个节点在消费消息时,会声明自己“当前负载”和“已持有店铺浏览器”的列表。
调度器在产生任务时,不是随机丢进队列,而是根据店铺当前是否有活跃的浏览器实例,优先发给持有该实例的节点。

但这不是硬分配。
如果那个节点已经满载,任务会进入公共队列,由其他节点竞争。

这个逻辑并不复杂,就是在任务入队前做一次预判:

async def dispatch_with_affinity(self, task: Task):
    preferred_node = self._find_node_with_active_browser(task.shop_id)
    if preferred_node and await self._node_has_capacity(preferred_node):
        await self._send_to_node(preferred_node, task)
    else:
        await self._send_to_public_queue(task.platform, task)

这种做法在实际运行中效果明显——
同一个店铺的日常任务,大部分都落在了同一台机器上,浏览器实例复用率超过 80%,整体任务延迟下降了约 30%。


四、异常和重试:临时故障和逻辑故障必须分开

任何自动化系统都会遇到失败。
问题在于,不是所有失败都应该重试。

我们花了很长时间把异常分成了两类:

  • 临时性故障:网络超时、页面加载慢、验证码弹出、浏览器卡死。
  • 业务性故障:商品数据不合法、店铺被限制、接口返回业务错误。

临时性故障可以重试,业务性故障重试没有意义。
判断标准是异常类型和返回的错误码,这部分规则维护在一个配置里,不同平台可以有自己的定义。

重试策略我们选择了指数退避,最大重试次数 3 次,每次间隔 30s、60s、120s。
超过 3 次任务进入“死信队列”,需要人工确认后才能重新激活。

这里有另一个经常被忽略的点:重试的时候,必须确保任务从头开始,而不是从断点继续
因为浏览器状态已经不可信了,你无法判断刚才到底执行到了哪一步。
最稳妥的做法是:直接标记任务失败,生成一个新的任务副本重新进入队列,而不是在原任务上循环。

我们用了一个简单的方法:

async def handle_failure(self, task: Task, error: Exception):
    if self._is_retryable(error) and task.retry_count < self.max_retries:
        new_task = task.copy_with_incremented_retry()
        self.bus.emit(TaskRetryScheduled(new_task))
    else:
        task.status = TaskStatus.FAILED
        self.bus.emit(TaskFailed(task, error))

这样做还有一个好处:保留完整的失败记录链。
你可以看到某个任务重试了三次,每次在哪个节点失败,原因是什么。
这在排查偶发性页面问题时非常重要。


五、任务超时与看门狗

有些任务不报错,也不完成,就卡在那里。
最常见的原因是:页面弹出了一个意料之外的对话框,脚本没有匹配到关闭逻辑,就一直等下去。

我们在执行流程时,必须设定一个绝对超时时间。
这个超时不是 HTTP 连接超时,而是业务逻辑超时——比如“商品上架”正常 2 分钟,我们设置硬超时 5 分钟。
一旦超过,守护进程主动调用影刀的停止流程 API,并强制回收浏览器实例。

但问题来了:影刀的停止 API 不一定能杀死卡死的流程。
有时候流程卡在浏览器内部的 JS 死循环里,API 调用也无法响应。

所以我们加了一层操作系统的看门狗:
每个执行中的任务都会记录进程 PID,超时后先尝试 API 停止,等待 5 秒,如果没反应,直接 SIGKILL。

async def kill_with_prejudice(self, execution_id: str, pid: int):
    try:
        await self.yidao.stop_execution(execution_id, timeout=5)
    except Exception:
        pass
    finally:
        try:
            os.kill(pid, signal.SIGKILL)
        except ProcessLookupError:
            pass

这种方式虽然粗暴,但保证了节点不会因为一个僵尸任务拖死整个并发槽。
很多团队最开始都会忽略这里,直到半夜被报警电话吵醒。


六、流程包管理与多平台适配

影刀的流程文件是本地 .bot 包。
三个平台加起来有十几个流程包,版本迭代频繁。

如果每次更新流程都要手动登录服务器替换文件,运维成本会随着节点数量一起爆炸。

我们做了一个简单的流程包分发系统:
把流程包放在一个对象存储里,节点启动或定时检查时,自动拉取最新版本。
每个流程包的版本号写在一个 manifest 文件里,节点对比本地版本和远程版本,有差异就下载替换。

流程的调用则通过配置表映射:

actions:
  upload_product:
    pdd: "pdd_product_upload_v2.3.bot"
    temu: "temu_upload_v1.8.bot"
    tiktok: "tkshop_product_upload_v1.1.bot"

这样 Python 调度代码完全不用关心底层流程文件名,只认动作名和平台。
新上线一个平台,只需要加映射配置,不需要改任何调度逻辑。

这个设计让我们在后来紧急上线 TikTok Shop 时,两天就完成了集成,而不是两周。


七、可观测性:任务链路的透明化

自动化系统最怕的,不是故障,是沉默。
某个任务一直处于 RUNNING 状态,没有报错,也没有完成,就这么悬着。
这种情况比直接报错更危险,因为它会默默吃掉所有可用的并发槽。

我们为每条任务生成一个全局唯一的 trace_id,贯穿整个生命周期。
从调度器产生任务,到进入队列,到被节点拉取,到影刀开始执行,到流程内部的关键步骤——
每一步都带着这个 trace_id 上报日志。

日志不进文件,直接写入 Elasticsearch,通过 Kibana 可以按 trace_id 串起整个调用链。
每个步骤的耗时一目了然。

有了这一步之后,我们才发现:
TEMU 后台的商品图片上传,在下午 2 点到 4 点之间平均耗时比其他时段多出 1.5 秒。
这个数据直接驱动我们调整了那个时段的任务并发策略,避开了平台响应慢的窗口。

很多看似偶发的现象,在日志里会呈现出清晰的规律。


八、写在最后

这套任务调度和生命周期管理系统,现在支撑着超过 150 个店铺的日常自动化运营。
它不会让业务增长得更快,但它让业务增长时,系统不会垮。

回头来看,真正有价值的不是“跑通了脚本”,而是建立了一套可以持续演进的工程框架:
状态机让任务可追溯,并发控制让资源不超载,重试机制让偶发故障自愈,全链路追踪让问题无处遁形。

如果你也在做类似的事,建议尽早把任务当成一个严肃的软件对象来设计,而不是零散的流程触发。
这个投入,会在店铺数跨过某个阈值之后,加倍还给你。

作者:林焱

0
0
0
0
评论
未登录
暂无评论