影刀RPA店群自动化教程:Python协同分布式任务补偿与Saga事务实战

影刀RPA店群自动化教程:Python协同分布式任务补偿与Saga事务实战

一个上货流程跑了七步,在第八步失败了。

前七步已经改了价格、传了图片、提交了部分信息——这些“半成品”怎么办?

单体脚本时代,这个问题不存在。失败了就重跑,从头再来。
但当自动化流程被拆成多个子模块、分布式执行,尤其涉及写操作时,一个模块的失败可能导致前面多个已成功的模块产生脏数据。

我们曾经遇到过一个典型事故:商品上架流程中的“上传图片”步骤成功,“填写描述”步骤失败。脚本重试了三次全部失败。但此时图片已经占用了平台的存储配额,且无法通过正常界面删除。最终人工登录后台,一个个商品清理。

那之后,我们开始引入分布式事务中的 Saga 模式,为自动化流程的写操作构建了补偿机制。本文就完整展开这套系统的设计理念和工程实践。


一、为什么自动化流程需要“事务”

传统RPA脚本大多没有事务概念。失败了就重试,或者人工处理残留。
但当流程被模块化、并行化后,一个业务操作可能横跨多个独立的模块调用,每个模块都执行了部分不可逆的写操作。

比如跨境商品上架的 Saga:

  1. 创建商品草稿(TEMU)
  2. 上传商品主图
  3. 上传详情图
  4. 填写商品描述
  5. 填写价格与库存
  6. 提交审核

如果步骤5失败,步骤1~4已完成的操作就处于“悬挂”状态。
直接重试可能因幂等性问题导致图片重复上传或商品草稿重复创建。不做处理则留下一堆垃圾数据。

我们需要的是:要么全部成功,要么全部回滚到初始状态——即 Saga 模式。


picture.image

二、Saga模式在自动化中的实现思路

Saga将一个大事务拆成多个本地事务,每个本地事务都有一个对应的“补偿动作”。
如果某个本地事务失败,Saga会按逆序依次执行已成功事务的补偿动作,将系统恢复原状。

在我们的自动化系统中:

picture.image

  • 每个写操作的模块,除了正向执行的 execute 方法,还提供可选的 compensate 方法。
  • Python编排引擎维护一个“已执行步骤栈”,当任何一步失败时,从栈顶开始逆序调用 compensate
  • 补偿动作本身必须幂等,可反复执行而无副作用。

picture.image

picture.image

from dataclasses import dataclass
from typing import Any, Callable, Awaitable, Optional

![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/62886c92fae647fba09b6af50bdd5aa2~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1780794867&x-signature=Q%2FGHjdUUwSHaqAslKICudpoVXU0%3D)
@dataclass
class SagaStep:
    name: str
    execute: Callable[[dict], Awaitable[dict]]
    compensate: Optional[Callable[[dict, dict], Awaitable[None]]] = None
    # execute返回的结果会传给compensate作为上下文

class SagaOrchestrator:
    def __init__(self):
        self.executed_steps: list[tuple[str, dict]] = []  # (step_name, result)

    async def run(self, steps: list[SagaStep], context: dict) -> dict:
        self.executed_steps.clear()
        for step in steps:
            try:
                result = await step.execute(context)
                self.executed_steps.append((step.name, result))
                # 将结果合并到上下文,供后续步骤使用
                context[step.name] = result
            except Exception as e:
                logger.error(f"Saga step '{step.name}' failed: {e}")
                await self._compensate(context)
                raise SagaFailedError(step.name, e) from e
        return context

    async def _compensate(self, context: dict):
        """逆序执行已成功步骤的补偿动作"""
        for step_name, result in reversed(self.executed_steps):
            step = self._find_step(step_name)
            if step and step.compensate:
                try:
                    await step.compensate(context, result)
                    logger.info(f"Compensated step '{step_name}'")
                except Exception as e:
                    logger.error(f"Compensation for '{step_name}' failed: {e}")
                    # 补偿失败不能中断其他补偿,记录并继续

