影刀RPA店群自动化工程实践:消息队列驱动的异步任务编排系统

影刀RPA店群自动化工程实践:消息队列驱动的异步任务编排系统

两年前我们第一版调度器是用轮询加数据库锁实现的。

每到高峰期,数据库连接池飙到200,任务积压严重。

picture.image 更糟的是,任务执行超时后,状态更新不及时,运营那边以为没跑完又手动触发一次。

结果同一个店铺的订单同步任务跑了三遍,数据全乱。

那次事故之后,我决定把调度核心从轮询改成消息驱动。

picture.image

这篇文章就讲讲我们基于消息队列重构的任务编排系统。

适用场景:拼多多/TEMU/TikTok Shop 店群自动化,50-500店铺规模。 核心组件:RabbitMQ + Redis + 影刀RPA + Python 执行器。

picture.image

一、轮询调度的三个硬伤

轮询方式简单,但规模上来之后问题很明显。

硬伤一:无效查询浪费资源。

picture.image 调度器每隔几秒扫一次数据库,看有没有待执行任务。

绝大部分时候没有新任务,但数据库还是在被频繁查询。

店铺越多,轮询频率越高,资源消耗线性增长。

硬伤二:任务优先级难以实现。

紧急任务(比如订单同步)和普通任务(比如商品刷新)在同一个队列里。

picture.image

想优先处理紧急任务?得额外写复杂的优先级逻辑。

硬伤三:延迟任务需要自己造轮子。

“失败后30秒重试”“每隔5分钟拉取一次平台消息”这类需求,用轮询做非常别扭。

picture.image 我们当时甚至为延迟任务单独建了一张表,存下次执行时间,然后轮询扫描。

数据量大了之后,这张表成了新的瓶颈。

消息队列天然解决了这三个问题。

我们来重新设计一版。


二、整体架构:Exchange + Queue + Binding

我们选了 RabbitMQ 作为消息中间件。

不是因为它比 Kafka 或 Redis Streams 强多少,而是因为它的死信队列、延迟插件、灵活的路由规则,非常适合任务编排场景。

核心拓扑如下:

任务生产者 → 任务交换机(Topic Exchange)→ 多个队列(按平台/优先级划分)→ 死信队列 → 延迟重试

每个执行节点消费指定队列,执行完成后根据结果决定 ack 或 nack 并重新入队。

