影刀RPA工程实战:构建API与RPA混合执行引擎及智能降级策略

影刀RPA工程实战:构建API与RPA混合执行引擎及智能降级策略

能用API一秒拉完的数据,为什么要让浏览器翻二十页去采集?
但API挂了的时候,流程能不能自己切回浏览器,当什么都没发生一样继续跑?

店群自动化做到深水区,我们开始盯上一个被很多人忽视的效率金矿——平台官方API。

TEMU、拼多多、TikTok Shop都有面向商家的开放接口。订单查询、商品上下架、物流发货、消息推送,这些操作通过API调用比RPA模拟点击快一个数量级,也稳定得多。但API有自己的限制:频次上限、功能覆盖不全、接口偶尔变更或临时降级。

我们在生产环境里跑了快一年的混合执行,把“API优先、RPA兜底”的策略做成了系统级的执行引擎。这篇文章还原这套混合引擎的设计思路和踩过的坑——所有关于API的使用都基于平台官方文档和正规授权,不涉及任何逆向或非公开接口。


一、API和RPA的分工边界

picture.image 首先要搞清楚:什么该走API,什么该走RPA。

API擅长的场景:

  • 批量数据读写:订单列表拉取、商品信息批量更新、物流单号回传
  • 高频实时操作:消息列表轮询、库存变更同步
  • 结构化数据交换:返回JSON,无需解析DOM

RPA擅长的场景:

  • API未开放的功能:某些平台的活动报名、特定报表导出、售后纠纷申诉
  • 页面复杂交互:需要拖拽、多步确认、富文本编辑的操作
  • API临时故障时的降级替代

混合执行的原则:能用API的优先API,API搞不定的才上浏览器,并且两者可以动态切换。

picture.image

API是快车道,RPA是越野车。
好的调度系统应该根据路况自动选车,而不是绑死某一种。

picture.image

二、混合执行引擎的架构

picture.image 引擎的核心是一个执行策略路由器。每个流程步骤在定义时,可以同时声明API执行方式和RPA执行方式,并给出优先级和降级条件。

流程定义里,一个“获取订单列表”的步骤长这样:


![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/0e53c817f052439cbde0c0ad76d2e604~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1780159652&x-signature=RDkKdpuiYPdSwihbHFNIrm%2BBXB0%3D)
step:
  name: "fetch_order_list"
  primary:
    type: "api"
    endpoint: "/order/list"
    method: "GET"
    params:
      start_date: "{{start_date}}"
      end_date: "{{end_date}}"
    timeout: 10000
    max_retries: 1
  fallback:
    type: "rpa"
    component: "paginated_collect"
    timeout: 120000
  degrade_conditions:
    - api_response_time > 3000ms
    - api_error_code in ["RATE_LIMITED", "SERVICE_UNAVAILABLE"]
    - api_result_count < expected_min_count * 0.5
  decision_ttl: 300  # 降级决定缓存5分钟

picture.image 执行时,引擎先走primary。如果API返回正常且在时延阈值内,直接用结果。一旦触发degrade_conditions中的任一条件,引擎自动将这一步降级为fallback,用RPA重新执行。


三、API适配层:抹平不同平台的差异

三个平台的API风格各不相同。拼多多用自家SDK,TEMU的接口还在快速迭代,TikTok Shop的API有严格的app-scoped鉴权。如果每个流程都直接调裸接口,维护成本直接爆炸。

我们做了一个统一的API适配层,对外暴露标准化的业务接口,对内封装各平台的鉴权、签名、限流、重试逻辑。

class PlatformAPIAdapter:
    def __init__(self, shop_id, api_credentials):
        self.shop_id = shop_id
        self.platform = get_platform(shop_id)
        self.credentials = api_credentials
        self.rate_limiter = RateLimiter(shop_id)

    def get_order_list(self, start_date, end_date, page_size=100):
        self.rate_limiter.acquire()
        if self.platform == "temu":
            return self._temu_get_orders(start_date, end_date, page_size)
        elif self.platform == "pdd":
            return self._pdd_get_orders(start_date, end_date, page_size)
        elif self.platform == "tiktok_shop":
            return self._tiktok_get_orders(start_date, end_date, page_size)

    def _temu_get_orders(self, start_date, end_date, page_size):
        # 封装TEMU API的签名、分页游标、响应解析
        resp = requests.get(
            f"{TEMU_API_BASE}/order/list",
            headers=self._build_auth_headers(),
            params={
                "startDate": start_date,
                "endDate": end_date,
                "pageSize": page_size
            },
            timeout=10
        )
        return self._parse_response(resp)

