影刀RPA进阶:我开发了一套店群管理系统,彻底解决100+店铺并发卡死痛点

影刀RPA进阶:我开发了一套店群管理系统,彻底解决100+店铺并发卡死痛点

凌晨两点,我看着任务管理器里22个Chrome窗口整整齐齐地跑着,内存占用稳定在14GB,CPU 67%。那一刻我知道:成了。

三个月前,同样凌晨两点,我对着屏幕骂娘。

影刀RPA开了18个窗口,内存飙到21GB,然后整个系统卡死。鼠标能动,但点哪里都没反应。强行重启后,三个店铺的cookie丢了,两个店铺的订单同步失败。

客户在微信里发了一串语音,我一条都没敢听。

那是Alien项目启动前的最后一天。第二天我开始写第一行代码。

今天这篇文章,我不讲虚的。直接上硬核:如何用Python给影刀RPA造一个能扛住100+店铺并发的“底盘”。

全文超过6000字,包含完整的环境隔离实现、调度器源码、以及我踩过的7个致命坑。

如果你是店群从业者,你会看到一套能把人力成本砍掉70%的系统。 如果你是开发者,你会看到一个工业级RPA调度架构从0到1的全过程。

准备好了?我们开始。

一、店群并发卡死:不是电脑不行,是架构错了

先还原一个真实场景。

picture.image

picture.image 你有一台32GB内存、8核i7的电脑。你开了20个Chrome窗口,每个窗口登录一个拼多多店铺。影刀RPA在这20个窗口里同时跑“上架商品”流程。

前10分钟一切正常。第15分钟开始,某个窗口白屏。第18分钟,另一个窗口卡在“正在上传图片”。第22分钟,鼠标指针开始转圈。然后,死机。

你以为是电脑配置不够。你换了64GB的机器,同样的问题,只是延后了十分钟。

picture.image

根本原因不是内存大小,而是资源竞争和垃圾回收机制

影刀RPA的每个流程都是一个独立进程。当20个流程同时运行时,它们会争夺CPU、磁盘IO、网络带宽。更糟糕的是,影刀打开的每个Chrome窗口都会在后台疯狂写日志和缓存,磁盘队列长度动不动就冲到10以上。

更隐蔽的问题:Chrome的user-data-dir如果被多个进程同时读写,会导致SQLite数据库锁冲突,然后整个窗口崩溃。

所以市面上的“多开工具”几乎都在忽悠。它们只管打开窗口,不管资源调度和隔离。

picture.image

我需要的是一个中间层。它负责:

  • 为每个店铺分配独立的user-data-dir,完全隔离
  • 控制同时运行的窗口数量(比如22个),超过的排队
  • 监控每个窗口的健康状态,死了自动重启
  • 回收任务结束后的资源,防止内存泄漏

这个中间层,就是Alien。

picture.image

二、Alien系统概览:三层架构,专治并发

画个图(文字描述):

用户界面层 (PyQt6)
   ↓
调度核心层 (Python)
  ├── 并发控制器 (Semaphore + 队列)
  ├── 环境管理器 (profile生命周期)
  └── 编排引擎 (DAG执行器)
   ↓
执行层 (影刀RPA + Chrome实例)

用户只操作界面:导入店铺、配置流程、点击“开始”。 底层的事情,Alien全包了。

下面拆解三个核心模块,每个模块我都会放出关键代码。

三、模块A:环境隔离矩阵——从根源杜绝串号和资源冲突

3.1 UI设计:让老板一眼看懂“我的店都在哪”

Alien的环境管理界面,是一个“卡片墙”。

每张卡片代表一个店铺环境。卡片上显示:

  • 店铺名称(可编辑)
  • 平台图标(TK/拼多多/TEMU/Shopee)
  • 代理IP的国家国旗
  • 一个彩色圆点:绿色=空闲,黄色=执行中,红色=异常
  • 最后活动时间(比如“5分钟前”)

左侧是分组树。老板可以创建“美区主店”、“美区分销店”、“东南亚测试店”等分组。拖动卡片即可移动店铺到不同分组。