每个店铺操作模块在注册时都需声明其补偿动作。比如“上传商品图片”的补偿就是“删除已上传的图片”。


三、补偿动作的设计:不止是简单删除

补偿不是简单地把数据删了就完事。需要考虑:

  • 补偿失败怎么办:需要重试机制,且最终要能人工介入。
  • 补偿的幂等性:如果补偿被重复调用(比如重试),不能报错,必须安全无操作或返回成功。
  • 补偿的上下文:如何知道删哪张图片、取消哪个草稿?必须从正向执行的结果中获取关键标识。

我们为常见的写操作模块设计了标准补偿:

class UploadImageStep:
    async def execute(self, ctx):
        image_url = ctx["image_url"]
        product_id = ctx["product_id"]
        # 上传图片,返回图片ID
        uploaded = await platform_api.upload_image(product_id, image_url)
        return {"image_id": uploaded["id"]}

    async def compensate(self, ctx, result):
        image_id = result["image_id"]
        # 删除图片,幂等:若已删除则忽略错误
        try:
            await platform_api.delete_image(image_id)
        except ImageNotFoundError:
            pass  # 幂等,已删除则成功
class CreateProductDraftStep:
    async def execute(self, ctx):
        draft = await platform_api.create_draft(ctx["product_data"])
        return {"draft_id": draft["id"]}

    async def compensate(self, ctx, result):
        draft_id = result["draft_id"]
        try:
            await platform_api.delete_draft(draft_id)
        except DraftNotFoundError:
            pass

所有补偿动作都要捕获“资源已不存在”的异常并视为成功,这是保证幂等的关键。


四、与现有流程编排系统的集成

Saga模式不需要改变我们已有的模块化流程编排。只需在配方配置中标记哪些步骤需要参与Saga。

flow_name: temu_upload_product
saga_mode: true
steps:
  - name: create_draft
    module: temu/create_draft
    compensate: temu/delete_draft
  - name: upload_main_image
    module: temu/upload_image
    compensate: temu/delete_image
  - name: upload_detail_images
    module: temu/upload_images_batch
    compensate: temu/delete_images_batch
  - name: fill_description
    module: temu/fill_description
    # 纯填写无补偿,失败时不影响数据一致性
  - name: submit_review
    module: temu/submit_review
    compensate: temu/cancel_review

编排引擎读取 saga_mode: true 后,使用 SagaOrchestrator 而非普通的并行执行器来驱动这些步骤。

对于没有配置 compensate 的步骤,编排引擎假设它是无副作用的读操作或纯UI操作(如滚动页面、等待元素),失败无需回滚。


五、分布式Saga的协调

上面的Saga是单机顺序执行。但我们的系统是分布式的——不同步骤可能由不同的Worker执行。

比如“上传图片”可能在Worker-A,“填写描述”可能在Worker-B(由于资源调度)。
这时候需要有一个集中式的Saga协调器来追踪全局进度。

我们用Redis记录Saga的执行状态:

class DistributedSagaCoordinator:
    def __init__(self, redis):
        self.redis = redis

    async def start_saga(self, saga_id: str, steps: list):
        await self.redis.hset(f"saga:{saga_id}", mapping={
            "status": "running",
            "total_steps": len(steps),
            "completed_steps": "0",
            "created_at": time.time()
        })

    async def record_step_success(self, saga_id: str, step_name: str, result: dict):
        await self.redis.hincrby(f"saga:{saga_id}", "completed_steps", 1)
        await self.redis.lpush(f"saga:{saga_id}:history", json.dumps({
            "step": step_name,
            "result": result,
            "timestamp": time.time()
        }))

    async def mark_saga_failed(self, saga_id: str, failed_step: str):
        await self.redis.hset(f"saga:{saga_id}", "status", "compensating")
        await self.redis.hset(f"saga:{saga_id}", "failed_step", failed_step)

    async def get_compensation_plan(self, saga_id: str) -> list:
        # 返回需要补偿的步骤列表(逆序)
        history = await self.redis.lrange(f"saga:{saga_id}:history", 0, -1)
        return [json.loads(h) for h in reversed(history)]

