店群自动化做到一定规模,会面临一个尴尬的局面:
运营的需求永远在变,开发的速度永远跟不上。
今天要加一个“上架前检查库存”,明天要“根据利润自动调价”,后天又要求“订单发货后发短信提醒”。每个需求都要开发去改影刀脚本,测试、发布、验证,一套流程走下来两天过去了。
我们当时就在想:能不能让运营自己编排任务流程?
就像搭积木一样,把已有的影刀原子操作(登录店铺、上架商品、修改价格、处理订单)组合起来,加上条件判断、循环、等待,形成一个新的自动化流程。
这篇文章不讲底层的调度和资源管理。
专门聊聊我们是如何设计一套面向运营的可视化任务编排引擎,让店群自动化的能力从“开发专属”变成“运营自助”。
适用场景:需要将自动化能力开放给非技术人员的企业级店群系统。 技术栈:Python + JSON Schema + React Flow + 影刀RPA原子脚本。
一、为什么需要编排引擎?
先看一个真实场景。
运营想实现一个“智能上下架”策略:每天早上8点,检查库存低于10件的商品,自动下架;同时,从备选商品池里随机选择等量的商品上架。
如果用传统方式,开发需要写一个影刀脚本,包含:登录店铺、遍历商品列表、判断库存、执行下架、随机选择备选池商品、执行上架。每个店铺都要跑一遍。需求变更时又要改代码。
但如果有一个编排引擎,运营可以这样描述流程:
触发器: 每天8:00
对于每个店铺:
执行原子操作: 登录店铺
执行原子操作: 获取低库存商品列表 (参数: 阈值=10)
对列表中的每个商品:
执行原子操作: 下架商品

