影刀RPA店群自动化系统架构:多节点任务编排与浏览器实例池设计

影刀RPA店群自动化系统架构:多节点任务编排与浏览器实例池设计

做店群自动化这几年,有一件事我印象特别深。

某个周一早上,运营过来说有三十多个店铺的订单没同步。

我去查日志,发现调度中心挂了。

原因是一个店铺的RPA任务卡死,把整个浏览器实例池拖垮了。

单点故障,全盘瘫痪。

那天之后,我们彻底重构了调度层。

从单机调度演进到了多节点任务编排 + 浏览器实例池的架构。

picture.image

picture.image

picture.image 这篇文章就把这套设计完整拆解一遍。

适用场景:拼多多、TEMU、TikTok Shop 店群自动化。 核心目标:高可用、可扩展、资源隔离。


一、单机调度的三个致命缺陷

第一版系统是单机调度。

picture.image

一台服务器上跑着调度器、任务队列、浏览器实例、RPA执行器。

看起来很紧凑,跑起来全是坑。

缺陷一:资源争抢不可控。

picture.image 十个浏览器实例同时跑,CPU飙到90%,内存经常OOM。

更麻烦的是,某个店铺的RPA死循环,会把整台机器的资源耗尽。

缺陷二:无法水平扩展。

店铺数量从30涨到150,单机根本扛不住。

换更大的服务器?成本翻倍,收益递减。

picture.image

缺陷三:故障恢复靠手。

调度进程挂了,所有正在运行的任务丢失。

需要人工查哪个店铺跑到哪一步,再手动补跑。

很多团队最开始都会忽略这里。

等到店铺数量翻倍,才会发现单机调度是个死胡同。

我们的解法是:调度与执行分离 + 浏览器实例池化 + 任务可恢复


二、多节点架构总览

先看整体分层:

控制层(1个):任务编排器、状态存储、监控告警
执行层(N个):执行节点、浏览器实例池、影刀RPA引擎
资源层:代理池、账号配置、浏览器指纹库

控制层负责调度决策,不直接运行RPA。

执行节点只负责跑任务,不关心任务从哪里来。

这种解耦带来的好处很明显:

  • 可以动态增减执行节点
  • 单个节点挂掉不影响整体
  • 任务可以在节点之间迁移
# orchestrator.py
import asyncio
import json
import uuid
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import redis.asyncio as redis
import logging

logger = logging.getLogger(__name__)

class TaskState(Enum):
    PENDING = "pending"
    ASSIGNED = "assigned"
    RUNNING = "running"
    SUCCEEDED = "succeeded"
    FAILED = "failed"
    TIMEOUT = "timeout"

@dataclass
class OrchestrationTask:
    task_id: str
    shop_id: str
    platform: str
    task_type: str  # sync_orders, refresh_products, batch_reply
    payload: dict
    state: TaskState = TaskState.PENDING
    assigned_node: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3
    timeout_seconds: int = 600