Master节点上的Saga协调器负责:接收Worker完成步骤的通知,在失败时决定补偿计划,并将补偿任务分发给对应的Worker。

由于补偿也是发到任务队列中执行的,即使某个Worker宕机,补偿任务也会被其他Worker接管,保证了分布式环境下的最终一致性。


六、补偿失败的重试与死信

补偿可能失败。比如删除图片时平台接口挂了。

我们为补偿动作设计了独立的重试策略:退避时间更长(1分钟、5分钟、15分钟),最大重试次数更多(5次)。
超过重试上限后,补偿任务进入死信队列,触发P1告警,等待人工处理。

class CompensationRetryManager:
    async def execute_compensation_with_retry(self, saga_id, step_name, compensate_fn, context, result):
        delays = [60, 300, 900, 1800, 3600]  # 退避时间
        for attempt, delay in enumerate(delays):
            try:
                await compensate_fn(context, result)
                return
            except Exception as e:
                if attempt < len(delays) - 1:
                    logger.warning(f"Compensation retry {attempt+1} for {step_name}, next in {delay}s")
                    await asyncio.sleep(delay)
                else:
                    # 进入死信
                    await self._move_to_dlq(saga_id, step_name, context, result)
                    raise CompensationExhaustedError(step_name) from e

    async def _move_to_dlq(self, saga_id, step_name, context, result):
        dlq_entry = {
            "saga_id": saga_id,
            "step_name": step_name,
            "context": context,
            "result": result,
            "created_at": time.time()
        }
        await redis.rpush("saga:dlq", json.dumps(dlq_entry))
        await alert_service.send_p1(f"Saga补偿失败进入死信: {saga_id}/{step_name}")

运营可以在管理后台看到死信队列中的补偿任务,手动执行或跳过。


七、与幂等机制的协同

Saga的补偿依赖每个步骤的幂等性。我们在之前的多篇文章中已构建了任务级别的幂等控制,这里重申与Saga的配合:

  • 正向步骤通过业务唯一键保证幂等(如“创建草稿”通过 product_id + shop_id 去重)
  • 补偿步骤通过捕获“资源不存在”异常保证幂等
  • Saga重放(因Master切换等原因导致重复调度)时,先检查步骤是否已执行,已执行的跳过

八、监控与告警

Saga的关键指标:

  • 每小时Saga启动数、成功数、失败数
  • 各步骤补偿次数
  • 死信队列长度
  • 平均补偿耗时

如果某个步骤的补偿频率突然飙升,可能意味着该步骤的正向成功率下降,或平台接口发生了变化。


九、踩坑与经验

补偿步骤执行时上下文过期。 某次补偿任务是凌晨执行的,而平台在前一天晚上清理了临时图片存储,导致图片ID失效,删除操作报错。
我们为图片类补偿增加了“忽略资源不存在”的异常捕获,并在补偿上下文里记录了备用标识。

补偿风暴。 一次平台大面积故障导致大量Saga失败,补偿任务瞬间堆积。
我们限制了补偿任务的并发数(每个Worker最多同时执行2个补偿),防止补偿把系统压垮。

Saga超时。 某次Saga因等待人工审批卡住,超过Saga的全局超时(2小时),触发强制补偿。但审批在补偿完成后才通过,导致流程混乱。
我们为包含人工节点的Saga增加了“暂停计时”能力,审批期间不计入超时。


十、写在最后

自动化不是一路向前的直线,而是随时准备优雅撤回的退路。

Saga模式赋予了自动化流程“知错能改”的能力——当某一步失败时,它能清理之前留下的痕迹,让系统回到干净的状态,而不是留下一堆等待人工处理的烂摊子。

工程化的自动化,不是祈求每一步都成功,而是设计好失败之后的每一步。
这,就是Saga的意义。


作者:林焱

0
0
0
0
评论
未登录
暂无评论