我见过很多店群自动化项目,脚本是写死的。
每次平台页面改版,运营就得找开发改脚本,然后一台一台机器手动更新。几十个节点跑下来,半天就没了。
更麻烦的是,不同店铺可能需要不同版本的脚本(比如灰度测试)。脚本硬编码在RPA文件里,版本管理根本没法做。
这篇文章不讲调度,也不讲资源回收。
聊聊我们在拼多多和TEMU店群项目中,如何构建一套动态脚本编排与配置中心,让影刀RPA脚本像微服务一样可配置、可灰度、可回滚。
适用场景:多店铺环境下的脚本统一管理、动态下发、版本控制。 技术栈:影刀RPA + Python + Git + 配置中心(etcd/Redis)+ 脚本热加载。
一、脚本硬编码的三座大山
先说痛点。
痛点一:脚本分散在各节点
每台执行机上都有一个影刀RPA脚本文件。改一个逻辑,要ssh到每台机器上替换文件。节点一多,手动操作必出错。
痛点二:版本不可控
同一个店铺,昨天跑的脚本和今天跑的脚本可能不一样(因为某节点忘记更新)。问题排查时根本不知道当前执行的是哪个版本。
痛点三:无法灰度
想测试一个新版上架脚本,只能在测试环境跑一遍。但测试环境和生产环境页面可能不同,上线后才发现问题。
这三个问题本质上是一样的:脚本和配置没有中心化存储和动态下发机制。
我们的解决思路是:
把影刀RPA脚本变成“空壳执行器”,实际的业务逻辑从配置中心拉取,然后动态执行。
二、整体设计:脚本即配置
我们把每个影刀RPA脚本拆成两部分:
壳脚本:固定不变,负责连接配置中心、拉取最新脚本代码、执行并上报结果。
业务逻辑:用Python或影刀自带的指令集描述,存储在Git仓库 + 配置中心。
执行流程:
- 节点上的影刀壳脚本启动
- 向配置中心请求最新的脚本代码(按店铺类型、任务类型、版本标签)
- 动态加载并执行
- 执行结果回调
这样,脚本更新只需要修改配置中心里的代码,所有节点下次执行时自动拉取最新版本。
三、配置中心的设计
我们选择etcd作为配置中心,主要因为它支持watch机制,可以实时推送配置变更。
但需要注意:etcd不适合存储大文件(脚本代码可能几万行)。所以我们用Git存原始脚本,etcd只存脚本的版本号和下载URL。
# config_center.py
import etcd3
import requests
import hashlib
class ScriptConfigCenter:
def __init__(self, etcd_host="localhost", etcd_port=2379):
self.client = etcd3.client(host=etcd_host, port=etcd_port)
self.script_cache = {}
def get_script(self, task_type: str, shop_type: str, version: str = None):
"""
获取指定任务类型的脚本内容
task_type: "upload_product", "process_order" 等
shop_type: "pdd", "temu", "tiktok"
version: 可选,不传则获取最新稳定版
"""
key = f"/scripts/{shop_type}/{task_type}"
if version:
key = f"{key}/versions/{version}"
else:
key = f"{key}/stable"
# 先查本地缓存(带TTL)
cache_key = f"{shop_type}_{task_type}_{version}"
if cache_key in self.script_cache:
cached, expire = self.script_cache[cache_key]
if time.time() < expire:
return cached
# 从etcd获取脚本元数据
value, _ = self.client.get(key)
if not value:
return None
meta = json.loads(value.decode())
# 从对象存储(或Git Raw)下载脚本
script_url = meta["download_url"]
expected_md5 = meta["md5"]
resp = requests.get(script_url)
if hashlib.md5(resp.content).hexdigest() != expected_md5:
raise Exception("Script integrity check failed")
script_code = resp.text
# 缓存5分钟
self.script_cache[cache_key] = (script_code, time.time() + 300)
return script_code
def watch_script_updates(self, callback):
"""监听脚本变更,实时推送"""
for event in self.client.watch_prefix("/scripts/"):
# 当脚本版本更新时,清理缓存,触发回调
self.script_cache.clear()
callback(event.key)
这样设计的好处是:
- 脚本存储在Git仓库,有完整版本历史
- etcd只存轻量级元数据,watch性能好
- 节点本地缓存避免频繁下载
- MD5校验保证脚本完整性
四、影刀RPA壳脚本的实现
影刀RPA本身不直接支持动态加载Python代码。但我们可以用“执行Python语句”指令配合eval来实现。
核心思路:影刀壳脚本从配置中心拉取Python代码字符串,然后用内置的Python执行功能跑起来。
壳脚本的关键步骤:
- 调用Python扩展命令,向配置中心请求脚本
- 将返回的脚本代码写入临时文件
- 使用
subprocess执行该临时文件,并传递店铺参数
# yingdao_shell_script.py
# 这是一个被影刀RPA调用的Python脚本,但它本身不包含业务逻辑
import sys
import json
import requests
import tempfile
import os
def main():
# 从影刀RPA传递过来的参数
task_type = sys.argv[1] # 例如 "upload_product"
shop_id = sys.argv[2]
shop_type = sys.argv[3] # pdd / temu
# 向配置中心获取业务脚本
config_url = "http://config-center.internal:8080/scripts"
resp = requests.get(f"{config_url}/{shop_type}/{task_type}")
if resp.status_code != 200:
print(f"Failed to fetch script: {resp.text}")
return 1
script_code = resp.json()["code"]
# 写入临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(script_code)
temp_path = f.name
# 执行业务脚本,传入shop_id等参数
result = subprocess.run(
["python3", temp_path, shop_id, shop_type],
capture_output=True,
timeout=120
)
# 清理临时文件
os.unlink(temp_path)
# 上报执行结果到调度中心
requests.post("http://scheduler/results", json={
"task_id": os.environ.get("TASK_ID"),
"success": result.returncode == 0,
"output": result.stdout.decode(),
"error": result.stderr.decode()
})
return result.returncode
if __name__ == "__main__":
sys.exit(main())
影刀RPA中只需要用“运行Python脚本”指令调用这个壳脚本,并传入店铺ID和任务类型。
整个业务逻辑都在配置中心动态下发,改脚本不需要重新发布影刀流程文件。
五、版本管理与灰度发布
配置中心支持多版本后,我们可以实现灰度:
# 灰度控制逻辑,放在节点管理器里
class GrayscaleController:
def __init__(self, redis_client):
self.redis = redis_client
def get_script_version(self, shop_id, task_type):
# 灰度规则:特定店铺ID后缀使用beta版本
if shop_id.endswith("test"):
return "beta"
# 百分比灰度:按店铺ID哈希取模
hash_val = int(hashlib.md5(shop_id.encode()).hexdigest()[:8], 16)
gray_percent = self.redis.get(f"gray:{task_type}:percent") or 0
if hash_val % 100 < int(gray_percent):
return "beta"
# 默认稳定版
return "stable"
灰度流程:
- 把新版本脚本上传到Git,打tag
v2.0.0-beta - 在配置中心注册beta版本,但
stable仍然指向旧版 - 设置灰度比例5%(即5%的店铺流量走向beta)
- 观察监控指标(错误率、执行时长)
- 逐步提高灰度比例,直到100%,然后将
stable指向新版本 - 旧版本保留,以备紧急回滚
我们有一次通过灰度发现新脚本在TEMU某些商品类型上会触发风控。因为只影响了3%的店铺,运营损失很小。如果全量发布,后果不敢想。
踩坑:灰度时要注意同一个店铺的任务不能混用不同版本的脚本,否则可能造成数据不一致。我们通过把灰度规则绑定店铺ID的哈希,保证同一店铺始终命中同一版本。
六、脚本热更新与中断处理
配置中心支持watch后,我们可以实现脚本的热更新:当脚本变更时,正在执行的任务不受影响,但下次拉取会拿到新版本。
这里有一个边界情况:长时间运行的任务(比如批量上架几百个商品),可能执行到一半时配置中心更新了。如果强行中断任务切换脚本,会造成数据不完整。
我们的策略是:
- 脚本在执行前记录自己的版本号
- 执行过程中不检查更新
- 任务结束后,下一次拉取自然拿到新版本
但对于那种真正无限循环的任务(比如监控消息),我们在循环体内每次迭代前检查一个全局标记,如果检测到脚本已过期,则优雅退出。
# 在业务脚本内部支持优雅退出
import signal
import sys
class ScriptUpgradeHandler:
def __init__(self):
self.should_exit = False
signal.signal(signal.SIGUSR1, self._handle_upgrade)
def _handle_upgrade(self, signum, frame):
self.should_exit = True
def check_and_exit(self):
if self.should_exit:
# 保存当前进度,通知调度器重新调度
self.save_checkpoint()
sys.exit(0)
节点管理器在收到配置中心的更新推送后,会向正在运行的相关任务进程发送SIGUSR1信号,通知它们优雅退出。任务退出时会保存断点,调度器随后用新脚本重新调度。
七、业务脚本的编写规范
为了让业务脚本能被壳脚本统一加载,我们定义了一套规范。
每个脚本必须实现一个run(shop_id, context)函数,返回执行结果。
# 示例:拼多多上架脚本 pdd_upload_product.py
def run(shop_id, context):
"""
context 包含:代理配置、浏览器调试端口、任务参数等
"""
# 通过影刀RPA的API连接浏览器
browser = connect_to_browser(context['debug_port'])
# 登录店铺(如果cookies过期则重新登录)
login_if_needed(browser, shop_id)
# 执行上架逻辑
result = do_upload(browser, context['product_data'])
return {
"success": True,
"uploaded_count": result.count
}
壳脚本动态导入这个模块并调用run方法。这样业务开发人员只需要关注业务逻辑,不需要关心调度、配置、日志等基础设施。
我们还写了一个脚本模板生成器,新加一个任务类型时,自动生成带日志、异常处理、重试的脚手架代码。
八、实际踩坑与优化
坑1:动态加载的脚本依赖第三方库
有些业务脚本需要requests、PIL等库。如果节点上没有安装,就会报错。
解决:每个节点的基础镜像预装常用库。同时,脚本可以声明额外依赖,节点管理器执行前自动pip install到临时虚拟环境。但注意不要污染全局。
坑2:脚本执行效率问题
动态从配置中心拉取脚本,每次任务多了一次HTTP请求。虽然缓存减轻了压力,但冷启动时延迟依然存在。
优化:节点启动时预拉取所有可能用到的脚本到本地磁盘,配置中心只负责通知“脚本已更新”,实际从本地文件加载。这样就变成了文件IO,基本无延迟。
坑3:调试困难
脚本分散在配置中心,报错时看到的堆栈只有行号,但不知道对应哪个版本。
解决:每次执行业务脚本前,把脚本内容和版本号记录到日志中。这样复现场景时能精确对应代码。
# 壳脚本中增加调试日志
with open(f"/var/log/scripts/{task_id}.py", "w") as f:
f.write(script_code)
logger.info(f"Script source saved to /var/log/scripts/{task_id}.py")
线上出问题时,直接去节点上看生成的脚本文件,比翻Git历史快得多。
九、从配置中心到低代码编排
这套系统运行半年后,我们进一步抽象了业务脚本的编写方式。
运营人员可以在Web界面上拖拽动作(登录店铺、搜索商品、填写标题、点击提交),系统自动生成符合规范的Python脚本,提交到Git,再通过配置中心下发。
影刀RPA壳脚本完全不变,但业务逻辑变成了运营可配置的低代码流程。
这个方向超出了本文范围,但思路可以分享:把脚本当成配置,就能把自动化能力开放给非技术人员。
十、总结
店群自动化的脚本管理,往往是被忽视的工程环节。
大家更关注调度、并发、稳定性,但脚本的版本控制、灰度发布、热更新同样重要。
一套配置中心 + 壳脚本模式,能带来:
- 发布效率:改脚本不需要重新部署节点
- 风险控制:灰度发布,快速回滚
- 可观测性:每次执行都能追溯到脚本版本
- 协作能力:开发、运营可以并行迭代
如果你正在维护一个超过10个节点的店群系统,建议尽早引入脚本动态下发机制。否则,脚本管理会变成你运维成本的大头。
作者:林焱