顶部工具栏有四个核心按钮:

  • 批量导入:下载Excel模板,填好店铺信息,一键生成所有环境
  • 批量校验:检查所有店铺的cookie是否有效,失效的标红
  • 批量换代理:选中一批店铺,一键更换代理IP
  • 手动打开:选中一个或几个环境,直接启动浏览器(带指纹和代理)

底部有一个实时状态栏:当前并发: 0/22 | 今日任务: 47 | 内存: 6.2/32GB

老板跟我说过一句话:“我不用猜哪个店在干什么,看一眼颜色就知道。”

3.2 技术实现:每个环境都是独立的“虚拟机”

创建环境的代码,我重构过四版。这是最终稳定版:

import os
import json
import random
import shutil
from pathlib import Path
from typing import Optional
import sqlite3

class AlienEnvironment:
    """单个店铺的运行环境"""
    
    PROFILES_ROOT = Path("./alien_profiles")
    BLANK_PROFILE = Path("./assets/blank_profile")  # 干净的Chrome用户数据模板
    
    def __init__(self, env_id: str, name: str, platform: str, proxy: Optional[str] = None):
        self.env_id = env_id
        self.name = name
        self.platform = platform
        self.proxy = proxy
        self.profile_path = self.PROFILES_ROOT / env_id
        self.status = "idle"  # idle, running, error
        
    def create(self) -> bool:
        """创建隔离环境,生成独立指纹"""
        if self.profile_path.exists():
            # 如果已存在,先备份再删除(防止残留损坏数据)
            backup = self.profile_path.with_suffix(".bak")
            shutil.move(str(self.profile_path), str(backup))
            shutil.rmtree(str(backup), ignore_errors=True)
        
        # 从空白模板复制
        shutil.copytree(self.BLANK_PROFILE, self.profile_path)
        
        # 注入差异化指纹
        self._inject_fingerprint()
        
        # 写入代理配置
        if self.proxy:
            (self.profile_path / "proxy.txt").write_text(self.proxy, encoding="utf-8")
        
        # 写入环境元数据
        meta = {
            "env_id": self.env_id,
            "name": self.name,
            "platform": self.platform,
            "created_at": str(Path.cwd()),
        }
        (self.profile_path / "meta.json").write_text(json.dumps(meta, indent=2))
        
        return True
    
    def _inject_fingerprint(self):
        """修改Preferences文件,伪造浏览器指纹"""
        prefs_file = self.profile_path / "Default" / "Preferences"
        if not prefs_file.exists():
            return
        
        data = json.loads(prefs_file.read_text(encoding="utf-8"))
        
        # 1. 屏幕分辨率随机化
        widths = [1366, 1440, 1536, 1600, 1920, 2560]
        width = random.choice(widths)
        height = int(width * 0.5625) + random.randint(-20, 20)
        data.setdefault("web_prefs", {})["screen"] = {
            "width": width,
            "height": height,
            "availWidth": width,
            "availHeight": height - 40,
        }
        
        # 2. 硬件并发数随机(2~12)
        data["hardware_concurrency"] = random.choice([2, 4, 6, 8, 12])
        
        # 3. WebGL渲染器随机(避免统一显示“SwiftShader”)
        renderers = ["ANGLE (NVIDIA)", "ANGLE (Intel)", "SwiftShader", "Google Inc."]
        data.setdefault("webgl", {})["renderer"] = random.choice(renderers)
        
        # 4. 时区模拟(根据代理IP位置,这里简化)
        data["timezone"] = random.choice(["America/New_York", "America/Los_Angeles", "Europe/London", "Asia/Tokyo"])
        
        prefs_file.write_text(json.dumps(data, indent=2), encoding="utf-8")

启动一个环境的浏览器,并让影刀能够连接:

import subprocess
import socket
import time

