影刀RPA电商矩阵自动化架构:配置驱动的流程编排与热更新实践

影刀RPA电商矩阵自动化架构:配置驱动的流程编排与热更新实践

我们早期每个RPA流程都是硬编码在影刀编辑器里的。

运营提一个新需求,比如“同步订单前先检查店铺是否被限制发货”,开发要去改流程、测试、发布。

一套流程走下来,最快也要半天。

后来店铺数量多了,运营每天都会提小调整,开发和运维被拖得疲惫不堪。

真正的问题不是RPA跑不起来,而是业务流程变更的速度跟不上运营的需求。

我们花了两个月重构了一套配置驱动的流程编排系统

这篇文章就讲这套系统的设计思路和落地细节。

picture.image

核心思想:将RPA执行逻辑与业务流程配置分离,实现热更新、零代码调整。


一、从硬编码到声明式配置

picture.image 先看一个典型的硬编码RPA流程:

打开浏览器 → 登录店铺 → 进入订单页面 → 点击同步按钮 → 导出订单 → 存入数据库

任何一步的顺序变化或条件判断,都需要改流程文件。

我们把它抽象成了配置化的步骤序列。

picture.image 每个步骤是一个独立的能力单元,可以像搭积木一样组合。

# order_sync_flow.yaml
name: pdd_order_sync
version: 1.2.3
platform: pdd
steps:

![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/96b2cc0e4edf4d8081365bd7e3ad25cf~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1781319031&x-signature=yl9ykR3pI1ypfYiS6sczQQi54ww%3D)
  - action: browser.launch
    params:
      headless: false
      profile_id: "${shop.profile_id}"
  
  - action: page.goto
    params:
      url: "https://mms.pinduoduo.com/orders"
      
![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/33f3fc97db984205a3f2e495850dea98~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1781319031&x-signature=XDRI13EOEbPH8LFKiYbPfsbsHfI%3D)
      wait_until: networkidle
  
  - action: condition.check_login
    params:
      check_selector: ".login-btn"
    on_failed: goto login_flow
  
  - action: click
    params:
      selector: "#sync-order-btn"
      timeout: 10
  
  - action: wait_for
    params:
      selector: ".sync-success"
      timeout: 30
  
  - action: extract.table
    params:
      selector: ".order-table"
      fields:
        - order_no
        - amount
        - status
  - action: db.insert
    params:
      table: orders
      data: "${extracted_data}"

error_handlers:
  - error_type: timeout
    retry: 3
    retry_delay: 5
  - error_type: login_expired
    action: refresh_login

这个YAML文件可以直接被调度系统解析执行。

运营想调整等待时间、增加一个检查步骤,只需要改YAML文件,系统自动热加载。


二、编排引擎:将配置翻译成RPA调用

编排引擎的核心是一个动作解析器

它读取YAML配置,将每个action映射到对应的影刀RPA流程或Python函数。

# flow_engine.py
import yaml
import json
import re
from typing import Dict, Any, List
from pathlib import Path
import subprocess
import logging

logger = logging.getLogger(__name__)

class ActionExecutor:
    """动作执行器 - 将配置中的action转换为实际调用"""
    
    ACTION_MAP = {
        "browser.launch": "rpa_browser_launch",
        "page.goto": "rpa_page_goto", 
        "click": "rpa_click",
        "wait_for": "rpa_wait_for_selector",
        "extract.table": "rpa_extract_table",
        "db.insert": "python_db_insert",  # 本地Python执行,不调RPA
        "condition.check_login": "rpa_check_element_exists"
    }
    
    def __init__(self, rpa_cli_path: str = "youdao_rpa_cli"):
        self.rpa_cli = rpa_cli_path
    
    def execute(self, action: str, params: Dict, context: Dict) -> Dict:
        """执行单个动作,返回结果"""
        handler = self.ACTION_MAP.get(action)
        if not handler:
            raise ValueError(f"Unknown action: {action}")
        
        # 替换上下文变量 ${xxx}
        resolved_params = self._resolve_variables(params, context)
        
        if handler.startswith("rpa_"):
            return self._call_rpa(handler, resolved_params, context)
        elif handler.startswith("python_"):
            return self._call_python(handler, resolved_params, context)
        else:
            raise ValueError(f"Unknown handler type: {handler}")
    
    def _call_rpa(self, flow_name: str, params: Dict, context: Dict) -> Dict:
        """调用影刀RPA流程"""
        # 影刀RPA支持传入JSON参数
        cmd = [
            self.rpa_cli, "run",
            "--flow", flow_name,
            "--param", json.dumps({
                "params": params,
                "context": context.get("shop_context", {})
            }),
            "--timeout", str(params.get("timeout", 60))
        ]
        proc = subprocess.run(cmd, capture_output=True, text=True, timeout=params.get("timeout", 60) + 5)
        if proc.returncode != 0:
            raise RuntimeError(f"RPA flow {flow_name} failed: {proc.stderr}")
        # 期望返回JSON
        return json.loads(proc.stdout) if proc.stdout else {}
    
    def _call_python(self, func_name: str, params: Dict, context: Dict) -> Dict:
        """执行本地Python函数(不需要浏览器的操作)"""
        if func_name == "python_db_insert":
            # 实际会调用数据库插入模块
            return {"status": "inserted", "count": len(params.get("data", []))}
        return {}
    
    def _resolve_variables(self, obj: Any, context: Dict) -> Any:
        """递归替换 ${variable} 形式的变量"""
        if isinstance(obj, str):
            pattern = r'\$\{([^}]+)\}'
            def replacer(match):
                var_path = match.group(1)
                return self._get_nested_value(context, var_path)
            return re.sub(pattern, replacer, obj)
        elif isinstance(obj, dict):
            return {k: self._resolve_variables(v, context) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._resolve_variables(item, context) for item in obj]
        return obj
    
    def _get_nested_value(self, data: Dict, path: str):
        parts = path.split('.')
        value = data
        for p in parts:
            if isinstance(value, dict):
                value = value.get(p, "")
            else:
                return ""
        return value

