影刀RPA店群自动化性能调优实战:高吞吐量系统设计与瓶颈分析

影刀RPA店群自动化性能调优实战:高吞吐量系统设计与瓶颈分析

自动化系统从“能跑”到“跑得稳”,中间隔着一套运维体系。
从“跑得稳”到“跑得快”,中间又隔着一层性能工程。

我们是在店铺数突破 80 个之后,才开始认真做性能调优的。
当时的情况是:任务积压越来越频繁,单店铺的上架任务从创建到完成,平均耗时从 2 分钟慢慢爬到了 5 分钟。
机器资源还有余量,但任务的吞吐量就是上不去。

不是系统在报错,是它在“慢”中逐步失能。

这篇文章要写的,是我们如何一步步定位瓶颈、优化架构,最终把系统吞吐量提升了接近三倍的过程。
不涉及业务逻辑,全是工程和性能的实战复盘。


一、先定义什么是“性能好”

性能优化最怕的,是没有明确的度量标准。
“感觉变快了”在工程上没有意义。

我们定了三个核心性能指标:

  • 任务吞吐量:系统每小时能完成的任务数(按平台分别统计)。
  • 任务延迟 P50 / P99:从任务入队到完成的时间分布。
  • 资源利用率:执行节点的 CPU、内存、网络带宽使用率,以及浏览器槽位的占用率。

picture.image

picture.image 同时,我们还定了一条红线:
性能优化不能牺牲稳定性和环境隔离。
不能为了提高并发而让多个店铺共用浏览器,那是用安全换速度,划不来。


picture.image

二、找到真正的瓶颈:别靠猜,靠埋点

没有充分测量的优化,约等于凭直觉改代码。
我们在系统的每一个关键节点都加了耗时埋点,打点粒度精确到毫秒。

一条任务的完整时间线,被拆成了五个阶段:

入队 → 节点拉取 → 浏览器就绪 → 流程执行 → 结果回调

每个阶段的耗时都随着日志一起上报到 Elasticsearch。
统计一周数据后,我们发现了一个意料之外的结果:

浏览器就绪阶段(从任务被节点拉取到影刀开始执行流程), 在 P99 场景下占到了总耗时的 35%。

picture.image

picture.image 换句话说,三分之一的延迟,耗在了等浏览器准备好。

picture.image 进一步拆解“浏览器就绪”,又分成三个子阶段:
创建实例(冷启动)、清理环境、附着窗口。
其中冷启动是最大的开销——平均耗时 4.2 秒,P99 到了 11 秒。

这就是我们要砍的第一个瓶颈。


三、浏览器预加载池:把冷启动变成热复用

之前的浏览器实例池,是“被动创建”的——任务来了才创建,任务结束就回收。
这意味着每个任务的初始阶段,都可能撞上冷启动。

优化方案是引入预加载池

每个执行节点在启动后,就预先创建 N 个浏览器实例(数量等于并发槽数),
并让它们保持一个轻量页面(比如 about:blank)常驻。
任务到来时,直接从预加载池中取用,省掉冷启动时间。

class PrewarmedBrowserPool(BrowserPool):
    def __init__(self, max_instances: int = 10):
        super().__init__(max_instances)
        self.prewarm_task = asyncio.create_task(self._prewarm())

    async def _prewarm(self):
        for i in range(self.max_instances):
            inst = await self._create_instance(None, default_config)
            self.idle.put_nowait(inst)

    async def acquire(self, shop_id, config):
        inst = await self.idle.get()
        # 给实例绑定店铺配置
        inst.assign_shop(shop_id, config)
        self.active[shop_id] = inst
        return inst

改造之后,“浏览器就绪”的平均耗时从 4.5 秒降到了 0.7 秒。
这里需要注意的是,预加载的实例虽然没有绑定店铺,但已经占用了内存。
所以预加载的数量必须和节点并发槽数严格一致,不能贪多。

真正跑了几天后,我们发现预加载池还有一个隐藏的好处:
Chromium 的 V8 引擎会在空闲时做 JIT 预热,预加载的实例比冷启动的实例在后续页面执行 JS 时响应更快。
这是一笔意料之外的收益。


