影刀RPA店群自动化系统:动态脚本编排与配置中心实战

影刀RPA店群自动化系统:动态脚本编排与配置中心实战

我见过很多店群自动化项目,脚本是写死的。

每次平台页面改版,运营就得找开发改脚本,然后一台一台机器手动更新。几十个节点跑下来,半天就没了。

更麻烦的是,不同店铺可能需要不同版本的脚本(比如灰度测试)。脚本硬编码在RPA文件里,版本管理根本没法做。

这篇文章不讲调度,也不讲资源回收。

聊聊我们在拼多多和TEMU店群项目中,如何构建一套动态脚本编排与配置中心,让影刀RPA脚本像微服务一样可配置、可灰度、可回滚。

适用场景:多店铺环境下的脚本统一管理、动态下发、版本控制。 技术栈:影刀RPA + Python + Git + 配置中心(etcd/Redis)+ 脚本热加载。


一、脚本硬编码的三座大山

先说痛点。

痛点一:脚本分散在各节点

每台执行机上都有一个影刀RPA脚本文件。改一个逻辑,要ssh到每台机器上替换文件。节点一多,手动操作必出错。

痛点二:版本不可控

同一个店铺,昨天跑的脚本和今天跑的脚本可能不一样(因为某节点忘记更新)。问题排查时根本不知道当前执行的是哪个版本。

痛点三:无法灰度

想测试一个新版上架脚本,只能在测试环境跑一遍。但测试环境和生产环境页面可能不同,上线后才发现问题。

picture.image 这三个问题本质上是一样的:脚本和配置没有中心化存储和动态下发机制

我们的解决思路是:

把影刀RPA脚本变成“空壳执行器”,实际的业务逻辑从配置中心拉取,然后动态执行。


picture.image

picture.image

二、整体设计:脚本即配置

我们把每个影刀RPA脚本拆成两部分:

壳脚本:固定不变,负责连接配置中心、拉取最新脚本代码、执行并上报结果。

picture.image

picture.image 业务逻辑:用Python或影刀自带的指令集描述,存储在Git仓库 + 配置中心。

执行流程:

  1. 节点上的影刀壳脚本启动
  2. 向配置中心请求最新的脚本代码(按店铺类型、任务类型、版本标签)
  3. 动态加载并执行
  4. 执行结果回调

这样,脚本更新只需要修改配置中心里的代码,所有节点下次执行时自动拉取最新版本。


三、配置中心的设计

我们选择etcd作为配置中心,主要因为它支持watch机制,可以实时推送配置变更。

但需要注意:etcd不适合存储大文件(脚本代码可能几万行)。所以我们用Git存原始脚本,etcd只存脚本的版本号和下载URL。

# config_center.py
import etcd3
import requests
import hashlib

class ScriptConfigCenter:
    def __init__(self, etcd_host="localhost", etcd_port=2379):
        self.client = etcd3.client(host=etcd_host, port=etcd_port)
        self.script_cache = {}
        
    def get_script(self, task_type: str, shop_type: str, version: str = None):
        """
        获取指定任务类型的脚本内容
        task_type: "upload_product", "process_order" 等
        shop_type: "pdd", "temu", "tiktok"
        version: 可选,不传则获取最新稳定版
        """
        key = f"/scripts/{shop_type}/{task_type}"
        if version:
            key = f"{key}/versions/{version}"
        else:
            key = f"{key}/stable"
            
        # 先查本地缓存(带TTL)
        cache_key = f"{shop_type}_{task_type}_{version}"
        if cache_key in self.script_cache:
            cached, expire = self.script_cache[cache_key]
            if time.time() < expire:
                return cached
                
        # 从etcd获取脚本元数据
        value, _ = self.client.get(key)
        if not value:
            return None
        meta = json.loads(value.decode())
        
        # 从对象存储(或Git Raw)下载脚本
        script_url = meta["download_url"]
        expected_md5 = meta["md5"]
        resp = requests.get(script_url)
        if hashlib.md5(resp.content).hexdigest() != expected_md5:
            raise Exception("Script integrity check failed")
            
        script_code = resp.text
        # 缓存5分钟
        self.script_cache[cache_key] = (script_code, time.time() + 300)
        return script_code
        
    def watch_script_updates(self, callback):
        """监听脚本变更,实时推送"""
        for event in self.client.watch_prefix("/scripts/"):
            # 当脚本版本更新时,清理缓存,触发回调
            self.script_cache.clear()
            callback(event.key)

