做店群自动化这几年,有一件事我印象特别深。
某个周一早上,运营过来说有三十多个店铺的订单没同步。
我去查日志,发现调度中心挂了。
原因是一个店铺的RPA任务卡死,把整个浏览器实例池拖垮了。
单点故障,全盘瘫痪。
那天之后,我们彻底重构了调度层。
从单机调度演进到了多节点任务编排 + 浏览器实例池的架构。
这篇文章就把这套设计完整拆解一遍。
适用场景:拼多多、TEMU、TikTok Shop 店群自动化。 核心目标:高可用、可扩展、资源隔离。
一、单机调度的三个致命缺陷
第一版系统是单机调度。
一台服务器上跑着调度器、任务队列、浏览器实例、RPA执行器。
看起来很紧凑,跑起来全是坑。
缺陷一:资源争抢不可控。
十个浏览器实例同时跑,CPU飙到90%,内存经常OOM。
更麻烦的是,某个店铺的RPA死循环,会把整台机器的资源耗尽。
缺陷二:无法水平扩展。
店铺数量从30涨到150,单机根本扛不住。
换更大的服务器?成本翻倍,收益递减。
缺陷三:故障恢复靠手。
调度进程挂了,所有正在运行的任务丢失。
需要人工查哪个店铺跑到哪一步,再手动补跑。
很多团队最开始都会忽略这里。
等到店铺数量翻倍,才会发现单机调度是个死胡同。
我们的解法是:调度与执行分离 + 浏览器实例池化 + 任务可恢复。
二、多节点架构总览
先看整体分层:
控制层(1个):任务编排器、状态存储、监控告警
执行层(N个):执行节点、浏览器实例池、影刀RPA引擎
资源层:代理池、账号配置、浏览器指纹库
控制层负责调度决策,不直接运行RPA。
执行节点只负责跑任务,不关心任务从哪里来。
这种解耦带来的好处很明显:
- 可以动态增减执行节点
- 单个节点挂掉不影响整体
- 任务可以在节点之间迁移
# orchestrator.py
import asyncio
import json
import uuid
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import redis.asyncio as redis
import logging
logger = logging.getLogger(__name__)
class TaskState(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
TIMEOUT = "timeout"
@dataclass
class OrchestrationTask:
task_id: str
shop_id: str
platform: str
task_type: str # sync_orders, refresh_products, batch_reply
payload: dict
state: TaskState = TaskState.PENDING
assigned_node: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
timeout_seconds: int = 600
class TaskOrchestrator:
"""
任务编排器 - 负责任务的创建、分配、超时处理和重试
使用Redis作为状态存储,支持多节点协调
"""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.node_heartbeat_key = "orchestrator:nodes"
self.task_prefix = "orchestrator:task:"
self.pending_queue = "orchestrator:pending"
async def submit_task(self, shop_id: str, platform: str, task_type: str, payload: dict) -> str:
"""提交任务到全局队列"""
task_id = f"{task_type}_{shop_id}_{uuid.uuid4().hex[:8]}"
task = OrchestrationTask(
task_id=task_id,
shop_id=shop_id,
platform=platform,
task_type=task_type,
payload=payload
)
# 持久化任务详情
await self.redis.set(
f"{self.task_prefix}{task_id}",
json.dumps(self._task_to_dict(task)),
ex=86400
)
# 推入待处理队列
await self.redis.lpush(self.pending_queue, task_id)
logger.info(f"Task {task_id} submitted for shop {shop_id}")
return task_id
async def assign_task(self, node_id: str) -> Optional[OrchestrationTask]:
"""
执行节点调用此接口领取任务
使用Redis事务保证任务不会被重复分配
"""
task_id = await self.redis.rpop(self.pending_queue)
if not task_id:
return None
task_dict = await self.redis.get(f"{self.task_prefix}{task_id}")
if not task_dict:
return None
task = self._dict_to_task(json.loads(task_dict))
task.state = TaskState.ASSIGNED
task.assigned_node = node_id
# 更新任务状态并设置超时监控
await self.redis.set(
f"{self.task_prefix}{task_id}",
json.dumps(self._task_to_dict(task)),
ex=86400
)
# 记录分配关系,用于超时回收
await self.redis.set(f"assignment:{task_id}", node_id, ex=task.timeout_seconds + 60)
return task
async def report_task_result(self, task_id: str, success: bool, result: dict = None, error: str = None):
"""执行节点上报任务结果"""
task_dict = await self.redis.get(f"{self.task_prefix}{task_id}")
if not task_dict:
return
task = self._dict_to_task(json.loads(task_dict))
if success:
task.state = TaskState.SUCCEEDED
logger.info(f"Task {task_id} succeeded")
else:
if task.retry_count < task.max_retries:
task.retry_count += 1
task.state = TaskState.PENDING
task.assigned_node = None
# 重新入队
await self.redis.lpush(self.pending_queue, task_id)
logger.warning(f"Task {task_id} retry {task.retry_count}")
else:
task.state = TaskState.FAILED
logger.error(f"Task {task_id} failed after {task.max_retries} retries: {error}")
await self.redis.set(
f"{self.task_prefix}{task_id}",
json.dumps(self._task_to_dict(task)),
ex=86400
)
await self.redis.delete(f"assignment:{task_id}")
async def recover_timeout_tasks(self):
"""定时扫描超时未完成的任务,重新分配"""
# 实际使用keys扫描不推荐,生产环境使用Redis SCAN
async for key in self.redis.scan_iter("assignment:*"):
task_id = key.decode().split(":")[1]
ttl = await self.redis.ttl(key)
if ttl <= 0: # 已过期,任务未完成
task_dict = await self.redis.get(f"{self.task_prefix}{task_id}")
if task_dict:
task = self._dict_to_task(json.loads(task_dict))
if task.state == TaskState.ASSIGNED or task.state == TaskState.RUNNING:
# 任务超时未完成,重新入队
task.state = TaskState.PENDING
task.assigned_node = None
await self.redis.set(
f"{self.task_prefix}{task_id}",
json.dumps(self._task_to_dict(task)),
ex=86400
)
await self.redis.lpush(self.pending_queue, task_id)
await self.redis.delete(key)
logger.warning(f"Recovered timeout task {task_id}")
def _task_to_dict(self, task: OrchestrationTask) -> dict:
return {
"task_id": task.task_id,
"shop_id": task.shop_id,
"platform": task.platform,
"task_type": task.task_type,
"payload": task.payload,
"state": task.state.value,
"assigned_node": task.assigned_node,
"retry_count": task.retry_count,
"max_retries": task.max_retries
}
def _dict_to_task(self, d: dict) -> OrchestrationTask:
return OrchestrationTask(
task_id=d["task_id"],
shop_id=d["shop_id"],
platform=d["platform"],
task_type=d["task_type"],
payload=d["payload"],
state=TaskState(d["state"]),
assigned_node=d["assigned_node"],
retry_count=d["retry_count"],
max_retries=d["max_retries"]
)
三、执行节点设计:浏览器实例池是核心
每个执行节点内部,最关键的组件是浏览器实例池。
实例池负责管理一批预先启动的指纹浏览器实例。
任务到达时,从池子里借一个实例,用完归还。
为什么预启动?
因为每个指纹浏览器启动需要2-3秒,加上加载代理、设置指纹的时间。
如果任务来了再启动,响应延迟太高。
实例池把冷启动时间摊到了空闲时段。
# browser_instance_pool.py
import asyncio
from typing import Dict, Optional, List
from dataclasses import dataclass
from datetime import datetime
import subprocess
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class BrowserInstance:
instance_id: str
profile_id: str # 关联的指纹浏览器配置
port: int
process: subprocess.Popen
status: str # idle, busy, stopped
last_used: datetime
shop_id: Optional[str] = None
class BrowserInstancePool:
"""
浏览器实例池 - 管理多个指纹浏览器进程的生命周期
支持预热、借出、归还、健康检查、自动回收
"""
def __init__(self, min_idle: int = 2, max_total: int = 8, browser_binary: str = "./fingerprint_browser"):
self.min_idle = min_idle
self.max_total = max_total
self.browser_binary = browser_binary
self.instances: Dict[str, BrowserInstance] = {}
self.idle_queue = asyncio.Queue()
self._lock = asyncio.Lock()
self._running = True
async def start(self):
"""启动实例池,预热最小空闲实例"""
for i in range(self.min_idle):
await self._create_instance()
asyncio.create_task(self._maintain_pool())
asyncio.create_task(self._health_check())
async def _create_instance(self, profile_id: str = None) -> BrowserInstance:
"""创建一个新的指纹浏览器实例"""
# 实际调用指纹浏览器的启动命令,返回调试端口
# 这里模拟启动过程
instance_id = f"br_{id(self)}_{len(self.instances)}"
port = 9222 + len(self.instances) % 100
# 伪代码:实际需要根据指纹浏览器API启动
# proc = subprocess.Popen([self.browser_binary, f"--remote-debugging-port={port}"])
proc = None # placeholder
instance = BrowserInstance(
instance_id=instance_id,
profile_id=profile_id or f"default_{instance_id}",
port=port,
process=proc,
status="idle",
last_used=datetime.now()
)
self.instances[instance_id] = instance
await self.idle_queue.put(instance_id)
logger.info(f"Created browser instance {instance_id} on port {port}")
return instance
async def acquire(self, shop_id: str, platform: str) -> Optional[BrowserInstance]:
"""
为一个店铺获取一个浏览器实例
优先复用该店铺之前用过的实例(保留登录态)
"""
# 先检查是否有该店铺已关联的空闲实例
async with self._lock:
for inst in self.instances.values():
if inst.status == "idle" and inst.shop_id == shop_id:
inst.status = "busy"
inst.last_used = datetime.now()
logger.info(f"Reused existing instance {inst.instance_id} for shop {shop_id}")
return inst
# 否则从空闲队列拿一个新的
try:
instance_id = await asyncio.wait_for(self.idle_queue.get(), timeout=10.0)
async with self._lock:
inst = self.instances[instance_id]
inst.status = "busy"
inst.shop_id = shop_id
inst.last_used = datetime.now()
return inst
except asyncio.TimeoutError:
# 没有空闲实例,如果总数未达上限则创建新的
if len(self.instances) < self.max_total:
inst = await self._create_instance()
async with self._lock:
inst.status = "busy"
inst.shop_id = shop_id
return inst
else:
logger.error("No idle browser instance and max limit reached")
return None
async def release(self, instance_id: str, clear_cookies: bool = False):
"""归还浏览器实例到池中"""
async with self._lock:
inst = self.instances.get(instance_id)
if not inst:
return
if clear_cookies:
# 通过DevTools Protocol清除该实例的cookies和localStorage
await self._clear_instance_data(inst)
inst.shop_id = None
inst.status = "idle"
inst.last_used = datetime.now()
await self.idle_queue.put(instance_id)
logger.info(f"Released instance {instance_id} back to pool")
async def _maintain_pool(self):
"""维持最小空闲实例数"""
while self._running:
await asyncio.sleep(30)
idle_count = sum(1 for i in self.instances.values() if i.status == "idle")
if idle_count < self.min_idle and len(self.instances) < self.max_total:
await self._create_instance()
logger.info(f"Maintained pool: created new instance, idle={idle_count+1}")
async def _health_check(self):
"""定期检查实例健康状态,杀掉僵死进程"""
while self._running:
await asyncio.sleep(60)
to_remove = []
for inst_id, inst in self.instances.items():
if inst.process and inst.process.poll() is not None:
# 进程已退出
to_remove.append(inst_id)
for inst_id in to_remove:
del self.instances[inst_id]
logger.warning(f"Removed dead instance {inst_id}")
async def _clear_instance_data(self, inst: BrowserInstance):
"""通过CDP清除浏览器数据,避免店铺间数据污染"""
# 实际通过websocket连接chrome devtools protocol执行
# Network.clearBrowserCache, Storage.clearCookies 等
pass
async def stop(self):
"""优雅关闭所有实例"""
self._running = False
for inst in self.instances.values():
if inst.process:
inst.process.terminate()
await asyncio.sleep(1)
if inst.process.poll() is None:
inst.process.kill()
四、影刀RPA与执行节点的集成
执行节点拿到任务和浏览器实例后,需要调用影刀RPA执行具体流程。
我们采用的方式是:影刀RPA流程以命令行方式触发,传入浏览器调试端口和任务参数。
每个RPA流程都是独立的进程,跑完即销毁。
这样即使某个RPA崩溃,也不影响执行节点本身的稳定性。
# execution_node.py
import asyncio
import subprocess
import json
import uuid
from typing import Optional
import aiohttp
class ExecutionNode:
"""
执行节点 - 注册到编排器,领取任务,调用影刀RPA
"""
def __init__(self, node_id: str, orchestrator_url: str, pool: BrowserInstancePool):
self.node_id = node_id
self.orchestrator_url = orchestrator_url
self.pool = pool
self.current_task_id: Optional[str] = None
self.running = False
async def register_and_work(self):
"""注册节点并开始工作循环"""
self.running = True
# 注册心跳
asyncio.create_task(self._heartbeat())
# 工作循环
while self.running:
task = await self._fetch_task()
if task:
await self._execute_task(task)
else:
await asyncio.sleep(2)
async def _fetch_task(self):
"""从编排器领取任务"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.orchestrator_url}/assign",
json={"node_id": self.node_id}
) as resp:
if resp.status == 200:
data = await resp.json()
return data.get("task")
return None
async def _execute_task(self, task_data: dict):
"""执行一个任务"""
task_id = task_data["task_id"]
shop_id = task_data["shop_id"]
platform = task_data["platform"]
task_type = task_data["task_type"]
# 从池中获取浏览器实例
browser_inst = await self.pool.acquire(shop_id, platform)
if not browser_inst:
await self._report_result(task_id, False, error="No browser instance available")
return
try:
# 调用影刀RPA命令行执行
# 影刀提供了命令行工具,可以传入参数启动指定流程
cmd = [
"youdao_rpa_cli",
"run",
"--flow", f"{platform}_{task_type}", # 预先在影刀中录制的流程
"--param", json.dumps({
"debug_port": browser_inst.port,
"shop_id": shop_id,
"profile_id": browser_inst.profile_id,
"payload": task_data.get("payload", {})
}),
"--timeout", "600"
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=650)
success = proc.returncode == 0
result = {"stdout": stdout.decode()}
error = stderr.decode() if not success else None
except asyncio.TimeoutError:
proc.kill()
success = False
error = "RPA execution timeout"
await self._report_result(task_id, success, result, error)
finally:
# 归还浏览器实例,不清除cookies以保持登录态
await self.pool.release(browser_inst.instance_id, clear_cookies=False)
async def _report_result(self, task_id: str, success: bool, result: dict = None, error: str = None):
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.orchestrator_url}/result",
json={
"task_id": task_id,
"success": success,
"result": result,
"error": error,
"node_id": self.node_id
}
)
async def _heartbeat(self):
"""定期发送心跳,表示节点存活"""
while self.running:
await asyncio.sleep(15)
async with aiohttp.ClientSession() as session:
try:
await session.post(
f"{self.orchestrator_url}/heartbeat",
json={"node_id": self.node_id},
timeout=5
)
except:
pass
五、任务状态可恢复:解决“挂了就丢”的痛点
有了编排器+Redis存储,任务就不会丢失了。
恢复流程:
- 编排器启动时,扫描Redis中所有状态为ASSIGNED或RUNNING的任务
- 检查这些任务对应的节点是否还有心跳
- 如果节点心跳超时,则将任务状态重置为PENDING,重新入队
我们当时在线上环境里踩过一次很严重的编排器主节点切换问题。
原因是心跳检测的超时时间设置得太短。
节点网络抖动一下,编排器就认为节点挂了,把任务重新分配。
结果同一个任务在两个节点上同时跑,造成数据重复。
教训:心跳超时时间至少要大于任务最长执行时间。
我们最终把心跳超时设为90秒,节点判定死亡前连续三次心跳丢失才回收任务。
六、监控与告警:没有这两个,系统会悄悄死掉
执行节点挂了,你不会第一时间知道。
浏览器实例池泄漏,内存慢慢涨上去,你会等到服务器报警才发现。
我们监控的核心指标:
- 每个节点的任务成功率、平均耗时
- 浏览器实例池的空闲实例数、总实例数
- 待处理任务队列长度(超过阈值说明执行能力不足)
- 各店铺最后一次成功执行时间(超过24小时未执行则告警)
告警通过飞书机器人推送。
【执行节点告警】 节点 node-03 心跳丢失超过90秒 该节点最后执行任务:shop_1024 订单同步 建议:登录该节点服务器查看RPA进程状态
这种告警文案,运营和开发都能看懂。
七、踩坑清单(真实经验)
最后列几个实际踩过的坑,希望对你有用。
1. 浏览器实例的端口冲突
多个实例启动时,端口需要动态分配并且确保回收后不残留进程。
解决方案:启动前检查端口是否可用,启动后记录PID,节点关闭时遍历kill。
2. 影刀RPA流程的全局变量污染
影刀RPA的全局变量在不同调用之间会残留。
必须在每个流程开始前重置所有全局变量,或者在流程结束时清理。
3. 网络代理切换导致浏览器崩溃
切换代理时,浏览器可能会触发某些安全检测导致崩溃。
最好在创建实例时就绑定代理,整个生命周期不变。
4. Redis连接池耗尽
高并发下,每个任务都操作Redis,连接不够用。
配置合理的最大连接数,使用连接池,避免每次请求都新建连接。
5. 任务日志无限增长
每个任务的stdout/stderr都保存下来,磁盘很快就会满。
只保存最近7天的日志,重要任务单独归档。
八、总结
这套多节点架构从设计到上线,前后迭代了三个月。
最核心的感悟是:调度和控制要集中,执行和资源要分散。
集中控制方便管理状态、做全局决策。
分散执行让系统可以水平扩展,单个节点故障不影响整体。
如果你现在正在从单机调度向多节点演进,建议先从“调度与执行分离”这一步做起。
先把任务队列和RPA执行器拆开,再引入浏览器实例池。
不要一次性做太多改动,容易翻车。
希望这篇文章能给你一些真实的参考。
作者:林焱