上面是单步执行,还需要一个流程编排器来管理步骤的顺序、条件跳转、循环、错误处理。

# flow_orchestrator.py
class FlowOrchestrator:
    """流程编排器 - 解析YAML并驱动执行流程"""
    
    def __init__(self, executor: ActionExecutor):
        self.executor = executor
        self.step_results = []
    
    def run(self, flow_config: Dict, initial_context: Dict) -> Dict:
        """执行整个流程"""
        steps = flow_config.get("steps", [])
        context = initial_context.copy()
        context.setdefault("step_results", [])
        
        pc = 0  # program counter
        while pc < len(steps):
            step = steps[pc]
            action = step.get("action")
            params = step.get("params", {})
            
            try:
                # 执行动作
                result = self.executor.execute(action, params, context)
                context["step_results"].append({
                    "step": pc,
                    "action": action,
                    "result": result
                })
                # 如果定义了 extract,将结果存入context
                if "extract" in step:
                    for var_name, json_path in step["extract"].items():
                        context[var_name] = self._json_extract(result, json_path)
                pc += 1
                
            except Exception as e:
                # 错误处理
                error_handled = self._handle_error(flow_config, e, step, context)
                if not error_handled:
                    raise
                # 如果错误处理要求重试同一行,pc不变;否则pc++由handler决定
                # 简化:假设重试后继续下一行
                pc += 1
        
        return context
    
    def _handle_error(self, flow_config, error, step, context) -> bool:
        """根据配置的错误处理策略进行恢复"""
        handlers = flow_config.get("error_handlers", [])
        error_type = self._classify_error(error)
        for handler in handlers:
            if handler.get("error_type") == error_type:
                retry_count = handler.get("retry", 0)
                for i in range(retry_count):
                    try:
                        # 重试当前步骤
                        result = self.executor.execute(step["action"], step.get("params", {}), context)
                        return True
                    except Exception:
                        time.sleep(handler.get("retry_delay", 1))
                # 重试失败,尝试执行补救action
                if "action" in handler:
                    self.executor.execute(handler["action"], handler.get("params", {}), context)
                return True
        return False
    
    def _classify_error(self, error):
        if "timeout" in str(error).lower():
            return "timeout"
        if "login" in str(error).lower():
            return "login_expired"
        return "unknown"

三、热更新机制:不重启系统更新流程

配置驱动的最大价值是热更新

我们用一个文件监听器,监控配置目录的变化。一旦YAML文件被修改,自动重新加载。

# config_watcher.py
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import yaml

class FlowConfigManager:
    """流程配置管理器 - 单例,维护所有流程配置的内存缓存"""
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.configs = {}
            cls._instance.watcher = None
        return cls._instance
    
    def load_all(self, config_dir: str):
        self.config_dir = config_dir
        for yaml_file in Path(config_dir).glob("*.yaml"):
            self._load_file(yaml_file)
        # 启动监听
        self._start_watching()
    
    def _load_file(self, filepath: Path):
        with open(filepath) as f:
            config = yaml.safe_load(f)
        name = config.get("name")
        if name:
            self.configs[name] = config
            print(f"Loaded flow: {name} v{config.get('version')}")
    
    def _start_watching(self):
        event_handler = FlowFileHandler(self)
        observer = Observer()
        observer.schedule(event_handler, self.config_dir, recursive=False)
        observer.start()
    
    def reload_file(self, filepath: str):
        self._load_file(Path(filepath))
    
    def get_flow(self, name: str) -> dict:
        return self.configs.get(name)

class FlowFileHandler(FileSystemEventHandler):
    def __init__(self, manager):
        self.manager = manager
    
    def on_modified(self, event):
        if not event.is_directory and event.src_path.endswith(".yaml"):
            self.manager.reload_file(event.src_path)
            print(f"[Hot reload] {event.src_path} reloaded")

配合编排器,执行任务时从FlowConfigManager获取最新配置,无需重启任何进程。

我们当时在线上环境里踩过一次很严重的配置热更新Bug。

原因是Python的yaml.load默认会构造自定义对象,而某个恶意YAML文件触发了安全漏洞。

