影刀RPA店群自动化编排引擎实战:从线性脚本到DAG工作流的工程重构

前几篇文章,我们把调度、稳定性、分布式、数据一致性、变更管理都聊透了。

该有的能力都有了。

picture.image

然后一个更深层次的问题浮现出来——流程之间的关系怎么管理?

一个店铺的运营,不是只有“上架商品”这一个动作。

picture.image

它是几十个动作的组合:签到、领券、上架、改价、同步库存、处理订单、发货、售后……

这些动作之间有先后顺序、有依赖关系、有触发条件。

picture.image 如果把这些动作全部塞进一个线性脚本里,结果就是灾难性的——一个环节出问题,整个链条全部卡死。

今天这篇文章,我们聊聊自动化系统的“大脑”——编排引擎

从线性脚本到DAG工作流,从硬编码依赖到声明式编排——全是工程重构的真实复盘。

picture.image


一、先说说“面条式脚本”为什么活不长

picture.image 大多数团队的RPA流程,早期都是“面条式”的。

一个流程文件里,从上到下排了几百条指令。

打开网页→登录→跳转到商品页→填写标题→上传图片→填写价格→提交→等待审核→……

picture.image

看起来逻辑清晰,实际上脆弱得像纸糊的。

第一个问题:变更成本极高。

picture.image 运营说“上架之前先做个库存校验”。

你得在流程中间找个位置插一段新逻辑。

但前后的依赖关系错综复杂,牵一发而动全身。

picture.image

改一处,测全域——没人敢保证不会影响后面的步骤。

第二个问题:复用几乎不可能。

同样是“登录店铺”这个动作,上架流程里有,改价流程里有,订单同步流程里也有。

每次都是复制粘贴。

一旦登录逻辑变了——比如平台加了验证码——你得去所有流程里一个一个改。

第三个问题:失败处理完全缺失。

面条式脚本的典型特征是:任何一个步骤失败,整个流程就停了。

没有重试、没有降级、没有跳过。

一个商品上传失败,后面的几十个商品全部卡住。

这些问题,本质上指向同一个结论:

“面条式脚本”撑不起企业级的自动化运营。

我们需要把“流程”从“一串指令”升级为“一组可编排的任务”。


二、核心思想:把流程拆成“积木”

picture.image

编排引擎的核心思想,其实特别朴素——乐高积木

每一个独立的业务动作,封装成一个“任务块”。

任务块之间通过“依赖关系”连接。

编排引擎负责解析这些依赖关系,决定执行的顺序和并行策略。

举个例子:

一个“完整的上架流程”,可以被拆解成这些任务块:

  • login:登录店铺
  • check_stock:校验库存
  • upload_product:上传商品
  • set_price:设置价格
  • submit_review:提交审核
  • notify_complete:通知完成

它们的依赖关系是:

  • logincheck_stockupload_productset_pricesubmit_reviewnotify_complete

这是串行的。

但如果把场景放大到“批量上架100个商品”:

  • login → 并行执行100次 upload_product → 并行执行100次 set_pricesubmit_review(批量提交)

这就是并行的。

编排引擎的价值在于:你只需要声明“谁依赖谁”,引擎自动决定怎么跑。


三、DAG:编排引擎的数学基础

上面说的“任务块 + 依赖关系”,在计算机科学里有一个精确的数学模型——DAG(有向无环图)

picture.image

  • 有向:依赖关系是有方向的。A依赖B,意味着B必须先于A执行。
  • 无环:不能有循环依赖。A依赖B、B依赖C、C依赖A——这种死循环在编排引擎里是不允许的。

在DAG模型下,编排引擎的核心工作是两件事:

第一,拓扑排序。

根据依赖关系,计算出所有任务的执行顺序。

比如A依赖B、A依赖C、B依赖D——拓扑排序的结果是D→B→C→A。

第二,并行度计算。

没有依赖关系的任务,可以并行执行。

比如B和C都依赖D,但B和C之间没有依赖——那么D执行完之后,B和C可以同时跑。

DAG模型的好处是:你把“做什么”和“什么时候做”彻底解耦了。

你只需要定义任务和依赖,引擎自动处理执行顺序和并行调度。


四、工程实现:一个轻量级的编排引擎

下面是我们真实项目里的编排引擎核心代码。

名字叫FlowEngine——当然,这也是一个代号。

# flow_engine.py - 轻量级DAG编排引擎
from typing import Dict, List, Set, Optional, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
import threading
import queue
import time
import uuid
from collections import defaultdict

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class FlowTask:
    """编排引擎中的任务节点"""
    task_id: str
    name: str
    func: Callable[[Dict[str, Any]], Dict[str, Any]]
    depends_on: List[str] = field(default_factory=list)
    timeout: int = 300
    retry_count: int = 0
    max_retries: int = 3
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    started_at: Optional[float] = None
    completed_at: Optional[float] = None

