我们早期每个RPA流程都是硬编码在影刀编辑器里的。
运营提一个新需求,比如“同步订单前先检查店铺是否被限制发货”,开发要去改流程、测试、发布。
一套流程走下来,最快也要半天。
后来店铺数量多了,运营每天都会提小调整,开发和运维被拖得疲惫不堪。
真正的问题不是RPA跑不起来,而是业务流程变更的速度跟不上运营的需求。
我们花了两个月重构了一套配置驱动的流程编排系统。
这篇文章就讲这套系统的设计思路和落地细节。
核心思想:将RPA执行逻辑与业务流程配置分离,实现热更新、零代码调整。
一、从硬编码到声明式配置
先看一个典型的硬编码RPA流程:
打开浏览器 → 登录店铺 → 进入订单页面 → 点击同步按钮 → 导出订单 → 存入数据库
任何一步的顺序变化或条件判断,都需要改流程文件。
我们把它抽象成了配置化的步骤序列。
每个步骤是一个独立的能力单元,可以像搭积木一样组合。
# order_sync_flow.yaml
name: pdd_order_sync
version: 1.2.3
platform: pdd
steps:

- action: browser.launch
params:
headless: false
profile_id: "${shop.profile_id}"
- action: page.goto
params:
url: "https://mms.pinduoduo.com/orders"