解决方案:永远使用 yaml.safe_load


四、店铺上下文管理

每个店铺有自己的上下文:shop_id、platform、cookie、上次运行状态等。

编排器执行时需要注入这些上下文,让配置中的${shop.profile_id}等变量生效。

# shop_context_manager.py
import redis
import json

class ShopContextManager:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def get_context(self, shop_id: str) -> dict:
        """获取店铺的运行时上下文"""
        key = f"shop_context:{shop_id}"
        data = self.redis.get(key)
        if data:
            return json.loads(data)
        return {
            "shop_id": shop_id,
            "platform": self._get_platform(shop_id),
            "profile_id": self._get_profile_id(shop_id),
            "last_sync_time": None,
            "last_error": None
        }
    
    def update_context(self, shop_id: str, updates: dict):
        key = f"shop_context:{shop_id}"
        current = self.get_context(shop_id)
        current.update(updates)
        self.redis.setex(key, 86400, json.dumps(current))
    
    def _get_platform(self, shop_id):
        # 从数据库查询
        return "pdd"
    
    def _get_profile_id(self, shop_id):
        # 根据店铺ID获取指纹浏览器配置ID
        return f"profile_{shop_id}"

在编排器执行前,我们把上下文注入,执行过程中的结果(比如同步到的订单数量)再写回上下文,供后续步骤使用。


五、跨平台的流程复用

不同平台的操作流程大同小异,差异主要体现在URL、选择器、接口地址上。

我们用配置继承平台变量覆盖来实现复用。

# base_order_sync.yaml (基类,不直接执行)
steps:
  - action: browser.launch
  - action: page.goto
    params:
      url: "${platform.urls.order_page}"
  - action: click
    params:
      selector: "${platform.selectors.sync_btn}"

具体的平台配置继承基类,只覆盖差异部分:

# pdd_order_sync.yaml
extends: base_order_sync
platform: pdd
vars:
  urls:
    order_page: "https://mms.pinduoduo.com/orders"
  selectors:
    sync_btn: "#sync-order-btn"

编排器加载时,先递归解析继承链,合并配置。

def _merge_config(base_config, override_config):
    for key, value in override_config.items():
        if key == "steps" and key in base_config:
            # 复杂合并逻辑:可能覆盖某些步骤
            pass
        elif isinstance(value, dict) and key in base_config:
            base_config[key] = _merge_config(base_config[key], value)
        else:
            base_config[key] = value
    return base_config

这套继承机制让新增平台只需要写几十行差异配置,而不是复制整个流程。


六、灰度发布与版本回滚

配置也有版本。我们给每个流程配置加上了版本号。

调度器执行任务时,可以指定使用某个版本的配置,默认使用最新。

灰度策略:

10%的店铺先使用新版本配置,观察24小时无异常再全量。

如果新版出问题,运维在配置中心点一下“回滚”,所有店铺立即切回上一版本。

class VersionedConfigManager:
    def get_flow_version(self, flow_name: str, version: str = None):
        if not version:
            version = self.get_default_version(flow_name)
        key = f"flow:{flow_name}:{version}"
        return self.load_from_redis(key)  # 配置存储在Redis中

配置文件和代码一样,应该纳入Git管理,但热更新时从Redis拉取。


七、实际踩过的坑

1. 变量解析的递归深度问题

我们的_resolve_variables简单递归,但有一次配置里出现了循环引用 ${a.b} -> ${c.d} -> ${a.b},导致递归栈溢出。

解决办法:加入访问路径记录,检测到循环立即报错。

2. RPA流程的入参长度限制

影刀RPA命令行传参有长度限制(某些系统是32KB),当店铺上下文很大时会截断。

解决方案:将大数据存入Redis,RPA流程通过key读取。

3. YAML的缩进错误导致运行时解析失败

运营手动编辑YAML经常搞错缩进,导致系统加载失败。

后来加了配置校验工具,在文件修改时自动运行yaml.load进行语法检查,失败则不发

布热更新。

4. 条件跳转的实现

最初设计没有条件分支,后来发现很多流程需要“如果订单为空则跳过同步”。

我们增加了condition字段,支持简单的表达式解析。

- action: conditional.skip
  condition: "${last_sync_result.order_count} == 0"
  skip_next: 2

八、总结

配置驱动的流程编排让我们的迭代速度提升了5倍以上。

运营可以自己调整超时时间、等待策略,不需要开发介入。

开发只需要维护原子动作(action)的实现,业务逻辑全部交给配置。

这套模式适合以下场景:

  • 有大量相似但细节不同的业务流程
  • 业务流程频繁变更
  • 需要快速响应平台页面改版(改一个选择器就能恢复)
  • 多个平台共享核心逻辑

如果你现在还在硬编码RPA流程,可以尝试先抽离出配置化的步骤序列,哪怕不支持热更新,光是分离逻辑和配置,维护成本就能降低不少。

从我们的经验来看,配置驱动是自动化系统走向成熟的一个重要里程碑。


作者:林焱

0
0
0
0
评论
未登录
暂无评论