class FlowEngine:
    """
    轻量级DAG编排引擎
    核心功能:解析依赖关系、拓扑排序、并行调度
    """
    
    def __init__(self, max_parallel: int = 10):
        self.max_parallel = max_parallel
        self._tasks: Dict[str, FlowTask] = {}
        self._semaphore = threading.Semaphore(max_parallel)
        self._results: Dict[str, Any] = {}
        self._lock = threading.Lock()
        
    def add_task(self, name: str, func: Callable, 
                 depends_on: List[str] = None,
                 timeout: int = 300,
                 max_retries: int = 3) -> str:
        """添加一个任务到编排引擎"""
        task_id = f"{name}_{uuid.uuid4().hex[:6]}"
        task = FlowTask(
            task_id=task_id,
            name=name,
            func=func,
            depends_on=depends_on or [],
            timeout=timeout,
            max_retries=max_retries
        )
        self._tasks[task_id] = task
        return task_id
    
    def run(self) -> Dict[str, Any]:
        """
        执行整个DAG
        返回所有任务的执行结果
        """
        # 1. 构建依赖图并检测循环
        self._validate_dag()
        
        # 2. 拓扑排序 - 计算执行顺序
        execution_order = self._topological_sort()
        
        # 3. 并行执行
        self._execute_dag(execution_order)
        
        return self._results
    
    def _validate_dag(self):
        """验证DAG合法性 - 检测循环依赖"""
        visited = set()
        rec_stack = set()
        
        def dfs(task_id: str):
            visited.add(task_id)
            rec_stack.add(task_id)
            
            task = self._tasks.get(task_id)
            if not task:
                return
            
            for dep in task.depends_on:
                if dep not in self._tasks:
                    raise ValueError(f"任务 '{task_id}' 依赖的任务 '{dep}' 不存在")
                if dep in rec_stack:
                    raise ValueError(f"检测到循环依赖: {task_id} -> {dep}")
                if dep not in visited:
                    dfs(dep)
            
            rec_stack.remove(task_id)
        
        for task_id in self._tasks:
            if task_id not in visited:
                dfs(task_id)
    
    def _topological_sort(self) -> List[str]:
        """拓扑排序 - Kahn算法"""
        # 计算入度
        in_degree: Dict[str, int] = defaultdict(int)
        for task_id, task in self._tasks.items():
            for dep in task.depends_on:
                in_degree[task_id] += 1
        
        # 入度为0的任务可以立即执行
        ready = [t for t in self._tasks if in_degree[t] == 0]
        result = []
        
        while ready:
            # 按优先级排序(可选)
            ready.sort()
            task_id = ready.pop(0)
            result.append(task_id)
            
            # 更新依赖该任务的其他任务的入度
            for other_id, other in self._tasks.items():
                if task_id in other.depends_on:
                    in_degree[other_id] -= 1
                    if in_degree[other_id] == 0:
                        ready.append(other_id)
        
        if len(result) != len(self._tasks):
            raise ValueError("DAG中存在循环依赖")
        
        return result
    
    def _execute_dag(self, execution_order: List[str]):
        """执行DAG - 按拓扑顺序并行调度"""
        # 记录每个任务依赖的任务是否已完成
        completed = set()
        pending = set(execution_order)
        
        # 线程池 + 队列
        task_queue = queue.Queue()
        for task_id in execution_order:
            task_queue.put(task_id)
        
        workers = []
        for _ in range(min(self.max_parallel, len(execution_order))):
            worker = threading.Thread(
                target=self._worker_loop,
                args=(task_queue, completed, pending),
                daemon=True
            )
            worker.start()
            workers.append(worker)
        
        # 等待所有任务完成
        for worker in workers:
            worker.join()
    
    def _worker_loop(self, task_queue: queue.Queue, 
                     completed: Set[str], pending: Set[str]):
        """工作线程 - 从队列取任务并执行"""
        while True:
            try:
                task_id = task_queue.get(timeout=1)
            except queue.Empty:
                # 检查是否所有任务都已完成
                with self._lock:
                    if not pending:
                        return
                continue
            
            # 检查依赖是否已完成
            task = self._tasks[task_id]
            deps_ready = all(dep in completed for dep in task.depends_on)
            
            if not deps_ready:
                # 依赖未完成,放回队列尾部
                task_queue.put(task_id)
                continue
            
            # 获取信号量 - 控制并发数
            self._semaphore.acquire()
            
            try:
                # 执行任务
                self._execute_task(task_id)
                
                with self._lock:
                    completed.add(task_id)
                    pending.remove(task_id)
                    
            finally:
                self._semaphore.release()
    
    def _execute_task(self, task_id: str):
        """执行单个任务 - 带重试和超时"""
        task = self._tasks[task_id]
        
        # 准备上下文 - 依赖任务的结果
        context = {}
        for dep in task.depends_on:
            if dep in self._results:
                context[dep] = self._results[dep]
        
        # 带重试的执行
        for attempt in range(task.max_retries + 1):
            try:
                task.status = TaskStatus.RUNNING
                task.started_at = time.time()
                
                # 执行任务函数
                result = task.func(context)
                
                task.result = result
                task.status = TaskStatus.COMPLETED
                task.completed_at = time.time()
                
                # 存储结果供后续任务使用
                with self._lock:
                    self._results[task_id] = result
                
                return
                
            except Exception as e:
                task.error = str(e)
                if attempt < task.max_retries:
                    # 重试前等待(退避)
                    time.sleep(2 ** attempt)
                else:
                    task.status = TaskStatus.FAILED
                    task.completed_at = time.time()
                    raise
    
    def get_status(self) -> dict:
        """获取编排引擎状态"""
        status_counts = defaultdict(int)
        for task in self._tasks.values():
            status_counts[task.status.value] += 1
        
        return {
            "total_tasks": len(self._tasks),
            "status_counts": dict(status_counts),
            "completed": sum(1 for t in self._tasks.values() 
                            if t.status == TaskStatus.COMPLETED)
        }