四、页面加载策略:不该等的东西绝对不等

影刀流程在进入一个新页面时,默认会等待页面完全加载——包括所有图片、CSS、第三方追踪脚本。
在店群场景下,一个商品编辑页可能嵌了十多个统计脚本和广告像素,等它们全部完成,2-3 秒就没了。

我们改了两件事:

  1. 影刀流程层面:把页面加载策略从“等待完成”改为“等待 DOM 就绪”,即 document.readyState === 'interactive' 即可开始操作。
  2. 浏览器启动参数:通过 --block-new-web-contents 阻止页面弹出新标签,通过 uBlock Origin 扩展拦截已知的统计域名。

但扩展拦截要谨慎。
我们只拦截了非关键的第三方域名,平台自身的资源一个不敢动,生怕触发风控。

这两项调整,让“流程执行”阶段的平均耗时减少了约 20%。
对于一些图片密集型页面,效果更明显。


五、Python 调度层的异步化重构

优化完浏览器端之后,瓶颈开始上移到调度层。
早期的 Python 调度器,虽然用了 asyncio,但部分数据库操作和 Redis 调用是同步的,用 loop.run_in_executor 勉强撑着。

当每小时任务量超过 2000 条时,调度器的 CPU 使用率开始接近单核极限,任务分发出现可见延迟。

我们花了一周时间做了一次彻底的异步化改造:

  • 数据库驱动从同步 SQLAlchemy 换成 asyncpg + SQLAlchemy[asyncio]
  • Redis 客户端换成 redis-py 的异步模式。
  • 所有文件 I/O 操作统一走 aiofiles

重构之后,调度器在同等硬件上支撑的任务量翻了 2.5 倍。
CPU 不再是瓶颈,瓶颈重新回到了执行节点的浏览器吞吐能力。

class AsyncTaskDispatcher:
    def __init__(self, redis, db_session_factory):
        self.redis = redis
        self.db_factory = db_session_factory

    async def dispatch(self, task: Task):
        async with self.db_factory() as db:
            task.status = TaskStatus.QUEUED
            await db.commit()
        queue_key = f"rpa:queue:{task.platform}"
        await self.redis.xadd(queue_key, task.to_payload(), maxlen=10000)

异步化之后还有一个额外的好处:错误处理变得更清晰了。
async/await 的调用栈比回调地狱好读太多,代码维护成本反而下降了。


六、队列消费的并发模型优化

执行节点的守护进程最初是单协程消费 Redis Stream。
一条条拉取、执行、ACK,虽然稳定,但吞吐量被限制在单流程的串行速度上。

改成多协程并发消费后,吞吐量线性增长——直到浏览器槽位数成为瓶颈。

并发消费的实现基于 asyncio.Semaphore 控制并发度,和节点最大槽位数保持一致:

class StreamConsumer:
    def __init__(self, redis, executor, max_concurrency):
        self.redis = redis
        self.executor = executor
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def run(self, stream_key: str, consumer_group: str, consumer_name: str):
        while True:
            messages = await self.redis.xreadgroup(
                consumer_group, consumer_name, {stream_key: ">"}, count=10, block=1000
            )
            for msg_id, payload in messages.get(stream_key, []):
                asyncio.create_task(self._handle(msg_id, payload))

    async def _handle(self, msg_id, payload):
        async with self.semaphore:
            try:
                task = Task.from_payload(payload)
                result = await self.executor.execute(task)
                await self.redis.xack(stream_key, consumer_group, msg_id)
                await self._report_result(task, result)
            except Exception as e:
                # 失败不 ACK,消息会留在 pending 中等待重试
                logger.error(f"Task {msg_id} failed: {e}")

这里有一个容易被忽视的细节:count=10 一次拉取多条消息,用 create_task 并行处理。
但如果某个任务执行特别慢,其他任务已经在槽内排队,不会额外抢占资源——Semaphore_handle 里统一控流。


七、代理 IP 的连接预热