这样设计的好处是:

  • 脚本存储在Git仓库,有完整版本历史
  • etcd只存轻量级元数据,watch性能好
  • 节点本地缓存避免频繁下载
  • MD5校验保证脚本完整性

四、影刀RPA壳脚本的实现

影刀RPA本身不直接支持动态加载Python代码。但我们可以用“执行Python语句”指令配合eval来实现。

核心思路:影刀壳脚本从配置中心拉取Python代码字符串,然后用内置的Python执行功能跑起来。

壳脚本的关键步骤:

  1. 调用Python扩展命令,向配置中心请求脚本
  2. 将返回的脚本代码写入临时文件
  3. 使用subprocess执行该临时文件,并传递店铺参数
# yingdao_shell_script.py
# 这是一个被影刀RPA调用的Python脚本,但它本身不包含业务逻辑
import sys
import json
import requests
import tempfile
import os

def main():
    # 从影刀RPA传递过来的参数
    task_type = sys.argv[1]   # 例如 "upload_product"
    shop_id = sys.argv[2]
    shop_type = sys.argv[3]   # pdd / temu
    
    # 向配置中心获取业务脚本
    config_url = "http://config-center.internal:8080/scripts"
    resp = requests.get(f"{config_url}/{shop_type}/{task_type}")
    if resp.status_code != 200:
        print(f"Failed to fetch script: {resp.text}")
        return 1
        
    script_code = resp.json()["code"]
    
    # 写入临时文件
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        f.write(script_code)
        temp_path = f.name
        
    # 执行业务脚本,传入shop_id等参数
    result = subprocess.run(
        ["python3", temp_path, shop_id, shop_type],
        capture_output=True,
        timeout=120
    )
    
    # 清理临时文件
    os.unlink(temp_path)
    
    # 上报执行结果到调度中心
    requests.post("http://scheduler/results", json={
        "task_id": os.environ.get("TASK_ID"),
        "success": result.returncode == 0,
        "output": result.stdout.decode(),
        "error": result.stderr.decode()
    })
    
    return result.returncode

if __name__ == "__main__":
    sys.exit(main())

影刀RPA中只需要用“运行Python脚本”指令调用这个壳脚本,并传入店铺ID和任务类型。

整个业务逻辑都在配置中心动态下发,改脚本不需要重新发布影刀流程文件。


五、版本管理与灰度发布

配置中心支持多版本后,我们可以实现灰度:

# 灰度控制逻辑,放在节点管理器里
class GrayscaleController:
    def __init__(self, redis_client):
        self.redis = redis_client
        
    def get_script_version(self, shop_id, task_type):
        # 灰度规则:特定店铺ID后缀使用beta版本
        if shop_id.endswith("test"):
            return "beta"
        # 百分比灰度:按店铺ID哈希取模
        hash_val = int(hashlib.md5(shop_id.encode()).hexdigest()[:8], 16)
        gray_percent = self.redis.get(f"gray:{task_type}:percent") or 0
        if hash_val % 100 < int(gray_percent):
            return "beta"
        # 默认稳定版
        return "stable"

灰度流程:

  1. 把新版本脚本上传到Git,打tag v2.0.0-beta
  2. 在配置中心注册beta版本,但stable仍然指向旧版
  3. 设置灰度比例5%(即5%的店铺流量走向beta)
  4. 观察监控指标(错误率、执行时长)
  5. 逐步提高灰度比例,直到100%,然后将stable指向新版本
  6. 旧版本保留,以备紧急回滚

我们有一次通过灰度发现新脚本在TEMU某些商品类型上会触发风控。因为只影响了3%的店铺,运营损失很小。如果全量发布,后果不敢想。

踩坑:灰度时要注意同一个店铺的任务不能混用不同版本的脚本,否则可能造成数据不一致。我们通过把灰度规则绑定店铺ID的哈希,保证同一店铺始终命中同一版本。


六、脚本热更新与中断处理

配置中心支持watch后,我们可以实现脚本的热更新:当脚本变更时,正在执行的任务不受影响,但下次拉取会拿到新版本。

这里有一个边界情况:长时间运行的任务(比如批量上架几百个商品),可能执行到一半时配置中心更新了。如果强行中断任务切换脚本,会造成数据不完整。