五、声明式编排:让非技术人员也能看懂流程

有了编排引擎的底层能力,下一步就是把它包装成可读的、可配置的形式。

我们设计了一套声明式的DSL(领域特定语言),用YAML定义工作流:

# daily_operation_flow.yaml - 店铺日常运营工作流
name: "每日店铺运营流水线"
version: "1.2.3"

tasks:
  - id: login
    name: "登录店铺"
    type: "rpa"
    flow: "shop_login"
    timeout: 60
    
  - id: check_stock
    name: "校验库存"
    type: "rpa"
    flow: "stock_check"
    depends_on: ["login"]
    timeout: 120
    
  - id: sync_orders
    name: "同步订单"
    type: "rpa"
    flow: "order_sync"
    depends_on: ["login"]
    timeout: 180
    
  - id: update_price
    name: "更新价格"
    type: "rpa"
    flow: "price_update"
    depends_on: ["login", "check_stock"]
    timeout: 90
    condition: "{{ check_stock.low_stock }} == true"
    
  - id: notify_report
    name: "发送运营日报"
    type: "webhook"
    url: "https://hooks.feishu.cn/xxx"
    depends_on: ["sync_orders", "update_price"]
    timeout: 30

这种声明式设计的好处是:

运营人员可以看懂流程的完整结构——先做什么、后做什么、什么条件下做什么。

开发人员可以专注于实现每个type对应的执行器。

流程的变更,只需要修改YAML文件,不需要改动任何代码。


六、条件分支与动态编排

真实的业务场景中,不是所有任务都会被执行的。

比如上面的例子——update_price只在check_stock检测到低库存时才执行。

这就是条件分支

我们的编排引擎支持两种条件:

前置条件:任务是否执行,取决于前置任务的结果。

# condition_evaluator.py - 条件评估器
import re
from typing import Any, Dict

class ConditionEvaluator:
    """条件表达式评估器 - 支持简单的模板语法"""
    
    def __init__(self, context: Dict[str, Any]):
        self.context = context
    
    def evaluate(self, condition: str) -> bool:
        """
        评估条件表达式
        支持格式: "{{ task_id.field }} == value"
        """
        if not condition:
            return True
        
        # 解析模板变量
        pattern = r'\{\{\s*([\w\.]+)\s*\}\}'
        matches = re.findall(pattern, condition)
        
        # 替换变量为实际值
        expr = condition
        for match in matches:
            value = self._get_nested_value(match)
            expr = expr.replace(f"{{{{{match}}}}}", repr(value))
        
        # 安全评估表达式
        try:
            return eval(expr, {"__builtins__": {}}, {})
        except:
            return False
    
    def _get_nested_value(self, path: str) -> Any:
        """从上下文中获取嵌套值 task_id.field"""
        parts = path.split(".")
        if len(parts) != 2:
            return None
        
        task_id, field = parts
        task_result = self.context.get(task_id, {})
        return task_result.get(field)

