影刀RPA店群自动化运营系统:可视化任务编排引擎与DSL设计实战

影刀RPA店群自动化运营系统:可视化任务编排引擎与DSL设计实战

店群自动化做到一定规模,会面临一个尴尬的局面:

运营的需求永远在变,开发的速度永远跟不上。

今天要加一个“上架前检查库存”,明天要“根据利润自动调价”,后天又要求“订单发货后发短信提醒”。每个需求都要开发去改影刀脚本,测试、发布、验证,一套流程走下来两天过去了。

我们当时就在想:能不能让运营自己编排任务流程?

就像搭积木一样,把已有的影刀原子操作(登录店铺、上架商品、修改价格、处理订单)组合起来,加上条件判断、循环、等待,形成一个新的自动化流程。

这篇文章不讲底层的调度和资源管理。

专门聊聊我们是如何设计一套面向运营的可视化任务编排引擎,让店群自动化的能力从“开发专属”变成“运营自助”。

picture.image

适用场景:需要将自动化能力开放给非技术人员的企业级店群系统。 技术栈:Python + JSON Schema + React Flow + 影刀RPA原子脚本。


一、为什么需要编排引擎?

picture.image

先看一个真实场景。

运营想实现一个“智能上下架”策略:每天早上8点,检查库存低于10件的商品,自动下架;同时,从备选商品池里随机选择等量的商品上架。

如果用传统方式,开发需要写一个影刀脚本,包含:登录店铺、遍历商品列表、判断库存、执行下架、随机选择备选池商品、执行上架。每个店铺都要跑一遍。需求变更时又要改代码。

但如果有一个编排引擎,运营可以这样描述流程:

picture.image

触发器: 每天8:00
对于每个店铺:
  执行原子操作: 登录店铺
  执行原子操作: 获取低库存商品列表 (参数: 阈值=10)
  对列表中的每个商品:
    执行原子操作: 下架商品
    
![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/731ce372f7a84246bddd35d3a3528f11~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1780503529&x-signature=%2BXT88%2FmkvBIesJBUcEcpaCO9TRY%3D)
  执行原子操作: 获取备选商品池
  随机选择 N 个商品 (N = 下架数量)
  对每个选中商品:
    执行原子操作: 上架商品

运营不需要懂代码,只需要理解这些“原子操作”的含义和连接规则。

picture.image 编排引擎的核心价值:

picture.image

  • 降低迭代成本:新策略上线从2天缩短到10分钟
  • 减少错误:原子操作经过充分测试,组合出错概率低
  • 可追溯:每次执行的流程图版本可以被记录和回放

二、整体架构:三层模型

我们把编排引擎分成三层:

第一层:原子操作层

最小的执行单元,对应一个影刀RPA脚本。比如“登录拼多多店铺”、“获取订单列表”、“上架商品”。

每个原子操作有明确的输入参数(如店铺ID、商品ID)和输出结果(如订单数量、操作状态)。

第二层:编排层

运行时的流程引擎,负责解析DSL(领域特定语言)、管理任务状态、调度原子操作、处理分支和循环。

第三层:可视化层

前端画布,运营通过拖拽节点、连接线条来创建流程。流程保存为JSON格式的DSL,下发给编排引擎执行。

三层之间解耦:可视化层只负责编辑和展示DSL,编排层只负责执行DSL,原子操作层只负责具体的浏览器自动化。


三、DSL设计:用JSON描述一切

为了让运营编辑的流程图可以被程序执行,我们需要一种结构化的流程描述语言。

我们选择了JSON Schema,因为它易于存储、解析和校验。

一个典型的流程DSL结构:

{
  "version": "1.0",
  "name": "智能上下架策略",
  "trigger": {
    "type": "cron",
    "value": "0 8 * * *"
  },
  "graph": {
    "nodes": [
      {
        "id": "node_1",
        "type": "start",
        "label": "开始"
      },
      {
        "id": "node_2",
        "type": "action",
        "action_id": "login_shop",
        "label": "登录店铺",
        "inputs": {
          "shop_id": "{{trigger.shop_id}}"
        },
        "outputs": {
          "login_status": "success"
        }
      },
      {
        "id": "node_3",
        "type": "action",
        "action_id": "get_low_stock_products",
        "label": "获取低库存商品",
        "inputs": {
          "threshold": 10
        },
        "outputs": {
          "product_list": "low_stock_items"
        }
      },
      {
        "id": "node_4",
        "type": "foreach",
        "collection": "{{low_stock_items}}",
        "item_var": "product",
        "subgraph": {
          "nodes": [
            {
              "id": "node_4_1",
              "type": "action",
              "action_id": "off_shelf",
              "inputs": {
                "product_id": "{{product.id}}"
              }
            }
          ]
        }
      },
      {
        "id": "node_5",
        "type": "action",
        "action_id": "get_backup_pool",
        "label": "获取备选商品池",
        "outputs": {
          "pool": "backup_products"
        }
      },
      {
        "id": "node_6",
        "type": "expression",
        "script": "random.sample(backup_products, len(low_stock_items))",
        "outputs": {
          "selected": "to_upload"
        }
      },
      {
        "id": "node_7",
        "type": "foreach",
        "collection": "{{to_upload}}",
        "item_var": "new_product",
        "subgraph": {
          "nodes": [
            {
              "id": "node_7_1",
              "type": "action",
              "action_id": "on_shelf",
              "inputs": {
                "product_data": "{{new_product}}"
              }
            }
          ]
        }
      },
      {
        "id": "node_8",
        "type": "end"
      }
    ],
    "edges": [
      {"from": "node_1", "to": "node_2"},
      {"from": "node_2", "to": "node_3"},
      {"from": "node_3", "to": "node_4"},
      {"from": "node_4", "to": "node_5"},
      {"from": "node_5", "to": "node_6"},
      {"from": "node_6", "to": "node_7"},
      {"from": "node_7", "to": "node_8"}
    ]
  }
}

DSL支持的节点类型:

  • start/end:流程起点和终点
  • action:原子操作,调用影刀脚本
  • condition:条件分支(if/else)
  • foreach:循环遍历集合
  • expression:执行Python表达式,用于数据转换
  • wait:等待一段时间或直到某个条件满足
  • subflow:调用子流程(复用)

变量引用使用{{variable.path}}语法,支持嵌套属性。

这个DSL的设计原则是:可读性强、容易序列化、便于前端渲染


四、编排引擎的实现

编排引擎的核心是一个状态机驱动的解释器

它读取DSL,维护当前执行位置和变量上下文,按顺序执行节点,处理跳转和循环。

# orchestration_engine.py
from typing import Dict, Any, List
import json
import time
from copy import deepcopy