适配层还做了响应标准化。不论哪个平台,订单列表返回的JSON都被转换为统一的数据模型,下游流程只认标准字段,不感知平台差异。


四、智能降级决策

降级条件不是一成不变的硬编码,而是结合了实时指标和历史数据的动态决策。

响应时延阈值动态化:API的响应时间基线来自过去1小时内的P95值。如果当前调用耗时超过基线的2倍,触发降级。这样做的好处是避免在平台整体变慢时,所有店铺同时降级导致RPA实例池被挤爆。

错误码分级:不是所有API错误都该降级。RATE_LIMITED降级到RPA也许更慢,因为RPA操作页面同样受频率约束。我们选择在限流时等待并重试,而不是降级。只有明确的服务不可用(5xx、连接超时)才降级。

局部降级而非全量切换:降级的粒度是步骤级,不是店铺级。一个店铺的“订单采集”降级到RPA了,“物流发货”可能仍然用API。引擎为每个步骤独立维护降级状态,互不干扰。

def should_degrade(step_name, api_response, baseline_metrics):
    # 检查响应时间
    if api_response.elapsed_ms > baseline_metrics.api_p95_latency * 2:
        logger.info(f"Step {step_name} degraded due to high latency")
        return True
    # 检查HTTP状态
    if api_response.status_code >= 500:
        logger.info(f"Step {step_name} degraded due to server error")
        return True
    # 检查数据完整性
    if api_response.result_count < baseline_metrics.expected_min_count * 0.5:
        logger.warning(f"Step {step_name} degraded due to data anomaly")
        return True
    return False

五、降级后的恢复策略

降级不是永久的。系统持续监测API的恢复情况。

引擎每隔5分钟(decision_ttl配置)会用一个轻量级的探测请求去试探API是否恢复正常。探测请求是一个极低成本的接口(通常是API健康检查或拉取最新一条数据),如果连续三次探测正常,引擎自动把步骤切回API模式。

def probe_api_recovery(step_name, shop_id):
    for i in range(3):
        try:
            resp = adapter.health_check()
            if resp.status_code != 200 or resp.elapsed_ms > RECOVERY_LATENCY_THRESHOLD:
                return False
            time.sleep(30)
        except Exception:
            return False
    logger.info(f"Step {step_name} API recovered, switching back")
    return True

这个恢复机制让我们避免了“降级后就一直走RPA,API好了也没人知道”的尴尬。几次平台大促后API恢复,系统自动切回了高效模式,运营和工程师都没感知到。


六、混合执行的成本核算

API调用和RPA执行的成本结构完全不同。

  • API:几乎没有服务器资源消耗,但可能有调用次数费用(部分平台API超过免费额度后收费)。
  • RPA:消耗执行节点的CPU、内存、浏览器实例,占用代理带宽,还有时间成本。

我们在每次执行后都记录实际使用的执行方式,成本归因脚本会把API费用和RPA资源消耗分别摊到店铺上。长期数据跑下来,一个清晰的结论是:API优先策略让单店铺的日均计算资源消耗下降了约40%,任务平均执行时长缩短了60%。

这也成了我们向管理层汇报自动化ROI时的重要论据——不是“自动化省了多少人力”,而是“用API进一步省了多少机器”。


七、限流器的工程实现

API调用必须严格控制频率,否则一个店铺的突发请求可能把整个IP出口的配额打满,影响同出口下的其他店铺。

限流器不是简单的“每秒N次”,而是基于令牌桶算法,支持按店铺、按API端点两级限流。

import time
import threading

class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def acquire(self):
        with self.lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False

