影刀RPA工程实战:店群自动化流程的DAG编排与跨任务依赖调度引擎

影刀RPA工程实战:店群自动化流程的DAG编排与跨任务依赖调度引擎

一个店铺的日常运营,从来不是一个流程在战斗。
采集、分析、上货、调价、回复,它们之间有先后、有依赖、有数据传递。把这些关系交给人工去协调,规模化就是一句空话。

我们之前的文章,把单个流程的执行、调度、环境、数据都讲透了。但真正跑过店群的人知道,一个店铺每天要跑的自动化流程不是孤立的。以TEMU店铺为例,早上的采集流程跑完,白天的分析流程才能知道哪些商品需要优化,下午的上货流程依赖分析结果,晚上的调价流程又依赖白天的销售数据。

在只有几个店铺的时候,我们用最原始的方法串起这些流程——在调度中心里给每个流程设一个定时触发,时间间隔靠人工估算。采集跑30分钟,分析就设在30分钟后启动。看起来能工作,但一旦采集因为数据量大跑超时了,分析流程就会在一个空数据集上运行,产出全是错的,然后上货流程把错误数据推到前台,一整天的运营节奏全乱。

不是流程错了,是流程之间的协调机制缺位了。

这篇文章还原我们怎么在调度层之上,构建一套轻量级的DAG工作流编排引擎,让多个影刀流程能够按照依赖关系自动编排执行,并在失败时做出正确的传播和恢复决策。


一、定时触发是自动化协调的最大谎言

定时触发适用于独立任务,但把它用在有依赖关系的任务链上,本质上是用时间顺序来假装依赖关系。这在系统负载稳定、数据量稳定的情况下勉强可行,但店群运营根本不具备这两个“稳定”。

平台活动期间数据量翻倍,采集时间拉长;某个店铺被临时限流,操作流程执行变慢;代理出口偶尔抖动,某个流程多花了十分钟。每一个微小波动都会让精心设计的时间差失效。

picture.image 更要命的是,如果前序流程失败了,后续流程照常启动,就会基于过期数据或空数据执行,污染业务数据库。我们有过一次教训:TEMU采集流程因为页面改版全部失败,分析流程还是在30分钟后准时启动,拿上周的旧数据跑了一遍分析,生成了“昨日销量暴跌90%”的错误报警,运营差点启动紧急预案。

picture.image

用时间来表达依赖,就像用目测来判断距离——平时还行,关键时刻一定出问题。


picture.image

二、引入DAG:让依赖关系显式化

在后端系统里,有依赖关系的任务编排早就用DAG(有向无环图)来表达了。Airflow、Dagster这些工具的核心思想就是把任务节点和它们之间的依赖边定义清楚,调度引擎根据依赖状态来决定执行顺序。

我们把同样的思想搬到RPA流程编排上。一个店铺的日常运营被抽象为一个工作流DAG,每个节点对应一个已有的影刀流程任务,边表示依赖——A执行成功后才能执行B。

picture.image

workflow:
  name: "temu_daily_ops"
  shop_id: "temu_045"
  nodes:
    - id: "collect_orders"
      flow_name: "order_collect"
      
![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/3a204ff9b02048ed969cf81e7471c102~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1780158255&x-signature=MHATBHKVZs3%2FBF6FFHkVuY6yeZw%3D)
      priority: "P2"
    - id: "collect_products"
      flow_name: "product_collect"
      priority: "P2"
    - id: "analyze_sales"
      flow_name: "sales_analysis"
      priority: "P2"
      depends_on: ["collect_orders"]
    - id: "analyze_inventory"
      flow_name: "inventory_analysis"
      priority: "P2"
      depends_on: ["collect_products"]
    - id: "adjust_price"
      flow_name: "price_adjust"
      priority: "P1"
      depends_on: ["analyze_sales"]
    - id: "replenish_stock"
      flow_name: "stock_replenish"
      priority: "P1"
      depends_on: ["analyze_inventory"]
    - id: "upload_products"
      flow_name: "product_upload"
      priority: "P2"
      depends_on: ["analyze_sales", "analyze_inventory"]

picture.image 这个DAG里,采集订单和采集商品可以并行执行,两者互不依赖。销售分析等待订单采集,库存分析等待商品采集,这又是并行的。调价和补货各自依赖一个分析结果,最后的商品上架则同时依赖两个分析——因为上架需要结合销售趋势和库存情况决定上什么货。

这种编排能力,用定时触发永远做不到。


三、DAG调度引擎的核心设计

我们的DAG引擎是一个独立的Python服务,它不替代已有的任务调度中心,而是在其之上做编排。引擎负责解析工作流定义、管理节点状态、根据依赖触发下游任务,并与调度中心交互来实际执行每个节点的影刀流程。