动态编排则是更高级的能力——在运行过程中,根据实际情况动态调整后续任务的参数或路径。

比如:如果某个商品在上架过程中被平台拦截了,编排引擎可以自动跳过该商品的后续处理,而不是让整个流程卡死。


七、事件驱动:让编排“活”起来

上面的编排引擎是“拉模式”的——你提交一个工作流,引擎把它跑完。

但真实的业务场景中,很多流程是“推模式”的——某个事件发生了,触发一系列动作。

比如:

  • 新订单产生 → 触发订单处理工作流
  • 库存低于阈值 → 触发补货工作流
  • 平台活动开始 → 触发批量改价工作流

我们的事件驱动架构是这样的:

# event_driven_flow.py - 事件驱动的工作流触发器
from typing import Dict, Any, Callable, List
from dataclasses import dataclass
import threading
import queue
import json

@dataclass
class Event:
    """事件对象"""
    event_type: str
    payload: Dict[str, Any]
    source: str
    timestamp: float

class EventBus:
    """事件总线 - 发布/订阅模式"""
    
    def __init__(self):
        self._subscribers: Dict[str, List[Callable]] = {}
        self._queue = queue.Queue()
        self._running = True
        
        # 启动事件分发线程
        threading.Thread(target=self._dispatch_loop, daemon=True).start()
    
    def subscribe(self, event_type: str, handler: Callable):
        """订阅事件"""
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        self._subscribers[event_type].append(handler)
    
    def publish(self, event: Event):
        """发布事件"""
        self._queue.put(event)
    
    def _dispatch_loop(self):
        """事件分发主循环"""
        while self._running:
            try:
                event = self._queue.get(timeout=1)
                handlers = self._subscribers.get(event.event_type, [])
                for handler in handlers:
                    threading.Thread(target=handler, args=(event,), daemon=True).start()
            except queue.Empty:
                continue

class FlowTrigger:
    """工作流触发器 - 将事件映射到工作流"""
    
    def __init__(self, event_bus: EventBus, flow_engine: FlowEngine):
        self.event_bus = event_bus
        self.flow_engine = flow_engine
        self._mappings: Dict[str, str] = {}  # event_type -> flow_id
    
    def register(self, event_type: str, flow_id: str):
        """注册事件到工作流的映射"""
        self._mappings[event_type] = flow_id
        self.event_bus.subscribe(event_type, self._on_event)
    
    def _on_event(self, event: Event):
        """事件触发时执行对应的工作流"""
        flow_id = self._mappings.get(event.event_type)
        if not flow_id:
            return
        
        # 将事件payload作为工作流的输入参数
        self.flow_engine.run(flow_id, event.payload)

事件驱动的价值在于:

编排不再是“一次性”的——你提交一个工作流,它跑完就结束了。

而是“常驻”的——系统持续监听事件,一旦触发条件满足,自动拉起对应的工作流。

这才是真正意义上的“自动化运营”。


八、可观测性:让编排过程“看得见”

编排引擎比单任务调度更复杂的一个地方是——你不仅要看单个任务的状态,还要看整个DAG的执行进度

我们在编排引擎中内置了完整的可观测性能力:

实时进度追踪:

每个任务的状态变化都通过WebSocket推送到前端。

运营人员可以看到一个实时更新的DAG可视化图——绿色的是已完成、黄色的是运行中、红色的是失败、灰色的是等待中。

执行历史回溯:

每个工作流实例都有唯一的flow_instance_id

所有任务的执行日志、耗时、结果——全部关联到这个ID上。

排查问题时,只需要输入flow_instance_id,整个DAG的执行轨迹一目了然。

性能瓶颈分析:

编排引擎自动记录每个任务的执行耗时。

汇总之后可以生成一份“耗时分布报告”——哪个任务最慢、哪个环节是瓶颈。

优化不再靠猜,而是靠数据。


九、写在最后

从“面条式脚本”到“DAG编排引擎”,我们走了大概半年的时间。

这半年里最大的体会是:

编排不是“把流程拆碎”,而是“把流程变聪明”。

拆碎只是第一步。

真正的价值在于——让流程具备依赖感知、并行调度、条件分支、事件驱动、可观测性这些能力。

当这些能力全部到位之后,自动化系统就不再是一个“执行工具”了。

它变成了一个“运营大脑”。

运营人员只需要在编排面板上拖拽任务块、配置依赖关系。

系统自动处理执行顺序、并发调度、异常恢复、结果汇总。

这才是企业级自动化系统该有的样子。

不是让人去适应机器——而是让机器去理解人的意图。


作者:林焱

0
0
0
0
评论
未登录
暂无评论