凌晨两点,我看着任务管理器里22个Chrome窗口整整齐齐地跑着,内存占用稳定在14GB,CPU 67%。那一刻我知道:成了。
三个月前,同样凌晨两点,我对着屏幕骂娘。
影刀RPA开了18个窗口,内存飙到21GB,然后整个系统卡死。鼠标能动,但点哪里都没反应。强行重启后,三个店铺的cookie丢了,两个店铺的订单同步失败。
客户在微信里发了一串语音,我一条都没敢听。
那是Alien项目启动前的最后一天。第二天我开始写第一行代码。
今天这篇文章,我不讲虚的。直接上硬核:如何用Python给影刀RPA造一个能扛住100+店铺并发的“底盘”。
全文超过6000字,包含完整的环境隔离实现、调度器源码、以及我踩过的7个致命坑。
如果你是店群从业者,你会看到一套能把人力成本砍掉70%的系统。 如果你是开发者,你会看到一个工业级RPA调度架构从0到1的全过程。
准备好了?我们开始。
一、店群并发卡死:不是电脑不行,是架构错了
先还原一个真实场景。
你有一台32GB内存、8核i7的电脑。你开了20个Chrome窗口,每个窗口登录一个拼多多店铺。影刀RPA在这20个窗口里同时跑“上架商品”流程。
前10分钟一切正常。第15分钟开始,某个窗口白屏。第18分钟,另一个窗口卡在“正在上传图片”。第22分钟,鼠标指针开始转圈。然后,死机。
你以为是电脑配置不够。你换了64GB的机器,同样的问题,只是延后了十分钟。
根本原因不是内存大小,而是资源竞争和垃圾回收机制。
影刀RPA的每个流程都是一个独立进程。当20个流程同时运行时,它们会争夺CPU、磁盘IO、网络带宽。更糟糕的是,影刀打开的每个Chrome窗口都会在后台疯狂写日志和缓存,磁盘队列长度动不动就冲到10以上。
更隐蔽的问题:Chrome的user-data-dir如果被多个进程同时读写,会导致SQLite数据库锁冲突,然后整个窗口崩溃。
所以市面上的“多开工具”几乎都在忽悠。它们只管打开窗口,不管资源调度和隔离。
我需要的是一个中间层。它负责:
- 为每个店铺分配独立的
user-data-dir,完全隔离 - 控制同时运行的窗口数量(比如22个),超过的排队
- 监控每个窗口的健康状态,死了自动重启
- 回收任务结束后的资源,防止内存泄漏
这个中间层,就是Alien。
二、Alien系统概览:三层架构,专治并发
画个图(文字描述):
用户界面层 (PyQt6)
↓
调度核心层 (Python)
├── 并发控制器 (Semaphore + 队列)
├── 环境管理器 (profile生命周期)
└── 编排引擎 (DAG执行器)
↓
执行层 (影刀RPA + Chrome实例)
用户只操作界面:导入店铺、配置流程、点击“开始”。 底层的事情,Alien全包了。
下面拆解三个核心模块,每个模块我都会放出关键代码。
三、模块A:环境隔离矩阵——从根源杜绝串号和资源冲突
3.1 UI设计:让老板一眼看懂“我的店都在哪”
Alien的环境管理界面,是一个“卡片墙”。
每张卡片代表一个店铺环境。卡片上显示:
- 店铺名称(可编辑)
- 平台图标(TK/拼多多/TEMU/Shopee)
- 代理IP的国家国旗
- 一个彩色圆点:绿色=空闲,黄色=执行中,红色=异常
- 最后活动时间(比如“5分钟前”)
左侧是分组树。老板可以创建“美区主店”、“美区分销店”、“东南亚测试店”等分组。拖动卡片即可移动店铺到不同分组。
顶部工具栏有四个核心按钮:
- 批量导入:下载Excel模板,填好店铺信息,一键生成所有环境
- 批量校验:检查所有店铺的cookie是否有效,失效的标红
- 批量换代理:选中一批店铺,一键更换代理IP
- 手动打开:选中一个或几个环境,直接启动浏览器(带指纹和代理)
底部有一个实时状态栏:当前并发: 0/22 | 今日任务: 47 | 内存: 6.2/32GB
老板跟我说过一句话:“我不用猜哪个店在干什么,看一眼颜色就知道。”
3.2 技术实现:每个环境都是独立的“虚拟机”
创建环境的代码,我重构过四版。这是最终稳定版:
import os
import json
import random
import shutil
from pathlib import Path
from typing import Optional
import sqlite3
class AlienEnvironment:
"""单个店铺的运行环境"""
PROFILES_ROOT = Path("./alien_profiles")
BLANK_PROFILE = Path("./assets/blank_profile") # 干净的Chrome用户数据模板
def __init__(self, env_id: str, name: str, platform: str, proxy: Optional[str] = None):
self.env_id = env_id
self.name = name
self.platform = platform
self.proxy = proxy
self.profile_path = self.PROFILES_ROOT / env_id
self.status = "idle" # idle, running, error
def create(self) -> bool:
"""创建隔离环境,生成独立指纹"""
if self.profile_path.exists():
# 如果已存在,先备份再删除(防止残留损坏数据)
backup = self.profile_path.with_suffix(".bak")
shutil.move(str(self.profile_path), str(backup))
shutil.rmtree(str(backup), ignore_errors=True)
# 从空白模板复制
shutil.copytree(self.BLANK_PROFILE, self.profile_path)
# 注入差异化指纹
self._inject_fingerprint()
# 写入代理配置
if self.proxy:
(self.profile_path / "proxy.txt").write_text(self.proxy, encoding="utf-8")
# 写入环境元数据
meta = {
"env_id": self.env_id,
"name": self.name,
"platform": self.platform,
"created_at": str(Path.cwd()),
}
(self.profile_path / "meta.json").write_text(json.dumps(meta, indent=2))
return True
def _inject_fingerprint(self):
"""修改Preferences文件,伪造浏览器指纹"""
prefs_file = self.profile_path / "Default" / "Preferences"
if not prefs_file.exists():
return
data = json.loads(prefs_file.read_text(encoding="utf-8"))
# 1. 屏幕分辨率随机化
widths = [1366, 1440, 1536, 1600, 1920, 2560]
width = random.choice(widths)
height = int(width * 0.5625) + random.randint(-20, 20)
data.setdefault("web_prefs", {})["screen"] = {
"width": width,
"height": height,
"availWidth": width,
"availHeight": height - 40,
}
# 2. 硬件并发数随机(2~12)
data["hardware_concurrency"] = random.choice([2, 4, 6, 8, 12])
# 3. WebGL渲染器随机(避免统一显示“SwiftShader”)
renderers = ["ANGLE (NVIDIA)", "ANGLE (Intel)", "SwiftShader", "Google Inc."]
data.setdefault("webgl", {})["renderer"] = random.choice(renderers)
# 4. 时区模拟(根据代理IP位置,这里简化)
data["timezone"] = random.choice(["America/New_York", "America/Los_Angeles", "Europe/London", "Asia/Tokyo"])
prefs_file.write_text(json.dumps(data, indent=2), encoding="utf-8")
启动一个环境的浏览器,并让影刀能够连接:
import subprocess
import socket
import time
def launch_environment(env: AlienEnvironment, debug_port: int) -> subprocess.Popen:
"""启动Chrome并开启远程调试端口,供影刀连接"""
chrome_exe = "C:/Program Files/Google/Chrome/Application/chrome.exe"
if not Path(chrome_exe).exists():
chrome_exe = "./assets/chrome_portable/chrome.exe"
proxy = env.proxy
cmd = [
chrome_exe,
f"--user-data-dir={env.profile_path}",
f"--remote-debugging-port={debug_port}",
"--no-first-run",
"--no-default-browser-check",
"--disable-blink-features=AutomationControlled",
"--disable-features=TranslateUI,PasswordImport,OptimizationHints",
"--disable-background-networking",
"--disable-sync",
"--disable-default-apps",
"--no-sandbox",
]
if proxy:
cmd.append(f"--proxy-server={proxy}")
# 关键:创建进程时不显示控制台窗口
proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
)
# 等待端口就绪
for _ in range(30):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if sock.connect_ex(('127.0.0.1', debug_port)) == 0:
sock.close()
return proc
sock.close()
time.sleep(0.2)
raise TimeoutError(f"环境 {env.env_id} 启动超时")
3.3 踩坑:并发写同一个profile导致窗口白屏
早期版本,我犯过一个低级错误:两个任务不小心用了同一个user-data-dir。
起因是批量导入时,如果Excel里的店铺ID重复,Alien没做校验。结果两个环境指向同一个目录。当影刀同时连接这两个“环境”时,Chrome的Cookies数据库被两个进程同时打开,直接锁死,然后窗口白屏。
修复:在创建环境时,检查env_id是否唯一,并且在调度器中增加“环境占用锁”——任何时候一个profile只能被一个任务使用。
代码里加了这个:
self.env_locks = {env_id: threading.Lock() for env_id in all_env_ids}
每次调度前获取锁,任务结束后释放。再也没出现过白屏。
四、模块B:自动化调度编排——控制22个窗口稳如老狗
4.1 并发数22是怎么来的?
我做了完整的压测。测试环境:i7-10750H, 32GB DDR4, NVMe SSD。
| 并发窗口 | CPU% | 内存GB | 任务完成时间(100任务) | 稳定性 |
|---|---|---|---|---|
| 10 | 30% | 6 | 50分钟 | 100% |
| 15 | 45% | 9.5 | 34分钟 | 100% |
| 20 | 62% | 13 | 26分钟 | 98% |
| 22 | 73% | 15 | 24分钟 | 95% |
| 25 | 88% | 18.5 | 23分钟 | 72% (有超时) |
| 28 | 99% | 22 | 22分钟 | 40% (频繁崩) |
22是甜点。任务完成时间只比25慢1分钟,但稳定性提升了23个百分点。
用户可以在设置里调高,但我会弹窗警告:“调高并发可能导致系统卡死,是否继续?”
4.2 调度器源码:信号量+优先级+自动重试
这是Alien调度器的核心。每一行都经过线上考验。
import threading
import queue
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
import logging
logger = logging.getLogger("AlienScheduler")
@dataclass(order=True)
class ScheduledTask:
priority: int = 5 # 1最高,10最低
task_id: str = field(compare=False)
env_id: str = field(compare=False)
flow_file: str = field(compare=False)
retries_left: int = field(default=3, compare=False)
callback: Optional[Callable] = field(default=None, compare=False)
class AlienScheduler:
def __init__(self, max_concurrent: int = 22):
self.max_concurrent = max_concurrent
self.semaphore = threading.Semaphore(max_concurrent)
self.task_queue = queue.PriorityQueue()
self.running = {}
self.lock = threading.Lock()
self._stop = False
self._workers = []
for i in range(max_concurrent):
t = threading.Thread(target=self._worker, name=f"SchedWorker-{i}", daemon=True)
t.start()
self._workers.append(t)
def submit(self, task: ScheduledTask) -> bool:
"""提交任务,返回是否成功加入队列"""
with self.lock:
if task.env_id in self.running:
logger.warning(f"环境 {task.env_id} 已有任务 {self.running[task.env_id]} 正在执行,拒绝新任务")
return False
self.task_queue.put((task.priority, task))
logger.info(f"任务 {task.task_id} 入队,环境 {task.env_id},优先级 {task.priority}")
return True
def _worker(self):
while not self._stop:
try:
priority, task = self.task_queue.get(timeout=1)
except queue.Empty:
continue
# 获取全局并发许可
self.semaphore.acquire()
with self.lock:
self.running[task.env_id] = task.task_id
# 启动执行线程,避免阻塞worker
exec_thread = threading.Thread(target=self._execute, args=(task,))
exec_thread.start()
def _execute(self, task: ScheduledTask):
try:
logger.info(f"执行任务 {task.task_id} on env {task.env_id}, flow {task.flow_file}")
# 调用影刀API实际执行
success = self._run_flow(task.env_id, task.flow_file)
if not success and task.retries_left > 0:
task.retries_left -= 1
logger.warning(f"任务 {task.task_id} 失败,剩余重试 {task.retries_left} 次")
# 重试时优先级降低(数字变大)
self.task_queue.put((task.priority + 1, task))
else:
if task.callback:
task.callback(task, success)
logger.info(f"任务 {task.task_id} 完成,结果 {success}")
except Exception as e:
logger.error(f"任务 {task.task_id} 异常: {e}")
finally:
with self.lock:
if self.running.get(task.env_id) == task.task_id:
del self.running[task.env_id]
self.semaphore.release()
self.task_queue.task_done()
def _run_flow(self, env_id: str, flow_file: str) -> bool:
"""调用影刀的本地HTTP API"""
# 影刀默认监听端口18888
import requests
try:
resp = requests.post(
"http://127.0.0.1:18888/run_flow",
json={"flow_path": flow_file, "env_id": env_id},
timeout=300
)
return resp.status_code == 200 and resp.json().get("success", False)
except Exception:
return False
关键点:
- 使用
PriorityQueue实现优先级调度。高优先级任务(如订单处理)可以插队。 Semaphore控制全局并发上限,无论有多少worker线程,同时执行的任务数不超过max_concurrent。self.running字典实现“每个环境同时只能跑一个任务”。防止两个任务抢同一个浏览器。- 失败自动重试,重试时优先级降低(避免无限重试霸占队列)。
4.3 编排流:拖拽连线,业务傻瓜化
老板不想每次手动选流程。Alien的“编排流”功能,允许预设一系列步骤。
界面描述(模拟截图):
- 左侧组件库:打开店铺、登录、上架商品、改价、报名活动、发送通知、等待、条件判断。
- 中间画布:拖拽组件到画布,用连线连接。可以创建分支和循环。
- 右侧属性:每个组件可配置参数,比如“上架商品”需要选择Excel文件路径。
保存后,编排流变成一个JSON文件。调度器解析JSON,按拓扑顺序执行节点。
一个真实案例(拼多多“自动上货+报活动”):
- 循环遍历环境组“主店群”,对每个店铺:
2. 执行“导出商品数据”子流程
- 等待本地Python脚本完成图片压缩
- 执行“批量发布”子流程(并发度15)
- 如果发布成功数>10,执行“报名秒杀活动”
- 全部完成后,发送钉钉消息汇总。
老板只需要点一下“运行编排流”,剩下的交给Alien。
有个客户用这个功能,把每天3小时的工作压缩到了20分钟。他跟我说:“我以前觉得自动化很遥远,现在觉得是我不够懒。”
五、底层工程封装:为什么客户说“这软件像大厂出品”
5.1 PyQt6界面:从零到专业感
我从没用过PyQt6。边学边写,第一个版本界面丑得不行。
后来我花了大量时间看Fluent Design规范,重新设计了所有组件。
几个让客户觉得“高级”的细节:
- 暗色主题:默认深灰背景,蓝绿色强调。长时间看不累眼。
- 实时搜索:在环境列表输入“TK”,自动过滤出所有TikTok店铺。响应时间<50ms。
- 进度条动画:批量导入时,进度条平滑增长,不是那种卡顿的刷新。
- 系统托盘:最小化到托盘,后台继续运行。老板们喜欢这个——“不用一直开着界面占屏幕”。
代码结构上,我用了MVC模式,把业务逻辑和UI分离。这样后期加功能不会牵一发动全身。
5.2 Nuitka打包:黑盒交付,双击即用
打包是独立开发者最容易忽略但又至关重要的环节。
客户不懂Python。你给他一个.py文件,他问你“这怎么打开”。你给他一个.exe但依赖一堆dll,他拷到另一台电脑就报错。
Nuitka解决了这个问题。它把Python代码编译成C++,再静态链接成单个exe。
我的打包脚本:
nuitka --standalone --onefile ^
--windows-console-mode=disable ^
--enable-plugin=pyqt6 ^
--include-data-dir=./assets=assets ^
--include-data-dir=./blank_profile=blank_profile ^
--windows-icon-from-ico=alien.ico ^
--output-file=Alien.exe ^
main.py
生成的Alien.exe约110MB。双击后2秒内出现登录窗口。没有控制台黑框,没有依赖缺失。
我还内置了便携版Chrome。如果系统没有Chrome,Alien会自动解压并使用内置版本。客户零配置。
5.3 安全验证:保护劳动成果
我做独立软件,不是做慈善。该有的保护要有。
验证机制:
- 首次运行,读取CPU序列号、主板序列号、MAC地址,生成设备指纹。
- 用户把指纹发给我,我用私钥签名生成license文件(JSON格式,含有效期)。
- 软件每次启动验证签名和有效期。
- 核心代码(调度器、环境管理)用Cython编译成
.pyd,阻止简单反编译。
这套方案花费了我两天时间实现,但之后再也没有人白嫖过。
六、血泪史:那些让Alien差点夭折的线上bug
bug 1:内存泄漏,跑两天就崩
现象:Alien运行48小时后,内存占用从500MB涨到8GB,然后崩溃。
排查:忘记在任务结束后关闭Chrome的调试连接。影刀虽然退出了,但Python程序里还保持着对浏览器的引用,导致垃圾回收失败。
修复:在_execute的finally块里,显式调用del proc和gc.collect()。
bug 2:代理切换时,新环境用了旧IP
现象:客户反馈A店铺的IP显示在香港,但代理配的是美国。
排查:Alien启动新环境时,前一个环境的Chrome进程还没完全退出。新进程启动时,--proxy-server参数被忽略了,因为它发现已有Chrome进程在运行(虽然即将退出),所以复用了旧进程的配置。
修复:启动新环境前,强制杀死所有使用同一user-data-dir的Chrome进程,等待500ms,再启动新进程。
bug 3:Windows更新后,所有环境打不开
现象:客户Windows自动更新重启后,Alien报错“无法启动Chrome”。
排查:更新后Chrome的路径变了(从Program Files变成了Program Files (x86))。我的代码里写死了路径。
修复:动态检测Chrome路径。优先从注册表读取,找不到再搜索常见目录。
每一个bug都让我通宵过。但也正是这些坑,让Alien变得越来越稳。
七、给后来者的建议
如果你也想做类似的系统,我给你三条建议:
- 先跑通最小闭环:不要一开始就想做“完美”的调度器。先让一个店铺能跑起来,再扩展到多个。
- 日志要详细:Alien的每个动作都有日志。客户出问题时,我让他发日志文件,几分钟就能定位。没有日志,你就像瞎子摸象。
- 永远假设会出问题:网络会断、代理会挂、平台会改版、影刀会升级。你的代码要能优雅降级和重试。
八、结语
Alien不是什么黑科技。它就是Python、PyQt、影刀RPA三个工具的“最优组合”。
但它解决了一个真实存在的、高频发生的、让人痛苦的业务问题:多店铺高并发自动化的资源管理和环境隔离。
这套系统现在跑在十几家工作室的电脑上,日均处理店铺数超过2000,累计执行任务超过50万次。
我做这件事的初衷很简单:不想看到那些老板们每天凌晨还在手动切号。
如果你也被同样的问题困扰,或者对这套架构感兴趣,欢迎交流。
我是林焱,一个还在写代码的独立开发者。
(全文完)