在性能分析时,我们发现网络连接建立的耗时在 P99 场景下被放大了。
代理 IP 和平台服务器之间的 TCP 握手 + TLS 协商,冷连接需要 800ms 到 1.5s。

我们在浏览器预加载的基础上,加了一层网络预热:
在浏览器实例绑定店铺后、流程正式执行前,先访问一个平台的静态资源(如 logo 图片),提前完成 TCP 和 TLS 握手。

这个操作被放在 BrowserInstance.assign_shop 方法里,异步执行,不阻塞任务调度。

async def warmup_connection(self, platform_url: str):
    # 用 headless 方式预加载一个轻量资源,预热 TLS 会话
    page = await self.browser.new_page()
    await page.goto(platform_url, wait_until="domcontentloaded", timeout=10000)
    await page.close()

增加预热后,流程执行阶段的首次网络请求延迟明显降低,P50 降低了 200ms,P99 降低了 600ms。
这个收益在高并发场景下会被放大——因为高并发时,网络连接的状态会更频繁地从冷启动开始。


八、结果回调的批量化

流程执行完毕后,Python 守护进程需要把结果写回数据库,并更新任务状态。
早期每个任务一结束就单独写一次数据库。
任务量一上来,数据库的写入压力就成了新的瓶颈,出现了连接等待和死锁。

我们把结果回调改成了批量写入。
守护进程维护一个内存缓冲队列,收集一批结果(最多 50 条或 500ms 时间窗口),然后一次性 INSERT ... ON CONFLICT UPDATE

class BatchResultWriter:
    def __init__(self, db_session_factory, batch_size=50, flush_interval=0.5):
        self.db_factory = db_session_factory
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.lock = asyncio.Lock()
        asyncio.create_task(self._periodic_flush())

    async def enqueue(self, task_id, result):
        async with self.lock:
            self.buffer.append((task_id, result))
            if len(self.buffer) >= self.batch_size:
                await self._flush()

    async def _periodic_flush(self):
        while True:
            await asyncio.sleep(self.flush_interval)
            async with self.lock:
                if self.buffer:
                    await self._flush()

    async def _flush(self):
        batch = self.buffer[:]
        self.buffer.clear()
        async with self.db_factory() as db:
            await db.execute(
                "INSERT INTO task_results (task_id, status, data) VALUES ... ON CONFLICT (task_id) DO UPDATE ...",
                batch
            )
            await db.commit()

这个改动让数据库的写入 TPS 下降了约 70%,任务状态更新的延迟反而从平均 200ms 降到了 50ms。
因为批量操作减少了事务开销。


九、真实收益:数据说话

把以上优化全部叠加上线之后,我们对比了优化前一周和优化后一周的数据:

指标优化前优化后提升
每小时任务吞吐量4201150+174%
任务延迟 P503.2min1.4min-56%
任务延迟 P9911.5min4.2min-63%
浏览器冷启动占比35%8%-
数据库写入延迟 P50200ms50ms-75%

机器数量没有增加,店铺数量反而从 80 增加到了 120。
系统扛住了,而且跑得更快。

很多团队的优化工作,都在出问题之后才匆忙开始。
我们比较幸运,是在系统还没崩溃之前就主动做了这轮调优。
但即使如此,仍然有几个深夜是盯着 Grafana 曲线度过的。


十、写在最后

性能优化不是一次性运动,而是一个持续观测、持续调整的过程。
每周我们都会看一次任务延迟的 P99 曲线,看看有没有哪类任务的耗时在悄悄变长。

有时候是平台改版导致页面变慢,有时候是代理 IP 池里混进了高延迟节点,有时候是新流程里忘了加随机等待导致重试风暴。
这些问题,如果没有性能基线和持续监控,永远发现不了。

如果你的系统现在还在“能跑”阶段,希望能从这篇文章里带走一个观点:
早点把耗时埋点、性能基线、瓶颈分析这三件事建立起来。
它们不会立刻让你的系统变快,但会让你知道,系统到底慢在哪里。

而知道问题在哪,问题就已经解决了一半。

作者:林焱

0
0
0
0
评论
未登录
暂无评论