class TaskOrchestrator:
    """
    任务编排器 - 负责任务的创建、分配、超时处理和重试
    使用Redis作为状态存储,支持多节点协调
    """
    
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.node_heartbeat_key = "orchestrator:nodes"
        self.task_prefix = "orchestrator:task:"
        self.pending_queue = "orchestrator:pending"
        
    async def submit_task(self, shop_id: str, platform: str, task_type: str, payload: dict) -> str:
        """提交任务到全局队列"""
        task_id = f"{task_type}_{shop_id}_{uuid.uuid4().hex[:8]}"
        task = OrchestrationTask(
            task_id=task_id,
            shop_id=shop_id,
            platform=platform,
            task_type=task_type,
            payload=payload
        )
        # 持久化任务详情
        await self.redis.set(
            f"{self.task_prefix}{task_id}",
            json.dumps(self._task_to_dict(task)),
            ex=86400
        )
        # 推入待处理队列
        await self.redis.lpush(self.pending_queue, task_id)
        logger.info(f"Task {task_id} submitted for shop {shop_id}")
        return task_id
    
    async def assign_task(self, node_id: str) -> Optional[OrchestrationTask]:
        """
        执行节点调用此接口领取任务
        使用Redis事务保证任务不会被重复分配
        """
        task_id = await self.redis.rpop(self.pending_queue)
        if not task_id:
            return None
        
        task_dict = await self.redis.get(f"{self.task_prefix}{task_id}")
        if not task_dict:
            return None
        
        task = self._dict_to_task(json.loads(task_dict))
        task.state = TaskState.ASSIGNED
        task.assigned_node = node_id
        
        # 更新任务状态并设置超时监控
        await self.redis.set(
            f"{self.task_prefix}{task_id}",
            json.dumps(self._task_to_dict(task)),
            ex=86400
        )
        # 记录分配关系,用于超时回收
        await self.redis.set(f"assignment:{task_id}", node_id, ex=task.timeout_seconds + 60)
        
        return task
    
    async def report_task_result(self, task_id: str, success: bool, result: dict = None, error: str = None):
        """执行节点上报任务结果"""
        task_dict = await self.redis.get(f"{self.task_prefix}{task_id}")
        if not task_dict:
            return
        
        task = self._dict_to_task(json.loads(task_dict))
        
        if success:
            task.state = TaskState.SUCCEEDED
            logger.info(f"Task {task_id} succeeded")
        else:
            if task.retry_count < task.max_retries:
                task.retry_count += 1
                task.state = TaskState.PENDING
                task.assigned_node = None
                # 重新入队
                await self.redis.lpush(self.pending_queue, task_id)
                logger.warning(f"Task {task_id} retry {task.retry_count}")
            else:
                task.state = TaskState.FAILED
                logger.error(f"Task {task_id} failed after {task.max_retries} retries: {error}")
        
        await self.redis.set(
            f"{self.task_prefix}{task_id}",
            json.dumps(self._task_to_dict(task)),
            ex=86400
        )
        await self.redis.delete(f"assignment:{task_id}")
    
    async def recover_timeout_tasks(self):
        """定时扫描超时未完成的任务,重新分配"""
        # 实际使用keys扫描不推荐,生产环境使用Redis SCAN
        async for key in self.redis.scan_iter("assignment:*"):
            task_id = key.decode().split(":")[1]
            ttl = await self.redis.ttl(key)
            if ttl <= 0:  # 已过期,任务未完成
                task_dict = await self.redis.get(f"{self.task_prefix}{task_id}")
                if task_dict:
                    task = self._dict_to_task(json.loads(task_dict))
                    if task.state == TaskState.ASSIGNED or task.state == TaskState.RUNNING:
                        # 任务超时未完成,重新入队
                        task.state = TaskState.PENDING
                        task.assigned_node = None
                        await self.redis.set(
                            f"{self.task_prefix}{task_id}",
                            json.dumps(self._task_to_dict(task)),
                            ex=86400
                        )
                        await self.redis.lpush(self.pending_queue, task_id)
                        await self.redis.delete(key)
                        logger.warning(f"Recovered timeout task {task_id}")
    
    def _task_to_dict(self, task: OrchestrationTask) -> dict:
        return {
            "task_id": task.task_id,
            "shop_id": task.shop_id,
            "platform": task.platform,
            "task_type": task.task_type,
            "payload": task.payload,
            "state": task.state.value,
            "assigned_node": task.assigned_node,
            "retry_count": task.retry_count,
            "max_retries": task.max_retries
        }
    
    def _dict_to_task(self, d: dict) -> OrchestrationTask:
        return OrchestrationTask(
            task_id=d["task_id"],
            shop_id=d["shop_id"],
            platform=d["platform"],
            task_type=d["task_type"],
            payload=d["payload"],
            state=TaskState(d["state"]),
            assigned_node=d["assigned_node"],
            retry_count=d["retry_count"],
            max_retries=d["max_retries"]
        )

三、执行节点设计:浏览器实例池是核心

每个执行节点内部,最关键的组件是浏览器实例池

实例池负责管理一批预先启动的指纹浏览器实例。

任务到达时,从池子里借一个实例,用完归还。

为什么预启动?

