一个上货流程跑了七步,在第八步失败了。
前七步已经改了价格、传了图片、提交了部分信息——这些“半成品”怎么办?
单体脚本时代,这个问题不存在。失败了就重跑,从头再来。
但当自动化流程被拆成多个子模块、分布式执行,尤其涉及写操作时,一个模块的失败可能导致前面多个已成功的模块产生脏数据。
我们曾经遇到过一个典型事故:商品上架流程中的“上传图片”步骤成功,“填写描述”步骤失败。脚本重试了三次全部失败。但此时图片已经占用了平台的存储配额,且无法通过正常界面删除。最终人工登录后台,一个个商品清理。
那之后,我们开始引入分布式事务中的 Saga 模式,为自动化流程的写操作构建了补偿机制。本文就完整展开这套系统的设计理念和工程实践。
一、为什么自动化流程需要“事务”
传统RPA脚本大多没有事务概念。失败了就重试,或者人工处理残留。
但当流程被模块化、并行化后,一个业务操作可能横跨多个独立的模块调用,每个模块都执行了部分不可逆的写操作。
比如跨境商品上架的 Saga:
- 创建商品草稿(TEMU)
- 上传商品主图
- 上传详情图
- 填写商品描述
- 填写价格与库存
- 提交审核
如果步骤5失败,步骤1~4已完成的操作就处于“悬挂”状态。
直接重试可能因幂等性问题导致图片重复上传或商品草稿重复创建。不做处理则留下一堆垃圾数据。
我们需要的是:要么全部成功,要么全部回滚到初始状态——即 Saga 模式。
二、Saga模式在自动化中的实现思路
Saga将一个大事务拆成多个本地事务,每个本地事务都有一个对应的“补偿动作”。
如果某个本地事务失败,Saga会按逆序依次执行已成功事务的补偿动作,将系统恢复原状。
在我们的自动化系统中:
- 每个写操作的模块,除了正向执行的
execute方法,还提供可选的compensate方法。 - Python编排引擎维护一个“已执行步骤栈”,当任何一步失败时,从栈顶开始逆序调用
compensate。 - 补偿动作本身必须幂等,可反复执行而无副作用。
from dataclasses import dataclass
from typing import Any, Callable, Awaitable, Optional