# task_producer.py
import json
import pika
from typing import Dict, Any
from uuid import uuid4
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class RPATaskProducer:
    """
    任务生产者 - 将店铺任务发送到消息队列
    支持优先级、延迟执行、平台路由
    """
    
    def __init__(self, rabbitmq_url: str):
        self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
        self.channel = self.connection.channel()
        
        # 声明交换机
        self.exchange_name = "rpa_task_exchange"
        self.channel.exchange_declare(
            exchange=self.exchange_name,
            exchange_type="topic",
            durable=True
        )
        
        # 声明各优先级队列
        priorities = ["high", "normal", "low"]
        for pri in priorities:
            queue_name = f"rpa_tasks_{pri}"
            self.channel.queue_declare(
                queue=queue_name,
                durable=True,
                arguments={
                    "x-max-priority": 10,
                    "x-dead-letter-exchange": "rpa_dlx_exchange"
                }
            )
            # 绑定路由键: task.pdd.high, task.temu.normal 等
            self.channel.queue_bind(
                exchange=self.exchange_name,
                queue=queue_name,
                routing_key=f"task.*.{pri}"
            )
        
        # 声明死信交换机(用于延迟重试)
        self.channel.exchange_declare(
            exchange="rpa_dlx_exchange",
            exchange_type="direct",
            durable=True
        )
        # 死信队列,用于收集超过重试次数的任务
        self.channel.queue_declare(
            queue="rpa_dead_queue",
            durable=True
        )
        self.channel.queue_bind(
            exchange="rpa_dlx_exchange",
            queue="rpa_dead_queue",
            routing_key="dead"
        )
        
        logger.info("Task producer initialized with exchange and queues")
    
    def publish_task(
        self,
        shop_id: str,
        platform: str,  # pdd, temu, tiktok
        task_type: str,  # sync_order, refresh_goods, batch_reply
        payload: Dict[str, Any],
        priority: str = "normal",
        delay_seconds: int = 0
    ) -> str:
        """
        发布一个RPA任务
        priority: high(0-3秒内), normal(4-7), low(8-10)
        delay_seconds: 延迟执行秒数(利用死信队列实现)
        """
        task_id = f"{platform}_{task_type}_{shop_id}_{uuid4().hex[:8]}"
        message = {
            "task_id": task_id,
            "shop_id": shop_id,
            "platform": platform,
            "task_type": task_type,
            "payload": payload,
            "priority": priority,
            "created_at": datetime.now().isoformat(),
            "retry_count": 0,
            "max_retries": 3
        }
        
        routing_key = f"task.{platform}.{priority}"
        properties = pika.BasicProperties(
            delivery_mode=2,  # 持久化
            priority={"high": 9, "normal": 5, "low": 1}.get(priority, 5),
            expiration=str(delay_seconds * 1000) if delay_seconds > 0 else None
        )
        
        self.channel.basic_publish(
            exchange=self.exchange_name,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=properties
        )
        
        logger.info(f"Task {task_id} published to {routing_key}, delay={delay_seconds}s")
        return task_id
    
    def publish_delayed_retry(self, original_message: dict, delay_seconds: int):
        """用于失败重试: 将任务发送到延迟队列"""
        # 利用死信交换机的TTL实现延迟
        queue_name = f"delay_{delay_seconds}s"
        self.channel.queue_declare(
            queue=queue_name,
            durable=True,
            arguments={
                "x-message-ttl": delay_seconds * 1000,
                "x-dead-letter-exchange": self.exchange_name,
                "x-dead-letter-routing-key": f"task.{original_message['platform']}.{original_message['priority']}"
            }
        )
        self.channel.basic_publish(
            exchange="",
            routing_key=queue_name,
            body=json.dumps(original_message),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        logger.info(f"Delayed retry for task {original_message['task_id']} after {delay_seconds}s")
    
    def close(self):
        self.connection.close()

三、执行节点:消费消息 + 调用影刀RPA

执行节点不再主动拉取任务,而是作为消费者等待消息。

这样设计有几个好处:

  • 没有任务时,消费者阻塞,不消耗CPU
  • 消息队列自动负载均衡(多个消费者共享队列)
  • 通过prefetch_count控制每个节点的并发数

我们每个节点配置了5个并发槽位,每个槽位独立消费。

槽位之间不共享浏览器实例,避免了之前说的线程安全问题。

# task_consumer.py
import asyncio
import json
import pika
import subprocess
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from browser_instance_pool import BrowserInstancePool  # 复用之前的池子

class RPATaskConsumer:
    """
    任务消费者 - 从消息队列获取任务,调用影刀RPA执行
    支持手动ACK、失败重试、死信处理
    """
    
    def __init__(
        self,
        rabbitmq_url: str,
        node_id: str,
        prefetch_count: int = 5,
        pool: Optional[BrowserInstancePool] = None
    ):
        self.rabbitmq_url = rabbitmq_url
        self.node_id = node_id
        self.prefetch_count = prefetch_count
        self.pool = pool or BrowserInstancePool()
        self.executor = ThreadPoolExecutor(max_workers=prefetch_count)
        self._running = False
    
    def start(self):
        """启动消费者,阻塞监听"""
        self._running = True
        self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_count=self.prefetch_count)
        
        # 消费三种优先级队列
        for priority in ["high", "normal", "low"]:
            queue_name = f"rpa_tasks_{priority}"
            self.channel.basic_consume(
                queue=queue_name,
                on_message_callback=self._on_message,
                auto_ack=False
            )
            print(f"[{self.node_id}] Listening on {queue_name}")
        
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop()
    
    def _on_message(self, channel, method, properties, body):
        """消息回调 - 在线程池中执行,避免阻塞IO"""
        message = json.loads(body)
        task_id = message["task_id"]
        shop_id = message["shop_id"]
        
        # 异步提交到线程池执行
        future = self.executor.submit(self._execute_rpa_task, message)
        future.add_done_callback(
            lambda f: self._handle_result(channel, method.delivery_tag, f.result(), message)
        )
    
    def _execute_rpa_task(self, message: dict) -> dict:
        """实际执行影刀RPA流程"""
        task_id = message["task_id"]
        shop_id = message["shop_id"]
        platform = message["platform"]
        task_type = message["task_type"]
        
        # 从浏览器实例池获取实例
        browser_inst = self.pool.acquire_sync(shop_id, platform)
        if not browser_inst:
            return {"success": False, "error": "No browser instance available", "retry": True}
        
        try:
            # 调用影刀RPA命令行
            cmd = [
                "youdao_rpa_cli",
                "run",
                "--flow", f"{platform}_{task_type}",
                "--param", json.dumps({
                    "debug_port": browser_inst.port,
                    "shop_id": shop_id,
                    "profile_id": browser_inst.profile_id,
                    "task_id": task_id,
                    **message.get("payload", {})
                }),
                "--timeout", "600"
            ]
            
            # 同步执行(因为在线程池中)
            proc = subprocess.run(
                cmd,
                capture_output=True,
                timeout=650
            )
            success = proc.returncode == 0
            error = proc.stderr.decode() if not success else None
            
            return {"success": success, "error": error, "retry": not success}
        except subprocess.TimeoutExpired:
            return {"success": False, "error": "Timeout", "retry": True}
        except Exception as e:
            return {"success": False, "error": str(e), "retry": True}
        finally:
            self.pool.release_sync(browser_inst.instance_id, clear_cookies=False)
    
    def _handle_result(self, channel, delivery_tag, result: dict, original_message: dict):
        """处理执行结果,决定ACK或重试"""
        if result["success"]:
            channel.basic_ack(delivery_tag)
            print(f"Task {original_message['task_id']} succeeded")
            return
        
        # 失败处理
        retry_count = original_message.get("retry_count", 0)
        max_retries = original_message.get("max_retries", 3)
        
        if result.get("retry", True) and retry_count < max_retries:
            # 延迟重试:指数退避 30s, 60s, 120s
            delay = 30 * (2 ** retry_count)
            original_message["retry_count"] = retry_count + 1
            producer = RPATaskProducer(self.rabbitmq_url)
            producer.publish_delayed_retry(original_message, delay)
            producer.close()
            channel.basic_ack(delivery_tag)  # 原消息已处理,不再投递
            print(f"Task {original_message['task_id']} scheduled retry {retry_count+1} after {delay}s")
        else:
            # 超过重试次数或不可重试错误 -> 死信队列
            channel.basic_ack(delivery_tag)
            self._send_to_dead_letter(original_message, result["error"])
            print(f"Task {original_message['task_id']} sent to dead letter")
    
    def _send_to_dead_letter(self, message: dict, error: str):
        """将失败任务发送到死信队列,供人工干预"""
        connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
        channel = connection.channel()
        message["error"] = error
        channel.basic_publish(
            exchange="rpa_dlx_exchange",
            routing_key="dead",
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        connection.close()
    
    def stop(self):
        self._running = False
        if self.connection and self.connection.is_open:
            self.connection.close()
        self.executor.shutdown(wait=True)

四、浏览器实例池的同步适配

上面的消费者用了 acquire_syncrelease_sync,因为在线程池中不方便跑异步代码。

我们把之前的异步池子改成了带锁的同步版本。

关键改动:

  • 使用 threading.Lock 而不是 asyncio.Lock
  • 空闲队列改用 queue.Queue
  • 健康检查跑在后台守护线程里

这里贴一下改动后的核心片段:

# browser_pool_sync.py - 摘录
import threading
import queue
import time

class SyncBrowserInstancePool:
    def __init__(self, min_idle=2, max_total=8):
        self.min_idle = min_idle
        self.max_total = max_total
        self.instances = {}
        self.idle_queue = queue.Queue()
        self.lock = threading.Lock()
        self._running = True
        self._init_pool()
        threading.Thread(target=self._maintain, daemon=True).start()
    
    def acquire_sync(self, shop_id: str, platform: str):
        with self.lock:
            for inst in self.instances.values():
                if inst.status == "idle" and inst.shop_id == shop_id:
                    inst.status = "busy"
                    return inst
        # 阻塞等待空闲实例
        instance_id = self.idle_queue.get(timeout=10)
        with self.lock:
            inst = self.instances[instance_id]
            inst.status = "busy"
            inst.shop_id = shop_id
            return inst
    
    def release_sync(self, instance_id: str, clear_cookies=False):
        with self.lock:
            inst = self.instances[instance_id]
            if clear_cookies:
                # 清除cookies逻辑
                inst.shop_id = None
            inst.status = "idle"
            self.idle_queue.put(instance_id)

这个改动看起来简单,但线程安全问题花了两天调试。

一个坑:Python 的 queue.Queue 的 timeout 和 threading.Lock 组合使用时要小心死锁。

我们在 acquire_sync 中先释放锁再取队列,不然会阻塞其他线程。


五、失败重试与死信处理

消息队列模式下的重试,比轮询优雅太多。

重试策略:指数退避,最大延迟120秒。

第一次失败后30秒重试,第二次60秒,第三次120秒,之后进入死信队列。

死信队列里的任务我们做了一个简单的 Web 后台,支持:

  • 查看失败原因和原始消息
  • 手动重新发布任务(带新的 task_id)
  • 忽略或归档

这个后台帮运营省了很多沟通成本。以前任务失败要开发去查日志,现在运营自己可以重试。

# dead_letter_handler.py
from flask import Flask, request, jsonify
import pika
import json

app = Flask(__name__)

@app.route("/dead_tasks", methods=["GET"])
def list_dead_tasks():
    # 从死信队列中取出消息(但不删除)
    connection = pika.BlockingConnection(...)
    channel = connection.channel()
    # 使用basic_get不删除消息
    tasks = []
    while True:
        method, props, body = channel.basic_get(queue="rpa_dead_queue", auto_ack=False)
        if not body:
            break
        tasks.append(json.loads(body))
        channel.basic_ack(method.delivery_tag)
    return jsonify(tasks)

@app.route("/retry_dead_task", methods=["POST"])
def retry_dead_task():
    data = request.json
    producer = RPATaskProducer(...)
    producer.publish_task(
        shop_id=data["shop_id"],
        platform=data["platform"],
        task_type=data["task_type"],
        payload=data["payload"],
        priority=data.get("priority", "normal")
    )
    return {"status": "retried"}

六、监控与运维视角的消息队列

消息队列的好处之一是监控指标天然丰富。

我们重点看这几个:

  • Queue length:队列中待处理的消息数,突然暴涨说明执行节点不够用了
  • Unacked count:已投递但未确认的消息,长时间有值说明消费者挂了
  • Publish rate / Deliver rate:生产消费速率是否匹配

我们当时踩过一个坑:RabbitMQ 的流控机制。

当某个队列的 unacked 消息超过一定阈值(默认是内存高水位),RabbitMQ 会阻塞整个 connection 的 publish。

结果生产者发不出新任务,整个系统假死。

解决办法:监控 unacked 并自动触发节点扩容,同时优化每个任务的超时设置。


七、跨平台任务路由示例

利用 Topic Exchange 的路由键,我们可以轻松实现不同平台、不同店铺的任务分流。

例如:

  • task.pdd.high → 所有拼多多高优先级任务
  • task.temu.normal → TEMU普通任务
  • task.tiktok.low → TikTok Shop低优先级任务

我们还可以为某个大店铺单独建一个队列,避免它抢占其他店铺的资源。

def bind_single_shop_queue(self, shop_id: str, platform: str):
    queue_name = f"single_shop_{shop_id}"
    self.channel.queue_declare(queue=queue_name, durable=True)
    self.channel.queue_bind(
        exchange=self.exchange_name,
        queue=queue_name,
        routing_key=f"task.{platform}.{shop_id}"
    )

然后在发布任务时,额外多发送一份到单店铺队列(或者直接替换路由键)。

这种灵活性是轮询调度很难做到的。


八、踩坑与优化:消息幂等与重复消费

消息队列模式下,一个任务可能被重复消费(网络闪断导致 ack 丢失)。

我们在任务开始执行前,会用 Redis 做幂等检查:

def is_task_already_executed(task_id: str) -> bool:
    return redis.exists(f"executed:{task_id}")

def mark_task_executed(task_id: str):
    redis.setex(f"executed:{task_id}", 3600, "1")

# 在 _execute_rpa_task 开头
if is_task_already_executed(task_id):
    return {"success": True, "skip": True}

另外,RPA流程本身也要设计成幂等的。比如同步订单时,先根据平台订单号查是否已存在,存在则更新而不是重复插入。

这个细节容易被忽略,但线上跑起来之后,重复消费的概率比想象中高。


九、总结:消息驱动让系统更健壮

从轮询改造成消息驱动后,系统稳定性提升了一个量级。

高峰期任务量翻三倍,队列长度短暂上涨,但执行节点自动消化,没有崩溃。

运维也轻松了,不用再写 cron 脚本去扫数据库超时任务。

几点建议:

  1. 先从最简单的 direct exchange 开始,不要一开始就搞复杂路由
  2. 务必开启队列持久化 + 消息持久化
  3. 死信队列一定要有,否则故障任务会无限重试
  4. 监控 unacked 和 queue length,设置告警
  5. 幂等设计从第一天就做

如果你目前还是基于数据库轮询,可以先用 RabbitMQ 替换掉调度器的拉取逻辑,其他部分不动。

一旦适应了消息驱动的思维,你会发现很多问题都变得简单了。


作者:林焱

0
0
0
0
评论
未登录
暂无评论