def launch_environment(env: AlienEnvironment, debug_port: int) -> subprocess.Popen:
    """启动Chrome并开启远程调试端口,供影刀连接"""
    chrome_exe = "C:/Program Files/Google/Chrome/Application/chrome.exe"
    if not Path(chrome_exe).exists():
        chrome_exe = "./assets/chrome_portable/chrome.exe"
    
    proxy = env.proxy
    cmd = [
        chrome_exe,
        f"--user-data-dir={env.profile_path}",
        f"--remote-debugging-port={debug_port}",
        "--no-first-run",
        "--no-default-browser-check",
        "--disable-blink-features=AutomationControlled",
        "--disable-features=TranslateUI,PasswordImport,OptimizationHints",
        "--disable-background-networking",
        "--disable-sync",
        "--disable-default-apps",
        "--no-sandbox",
    ]
    
    if proxy:
        cmd.append(f"--proxy-server={proxy}")
    
    # 关键:创建进程时不显示控制台窗口
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
    )
    
    # 等待端口就绪
    for _ in range(30):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        if sock.connect_ex(('127.0.0.1', debug_port)) == 0:
            sock.close()
            return proc
        sock.close()
        time.sleep(0.2)
    
    raise TimeoutError(f"环境 {env.env_id} 启动超时")

3.3 踩坑:并发写同一个profile导致窗口白屏

早期版本,我犯过一个低级错误:两个任务不小心用了同一个user-data-dir

起因是批量导入时,如果Excel里的店铺ID重复,Alien没做校验。结果两个环境指向同一个目录。当影刀同时连接这两个“环境”时,Chrome的Cookies数据库被两个进程同时打开,直接锁死,然后窗口白屏。

修复:在创建环境时,检查env_id是否唯一,并且在调度器中增加“环境占用锁”——任何时候一个profile只能被一个任务使用。

代码里加了这个:

self.env_locks = {env_id: threading.Lock() for env_id in all_env_ids}

每次调度前获取锁,任务结束后释放。再也没出现过白屏。

四、模块B:自动化调度编排——控制22个窗口稳如老狗

4.1 并发数22是怎么来的?

我做了完整的压测。测试环境:i7-10750H, 32GB DDR4, NVMe SSD。

并发窗口CPU%内存GB任务完成时间(100任务)稳定性
1030%650分钟100%
1545%9.534分钟100%
2062%1326分钟98%
2273%1524分钟95%
2588%18.523分钟72% (有超时)
2899%2222分钟40% (频繁崩)

22是甜点。任务完成时间只比25慢1分钟,但稳定性提升了23个百分点。

用户可以在设置里调高,但我会弹窗警告:“调高并发可能导致系统卡死,是否继续?”

4.2 调度器源码:信号量+优先级+自动重试

这是Alien调度器的核心。每一行都经过线上考验。

import threading
import queue
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
import logging

logger = logging.getLogger("AlienScheduler")

@dataclass(order=True)
class ScheduledTask:
    priority: int = 5  # 1最高,10最低
    task_id: str = field(compare=False)
    env_id: str = field(compare=False)
    flow_file: str = field(compare=False)
    retries_left: int = field(default=3, compare=False)
    callback: Optional[Callable] = field(default=None, compare=False)

