两年前我们第一版调度器是用轮询加数据库锁实现的。
每到高峰期,数据库连接池飙到200,任务积压严重。
更糟的是,任务执行超时后,状态更新不及时,运营那边以为没跑完又手动触发一次。
结果同一个店铺的订单同步任务跑了三遍,数据全乱。
那次事故之后,我决定把调度核心从轮询改成消息驱动。
这篇文章就讲讲我们基于消息队列重构的任务编排系统。
适用场景:拼多多/TEMU/TikTok Shop 店群自动化,50-500店铺规模。 核心组件:RabbitMQ + Redis + 影刀RPA + Python 执行器。
一、轮询调度的三个硬伤
轮询方式简单,但规模上来之后问题很明显。
硬伤一:无效查询浪费资源。
调度器每隔几秒扫一次数据库,看有没有待执行任务。
绝大部分时候没有新任务,但数据库还是在被频繁查询。
店铺越多,轮询频率越高,资源消耗线性增长。
硬伤二:任务优先级难以实现。
紧急任务(比如订单同步)和普通任务(比如商品刷新)在同一个队列里。
想优先处理紧急任务?得额外写复杂的优先级逻辑。
硬伤三:延迟任务需要自己造轮子。
“失败后30秒重试”“每隔5分钟拉取一次平台消息”这类需求,用轮询做非常别扭。
我们当时甚至为延迟任务单独建了一张表,存下次执行时间,然后轮询扫描。
数据量大了之后,这张表成了新的瓶颈。
消息队列天然解决了这三个问题。
我们来重新设计一版。
二、整体架构:Exchange + Queue + Binding
我们选了 RabbitMQ 作为消息中间件。
不是因为它比 Kafka 或 Redis Streams 强多少,而是因为它的死信队列、延迟插件、灵活的路由规则,非常适合任务编排场景。
核心拓扑如下:
任务生产者 → 任务交换机(Topic Exchange)→ 多个队列(按平台/优先级划分)→ 死信队列 → 延迟重试
每个执行节点消费指定队列,执行完成后根据结果决定 ack 或 nack 并重新入队。
# task_producer.py
import json
import pika
from typing import Dict, Any
from uuid import uuid4
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class RPATaskProducer:
"""
任务生产者 - 将店铺任务发送到消息队列
支持优先级、延迟执行、平台路由
"""
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
self.channel = self.connection.channel()
# 声明交换机
self.exchange_name = "rpa_task_exchange"
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type="topic",
durable=True
)
# 声明各优先级队列
priorities = ["high", "normal", "low"]
for pri in priorities:
queue_name = f"rpa_tasks_{pri}"
self.channel.queue_declare(
queue=queue_name,
durable=True,
arguments={
"x-max-priority": 10,
"x-dead-letter-exchange": "rpa_dlx_exchange"
}
)
# 绑定路由键: task.pdd.high, task.temu.normal 等
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name,
routing_key=f"task.*.{pri}"
)
# 声明死信交换机(用于延迟重试)
self.channel.exchange_declare(
exchange="rpa_dlx_exchange",
exchange_type="direct",
durable=True
)
# 死信队列,用于收集超过重试次数的任务
self.channel.queue_declare(
queue="rpa_dead_queue",
durable=True
)
self.channel.queue_bind(
exchange="rpa_dlx_exchange",
queue="rpa_dead_queue",
routing_key="dead"
)
logger.info("Task producer initialized with exchange and queues")
def publish_task(
self,
shop_id: str,
platform: str, # pdd, temu, tiktok
task_type: str, # sync_order, refresh_goods, batch_reply
payload: Dict[str, Any],
priority: str = "normal",
delay_seconds: int = 0
) -> str:
"""
发布一个RPA任务
priority: high(0-3秒内), normal(4-7), low(8-10)
delay_seconds: 延迟执行秒数(利用死信队列实现)
"""
task_id = f"{platform}_{task_type}_{shop_id}_{uuid4().hex[:8]}"
message = {
"task_id": task_id,
"shop_id": shop_id,
"platform": platform,
"task_type": task_type,
"payload": payload,
"priority": priority,
"created_at": datetime.now().isoformat(),
"retry_count": 0,
"max_retries": 3
}
routing_key = f"task.{platform}.{priority}"
properties = pika.BasicProperties(
delivery_mode=2, # 持久化
priority={"high": 9, "normal": 5, "low": 1}.get(priority, 5),
expiration=str(delay_seconds * 1000) if delay_seconds > 0 else None
)
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key=routing_key,
body=json.dumps(message),
properties=properties
)
logger.info(f"Task {task_id} published to {routing_key}, delay={delay_seconds}s")
return task_id
def publish_delayed_retry(self, original_message: dict, delay_seconds: int):
"""用于失败重试: 将任务发送到延迟队列"""
# 利用死信交换机的TTL实现延迟
queue_name = f"delay_{delay_seconds}s"
self.channel.queue_declare(
queue=queue_name,
durable=True,
arguments={
"x-message-ttl": delay_seconds * 1000,
"x-dead-letter-exchange": self.exchange_name,
"x-dead-letter-routing-key": f"task.{original_message['platform']}.{original_message['priority']}"
}
)
self.channel.basic_publish(
exchange="",
routing_key=queue_name,
body=json.dumps(original_message),
properties=pika.BasicProperties(delivery_mode=2)
)
logger.info(f"Delayed retry for task {original_message['task_id']} after {delay_seconds}s")
def close(self):
self.connection.close()
三、执行节点:消费消息 + 调用影刀RPA
执行节点不再主动拉取任务,而是作为消费者等待消息。
这样设计有几个好处:
- 没有任务时,消费者阻塞,不消耗CPU
- 消息队列自动负载均衡(多个消费者共享队列)
- 通过prefetch_count控制每个节点的并发数
我们每个节点配置了5个并发槽位,每个槽位独立消费。
槽位之间不共享浏览器实例,避免了之前说的线程安全问题。
# task_consumer.py
import asyncio
import json
import pika
import subprocess
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from browser_instance_pool import BrowserInstancePool # 复用之前的池子
class RPATaskConsumer:
"""
任务消费者 - 从消息队列获取任务,调用影刀RPA执行
支持手动ACK、失败重试、死信处理
"""
def __init__(
self,
rabbitmq_url: str,
node_id: str,
prefetch_count: int = 5,
pool: Optional[BrowserInstancePool] = None
):
self.rabbitmq_url = rabbitmq_url
self.node_id = node_id
self.prefetch_count = prefetch_count
self.pool = pool or BrowserInstancePool()
self.executor = ThreadPoolExecutor(max_workers=prefetch_count)
self._running = False
def start(self):
"""启动消费者,阻塞监听"""
self._running = True
self.connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=self.prefetch_count)
# 消费三种优先级队列
for priority in ["high", "normal", "low"]:
queue_name = f"rpa_tasks_{priority}"
self.channel.basic_consume(
queue=queue_name,
on_message_callback=self._on_message,
auto_ack=False
)
print(f"[{self.node_id}] Listening on {queue_name}")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop()
def _on_message(self, channel, method, properties, body):
"""消息回调 - 在线程池中执行,避免阻塞IO"""
message = json.loads(body)
task_id = message["task_id"]
shop_id = message["shop_id"]
# 异步提交到线程池执行
future = self.executor.submit(self._execute_rpa_task, message)
future.add_done_callback(
lambda f: self._handle_result(channel, method.delivery_tag, f.result(), message)
)
def _execute_rpa_task(self, message: dict) -> dict:
"""实际执行影刀RPA流程"""
task_id = message["task_id"]
shop_id = message["shop_id"]
platform = message["platform"]
task_type = message["task_type"]
# 从浏览器实例池获取实例
browser_inst = self.pool.acquire_sync(shop_id, platform)
if not browser_inst:
return {"success": False, "error": "No browser instance available", "retry": True}
try:
# 调用影刀RPA命令行
cmd = [
"youdao_rpa_cli",
"run",
"--flow", f"{platform}_{task_type}",
"--param", json.dumps({
"debug_port": browser_inst.port,
"shop_id": shop_id,
"profile_id": browser_inst.profile_id,
"task_id": task_id,
**message.get("payload", {})
}),
"--timeout", "600"
]
# 同步执行(因为在线程池中)
proc = subprocess.run(
cmd,
capture_output=True,
timeout=650
)
success = proc.returncode == 0
error = proc.stderr.decode() if not success else None
return {"success": success, "error": error, "retry": not success}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Timeout", "retry": True}
except Exception as e:
return {"success": False, "error": str(e), "retry": True}
finally:
self.pool.release_sync(browser_inst.instance_id, clear_cookies=False)
def _handle_result(self, channel, delivery_tag, result: dict, original_message: dict):
"""处理执行结果,决定ACK或重试"""
if result["success"]:
channel.basic_ack(delivery_tag)
print(f"Task {original_message['task_id']} succeeded")
return
# 失败处理
retry_count = original_message.get("retry_count", 0)
max_retries = original_message.get("max_retries", 3)
if result.get("retry", True) and retry_count < max_retries:
# 延迟重试:指数退避 30s, 60s, 120s
delay = 30 * (2 ** retry_count)
original_message["retry_count"] = retry_count + 1
producer = RPATaskProducer(self.rabbitmq_url)
producer.publish_delayed_retry(original_message, delay)
producer.close()
channel.basic_ack(delivery_tag) # 原消息已处理,不再投递
print(f"Task {original_message['task_id']} scheduled retry {retry_count+1} after {delay}s")
else:
# 超过重试次数或不可重试错误 -> 死信队列
channel.basic_ack(delivery_tag)
self._send_to_dead_letter(original_message, result["error"])
print(f"Task {original_message['task_id']} sent to dead letter")
def _send_to_dead_letter(self, message: dict, error: str):
"""将失败任务发送到死信队列,供人工干预"""
connection = pika.BlockingConnection(pika.URLParameters(self.rabbitmq_url))
channel = connection.channel()
message["error"] = error
channel.basic_publish(
exchange="rpa_dlx_exchange",
routing_key="dead",
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
connection.close()
def stop(self):
self._running = False
if self.connection and self.connection.is_open:
self.connection.close()
self.executor.shutdown(wait=True)
四、浏览器实例池的同步适配
上面的消费者用了 acquire_sync 和 release_sync,因为在线程池中不方便跑异步代码。
我们把之前的异步池子改成了带锁的同步版本。
关键改动:
- 使用
threading.Lock而不是asyncio.Lock - 空闲队列改用
queue.Queue - 健康检查跑在后台守护线程里
这里贴一下改动后的核心片段:
# browser_pool_sync.py - 摘录
import threading
import queue
import time
class SyncBrowserInstancePool:
def __init__(self, min_idle=2, max_total=8):
self.min_idle = min_idle
self.max_total = max_total
self.instances = {}
self.idle_queue = queue.Queue()
self.lock = threading.Lock()
self._running = True
self._init_pool()
threading.Thread(target=self._maintain, daemon=True).start()
def acquire_sync(self, shop_id: str, platform: str):
with self.lock:
for inst in self.instances.values():
if inst.status == "idle" and inst.shop_id == shop_id:
inst.status = "busy"
return inst
# 阻塞等待空闲实例
instance_id = self.idle_queue.get(timeout=10)
with self.lock:
inst = self.instances[instance_id]
inst.status = "busy"
inst.shop_id = shop_id
return inst
def release_sync(self, instance_id: str, clear_cookies=False):
with self.lock:
inst = self.instances[instance_id]
if clear_cookies:
# 清除cookies逻辑
inst.shop_id = None
inst.status = "idle"
self.idle_queue.put(instance_id)
这个改动看起来简单,但线程安全问题花了两天调试。
一个坑:Python 的 queue.Queue 的 timeout 和 threading.Lock 组合使用时要小心死锁。
我们在 acquire_sync 中先释放锁再取队列,不然会阻塞其他线程。
五、失败重试与死信处理
消息队列模式下的重试,比轮询优雅太多。
重试策略:指数退避,最大延迟120秒。
第一次失败后30秒重试,第二次60秒,第三次120秒,之后进入死信队列。
死信队列里的任务我们做了一个简单的 Web 后台,支持:
- 查看失败原因和原始消息
- 手动重新发布任务(带新的 task_id)
- 忽略或归档
这个后台帮运营省了很多沟通成本。以前任务失败要开发去查日志,现在运营自己可以重试。
# dead_letter_handler.py
from flask import Flask, request, jsonify
import pika
import json
app = Flask(__name__)
@app.route("/dead_tasks", methods=["GET"])
def list_dead_tasks():
# 从死信队列中取出消息(但不删除)
connection = pika.BlockingConnection(...)
channel = connection.channel()
# 使用basic_get不删除消息
tasks = []
while True:
method, props, body = channel.basic_get(queue="rpa_dead_queue", auto_ack=False)
if not body:
break
tasks.append(json.loads(body))
channel.basic_ack(method.delivery_tag)
return jsonify(tasks)
@app.route("/retry_dead_task", methods=["POST"])
def retry_dead_task():
data = request.json
producer = RPATaskProducer(...)
producer.publish_task(
shop_id=data["shop_id"],
platform=data["platform"],
task_type=data["task_type"],
payload=data["payload"],
priority=data.get("priority", "normal")
)
return {"status": "retried"}
六、监控与运维视角的消息队列
消息队列的好处之一是监控指标天然丰富。
我们重点看这几个:
- Queue length:队列中待处理的消息数,突然暴涨说明执行节点不够用了
- Unacked count:已投递但未确认的消息,长时间有值说明消费者挂了
- Publish rate / Deliver rate:生产消费速率是否匹配
我们当时踩过一个坑:RabbitMQ 的流控机制。
当某个队列的 unacked 消息超过一定阈值(默认是内存高水位),RabbitMQ 会阻塞整个 connection 的 publish。
结果生产者发不出新任务,整个系统假死。
解决办法:监控 unacked 并自动触发节点扩容,同时优化每个任务的超时设置。
七、跨平台任务路由示例
利用 Topic Exchange 的路由键,我们可以轻松实现不同平台、不同店铺的任务分流。
例如:
task.pdd.high→ 所有拼多多高优先级任务task.temu.normal→ TEMU普通任务task.tiktok.low→ TikTok Shop低优先级任务
我们还可以为某个大店铺单独建一个队列,避免它抢占其他店铺的资源。
def bind_single_shop_queue(self, shop_id: str, platform: str):
queue_name = f"single_shop_{shop_id}"
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name,
routing_key=f"task.{platform}.{shop_id}"
)
然后在发布任务时,额外多发送一份到单店铺队列(或者直接替换路由键)。
这种灵活性是轮询调度很难做到的。
八、踩坑与优化:消息幂等与重复消费
消息队列模式下,一个任务可能被重复消费(网络闪断导致 ack 丢失)。
我们在任务开始执行前,会用 Redis 做幂等检查:
def is_task_already_executed(task_id: str) -> bool:
return redis.exists(f"executed:{task_id}")
def mark_task_executed(task_id: str):
redis.setex(f"executed:{task_id}", 3600, "1")
# 在 _execute_rpa_task 开头
if is_task_already_executed(task_id):
return {"success": True, "skip": True}
另外,RPA流程本身也要设计成幂等的。比如同步订单时,先根据平台订单号查是否已存在,存在则更新而不是重复插入。
这个细节容易被忽略,但线上跑起来之后,重复消费的概率比想象中高。
九、总结:消息驱动让系统更健壮
从轮询改造成消息驱动后,系统稳定性提升了一个量级。
高峰期任务量翻三倍,队列长度短暂上涨,但执行节点自动消化,没有崩溃。
运维也轻松了,不用再写 cron 脚本去扫数据库超时任务。
几点建议:
- 先从最简单的 direct exchange 开始,不要一开始就搞复杂路由
- 务必开启队列持久化 + 消息持久化
- 死信队列一定要有,否则故障任务会无限重试
- 监控 unacked 和 queue length,设置告警
- 幂等设计从第一天就做
如果你目前还是基于数据库轮询,可以先用 RabbitMQ 替换掉调度器的拉取逻辑,其他部分不动。
一旦适应了消息驱动的思维,你会发现很多问题都变得简单了。
作者:林焱