class OrchestrationEngine:
    def __init__(self, dsl: dict, context: dict = None):
        self.dsl = dsl
        self.context = context or {}  # 全局变量存储
        self.nodes = {node["id"]: node for node in dsl["graph"]["nodes"]}
        self.edges = dsl["graph"]["edges"]
        self.current_node_id = self._find_start_node()
        self.node_status = {}  # 记录每个节点的执行状态
        
    def _find_start_node(self):
        for node in self.nodes.values():
            if node["type"] == "start":
                return node["id"]
        raise ValueError("No start node found")
        
    def _get_next_node(self, current_id):
        for edge in self.edges:
            if edge["from"] == current_id:
                return edge["to"]
        return None
        
    def _resolve_variables(self, value, context):
        """将字符串中的 {{var}} 替换为实际值"""
        import re
        def replacer(match):
            path = match.group(1).strip()
            parts = path.split('.')
            val = context
            for part in parts:
                if isinstance(val, dict):
                    val = val.get(part)
                elif isinstance(val, list):
                    try:
                        idx = int(part)
                        val = val[idx]
                    except:
                        val = None
                else:
                    val = getattr(val, part, None)
            return str(val) if val is not None else ""
        return re.sub(r'\{\{\s*([^}]+)\s*\}\}', replacer, str(value))
        
    def _execute_action(self, node: dict, context: dict) -> dict:
        """调用影刀RPA原子脚本"""
        action_id = node["action_id"]
        inputs = node.get("inputs", {})
        # 解析输入参数中的变量引用
        resolved_inputs = {}
        for key, val in inputs.items():
            if isinstance(val, str):
                resolved_inputs[key] = self._resolve_variables(val, context)
            else:
                resolved_inputs[key] = val
                
        # 调用原子操作(通过HTTP调用本地服务或直接运行脚本)
        result = self._call_action(action_id, resolved_inputs)
        
        # 将输出写入context
        outputs = node.get("outputs", {})
        for out_key, var_name in outputs.items():
            if out_key in result:
                context[var_name] = result[out_key]
        return result
        
    def _execute_foreach(self, node: dict, context: dict) -> str:
        """循环执行子图,返回循环结束后的下一个节点"""
        collection_var = node["collection"].strip("{}")
        collection = self._resolve_variable_path(collection_var, context)
        if not isinstance(collection, list):
            collection = []
        
        item_var = node.get("item_var", "item")
        subgraph = node["subgraph"]
        
        for item in collection:
            loop_context = deepcopy(context)
            loop_context[item_var] = item
            # 递归执行子图的节点
            sub_engine = OrchestrationEngine(
                {"graph": subgraph, "version": "1.0"},
                loop_context
            )
            final_context = sub_engine.run()
            # 将子图产生的变量合并回外层(非冲突的)
            for key, val in final_context.items():
                if key not in context:
                    context[key] = val
        
        # 返回foreach节点后续的节点ID
        return self._get_next_node(node["id"])
        
    def run(self) -> dict:
        """执行整个流程,返回最终的context"""
        current_id = self.current_node_id
        while current_id:
            node = self.nodes.get(current_id)
            if not node:
                break
                
            node_type = node["type"]
            self.node_status[current_id] = "running"
            
            if node_type == "start":
                pass
            elif node_type == "action":
                self._execute_action(node, self.context)
            elif node_type == "foreach":
                next_id = self._execute_foreach(node, self.context)
                self.node_status[current_id] = "success"
                current_id = next_id
                continue
            elif node_type == "condition":
                condition = node.get("condition", "")
                cond_result = self._evaluate_condition(condition, self.context)
                branch = node["branches"]["true" if cond_result else "false"]
                current_id = branch
                continue
            elif node_type == "expression":
                script = node["script"]
                result = self._evaluate_expression(script, self.context)
                outputs = node.get("outputs", {})
                for out_key, var_name in outputs.items():
                    self.context[var_name] = result.get(out_key, result)
            elif node_type == "wait":
                seconds = int(self._resolve_variables(node.get("seconds", "0"), self.context))
                time.sleep(seconds)
            elif node_type == "end":
                break
                
            self.node_status[current_id] = "success"
            current_id = self._get_next_node(current_id)
            
        return self.context

这个引擎的实现约300行代码,却支撑了我们80%的运营自动化需求。关键是它让流程逻辑和数据流变得透明。


五、可视化编辑器的设计

前端编辑器基于React Flow搭建。运营看到的是一个画布,左侧是原子操作列表,中间是流程图,右侧是属性面板。

原子操作的注册信息来自后端的一个API,包含:

  • action_id:唯一标识
  • 名称和描述
  • 输入参数定义(类型、是否必填、默认值)
  • 输出字段定义

运营拖拽一个原子操作到画布,填写输入参数(支持变量引用),连接输出端到下一个节点的输入端。

编辑器保存时,将整个画布转换成前面定义的JSON DSL,存储到数据库。

我们还在编辑器里加入了一个**“实时校验”**功能:

  • 检查所有必需的输入参数是否已填写
  • 检查引用的变量是否在之前的节点中有定义
  • 检查是否存在循环依赖(A->B->A)
  • 检查原子操作的版本兼容性

运营编辑时就能发现错误,不用等到运行时报错。


六、原子操作的粒度设计

原子操作的粒度直接影响编排的灵活性。

太细:运营需要拖拽几十个节点才能完成一个简单流程,太繁琐。

太粗:可组合性差,稍微变化的场景就需要新原子操作。

我们的原则是:以“业务实体操作”为单位

比如“上架商品”是一个原子操作,而不是“点击上架按钮”、“填写标题”、“上传图片”拆成三个。