因为每个指纹浏览器启动需要2-3秒,加上加载代理、设置指纹的时间。

如果任务来了再启动,响应延迟太高。

实例池把冷启动时间摊到了空闲时段。

# browser_instance_pool.py
import asyncio
from typing import Dict, Optional, List
from dataclasses import dataclass
from datetime import datetime
import subprocess
import json
import logging

logger = logging.getLogger(__name__)

@dataclass
class BrowserInstance:
    instance_id: str
    profile_id: str  # 关联的指纹浏览器配置
    port: int
    process: subprocess.Popen
    status: str  # idle, busy, stopped
    last_used: datetime
    shop_id: Optional[str] = None

class BrowserInstancePool:
    """
    浏览器实例池 - 管理多个指纹浏览器进程的生命周期
    支持预热、借出、归还、健康检查、自动回收
    """
    
    def __init__(self, min_idle: int = 2, max_total: int = 8, browser_binary: str = "./fingerprint_browser"):
        self.min_idle = min_idle
        self.max_total = max_total
        self.browser_binary = browser_binary
        self.instances: Dict[str, BrowserInstance] = {}
        self.idle_queue = asyncio.Queue()
        self._lock = asyncio.Lock()
        self._running = True
    
    async def start(self):
        """启动实例池,预热最小空闲实例"""
        for i in range(self.min_idle):
            await self._create_instance()
        asyncio.create_task(self._maintain_pool())
        asyncio.create_task(self._health_check())
    
    async def _create_instance(self, profile_id: str = None) -> BrowserInstance:
        """创建一个新的指纹浏览器实例"""
        # 实际调用指纹浏览器的启动命令,返回调试端口
        # 这里模拟启动过程
        instance_id = f"br_{id(self)}_{len(self.instances)}"
        port = 9222 + len(self.instances) % 100
        
        # 伪代码:实际需要根据指纹浏览器API启动
        # proc = subprocess.Popen([self.browser_binary, f"--remote-debugging-port={port}"])
        proc = None  # placeholder
        
        instance = BrowserInstance(
            instance_id=instance_id,
            profile_id=profile_id or f"default_{instance_id}",
            port=port,
            process=proc,
            status="idle",
            last_used=datetime.now()
        )
        self.instances[instance_id] = instance
        await self.idle_queue.put(instance_id)
        logger.info(f"Created browser instance {instance_id} on port {port}")
        return instance
    
    async def acquire(self, shop_id: str, platform: str) -> Optional[BrowserInstance]:
        """
        为一个店铺获取一个浏览器实例
        优先复用该店铺之前用过的实例(保留登录态)
        """
        # 先检查是否有该店铺已关联的空闲实例
        async with self._lock:
            for inst in self.instances.values():
                if inst.status == "idle" and inst.shop_id == shop_id:
                    inst.status = "busy"
                    inst.last_used = datetime.now()
                    logger.info(f"Reused existing instance {inst.instance_id} for shop {shop_id}")
                    return inst
        
        # 否则从空闲队列拿一个新的
        try:
            instance_id = await asyncio.wait_for(self.idle_queue.get(), timeout=10.0)
            async with self._lock:
                inst = self.instances[instance_id]
                inst.status = "busy"
                inst.shop_id = shop_id
                inst.last_used = datetime.now()
                return inst
        except asyncio.TimeoutError:
            # 没有空闲实例,如果总数未达上限则创建新的
            if len(self.instances) < self.max_total:
                inst = await self._create_instance()
                async with self._lock:
                    inst.status = "busy"
                    inst.shop_id = shop_id
                    return inst
            else:
                logger.error("No idle browser instance and max limit reached")
                return None
    
    async def release(self, instance_id: str, clear_cookies: bool = False):
        """归还浏览器实例到池中"""
        async with self._lock:
            inst = self.instances.get(instance_id)
            if not inst:
                return
            
            if clear_cookies:
                # 通过DevTools Protocol清除该实例的cookies和localStorage
                await self._clear_instance_data(inst)
                inst.shop_id = None
            
            inst.status = "idle"
            inst.last_used = datetime.now()
            await self.idle_queue.put(instance_id)
            logger.info(f"Released instance {instance_id} back to pool")
    
    async def _maintain_pool(self):
        """维持最小空闲实例数"""
        while self._running:
            await asyncio.sleep(30)
            idle_count = sum(1 for i in self.instances.values() if i.status == "idle")
            if idle_count < self.min_idle and len(self.instances) < self.max_total:
                await self._create_instance()
                logger.info(f"Maintained pool: created new instance, idle={idle_count+1}")
    
    async def _health_check(self):
        """定期检查实例健康状态,杀掉僵死进程"""
        while self._running:
            await asyncio.sleep(60)
            to_remove = []
            for inst_id, inst in self.instances.items():
                if inst.process and inst.process.poll() is not None:
                    # 进程已退出
                    to_remove.append(inst_id)
            for inst_id in to_remove:
                del self.instances[inst_id]
                logger.warning(f"Removed dead instance {inst_id}")
    
    async def _clear_instance_data(self, inst: BrowserInstance):
        """通过CDP清除浏览器数据,避免店铺间数据污染"""
        # 实际通过websocket连接chrome devtools protocol执行
        # Network.clearBrowserCache, Storage.clearCookies 等
        pass
    
    async def stop(self):
        """优雅关闭所有实例"""
        self._running = False
        for inst in self.instances.values():
            if inst.process:
                inst.process.terminate()
                await asyncio.sleep(1)
                if inst.process.poll() is None:
                    inst.process.kill()