class AlienScheduler:
    def __init__(self, max_concurrent: int = 22):
        self.max_concurrent = max_concurrent
        self.semaphore = threading.Semaphore(max_concurrent)
        self.task_queue = queue.PriorityQueue()
        self.running = {}
        self.lock = threading.Lock()
        self._stop = False
        self._workers = []
        for i in range(max_concurrent):
            t = threading.Thread(target=self._worker, name=f"SchedWorker-{i}", daemon=True)
            t.start()
            self._workers.append(t)
            
    def submit(self, task: ScheduledTask) -> bool:
        """提交任务,返回是否成功加入队列"""
        with self.lock:
            if task.env_id in self.running:
                logger.warning(f"环境 {task.env_id} 已有任务 {self.running[task.env_id]} 正在执行,拒绝新任务")
                return False
        self.task_queue.put((task.priority, task))
        logger.info(f"任务 {task.task_id} 入队,环境 {task.env_id},优先级 {task.priority}")
        return True
        
    def _worker(self):
        while not self._stop:
            try:
                priority, task = self.task_queue.get(timeout=1)
            except queue.Empty:
                continue
            
            # 获取全局并发许可
            self.semaphore.acquire()
            
            with self.lock:
                self.running[task.env_id] = task.task_id
            
            # 启动执行线程,避免阻塞worker
            exec_thread = threading.Thread(target=self._execute, args=(task,))
            exec_thread.start()
            
    def _execute(self, task: ScheduledTask):
        try:
            logger.info(f"执行任务 {task.task_id} on env {task.env_id}, flow {task.flow_file}")
            # 调用影刀API实际执行
            success = self._run_flow(task.env_id, task.flow_file)
            
            if not success and task.retries_left > 0:
                task.retries_left -= 1
                logger.warning(f"任务 {task.task_id} 失败,剩余重试 {task.retries_left} 次")
                # 重试时优先级降低(数字变大)
                self.task_queue.put((task.priority + 1, task))
            else:
                if task.callback:
                    task.callback(task, success)
                logger.info(f"任务 {task.task_id} 完成,结果 {success}")
        except Exception as e:
            logger.error(f"任务 {task.task_id} 异常: {e}")
        finally:
            with self.lock:
                if self.running.get(task.env_id) == task.task_id:
                    del self.running[task.env_id]
            self.semaphore.release()
            self.task_queue.task_done()
    
    def _run_flow(self, env_id: str, flow_file: str) -> bool:
        """调用影刀的本地HTTP API"""
        # 影刀默认监听端口18888
        import requests
        try:
            resp = requests.post(
                "http://127.0.0.1:18888/run_flow",
                json={"flow_path": flow_file, "env_id": env_id},
                timeout=300
            )
            return resp.status_code == 200 and resp.json().get("success", False)
        except Exception:
            return False

关键点:

  • 使用PriorityQueue实现优先级调度。高优先级任务(如订单处理)可以插队。
  • Semaphore控制全局并发上限,无论有多少worker线程,同时执行的任务数不超过max_concurrent
  • self.running字典实现“每个环境同时只能跑一个任务”。防止两个任务抢同一个浏览器。
  • 失败自动重试,重试时优先级降低(避免无限重试霸占队列)。

4.3 编排流:拖拽连线,业务傻瓜化

老板不想每次手动选流程。Alien的“编排流”功能,允许预设一系列步骤。

界面描述(模拟截图):

  • 左侧组件库:打开店铺、登录、上架商品、改价、报名活动、发送通知、等待、条件判断。
  • 中间画布:拖拽组件到画布,用连线连接。可以创建分支和循环。
  • 右侧属性:每个组件可配置参数,比如“上架商品”需要选择Excel文件路径。

保存后,编排流变成一个JSON文件。调度器解析JSON,按拓扑顺序执行节点。

一个真实案例(拼多多“自动上货+报活动”):

  1. 循环遍历环境组“主店群”,对每个店铺: 2. 执行“导出商品数据”子流程
    1. 等待本地Python脚本完成图片压缩
    2. 执行“批量发布”子流程(并发度15)
    3. 如果发布成功数>10,执行“报名秒杀活动”
  2. 全部完成后,发送钉钉消息汇总。

老板只需要点一下“运行编排流”,剩下的交给Alien。

有个客户用这个功能,把每天3小时的工作压缩到了20分钟。他跟我说:“我以前觉得自动化很遥远,现在觉得是我不够懒。”

五、底层工程封装:为什么客户说“这软件像大厂出品”

5.1 PyQt6界面:从零到专业感

我从没用过PyQt6。边学边写,第一个版本界面丑得不行。

后来我花了大量时间看Fluent Design规范,重新设计了所有组件。

几个让客户觉得“高级”的细节:

  • 暗色主题:默认深灰背景,蓝绿色强调。长时间看不累眼。
  • 实时搜索:在环境列表输入“TK”,自动过滤出所有TikTok店铺。响应时间<50ms。
  • 进度条动画:批量导入时,进度条平滑增长,不是那种卡顿的刷新。
  • 系统托盘:最小化到托盘,后台继续运行。老板们喜欢这个——“不用一直开着界面占屏幕”。

代码结构上,我用了MVC模式,把业务逻辑和UI分离。这样后期加功能不会牵一发动全身。