每个原子操作内部已经包含了完整的异常处理、重试、日志上报。运营不需要关心这些底层细节。

原子操作的定义示例(通过API注册):

{
  "action_id": "pdd_off_shelf",
  "name": "拼多多下架商品",
  "description": "根据商品ID将商品从拼多多店铺下架",
  "category": "商品管理",
  "inputs": [
    {"name": "shop_id", "type": "string", "required": true, "description": "店铺ID"},
    {"name": "product_id", "type": "string", "required": true, "description": "商品ID"}
  ],
  "outputs": [
    {"name": "success", "type": "boolean", "description": "是否下架成功"},
    {"name": "off_shelf_time", "type": "string", "description": "下架时间"}
  ],
  "timeout_seconds": 60
}

我们维护了一个原子操作市场,运营可以根据需要浏览和添加。


七、运行时监控与断点调试

流程运行时,运营需要看到当前执行到哪一步,每个节点的输入输出是什么,如果失败在哪里失败。

编排引擎在每个节点执行前后,将节点ID、时间、输入参数、输出结果、异常信息写入Redis。前端通过WebSocket实时推送。

我们还在编辑器里实现了断点调试功能:运营可以在某个节点上设置断点,引擎执行到该节点时会暂停,等待人工点击“继续”。暂停期间,运营可以查看当前的变量上下文,甚至可以修改变量值后继续执行。

这个功能在调试复杂流程时非常有用,运营自己就能定位问题,不用再找开发看日志。


八、真实踩坑与教训

坑1:变量作用域混乱

早期版本只有全局context,循环内部的变量会影响外部,导致意料之外的覆盖。

解决:每个foreach创建独立的子context,子context可以读取外层变量,但写操作默认不污染外层(除非显式声明export)。实现深拷贝+写时复制。

坑2:循环性能问题

有一个运营写了一个流程,遍历500个商品,每个商品内部又调用了一个耗时的原子操作。流程跑了2个小时。

解决:在编辑器中增加“预估执行时间”提示。同时引擎支持循环内的并行执行(需要原子操作保证幂等),运营可以选择并行度。

坑3:DSL版本升级导致存量流程失效

我们迭代DSL schema时,旧版本的流程无法解析。

解决:引入版本兼容层,每个DSL都带有version字段,引擎根据版本号选择不同的解析器。同时提供“一键升级”工具,将老流程自动转换成新版语法。

坑4:运营编写的条件表达式过于复杂

有些运营在condition里写很长的Python表达式,难调试。

解决:限制条件表达式的复杂度,并提供“可视化条件编辑器”,用下拉框组合条件(如“商品库存 < 10”),自动生成表达式。


九、从编排引擎到智能推荐

随着运营在平台上创建了越来越多的流程,我们发现了一个有趣的现象:

很多流程是相似的。比如“当库存低于N时自动下架”这个模式,在不同的店铺和不同的商品分类中反复出现。

于是我们增加了一个流程模板推荐功能:当运营新建流程时,系统根据当前店铺的业务类型和历史行为,推荐相似的模板,运营可以一键复用。

更进一步,我们训练了一个简单的分类模型,根据运营填写的流程名称和描述,自动推荐最有可能用到的原子操作组合。

这个方向还在探索中,但已经显著降低了新运营的学习成本。


十、总结:自动化的终局是“让用户定义自动化”

店群自动化的最终形态,不是开发写出所有可能的脚本。

而是提供一套安全、灵活、易用的编排工具,让最了解业务的人——运营——自己去定义自动化策略。

编排引擎 + 原子操作库 + 可视化编辑器,这三者组合起来,形成了一条“自动化能力民主化”的路径。

我们从这套系统中获得的收益:

  • 需求响应时间:从平均2天降到15分钟
  • 脚本维护数量:从200+个减少到50个原子操作
  • 运营自主上线策略数:每月超过40个

如果你正在为店群系统的需求变更速度发愁,不妨考虑引入编排引擎。

它前期投入有一定工作量(设计和实现DSL、编辑器、引擎),但长期回报极高。

希望这篇文章给你一些启发。


作者:林焱

0
0
0
0
评论
未登录
暂无评论