wait_until: networkidle
- action: condition.check_login
params:
check_selector: ".login-btn"
on_failed: goto login_flow
- action: click
params:
selector: "#sync-order-btn"
timeout: 10
- action: wait_for
params:
selector: ".sync-success"
timeout: 30
- action: extract.table
params:
selector: ".order-table"
fields:
- order_no
- amount
- status
- action: db.insert
params:
table: orders
data: "${extracted_data}"
error_handlers:
- error_type: timeout
retry: 3
retry_delay: 5
- error_type: login_expired
action: refresh_login
这个YAML文件可以直接被调度系统解析执行。
运营想调整等待时间、增加一个检查步骤,只需要改YAML文件,系统自动热加载。
二、编排引擎:将配置翻译成RPA调用
编排引擎的核心是一个动作解析器。
它读取YAML配置,将每个action映射到对应的影刀RPA流程或Python函数。
# flow_engine.py
import yaml
import json
import re
from typing import Dict, Any, List
from pathlib import Path
import subprocess
import logging
logger = logging.getLogger(__name__)
class ActionExecutor:
"""动作执行器 - 将配置中的action转换为实际调用"""
ACTION_MAP = {
"browser.launch": "rpa_browser_launch",
"page.goto": "rpa_page_goto",
"click": "rpa_click",
"wait_for": "rpa_wait_for_selector",
"extract.table": "rpa_extract_table",
"db.insert": "python_db_insert", # 本地Python执行,不调RPA
"condition.check_login": "rpa_check_element_exists"
}
def __init__(self, rpa_cli_path: str = "youdao_rpa_cli"):
self.rpa_cli = rpa_cli_path
def execute(self, action: str, params: Dict, context: Dict) -> Dict:
"""执行单个动作,返回结果"""
handler = self.ACTION_MAP.get(action)
if not handler:
raise ValueError(f"Unknown action: {action}")
# 替换上下文变量 ${xxx}
resolved_params = self._resolve_variables(params, context)
if handler.startswith("rpa_"):
return self._call_rpa(handler, resolved_params, context)
elif handler.startswith("python_"):
return self._call_python(handler, resolved_params, context)
else:
raise ValueError(f"Unknown handler type: {handler}")
def _call_rpa(self, flow_name: str, params: Dict, context: Dict) -> Dict:
"""调用影刀RPA流程"""
# 影刀RPA支持传入JSON参数
cmd = [
self.rpa_cli, "run",
"--flow", flow_name,
"--param", json.dumps({
"params": params,
"context": context.get("shop_context", {})
}),
"--timeout", str(params.get("timeout", 60))
]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=params.get("timeout", 60) + 5)
if proc.returncode != 0:
raise RuntimeError(f"RPA flow {flow_name} failed: {proc.stderr}")
# 期望返回JSON
return json.loads(proc.stdout) if proc.stdout else {}
def _call_python(self, func_name: str, params: Dict, context: Dict) -> Dict:
"""执行本地Python函数(不需要浏览器的操作)"""
if func_name == "python_db_insert":
# 实际会调用数据库插入模块
return {"status": "inserted", "count": len(params.get("data", []))}
return {}
def _resolve_variables(self, obj: Any, context: Dict) -> Any:
"""递归替换 ${variable} 形式的变量"""
if isinstance(obj, str):
pattern = r'\$\{([^}]+)\}'
def replacer(match):
var_path = match.group(1)
return self._get_nested_value(context, var_path)
return re.sub(pattern, replacer, obj)
elif isinstance(obj, dict):
return {k: self._resolve_variables(v, context) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._resolve_variables(item, context) for item in obj]
return obj
def _get_nested_value(self, data: Dict, path: str):
parts = path.split('.')
value = data
for p in parts:
if isinstance(value, dict):
value = value.get(p, "")
else:
return ""
return value
上面是单步执行,还需要一个流程编排器来管理步骤的顺序、条件跳转、循环、错误处理。
# flow_orchestrator.py
class FlowOrchestrator:
"""流程编排器 - 解析YAML并驱动执行流程"""
def __init__(self, executor: ActionExecutor):
self.executor = executor
self.step_results = []
def run(self, flow_config: Dict, initial_context: Dict) -> Dict:
"""执行整个流程"""
steps = flow_config.get("steps", [])
context = initial_context.copy()
context.setdefault("step_results", [])
pc = 0 # program counter
while pc < len(steps):
step = steps[pc]
action = step.get("action")
params = step.get("params", {})
try:
# 执行动作
result = self.executor.execute(action, params, context)
context["step_results"].append({
"step": pc,
"action": action,
"result": result
})
# 如果定义了 extract,将结果存入context
if "extract" in step:
for var_name, json_path in step["extract"].items():
context[var_name] = self._json_extract(result, json_path)
pc += 1
except Exception as e:
# 错误处理
error_handled = self._handle_error(flow_config, e, step, context)
if not error_handled:
raise
# 如果错误处理要求重试同一行,pc不变;否则pc++由handler决定
# 简化:假设重试后继续下一行
pc += 1
return context
def _handle_error(self, flow_config, error, step, context) -> bool:
"""根据配置的错误处理策略进行恢复"""
handlers = flow_config.get("error_handlers", [])
error_type = self._classify_error(error)
for handler in handlers:
if handler.get("error_type") == error_type:
retry_count = handler.get("retry", 0)
for i in range(retry_count):
try:
# 重试当前步骤
result = self.executor.execute(step["action"], step.get("params", {}), context)
return True
except Exception:
time.sleep(handler.get("retry_delay", 1))
# 重试失败,尝试执行补救action
if "action" in handler:
self.executor.execute(handler["action"], handler.get("params", {}), context)
return True
return False
def _classify_error(self, error):
if "timeout" in str(error).lower():
return "timeout"
if "login" in str(error).lower():
return "login_expired"
return "unknown"
三、热更新机制:不重启系统更新流程
配置驱动的最大价值是热更新。
我们用一个文件监听器,监控配置目录的变化。一旦YAML文件被修改,自动重新加载。
# config_watcher.py
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import yaml
class FlowConfigManager:
"""流程配置管理器 - 单例,维护所有流程配置的内存缓存"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.configs = {}
cls._instance.watcher = None
return cls._instance
def load_all(self, config_dir: str):
self.config_dir = config_dir
for yaml_file in Path(config_dir).glob("*.yaml"):
self._load_file(yaml_file)
# 启动监听
self._start_watching()
def _load_file(self, filepath: Path):
with open(filepath) as f:
config = yaml.safe_load(f)
name = config.get("name")
if name:
self.configs[name] = config
print(f"Loaded flow: {name} v{config.get('version')}")
def _start_watching(self):
event_handler = FlowFileHandler(self)
observer = Observer()
observer.schedule(event_handler, self.config_dir, recursive=False)
observer.start()
def reload_file(self, filepath: str):
self._load_file(Path(filepath))
def get_flow(self, name: str) -> dict:
return self.configs.get(name)
class FlowFileHandler(FileSystemEventHandler):
def __init__(self, manager):
self.manager = manager
def on_modified(self, event):
if not event.is_directory and event.src_path.endswith(".yaml"):
self.manager.reload_file(event.src_path)
print(f"[Hot reload] {event.src_path} reloaded")
配合编排器,执行任务时从FlowConfigManager获取最新配置,无需重启任何进程。
我们当时在线上环境里踩过一次很严重的配置热更新Bug。
原因是Python的yaml.load默认会构造自定义对象,而某个恶意YAML文件触发了安全漏洞。
解决方案:永远使用 yaml.safe_load。
四、店铺上下文管理
每个店铺有自己的上下文:shop_id、platform、cookie、上次运行状态等。
编排器执行时需要注入这些上下文,让配置中的${shop.profile_id}等变量生效。
# shop_context_manager.py
import redis
import json
class ShopContextManager:
def __init__(self, redis_client):
self.redis = redis_client
def get_context(self, shop_id: str) -> dict:
"""获取店铺的运行时上下文"""
key = f"shop_context:{shop_id}"
data = self.redis.get(key)
if data:
return json.loads(data)
return {
"shop_id": shop_id,
"platform": self._get_platform(shop_id),
"profile_id": self._get_profile_id(shop_id),
"last_sync_time": None,
"last_error": None
}
def update_context(self, shop_id: str, updates: dict):
key = f"shop_context:{shop_id}"
current = self.get_context(shop_id)
current.update(updates)
self.redis.setex(key, 86400, json.dumps(current))
def _get_platform(self, shop_id):
# 从数据库查询
return "pdd"
def _get_profile_id(self, shop_id):
# 根据店铺ID获取指纹浏览器配置ID
return f"profile_{shop_id}"
在编排器执行前,我们把上下文注入,执行过程中的结果(比如同步到的订单数量)再写回上下文,供后续步骤使用。
五、跨平台的流程复用
不同平台的操作流程大同小异,差异主要体现在URL、选择器、接口地址上。
我们用配置继承和平台变量覆盖来实现复用。
# base_order_sync.yaml (基类,不直接执行)
steps:
- action: browser.launch
- action: page.goto
params:
url: "${platform.urls.order_page}"
- action: click
params:
selector: "${platform.selectors.sync_btn}"
具体的平台配置继承基类,只覆盖差异部分:
# pdd_order_sync.yaml
extends: base_order_sync
platform: pdd
vars:
urls:
order_page: "https://mms.pinduoduo.com/orders"
selectors:
sync_btn: "#sync-order-btn"
编排器加载时,先递归解析继承链,合并配置。
def _merge_config(base_config, override_config):
for key, value in override_config.items():
if key == "steps" and key in base_config:
# 复杂合并逻辑:可能覆盖某些步骤
pass
elif isinstance(value, dict) and key in base_config:
base_config[key] = _merge_config(base_config[key], value)
else:
base_config[key] = value
return base_config
这套继承机制让新增平台只需要写几十行差异配置,而不是复制整个流程。
六、灰度发布与版本回滚
配置也有版本。我们给每个流程配置加上了版本号。
调度器执行任务时,可以指定使用某个版本的配置,默认使用最新。
灰度策略:
10%的店铺先使用新版本配置,观察24小时无异常再全量。
如果新版出问题,运维在配置中心点一下“回滚”,所有店铺立即切回上一版本。
class VersionedConfigManager:
def get_flow_version(self, flow_name: str, version: str = None):
if not version:
version = self.get_default_version(flow_name)
key = f"flow:{flow_name}:{version}"
return self.load_from_redis(key) # 配置存储在Redis中
配置文件和代码一样,应该纳入Git管理,但热更新时从Redis拉取。
七、实际踩过的坑
1. 变量解析的递归深度问题
我们的_resolve_variables简单递归,但有一次配置里出现了循环引用 ${a.b} -> ${c.d} -> ${a.b},导致递归栈溢出。
解决办法:加入访问路径记录,检测到循环立即报错。
2. RPA流程的入参长度限制
影刀RPA命令行传参有长度限制(某些系统是32KB),当店铺上下文很大时会截断。
解决方案:将大数据存入Redis,RPA流程通过key读取。
3. YAML的缩进错误导致运行时解析失败
运营手动编辑YAML经常搞错缩进,导致系统加载失败。
后来加了配置校验工具,在文件修改时自动运行yaml.load进行语法检查,失败则不发
布热更新。
4. 条件跳转的实现
最初设计没有条件分支,后来发现很多流程需要“如果订单为空则跳过同步”。
我们增加了condition字段,支持简单的表达式解析。
- action: conditional.skip
condition: "${last_sync_result.order_count} == 0"
skip_next: 2
八、总结
配置驱动的流程编排让我们的迭代速度提升了5倍以上。
运营可以自己调整超时时间、等待策略,不需要开发介入。
开发只需要维护原子动作(action)的实现,业务逻辑全部交给配置。
这套模式适合以下场景:
- 有大量相似但细节不同的业务流程
- 业务流程频繁变更
- 需要快速响应平台页面改版(改一个选择器就能恢复)
- 多个平台共享核心逻辑
如果你现在还在硬编码RPA流程,可以尝试先抽离出配置化的步骤序列,哪怕不支持热更新,光是分离逻辑和配置,维护成本就能降低不少。
从我们的经验来看,配置驱动是自动化系统走向成熟的一个重要里程碑。
作者:林焱