5.2 Nuitka打包:黑盒交付,双击即用

打包是独立开发者最容易忽略但又至关重要的环节。

客户不懂Python。你给他一个.py文件,他问你“这怎么打开”。你给他一个.exe但依赖一堆dll,他拷到另一台电脑就报错。

Nuitka解决了这个问题。它把Python代码编译成C++,再静态链接成单个exe。

我的打包脚本:

nuitka --standalone --onefile ^
       --windows-console-mode=disable ^
       --enable-plugin=pyqt6 ^
       --include-data-dir=./assets=assets ^
       --include-data-dir=./blank_profile=blank_profile ^
       --windows-icon-from-ico=alien.ico ^
       --output-file=Alien.exe ^
       main.py

生成的Alien.exe约110MB。双击后2秒内出现登录窗口。没有控制台黑框,没有依赖缺失。

我还内置了便携版Chrome。如果系统没有Chrome,Alien会自动解压并使用内置版本。客户零配置。

5.3 安全验证:保护劳动成果

我做独立软件,不是做慈善。该有的保护要有。

验证机制:

  1. 首次运行,读取CPU序列号、主板序列号、MAC地址,生成设备指纹。
  2. 用户把指纹发给我,我用私钥签名生成license文件(JSON格式,含有效期)。
  3. 软件每次启动验证签名和有效期。
  4. 核心代码(调度器、环境管理)用Cython编译成.pyd,阻止简单反编译。

这套方案花费了我两天时间实现,但之后再也没有人白嫖过。

六、血泪史:那些让Alien差点夭折的线上bug

bug 1:内存泄漏,跑两天就崩

现象:Alien运行48小时后,内存占用从500MB涨到8GB,然后崩溃。

排查:忘记在任务结束后关闭Chrome的调试连接。影刀虽然退出了,但Python程序里还保持着对浏览器的引用,导致垃圾回收失败。

修复:在_executefinally块里,显式调用del procgc.collect()

bug 2:代理切换时,新环境用了旧IP

现象:客户反馈A店铺的IP显示在香港,但代理配的是美国。

排查:Alien启动新环境时,前一个环境的Chrome进程还没完全退出。新进程启动时,--proxy-server参数被忽略了,因为它发现已有Chrome进程在运行(虽然即将退出),所以复用了旧进程的配置。

修复:启动新环境前,强制杀死所有使用同一user-data-dir的Chrome进程,等待500ms,再启动新进程。

bug 3:Windows更新后,所有环境打不开

现象:客户Windows自动更新重启后,Alien报错“无法启动Chrome”。

排查:更新后Chrome的路径变了(从Program Files变成了Program Files (x86))。我的代码里写死了路径。

修复:动态检测Chrome路径。优先从注册表读取,找不到再搜索常见目录。

每一个bug都让我通宵过。但也正是这些坑,让Alien变得越来越稳。

七、给后来者的建议

如果你也想做类似的系统,我给你三条建议:

  1. 先跑通最小闭环:不要一开始就想做“完美”的调度器。先让一个店铺能跑起来,再扩展到多个。
  2. 日志要详细:Alien的每个动作都有日志。客户出问题时,我让他发日志文件,几分钟就能定位。没有日志,你就像瞎子摸象。
  3. 永远假设会出问题:网络会断、代理会挂、平台会改版、影刀会升级。你的代码要能优雅降级和重试。

八、结语

Alien不是什么黑科技。它就是Python、PyQt、影刀RPA三个工具的“最优组合”。

但它解决了一个真实存在的、高频发生的、让人痛苦的业务问题:多店铺高并发自动化的资源管理和环境隔离。

这套系统现在跑在十几家工作室的电脑上,日均处理店铺数超过2000,累计执行任务超过50万次。

我做这件事的初衷很简单:不想看到那些老板们每天凌晨还在手动切号。

如果你也被同样的问题困扰,或者对这套架构感兴趣,欢迎交流。

我是林焱,一个还在写代码的独立开发者。

(全文完)

0
0
0
0
评论
未登录
暂无评论