自动化系统从十几台执行节点开始,配置管理就会变成一个隐形的泥潭。
超时时间改了,要通知所有节点重启。
代理 IP 白名单更新了,运维一台台登上去改配置文件。
流程版本回滚了,Python 调度器里硬编码的版本号忘了同步。
这些事情单独看都不大,但加起来能把团队拖垮。
我们是在一次“配置不一致导致全平台任务失败”的事故后,才下决心把配置中心单独拿出来做的。
这篇文章专门写配置中心的设计与落地——不涉及业务逻辑,只讲怎么让配置变成一种可管控、可追溯、可热更新的独立资产。
一、配置管理混乱的代价
事故的起因很简单:拼多多改版后,某个元素的选择器变了。
开发在影刀流程里修了,但需要把流程的 wait_timeout 从 5 秒改成 10 秒。
这个参数写在执行节点的本地 YAML 文件里。
运维在 12 台执行节点上手动改了配置,但漏了一台。
那台漏掉的节点继续用 5 秒超时跑新流程,所有任务在该步骤因为等不到元素而批量失败。
而那台节点正好分配了几个重要店铺的任务。
等发现的时候,已经积压了 200 多条失败任务。
这件事让我们意识到一个根本问题:
配置分散在文件里、环境变量里、数据库里、甚至硬编码在代码里——
它已经不是“配置”,而是一颗不定时炸弹。
二、配置中心的定位:不只是存储,更是管控
我们需要的不是一个更好用的配置文件,而是一个有生命周期的配置管理系统。
它必须做到:
- 所有配置集中存储,单一事实来源。
- 变更可追溯,谁改了、什么时候改的、改了什么。
- 支持灰度发布,新配置先在少量节点生效,验证后再全量。
- 支持热更新,配置变更后执行节点无需重启。
- 支持版本回滚,出问题一键回到上一个版本。
最终我们选用了 Git 仓库 + Redis + Python 配置客户端 的组合方案。
没有引入 Consul 或 Nacos——不是它们不好,而是我们的运维团队规模小,能少维护一个中间件就少一个。
三、配置的分类与存储结构
首先对所有配置做了分类,不同类型的配置有不同的更新策略。
第一类:静态配置
只读,启动时加载一次,变更需重启节点。
例如:Chromium 启动参数、Java 运行参数、日志级别。
第二类:动态配置
可热更新,节点定时拉取,变更无需重启。
例如:流程超时时间、重试次数、代理 IP 白名单、频率控制阈值。
第三类:敏感配置
加密存储,只在注入内存时解密,不落盘。
例如:数据库密码、代理凭据、API 密钥。
配置在 Git 仓库中的目录结构:
configs/
├── global/
│ ├── defaults.yaml # 全局默认配置
│ └── sensitive.yaml.enc # 加密的敏感配置
├── platforms/
│ ├── pdd/
│ │ ├── flows.yaml # 流程相关配置
│ │ ├── rate_limits.yaml # 频率控制
│ │ └── staging.yaml # 测试环境覆盖配置
│ ├── temu/
│ └── tiktok/
└── nodes/
├── node-pdd-01.yaml # 节点级覆盖
└── node-pdd-02.yaml
配置加载有优先级:节点级覆盖 > 平台级 > 全局默认。
这样我们可以为某些特殊节点设置不同的参数,而不影响整体。
四、配置客户端的核心实现
每个执行节点上有一个配置客户端 ConfigClient。
它负责从 Git 仓库拉取配置、解密敏感字段、合并优先级、监听变更。
import yaml
import asyncio
from pathlib import Path
from typing import Any
class ConfigClient:
def __init__(self, git_repo_url: str, local_path: str, node_id: str):
self.repo_url = git_repo_url
self.local_path = Path(local_path)
self.node_id = node_id
self.cache: dict[str, Any] = {}
self.watchers: dict[str, list[callable]] = {}
self.last_commit_hash = None
async def init(self):
# 克隆配置仓库到本地
if not self.local_path.exists():
await self._git_clone()
else:
await self._git_pull()
await self._load_all()
# 启动定时检查协程
asyncio.create_task(self._watch_loop())
async def _load_all(self):
# 按优先级加载配置
global_defaults = self._load_yaml("global/defaults.yaml")
platform = self._get_platform()
platform_config = self._load_yaml(f"platforms/{platform}/flows.yaml")
platform_rates = self._load_yaml(f"platforms/{platform}/rate_limits.yaml")
node_override = self._load_yaml(f"nodes/{self.node_id}.yaml")
# 深度合并
merged = self._deep_merge(global_defaults, platform_config)
merged = self._deep_merge(merged, platform_rates)
merged = self._deep_merge(merged, node_override)
# 解密敏感字段
merged = await self._decrypt_sensitive(merged)
self.cache = merged
self._notify_watchers("*", merged)
async def _watch_loop(self):
while True:
await asyncio.sleep(30) # 每 30 秒检查一次
try:
current_hash = await self._git_remote_hash()
if current_hash != self.last_commit_hash:
await self._git_pull()
await self._load_all()
self.last_commit_hash = current_hash
logger.info("配置更新已生效")
except Exception as e:
logger.error(f"配置更新检查失败: {e}")
def get(self, key: str, default=None):
keys = key.split(".")
value = self.cache
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return default
return value if value is not None else default
def on_change(self, key: str, callback):
self.watchers.setdefault(key, []).append(callback)
def _notify_watchers(self, key, new_value):
for pattern, callbacks in self.watchers.items():
if pattern == "*" or pattern == key:
for cb in callbacks:
asyncio.create_task(cb(key, new_value))
节点上的所有组件——守护进程、浏览器管理器、IP 池——都通过 ConfigClient 获取配置。
不再允许直接读取本地文件或环境变量。
五、热更新机制:不重启也能变
配置更新后,客户端会回调注册的监听器。
不同类型的配置,监听器的处理方式不同。
比如,频率控制阈值变化了:
class RateLimiter:
def __init__(self, config_client: ConfigClient):
self.config = config_client
self.limits = self.config.get("rate_limits", {})
# 注册监听
self.config.on_change("rate_limits", self._on_limits_change)
async def _on_limits_change(self, key, new_value):
self.limits = new_value
logger.info("频率控制配置已热更新")
流程超时时间变化了:
class FlowExecutor:
def __init__(self, config_client: ConfigClient):
self.config = config_client
self.timeouts = self.config.get("timeouts", {})
self.config.on_change("timeouts", self._on_timeout_change)
def _on_timeout_change(self, key, new_value):
self.timeouts = new_value
# 无需重启,下个任务自动使用新超时
敏感配置的更新稍特殊:
客户端拉取到新配置后,会重新解密,然后通知监听器。
数据库密码变了,守护进程不会主动断开已有连接,但下一个新建的连接会使用新密码。
所有热更新的前提是:配置变更必须兼容,不能改变字段结构,只改变值。
如果要改变结构(比如把 timeout 从整数变成对象),就需要走灰度发布流程,不能直接全量推送。
六、灰度发布:部分节点先吃螃蟹
不是所有配置都适合一键全量。
某个流程的等待时间从 5 秒改到 10 秒,看似简单,但可能在某些店铺上引发未知问题。
我们在配置仓库里使用了分支策略来实现灰度。
生产环境的 main 分支是全量配置。
灰度发布时,创建一个 staging 分支,灰度节点拉取 staging 分支的配置。
灰度节点通过节点标签来识别。
在节点信息注册时,标明自己是 stable 组还是 canary 组:
async def register_node(self, node_id: str, group: str = "stable"):
info = {"node_id": node_id, "group": group, ...}
await self.redis.hset("rpa:nodes:info", node_id, json.dumps(info))
配置客户端根据节点组决定拉取哪个分支:
async def _get_git_branch(self):
node_info = await self.redis.hget("rpa:nodes:info", self.node_id)
group = json.loads(node_info).get("group", "stable")
return "staging" if group == "canary" else "main"
灰度期间观察 2 小时,如果灰度节点一切正常,就把 staging 分支合并到 main,全量生效。
如果出问题,回滚 staging 分支或直接把灰度节点切回 stable 组。
整个流程不需要登录任何一台服务器。
七、配置变更的审计与回滚
配置仓库的每一次提交,都关联一个 JIRA 单号或变更说明。
我们规定,所有配置变更必须走 Pull Request,评审通过后才能合并。
合并后的 commit 哈希会记录在 Redis 中,配置客户端上报自己的当前版本号。
管理后台可以看到每个节点当前运行的配置版本,以及是否和 main 分支有差异。
回滚操作极度简单:
在 Git 仓库里 git revert 上一次变更,推送,等待 30 秒配置客户端自动拉取。
所有节点自动恢复到上一个版本。
这比之前手动改文件、漏改几台机器的噩梦,进步了不止一个时代。
八、配置校验:把错误挡在合并之前
配置的错误如果在线上才暴露,代价太大。
我们在 CI 流水线里加入了配置校验步骤。
校验内容包括:
- YAML 语法正确性。
- 必填字段完整性。
- 数值范围合理性(比如超时时间不能小于 1 秒,重试次数不能超过 10 次)。
- 敏感字段是否已加密,不能明文提交。
校验脚本用 Python 写,放在仓库的 ci/validate_config.py:
import yaml
import sys
REQUIRED_FIELDS = {
"platforms/*/flows.yaml": ["timeouts", "retry"],
"platforms/*/rate_limits.yaml": ["upload_product", "update_price"],
}
def validate(file_path: str):
with open(file_path) as f:
config = yaml.safe_load(f)
# 检查必填字段
for pattern, fields in REQUIRED_FIELDS.items():
if fnmatch(file_path, pattern):
for field in fields:
if field not in config:
raise ValueError(f"{file_path}: 缺少必填字段 {field}")
# 检查数值范围
if "timeouts" in config:
for k, v in config["timeouts"].items():
if v < 1:
raise ValueError(f"{file_path}: timeout {k} 不能小于 1 秒")
如果校验不通过,CI 直接失败,合并被阻止。
这层卡口省去了无数次半夜的告警电话。
九、配置中心带来的团队协作变化
配置中心上线后,团队的工作方式有了明显变化。
以前:开发在本地测完,告诉运维“把超时改成 10 秒”。
运维一台台登上去改,改完还不确定是否生效。
现在:开发在配置仓库提个 PR,评审合并。
30 秒内所有节点自动更新,管理后台能看到每台机器的配置版本。
以前:故障复盘时,没人能说清楚线上是什么配置。
现在:Git 历史完整记录每一次变更,谁改的、什么时候改的、改的什么值,一目了然。
以前:新成员加入,要花两天熟悉各种散落的配置文件。
现在:拉下配置仓库,看一遍目录结构和注释,就能理解整个系统的参数体系。
这些变化不是技术突破,但它们让系统的维护成本断崖式下降。
十、真实教训:一次不兼容的热更新
我们也踩过坑。
有一次运维直接在配置仓库里把一个字段从 timeout: 30 改成了 timeout: {default: 30, upload: 60}。
从整数变成了对象,但忘了通知开发。
配置客户端拉取后,FlowExecutor 按整数去读,类型错误导致流程全部启动失败。
因为配置是热更新的,所有在线节点瞬间全部瘫痪。
从那以后,我们在配置校验里加了严格的 结构兼容性检查:
任何字段的类型变更,必须在 Pull Request 里明确标注为 BREAKING CHANGE,并走灰度发布 + 全量确认流程。
类型检查脚本比对当前版本和 PR 版本的字段类型,有变化就告警。
这个机制让“热更新导致全平台瘫痪”的事故再也没有发生过。
十一、配置中心的可观测性
配置中心本身也需要被监控。
我们收集了以下指标:
- 配置拉取成功率与延迟。
- 各节点当前配置版本分布(看灰度是否生效、是否有节点掉队)。
- 配置变更频率(过高说明不稳定)。
这些指标汇入 Grafana,如果发现某个节点配置版本长时间未更新,说明它的配置拉取可能失败了,自动触发告警。
配置不再是角落里的几个文件,而是和其他核心组件一样,有完善的监控和告警。
十二、写在最后
配置中心在大多数自动化系统里,都是最后被照顾到的部分。
流程能跑了、调度能转了、监控能看了,大家就觉得差不多了。
但当系统真的跑起来,每天折磨你的往往不是大架构的缺陷,而是改个参数要提心吊胆、换个 IP 要手动操作、查个配置要翻好几台机器。
把配置管理做好,是自动化系统成熟度的一个标志。
它不会让你的系统吞吐量翻倍,但它会让你的运维负担减半。
如果你还在用散落的配置文件管理几十台节点,建议尽早搭一个配置中心。
从 Git 仓库 + 定时拉取开始,逐步加入灰度、校验、审计。
这套东西建成后,你会发现许多以前觉得头疼的运维问题,忽然就消失了。
作者:林焱