3.1 工作流实例与节点状态机

每次工作流被触发,引擎创建一个工作流实例,包含所有节点的实例。每个节点实例有自己的状态:

  • pending:节点已创建,但依赖未满足,等待中
  • ready:所有依赖节点已成功完成,可以执行
  • running:已下发给调度中心,正在执行
  • success:执行成功
  • failed:执行失败
  • skipped:因上游失败导致本节点被跳过
  • cancelled:被人工取消

节点之间的状态流转完全由依赖关系驱动。引擎每隔几秒扫描所有running的工作流实例,检查是否有节点变成successfailed,然后更新下游节点状态。

class WorkflowEngine:
    def __init__(self, scheduler_client, redis_client):
        self.scheduler = scheduler_client
        self.redis = redis_client

    def on_node_completed(self, workflow_instance_id: str, node_id: str, status: str):
        wf = self._load_workflow_instance(workflow_instance_id)
        node = wf.get_node(node_id)
        node.status = status
        self._save_workflow_instance(wf)

        if status == "success":
            self._propagate_success(wf, node)
        elif status == "failed":
            self._propagate_failure(wf, node)

    def _propagate_success(self, wf, completed_node):
        for downstream_id in completed_node.downstream_nodes:
            downstream = wf.get_node(downstream_id)
            if all(wf.get_node(dep).status == "success" for dep in downstream.depends_on):
                downstream.status = "ready"
                self._submit_task(wf, downstream)

3.2 并行执行与资源协调

DAG里并行的节点会被同时提交到调度中心,但它们争抢的是同一个店铺的浏览器实例资源。我们在调度中心里已经做了店铺级别的资源管理——同一个店铺同一时间只能有一个流程在执行(采集和回复消息这种不同安全等级的场景除外)。

这意味着,collect_orderscollect_products虽然逻辑上可以并行,但如果它们操作的是同一个店铺后台,页面上不可能同时打开两个会话。现实里,这两个采集任务操作的是不同页面模块,可以安全并行,我们通过分配两个浏览器实例来支持。但如果并行任务都试图操作同一个模块,就会冲突。

我们在DAG引擎里增加了一个资源标签机制。每个节点可以声明自己需要的资源标签,引擎在提交任务前,检查该店铺的可用资源是否满足。如果冲突,即使依赖已满足,节点也会保持在ready状态等待资源释放。

nodes:
  - id: "collect_orders"
    flow_name: "order_collect"
    resource_tags: ["module:orders"]
  - id: "collect_products"
    flow_name: "product_collect"
    resource_tags: ["module:products"]

这种标签让并行安全的节点快速执行,有冲突的节点自动排队,而不用人工去猜测哪些流程可以一起跑。


四、失败传播策略:不是所有失败都该终止一切

DAG里一个节点失败,下游该怎么办?没有一刀切的答案。不同的失败类型,不同的业务场景,需要不同的传播策略。

我们定义了四种失败处理模式:

模式一:直接终止(fail-fast)。适用于关键路径节点,失败意味着整个工作流失去意义。例如销售分析失败,后续调价和上架都无源之水,工作流直接标记为失败,所有下游节点置为skipped

模式二:跳过下游继续(skip-downstream)。适用于可降级的节点。例如库存分析失败,补货可以暂停,但订单采集和销售分析的结果仍然可用,商品上架可以基于销售分析单独决策。此时库存分析的下游补货被跳过,但上架节点因为还依赖销售分析(已成功),可以继续执行。

模式三:重试后决定(retry-then-decide)。适用于临时性故障。节点失败后,引擎自动重试最多2次,每次间隔递增。如果重试成功,下游正常触发;如果全部失败,再按fail-fast或skip-downstream策略处理。这个重试是在DAG引擎层面的,和调度中心的任务级重试互相独立。

模式四:等待人工决策(wait-for-human)。适用于需要人工判断的失败。节点失败后,工作流挂起,向运营发送告警工单,附上失败截图和日志。运营可以选择“忽略失败继续下游”、“重跑该节点”或“终止工作流”。引擎等待指令再继续。

传播策略在工作流定义时为每个节点指定:

nodes:
  - id: "analyze_sales"
    flow_name: "sales_analysis"
    depends_on: ["collect_orders"]
    on_failure: "fail-fast"
    max_retries: 2

五、工作流与断点续跑的协同

单个流程有Checkpoint机制,那工作流层面的进度如何保存?如果整个工作流执行到一半,调度中心挂了或机器重启了,恢复后怎么知道工作流跑到哪一步了?