四、影刀RPA与执行节点的集成

执行节点拿到任务和浏览器实例后,需要调用影刀RPA执行具体流程。

我们采用的方式是:影刀RPA流程以命令行方式触发,传入浏览器调试端口和任务参数。

每个RPA流程都是独立的进程,跑完即销毁。

这样即使某个RPA崩溃,也不影响执行节点本身的稳定性。

# execution_node.py
import asyncio
import subprocess
import json
import uuid
from typing import Optional
import aiohttp

class ExecutionNode:
    """
    执行节点 - 注册到编排器,领取任务,调用影刀RPA
    """
    
    def __init__(self, node_id: str, orchestrator_url: str, pool: BrowserInstancePool):
        self.node_id = node_id
        self.orchestrator_url = orchestrator_url
        self.pool = pool
        self.current_task_id: Optional[str] = None
        self.running = False
    
    async def register_and_work(self):
        """注册节点并开始工作循环"""
        self.running = True
        # 注册心跳
        asyncio.create_task(self._heartbeat())
        # 工作循环
        while self.running:
            task = await self._fetch_task()
            if task:
                await self._execute_task(task)
            else:
                await asyncio.sleep(2)
    
    async def _fetch_task(self):
        """从编排器领取任务"""
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.orchestrator_url}/assign",
                json={"node_id": self.node_id}
            ) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    return data.get("task")
        return None
    
    async def _execute_task(self, task_data: dict):
        """执行一个任务"""
        task_id = task_data["task_id"]
        shop_id = task_data["shop_id"]
        platform = task_data["platform"]
        task_type = task_data["task_type"]
        
        # 从池中获取浏览器实例
        browser_inst = await self.pool.acquire(shop_id, platform)
        if not browser_inst:
            await self._report_result(task_id, False, error="No browser instance available")
            return
        
        try:
            # 调用影刀RPA命令行执行
            # 影刀提供了命令行工具,可以传入参数启动指定流程
            cmd = [
                "youdao_rpa_cli",
                "run",
                "--flow", f"{platform}_{task_type}",  # 预先在影刀中录制的流程
                "--param", json.dumps({
                    "debug_port": browser_inst.port,
                    "shop_id": shop_id,
                    "profile_id": browser_inst.profile_id,
                    "payload": task_data.get("payload", {})
                }),
                "--timeout", "600"
            ]
            
            proc = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            try:
                stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=650)
                success = proc.returncode == 0
                result = {"stdout": stdout.decode()}
                error = stderr.decode() if not success else None
            except asyncio.TimeoutError:
                proc.kill()
                success = False
                error = "RPA execution timeout"
            
            await self._report_result(task_id, success, result, error)
            
        finally:
            # 归还浏览器实例,不清除cookies以保持登录态
            await self.pool.release(browser_inst.instance_id, clear_cookies=False)
    
    async def _report_result(self, task_id: str, success: bool, result: dict = None, error: str = None):
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.orchestrator_url}/result",
                json={
                    "task_id": task_id,
                    "success": success,
                    "result": result,
                    "error": error,
                    "node_id": self.node_id
                }
            )
    
    async def _heartbeat(self):
        """定期发送心跳,表示节点存活"""
        while self.running:
            await asyncio.sleep(15)
            async with aiohttp.ClientSession() as session:
                try:
                    await session.post(
                        f"{self.orchestrator_url}/heartbeat",
                        json={"node_id": self.node_id},
                        timeout=5
                    )
                except:
                    pass