@dataclass
class SagaStep:
name: str
execute: Callable[[dict], Awaitable[dict]]
compensate: Optional[Callable[[dict, dict], Awaitable[None]]] = None
# execute返回的结果会传给compensate作为上下文
class SagaOrchestrator:
def __init__(self):
self.executed_steps: list[tuple[str, dict]] = [] # (step_name, result)
async def run(self, steps: list[SagaStep], context: dict) -> dict:
self.executed_steps.clear()
for step in steps:
try:
result = await step.execute(context)
self.executed_steps.append((step.name, result))
# 将结果合并到上下文,供后续步骤使用
context[step.name] = result
except Exception as e:
logger.error(f"Saga step '{step.name}' failed: {e}")
await self._compensate(context)
raise SagaFailedError(step.name, e) from e
return context
async def _compensate(self, context: dict):
"""逆序执行已成功步骤的补偿动作"""
for step_name, result in reversed(self.executed_steps):
step = self._find_step(step_name)
if step and step.compensate:
try:
await step.compensate(context, result)
logger.info(f"Compensated step '{step_name}'")
except Exception as e:
logger.error(f"Compensation for '{step_name}' failed: {e}")
# 补偿失败不能中断其他补偿,记录并继续
每个店铺操作模块在注册时都需声明其补偿动作。比如“上传商品图片”的补偿就是“删除已上传的图片”。
三、补偿动作的设计:不止是简单删除
补偿不是简单地把数据删了就完事。需要考虑:
- 补偿失败怎么办:需要重试机制,且最终要能人工介入。
- 补偿的幂等性:如果补偿被重复调用(比如重试),不能报错,必须安全无操作或返回成功。
- 补偿的上下文:如何知道删哪张图片、取消哪个草稿?必须从正向执行的结果中获取关键标识。
我们为常见的写操作模块设计了标准补偿:
class UploadImageStep:
async def execute(self, ctx):
image_url = ctx["image_url"]
product_id = ctx["product_id"]
# 上传图片,返回图片ID
uploaded = await platform_api.upload_image(product_id, image_url)
return {"image_id": uploaded["id"]}
async def compensate(self, ctx, result):
image_id = result["image_id"]
# 删除图片,幂等:若已删除则忽略错误
try:
await platform_api.delete_image(image_id)
except ImageNotFoundError:
pass # 幂等,已删除则成功
class CreateProductDraftStep:
async def execute(self, ctx):
draft = await platform_api.create_draft(ctx["product_data"])
return {"draft_id": draft["id"]}
async def compensate(self, ctx, result):
draft_id = result["draft_id"]
try:
await platform_api.delete_draft(draft_id)
except DraftNotFoundError:
pass
所有补偿动作都要捕获“资源已不存在”的异常并视为成功,这是保证幂等的关键。
四、与现有流程编排系统的集成
Saga模式不需要改变我们已有的模块化流程编排。只需在配方配置中标记哪些步骤需要参与Saga。
flow_name: temu_upload_product
saga_mode: true
steps:
- name: create_draft
module: temu/create_draft
compensate: temu/delete_draft
- name: upload_main_image
module: temu/upload_image
compensate: temu/delete_image
- name: upload_detail_images
module: temu/upload_images_batch
compensate: temu/delete_images_batch
- name: fill_description
module: temu/fill_description
# 纯填写无补偿,失败时不影响数据一致性
- name: submit_review
module: temu/submit_review
compensate: temu/cancel_review
编排引擎读取 saga_mode: true 后,使用 SagaOrchestrator 而非普通的并行执行器来驱动这些步骤。
对于没有配置 compensate 的步骤,编排引擎假设它是无副作用的读操作或纯UI操作(如滚动页面、等待元素),失败无需回滚。
五、分布式Saga的协调
上面的Saga是单机顺序执行。但我们的系统是分布式的——不同步骤可能由不同的Worker执行。
比如“上传图片”可能在Worker-A,“填写描述”可能在Worker-B(由于资源调度)。
这时候需要有一个集中式的Saga协调器来追踪全局进度。
我们用Redis记录Saga的执行状态:
class DistributedSagaCoordinator:
def __init__(self, redis):
self.redis = redis
async def start_saga(self, saga_id: str, steps: list):
await self.redis.hset(f"saga:{saga_id}", mapping={
"status": "running",
"total_steps": len(steps),
"completed_steps": "0",
"created_at": time.time()
})
async def record_step_success(self, saga_id: str, step_name: str, result: dict):
await self.redis.hincrby(f"saga:{saga_id}", "completed_steps", 1)
await self.redis.lpush(f"saga:{saga_id}:history", json.dumps({
"step": step_name,
"result": result,
"timestamp": time.time()
}))
async def mark_saga_failed(self, saga_id: str, failed_step: str):
await self.redis.hset(f"saga:{saga_id}", "status", "compensating")
await self.redis.hset(f"saga:{saga_id}", "failed_step", failed_step)
async def get_compensation_plan(self, saga_id: str) -> list:
# 返回需要补偿的步骤列表(逆序)
history = await self.redis.lrange(f"saga:{saga_id}:history", 0, -1)
return [json.loads(h) for h in reversed(history)]
Master节点上的Saga协调器负责:接收Worker完成步骤的通知,在失败时决定补偿计划,并将补偿任务分发给对应的Worker。
由于补偿也是发到任务队列中执行的,即使某个Worker宕机,补偿任务也会被其他Worker接管,保证了分布式环境下的最终一致性。
六、补偿失败的重试与死信
补偿可能失败。比如删除图片时平台接口挂了。
我们为补偿动作设计了独立的重试策略:退避时间更长(1分钟、5分钟、15分钟),最大重试次数更多(5次)。
超过重试上限后,补偿任务进入死信队列,触发P1告警,等待人工处理。
class CompensationRetryManager:
async def execute_compensation_with_retry(self, saga_id, step_name, compensate_fn, context, result):
delays = [60, 300, 900, 1800, 3600] # 退避时间
for attempt, delay in enumerate(delays):
try:
await compensate_fn(context, result)
return
except Exception as e:
if attempt < len(delays) - 1:
logger.warning(f"Compensation retry {attempt+1} for {step_name}, next in {delay}s")
await asyncio.sleep(delay)
else:
# 进入死信
await self._move_to_dlq(saga_id, step_name, context, result)
raise CompensationExhaustedError(step_name) from e
async def _move_to_dlq(self, saga_id, step_name, context, result):
dlq_entry = {
"saga_id": saga_id,
"step_name": step_name,
"context": context,
"result": result,
"created_at": time.time()
}
await redis.rpush("saga:dlq", json.dumps(dlq_entry))
await alert_service.send_p1(f"Saga补偿失败进入死信: {saga_id}/{step_name}")
运营可以在管理后台看到死信队列中的补偿任务,手动执行或跳过。
七、与幂等机制的协同
Saga的补偿依赖每个步骤的幂等性。我们在之前的多篇文章中已构建了任务级别的幂等控制,这里重申与Saga的配合:
- 正向步骤通过业务唯一键保证幂等(如“创建草稿”通过 product_id + shop_id 去重)
- 补偿步骤通过捕获“资源不存在”异常保证幂等
- Saga重放(因Master切换等原因导致重复调度)时,先检查步骤是否已执行,已执行的跳过
八、监控与告警
Saga的关键指标:
- 每小时Saga启动数、成功数、失败数
- 各步骤补偿次数
- 死信队列长度
- 平均补偿耗时
如果某个步骤的补偿频率突然飙升,可能意味着该步骤的正向成功率下降,或平台接口发生了变化。
九、踩坑与经验
补偿步骤执行时上下文过期。 某次补偿任务是凌晨执行的,而平台在前一天晚上清理了临时图片存储,导致图片ID失效,删除操作报错。
我们为图片类补偿增加了“忽略资源不存在”的异常捕获,并在补偿上下文里记录了备用标识。
补偿风暴。 一次平台大面积故障导致大量Saga失败,补偿任务瞬间堆积。
我们限制了补偿任务的并发数(每个Worker最多同时执行2个补偿),防止补偿把系统压垮。
Saga超时。 某次Saga因等待人工审批卡住,超过Saga的全局超时(2小时),触发强制补偿。但审批在补偿完成后才通过,导致流程混乱。
我们为包含人工节点的Saga增加了“暂停计时”能力,审批期间不计入超时。
十、写在最后
自动化不是一路向前的直线,而是随时准备优雅撤回的退路。
Saga模式赋予了自动化流程“知错能改”的能力——当某一步失败时,它能清理之前留下的痕迹,让系统回到干净的状态,而不是留下一堆等待人工处理的烂摊子。
工程化的自动化,不是祈求每一步都成功,而是设计好失败之后的每一步。
这,就是Saga的意义。
作者:林焱