工作流实例的所有状态都持久化在Redis和数据库中,引擎本身无状态。恢复时,引擎重新加载所有处于running状态的工作流实例,检查每个正在执行的节点的实际任务状态(通过查询调度中心),然后更新节点状态并继续推进DAG。

这个过程依赖一个关键设计:工作流实例的ID和每个节点的任务ID是强关联的。调度中心完成任务后,不仅更新任务状态,还回调DAG引擎的on_node_completed接口。如果回调丢失(比如引擎当时挂了),恢复时引擎主动拉取任务状态做补偿同步。

这个设计让工作流编排也具有了断点续跑的特性——不是流程内部的断点,而是流程之间的断点


六、工作流触发方式与周期性管理

工作流需要被触发才能开始执行。我们支持三种触发方式:

手动触发:运营在后台点击“启动今日运营工作流”,适合临时性任务或测试。

定时触发:定义一个cron表达式,每天固定时间启动。但和以前粗放式的定时不同,现在的定时只触发工作流的第一个节点(没有依赖的节点),后续完全由DAG引擎根据依赖自动推进。

事件触发:由外部系统发布事件来触发。比如“采集流程检测到差评”事件触发一个“差评处理工作流”。事件触发让自动化从定时驱动转向事件驱动,实时性大幅提升。

周期性工作流每天都会创建一个新的实例。我们保留最近30天的实例日志和状态,用于回溯和审计。


七、DAG的可视化与运营自助编排

工作流定义最初是工程师手写YAML,运营完全看不懂。后来我们做了一个简单的可视化编排界面,运营可以拖拽节点来组合工作流,并配置依赖关系和失败策略。

后台把可视化编排的结果自动转换为YAML,经过版本管理和审批后生效。这让运营团队可以自己设计日常运营流水线,工程师只负责底层流程节点的开发和维护。

可视化的另一个好处是,运营可以在工作流运行中实时看到DAG图,哪个节点跑完了、哪个在跑、哪个失败了,一目了然。不再需要追着工程师问“今天的数据怎么还没出来”。


八、那些实际编排中才暴露的问题

问题一:循环依赖的隐蔽性

有一次运营在编排界面里,给A节点设置了依赖B,又给B设置了依赖A,形成了一个环。DAG校验模块在提交时检测到了循环依赖,拒绝了发布。但更隐蔽的情况是间接循环——A依赖B,B依赖C,C依赖A。我们实现了基于拓扑排序的环检测,在每次工作流定义保存时运行,有环直接报错并高亮问题节点。

问题二:跨店铺工作流的资源死锁

店铺A的工作流节点等待一个浏览器实例,店铺B的工作流也在等,两个工作流分别占着对方需要的资源标签不放。这种情况虽然少见,但确实发生过一次。我们通过给资源等待加超时机制来解决——如果节点在ready状态等待资源超过30分钟,自动失败并释放已有资源,避免死锁。

问题三:工作流超时与僵尸实例

有些工作流因为某个节点执行时间异常长(比如采集全量历史订单跑了一整天还没完),整个工作流悬在那里。我们给每个工作流实例设了一个全局超时时间,超时后自动终止所有运行中的节点,标记工作流为timeout,并告警。


九、编排带来的本质变化

DAG编排上线后的第一个月,我们对比了前后的业务指标。店铺日运营流程的完整执行率(所有必要流程全部成功完成)从之前的82%提升到了96%。流程之间的数据传递不再依赖人工估算时间,上游完成后下游立刻启动,平均每天节省了约45分钟的等待空窗期。

更重要的变化是隐性的。运营不再需要记住“哪个流程跑完要手动触发哪个”,一切由引擎保证。工程师也不再需要为每个店铺维护一套定时任务配置,新店铺的运营流水线直接从模板复制一份DAG定义即可。

依赖关系不该住在人的脑子里,它应该被写进系统里,被引擎忠实地执行。
自动化不是替代人操作,是替代人协调。


十、继续演进的方向

目前我们的DAG引擎只支持店铺级别的工作流编排,跨店铺的协同——比如多个TEMU店铺的价格对比分析、跨平台库存调拨——还依赖外部脚本手动触发。下一步想把编排范围扩大到店群矩阵级别,让跨店铺的工作流也能被定义和调度。

另一个方向是动态DAG——工作流不是事先定义死的,而是根据上游节点的输出动态决定下游应该执行哪些节点。这会让自动化更贴近实际业务决策流程,但也对引擎的灵活性和安全性提出更高要求。

编排是自动化的神经系统。
单个流程再强壮,没有编排,也只是一群各自为战的肌肉。

作者:林焱
一个执着于让自动化流程像交响乐团一样协调演奏的工程师

0
0
0
0
评论
未登录
暂无评论