执行原子操作: 获取备选商品池
随机选择 N 个商品 (N = 下架数量)
对每个选中商品:
执行原子操作: 上架商品
运营不需要懂代码,只需要理解这些“原子操作”的含义和连接规则。
编排引擎的核心价值:
- 降低迭代成本:新策略上线从2天缩短到10分钟
- 减少错误:原子操作经过充分测试,组合出错概率低
- 可追溯:每次执行的流程图版本可以被记录和回放
二、整体架构:三层模型
我们把编排引擎分成三层:
第一层:原子操作层
最小的执行单元,对应一个影刀RPA脚本。比如“登录拼多多店铺”、“获取订单列表”、“上架商品”。
每个原子操作有明确的输入参数(如店铺ID、商品ID)和输出结果(如订单数量、操作状态)。
第二层:编排层
运行时的流程引擎,负责解析DSL(领域特定语言)、管理任务状态、调度原子操作、处理分支和循环。
第三层:可视化层
前端画布,运营通过拖拽节点、连接线条来创建流程。流程保存为JSON格式的DSL,下发给编排引擎执行。
三层之间解耦:可视化层只负责编辑和展示DSL,编排层只负责执行DSL,原子操作层只负责具体的浏览器自动化。
三、DSL设计:用JSON描述一切
为了让运营编辑的流程图可以被程序执行,我们需要一种结构化的流程描述语言。
我们选择了JSON Schema,因为它易于存储、解析和校验。
一个典型的流程DSL结构:
{
"version": "1.0",
"name": "智能上下架策略",
"trigger": {
"type": "cron",
"value": "0 8 * * *"
},
"graph": {
"nodes": [
{
"id": "node_1",
"type": "start",
"label": "开始"
},
{
"id": "node_2",
"type": "action",
"action_id": "login_shop",
"label": "登录店铺",
"inputs": {
"shop_id": "{{trigger.shop_id}}"
},
"outputs": {
"login_status": "success"
}
},
{
"id": "node_3",
"type": "action",
"action_id": "get_low_stock_products",
"label": "获取低库存商品",
"inputs": {
"threshold": 10
},
"outputs": {
"product_list": "low_stock_items"
}
},
{
"id": "node_4",
"type": "foreach",
"collection": "{{low_stock_items}}",
"item_var": "product",
"subgraph": {
"nodes": [
{
"id": "node_4_1",
"type": "action",
"action_id": "off_shelf",
"inputs": {
"product_id": "{{product.id}}"
}
}
]
}
},
{
"id": "node_5",
"type": "action",
"action_id": "get_backup_pool",
"label": "获取备选商品池",
"outputs": {
"pool": "backup_products"
}
},
{
"id": "node_6",
"type": "expression",
"script": "random.sample(backup_products, len(low_stock_items))",
"outputs": {
"selected": "to_upload"
}
},
{
"id": "node_7",
"type": "foreach",
"collection": "{{to_upload}}",
"item_var": "new_product",
"subgraph": {
"nodes": [
{
"id": "node_7_1",
"type": "action",
"action_id": "on_shelf",
"inputs": {
"product_data": "{{new_product}}"
}
}
]
}
},
{
"id": "node_8",
"type": "end"
}
],
"edges": [
{"from": "node_1", "to": "node_2"},
{"from": "node_2", "to": "node_3"},
{"from": "node_3", "to": "node_4"},
{"from": "node_4", "to": "node_5"},
{"from": "node_5", "to": "node_6"},
{"from": "node_6", "to": "node_7"},
{"from": "node_7", "to": "node_8"}
]
}
}
DSL支持的节点类型:
- start/end:流程起点和终点
- action:原子操作,调用影刀脚本
- condition:条件分支(if/else)
- foreach:循环遍历集合
- expression:执行Python表达式,用于数据转换
- wait:等待一段时间或直到某个条件满足
- subflow:调用子流程(复用)
变量引用使用{{variable.path}}语法,支持嵌套属性。
这个DSL的设计原则是:可读性强、容易序列化、便于前端渲染。
四、编排引擎的实现
编排引擎的核心是一个状态机驱动的解释器。
它读取DSL,维护当前执行位置和变量上下文,按顺序执行节点,处理跳转和循环。
# orchestration_engine.py
from typing import Dict, Any, List
import json
import time
from copy import deepcopy
class OrchestrationEngine:
def __init__(self, dsl: dict, context: dict = None):
self.dsl = dsl
self.context = context or {} # 全局变量存储
self.nodes = {node["id"]: node for node in dsl["graph"]["nodes"]}
self.edges = dsl["graph"]["edges"]
self.current_node_id = self._find_start_node()
self.node_status = {} # 记录每个节点的执行状态
def _find_start_node(self):
for node in self.nodes.values():
if node["type"] == "start":
return node["id"]
raise ValueError("No start node found")
def _get_next_node(self, current_id):
for edge in self.edges:
if edge["from"] == current_id:
return edge["to"]
return None
def _resolve_variables(self, value, context):
"""将字符串中的 {{var}} 替换为实际值"""
import re
def replacer(match):
path = match.group(1).strip()
parts = path.split('.')
val = context
for part in parts:
if isinstance(val, dict):
val = val.get(part)
elif isinstance(val, list):
try:
idx = int(part)
val = val[idx]
except:
val = None
else:
val = getattr(val, part, None)
return str(val) if val is not None else ""
return re.sub(r'\{\{\s*([^}]+)\s*\}\}', replacer, str(value))
def _execute_action(self, node: dict, context: dict) -> dict:
"""调用影刀RPA原子脚本"""
action_id = node["action_id"]
inputs = node.get("inputs", {})
# 解析输入参数中的变量引用
resolved_inputs = {}
for key, val in inputs.items():
if isinstance(val, str):
resolved_inputs[key] = self._resolve_variables(val, context)
else:
resolved_inputs[key] = val
# 调用原子操作(通过HTTP调用本地服务或直接运行脚本)
result = self._call_action(action_id, resolved_inputs)
# 将输出写入context
outputs = node.get("outputs", {})
for out_key, var_name in outputs.items():
if out_key in result:
context[var_name] = result[out_key]
return result
def _execute_foreach(self, node: dict, context: dict) -> str:
"""循环执行子图,返回循环结束后的下一个节点"""
collection_var = node["collection"].strip("{}")
collection = self._resolve_variable_path(collection_var, context)
if not isinstance(collection, list):
collection = []
item_var = node.get("item_var", "item")
subgraph = node["subgraph"]
for item in collection:
loop_context = deepcopy(context)
loop_context[item_var] = item
# 递归执行子图的节点
sub_engine = OrchestrationEngine(
{"graph": subgraph, "version": "1.0"},
loop_context
)
final_context = sub_engine.run()
# 将子图产生的变量合并回外层(非冲突的)
for key, val in final_context.items():
if key not in context:
context[key] = val
# 返回foreach节点后续的节点ID
return self._get_next_node(node["id"])
def run(self) -> dict:
"""执行整个流程,返回最终的context"""
current_id = self.current_node_id
while current_id:
node = self.nodes.get(current_id)
if not node:
break
node_type = node["type"]
self.node_status[current_id] = "running"
if node_type == "start":
pass
elif node_type == "action":
self._execute_action(node, self.context)
elif node_type == "foreach":
next_id = self._execute_foreach(node, self.context)
self.node_status[current_id] = "success"
current_id = next_id
continue
elif node_type == "condition":
condition = node.get("condition", "")
cond_result = self._evaluate_condition(condition, self.context)
branch = node["branches"]["true" if cond_result else "false"]
current_id = branch
continue
elif node_type == "expression":
script = node["script"]
result = self._evaluate_expression(script, self.context)
outputs = node.get("outputs", {})
for out_key, var_name in outputs.items():
self.context[var_name] = result.get(out_key, result)
elif node_type == "wait":
seconds = int(self._resolve_variables(node.get("seconds", "0"), self.context))
time.sleep(seconds)
elif node_type == "end":
break
self.node_status[current_id] = "success"
current_id = self._get_next_node(current_id)
return self.context
这个引擎的实现约300行代码,却支撑了我们80%的运营自动化需求。关键是它让流程逻辑和数据流变得透明。
五、可视化编辑器的设计
前端编辑器基于React Flow搭建。运营看到的是一个画布,左侧是原子操作列表,中间是流程图,右侧是属性面板。
原子操作的注册信息来自后端的一个API,包含:
- action_id:唯一标识
- 名称和描述
- 输入参数定义(类型、是否必填、默认值)
- 输出字段定义
运营拖拽一个原子操作到画布,填写输入参数(支持变量引用),连接输出端到下一个节点的输入端。
编辑器保存时,将整个画布转换成前面定义的JSON DSL,存储到数据库。
我们还在编辑器里加入了一个**“实时校验”**功能:
- 检查所有必需的输入参数是否已填写
- 检查引用的变量是否在之前的节点中有定义
- 检查是否存在循环依赖(A->B->A)
- 检查原子操作的版本兼容性
运营编辑时就能发现错误,不用等到运行时报错。
六、原子操作的粒度设计
原子操作的粒度直接影响编排的灵活性。
太细:运营需要拖拽几十个节点才能完成一个简单流程,太繁琐。
太粗:可组合性差,稍微变化的场景就需要新原子操作。
我们的原则是:以“业务实体操作”为单位。
比如“上架商品”是一个原子操作,而不是“点击上架按钮”、“填写标题”、“上传图片”拆成三个。
每个原子操作内部已经包含了完整的异常处理、重试、日志上报。运营不需要关心这些底层细节。
原子操作的定义示例(通过API注册):
{
"action_id": "pdd_off_shelf",
"name": "拼多多下架商品",
"description": "根据商品ID将商品从拼多多店铺下架",
"category": "商品管理",
"inputs": [
{"name": "shop_id", "type": "string", "required": true, "description": "店铺ID"},
{"name": "product_id", "type": "string", "required": true, "description": "商品ID"}
],
"outputs": [
{"name": "success", "type": "boolean", "description": "是否下架成功"},
{"name": "off_shelf_time", "type": "string", "description": "下架时间"}
],
"timeout_seconds": 60
}
我们维护了一个原子操作市场,运营可以根据需要浏览和添加。
七、运行时监控与断点调试
流程运行时,运营需要看到当前执行到哪一步,每个节点的输入输出是什么,如果失败在哪里失败。
编排引擎在每个节点执行前后,将节点ID、时间、输入参数、输出结果、异常信息写入Redis。前端通过WebSocket实时推送。
我们还在编辑器里实现了断点调试功能:运营可以在某个节点上设置断点,引擎执行到该节点时会暂停,等待人工点击“继续”。暂停期间,运营可以查看当前的变量上下文,甚至可以修改变量值后继续执行。
这个功能在调试复杂流程时非常有用,运营自己就能定位问题,不用再找开发看日志。
八、真实踩坑与教训
坑1:变量作用域混乱
早期版本只有全局context,循环内部的变量会影响外部,导致意料之外的覆盖。
解决:每个foreach创建独立的子context,子context可以读取外层变量,但写操作默认不污染外层(除非显式声明export)。实现深拷贝+写时复制。
坑2:循环性能问题
有一个运营写了一个流程,遍历500个商品,每个商品内部又调用了一个耗时的原子操作。流程跑了2个小时。
解决:在编辑器中增加“预估执行时间”提示。同时引擎支持循环内的并行执行(需要原子操作保证幂等),运营可以选择并行度。
坑3:DSL版本升级导致存量流程失效
我们迭代DSL schema时,旧版本的流程无法解析。
解决:引入版本兼容层,每个DSL都带有version字段,引擎根据版本号选择不同的解析器。同时提供“一键升级”工具,将老流程自动转换成新版语法。
坑4:运营编写的条件表达式过于复杂
有些运营在condition里写很长的Python表达式,难调试。
解决:限制条件表达式的复杂度,并提供“可视化条件编辑器”,用下拉框组合条件(如“商品库存 < 10”),自动生成表达式。
九、从编排引擎到智能推荐
随着运营在平台上创建了越来越多的流程,我们发现了一个有趣的现象:
很多流程是相似的。比如“当库存低于N时自动下架”这个模式,在不同的店铺和不同的商品分类中反复出现。
于是我们增加了一个流程模板推荐功能:当运营新建流程时,系统根据当前店铺的业务类型和历史行为,推荐相似的模板,运营可以一键复用。
更进一步,我们训练了一个简单的分类模型,根据运营填写的流程名称和描述,自动推荐最有可能用到的原子操作组合。
这个方向还在探索中,但已经显著降低了新运营的学习成本。
十、总结:自动化的终局是“让用户定义自动化”
店群自动化的最终形态,不是开发写出所有可能的脚本。
而是提供一套安全、灵活、易用的编排工具,让最了解业务的人——运营——自己去定义自动化策略。
编排引擎 + 原子操作库 + 可视化编辑器,这三者组合起来,形成了一条“自动化能力民主化”的路径。
我们从这套系统中获得的收益:
- 需求响应时间:从平均2天降到15分钟
- 脚本维护数量:从200+个减少到50个原子操作
- 运营自主上线策略数:每月超过40个
如果你正在为店群系统的需求变更速度发愁,不妨考虑引入编排引擎。
它前期投入有一定工作量(设计和实现DSL、编辑器、引擎),但长期回报极高。
希望这篇文章给你一些启发。
作者:林焱