我们的策略是:

  • 脚本在执行前记录自己的版本号
  • 执行过程中不检查更新
  • 任务结束后,下一次拉取自然拿到新版本

但对于那种真正无限循环的任务(比如监控消息),我们在循环体内每次迭代前检查一个全局标记,如果检测到脚本已过期,则优雅退出。

# 在业务脚本内部支持优雅退出
import signal
import sys

class ScriptUpgradeHandler:
    def __init__(self):
        self.should_exit = False
        signal.signal(signal.SIGUSR1, self._handle_upgrade)
        
    def _handle_upgrade(self, signum, frame):
        self.should_exit = True
        
    def check_and_exit(self):
        if self.should_exit:
            # 保存当前进度,通知调度器重新调度
            self.save_checkpoint()
            sys.exit(0)

节点管理器在收到配置中心的更新推送后,会向正在运行的相关任务进程发送SIGUSR1信号,通知它们优雅退出。任务退出时会保存断点,调度器随后用新脚本重新调度。


七、业务脚本的编写规范

为了让业务脚本能被壳脚本统一加载,我们定义了一套规范。

每个脚本必须实现一个run(shop_id, context)函数,返回执行结果。

# 示例:拼多多上架脚本 pdd_upload_product.py
def run(shop_id, context):
    """
    context 包含:代理配置、浏览器调试端口、任务参数等
    """
    # 通过影刀RPA的API连接浏览器
    browser = connect_to_browser(context['debug_port'])
    # 登录店铺(如果cookies过期则重新登录)
    login_if_needed(browser, shop_id)
    # 执行上架逻辑
    result = do_upload(browser, context['product_data'])
    return {
        "success": True,
        "uploaded_count": result.count
    }

壳脚本动态导入这个模块并调用run方法。这样业务开发人员只需要关注业务逻辑,不需要关心调度、配置、日志等基础设施。

我们还写了一个脚本模板生成器,新加一个任务类型时,自动生成带日志、异常处理、重试的脚手架代码。


八、实际踩坑与优化

坑1:动态加载的脚本依赖第三方库

有些业务脚本需要requestsPIL等库。如果节点上没有安装,就会报错。

解决:每个节点的基础镜像预装常用库。同时,脚本可以声明额外依赖,节点管理器执行前自动pip install到临时虚拟环境。但注意不要污染全局。

坑2:脚本执行效率问题

动态从配置中心拉取脚本,每次任务多了一次HTTP请求。虽然缓存减轻了压力,但冷启动时延迟依然存在。

优化:节点启动时预拉取所有可能用到的脚本到本地磁盘,配置中心只负责通知“脚本已更新”,实际从本地文件加载。这样就变成了文件IO,基本无延迟。

坑3:调试困难

脚本分散在配置中心,报错时看到的堆栈只有行号,但不知道对应哪个版本。

解决:每次执行业务脚本前,把脚本内容和版本号记录到日志中。这样复现场景时能精确对应代码。

# 壳脚本中增加调试日志
with open(f"/var/log/scripts/{task_id}.py", "w") as f:
    f.write(script_code)
logger.info(f"Script source saved to /var/log/scripts/{task_id}.py")

线上出问题时,直接去节点上看生成的脚本文件,比翻Git历史快得多。


九、从配置中心到低代码编排

这套系统运行半年后,我们进一步抽象了业务脚本的编写方式。

运营人员可以在Web界面上拖拽动作(登录店铺、搜索商品、填写标题、点击提交),系统自动生成符合规范的Python脚本,提交到Git,再通过配置中心下发。

影刀RPA壳脚本完全不变,但业务逻辑变成了运营可配置的低代码流程。

这个方向超出了本文范围,但思路可以分享:把脚本当成配置,就能把自动化能力开放给非技术人员。


十、总结

店群自动化的脚本管理,往往是被忽视的工程环节。

大家更关注调度、并发、稳定性,但脚本的版本控制、灰度发布、热更新同样重要。

一套配置中心 + 壳脚本模式,能带来:

  • 发布效率:改脚本不需要重新部署节点
  • 风险控制:灰度发布,快速回滚
  • 可观测性:每次执行都能追溯到脚本版本
  • 协作能力:开发、运营可以并行迭代

如果你正在维护一个超过10个节点的店群系统,建议尽早引入脚本动态下发机制。否则,脚本管理会变成你运维成本的大头。


作者:林焱

0
0
0
0
评论
未登录
暂无评论