class RateLimiter:
    def __init__(self, shop_id):
        self.shop_bucket = TokenBucket(rate=10, capacity=20)  # 单店铺每秒10次
        self.api_buckets = {
            "order_list": TokenBucket(rate=5, capacity=10),
            "product_update": TokenBucket(rate=2, capacity=5),
        }

    def acquire(self, endpoint=None):
        if not self.shop_bucket.acquire():
            wait_time = 1.0 / self.shop_bucket.rate
            time.sleep(wait_time)
            return self.acquire(endpoint)
        if endpoint and endpoint in self.api_buckets:
            if not self.api_buckets[endpoint].acquire():
                time.sleep(0.2)
                return self.acquire(endpoint)
        return True

限流器在适配层自动调用,流程开发者不用关心细节。当限流触发等待时,日志里会记录等待时间,用于监控和优化。


八、混合执行的结果一致性校验

API返回的数据和RPA从页面采集的数据,理论上是同一份数据,但实际可能出现偏差——API有缓存延迟,页面数据更新比API快半分钟。

当降级发生时,意味着这一步从API切到了RPA。如果后面的步骤又切回了API,可能导致上下游数据不一致。比如用API拉到的订单列表是旧的,用RPA采集到的商品价格是新的,两者拼接后逻辑对不上。

我们做了一层结果快照校验。混合执行的关键步骤,会将API或RPA产出的数据做一份快照签名。下游步骤如果发现输入数据与快照版本不匹配,会触发一次重新同步。

def verify_snapshot_consistency(step_output, downstream_input):
    if step_output.get("snapshot_id") != downstream_input.get("expected_snapshot_id"):
        logger.warning("Snapshot mismatch detected, triggering re-sync")
        return False
    return True

这个机制在大多数时候是无感的,但在一次平台数据迁移期间帮了大忙——API返回的数据滞后了约10分钟,而RPA拿到的是实时的,两者差异被快照校验捕获,避免了后续分析流程基于旧数据做决策。


九、与DAG编排的联动

混合执行引擎和DAG工作流编排是天然打通的。

DAG里的每个节点(任务)都可以声明自己的执行策略。调度中心在分发任务时,会带上执行策略参数。执行节点拿到任务后,混合引擎根据策略决定走API还是RPA。

如果节点因为API故障降级到了RPA,这个信息会回传给调度中心。调度中心可以基于降级状态动态调整DAG的并行度——因为RPA的执行时间长,原本设计为并行的节点可能需要串行以避免浏览器池枯竭。

我们在一个复杂的每日运营DAG里验证了这种联动。API正常情况下,订单采集和商品采集并行跑,20秒收工。API异常时,订单采集降级为RPA(耗时5分钟),调度中心收到降级通知后,自动将原本并行的商品采集推迟1分钟启动,给订单采集留出足够的浏览器实例。整个过程没人介入,系统自己协调好了。


十、混合引擎的边界与不适用场景

不是所有操作都适合混合。以下场景我们明确禁用了混合策略,强制只用RPA:

  • 需要人机交互的操作:如扫码登录、验证码识别,API无法处理,也没有备用意义。
  • 强依赖页面渲染的操作:如截图取证、视觉元素检测,API拿不到视觉信息。
  • API合规限制的操作:部分平台API禁止用于某些业务场景,必须在浏览器里按规范操作。

另外,混合引擎的复杂度也带来了测试成本的上升。每一步都需要测两种执行路径。我们的沙箱测试自动覆盖了两条路径——正常模式走API,注入API故障后走RPA降级路径。


十一、接下来的方向

混合引擎已经在生产环境稳定运行了大半年。下一步我们在尝试的是跨平台数据融合。通过API拉取多个平台的订单和商品数据,在数据加工层做交叉分析,产出跨平台运营建议。API让这种跨平台的数据汇聚变得经济可行——如果用RPA去采集全量数据,机器成本和时间成本都太高了。

另一个方向是API变更的自动感知。平台API接口偶尔会调整字段或限频规则,目前靠人工巡检和告警。我们想做一个轻量的API合约测试,每次API调用后对比响应Schema与基线,发现差异自动告警并尝试适配。

API是自动化的另一条腿,和RPA配合起来才能跑得又快又稳。
不要把它们当成互斥的选择,工程化的艺术在于让最合适的工具在最合适的时机上场。

作者:林焱
一个致力于让API和RPA在同一个引擎里和谐共舞的工程师

0
0
0
0
评论
未登录
暂无评论