五、任务状态可恢复:解决“挂了就丢”的痛点

有了编排器+Redis存储,任务就不会丢失了。

恢复流程:

  1. 编排器启动时,扫描Redis中所有状态为ASSIGNED或RUNNING的任务
  2. 检查这些任务对应的节点是否还有心跳
  3. 如果节点心跳超时,则将任务状态重置为PENDING,重新入队

我们当时在线上环境里踩过一次很严重的编排器主节点切换问题。

原因是心跳检测的超时时间设置得太短。

节点网络抖动一下,编排器就认为节点挂了,把任务重新分配。

结果同一个任务在两个节点上同时跑,造成数据重复。

教训:心跳超时时间至少要大于任务最长执行时间。

我们最终把心跳超时设为90秒,节点判定死亡前连续三次心跳丢失才回收任务。


六、监控与告警:没有这两个,系统会悄悄死掉

执行节点挂了,你不会第一时间知道。

浏览器实例池泄漏,内存慢慢涨上去,你会等到服务器报警才发现。

我们监控的核心指标:

  • 每个节点的任务成功率、平均耗时
  • 浏览器实例池的空闲实例数、总实例数
  • 待处理任务队列长度(超过阈值说明执行能力不足)
  • 各店铺最后一次成功执行时间(超过24小时未执行则告警)

告警通过飞书机器人推送。

【执行节点告警】 节点 node-03 心跳丢失超过90秒 该节点最后执行任务:shop_1024 订单同步 建议:登录该节点服务器查看RPA进程状态

这种告警文案,运营和开发都能看懂。


七、踩坑清单(真实经验)

最后列几个实际踩过的坑,希望对你有用。

1. 浏览器实例的端口冲突

多个实例启动时,端口需要动态分配并且确保回收后不残留进程。

解决方案:启动前检查端口是否可用,启动后记录PID,节点关闭时遍历kill。

2. 影刀RPA流程的全局变量污染

影刀RPA的全局变量在不同调用之间会残留。

必须在每个流程开始前重置所有全局变量,或者在流程结束时清理。

3. 网络代理切换导致浏览器崩溃

切换代理时,浏览器可能会触发某些安全检测导致崩溃。

最好在创建实例时就绑定代理,整个生命周期不变。

4. Redis连接池耗尽

高并发下,每个任务都操作Redis,连接不够用。

配置合理的最大连接数,使用连接池,避免每次请求都新建连接。

5. 任务日志无限增长

每个任务的stdout/stderr都保存下来,磁盘很快就会满。

只保存最近7天的日志,重要任务单独归档。


八、总结

这套多节点架构从设计到上线,前后迭代了三个月。

最核心的感悟是:调度和控制要集中,执行和资源要分散。

集中控制方便管理状态、做全局决策。

分散执行让系统可以水平扩展,单个节点故障不影响整体。

如果你现在正在从单机调度向多节点演进,建议先从“调度与执行分离”这一步做起。

先把任务队列和RPA执行器拆开,再引入浏览器实例池。

不要一次性做太多改动,容易翻车。

希望这篇文章能给你一些真实的参考。


作者:林焱

0
0
0
0
评论
未登录
暂无评论