定时任务设了一个固定超时,短流程绰绰有余,长流程频频误杀。
任务的执行时间不是常数,它是一个随数据量、网络状态、店铺规模变化的波动曲线。
我们的调度系统在很长一段时间里,对任务执行时间的认知非常粗糙。流程配置里写一个固定的 timeout,比如订单采集设1800秒,消息回复设300秒。超时了就强行终止,标记失败,触发重试。
日常情况下这个机制没大问题。但一旦平台搞促销、数据量翻倍、网络质量波动,固定的超时阈值就开始出各种幺蛾子。采集流程原本30分钟跑完,大促期间需要50分钟,结果刚到30分钟就被调度中心无情地kill了。重试之后还是一样被杀,三次重试全部用完,任务彻底失败,人工介入。
另一面,一些本该3分钟跑完的轻量流程,因为一个罕见的页面卡顿被设了15分钟的超时,迟迟不释放浏览器实例,白白占用资源。
我们开始意识到,任务超时不应该是一个写死的数字,而应该是一个基于历史数据和实时上下文的动态预测值。 调度系统如果能预估每个任务大概需要跑多久,不仅能让超时设置更合理,还能做很多事情——更精准的资源分配、更早的异常发现、更合理的SLA承诺。
这篇文章讲我们怎么从零建立任务执行时间的预估模型,并把它用到调度的各个环节里去。
一、执行时间的变量来源
一个影刀RPA流程的执行时间,受多重因素影响。
固定因素:流程类型(采集比回复慢)、目标平台(TEMU页面比拼多多重)、店铺所在区域(海外网络延迟)。
可变因素:数据量(今天要采集的订单数是昨天的一倍)、翻页深度、页面加载速度(受平台服务端负载影响)、代理网络质量、执行节点当前负载。
随机因素:偶发的页面弹窗、平台临时限流、浏览器内存回收导致的卡顿。
固定因素让同类任务有一个基础耗时基线,可变因素让每次执行的实际时间在基线上波动,随机因素则是偶尔出现的尖刺。
预估的目标不是精准到秒,而是给出一个合理的区间,让调度系统能在大部分情况下做出正确的判断。
二、时间预估的特征工程
我们翻出了过去三个月所有任务的执行日志,提取每一条任务的以下特征:
flow_name:流程名
platform:平台shop_id:店铺(隐含店铺规模和区域)data_volume:本次执行涉及的预估数据量(采集前无法精确知道,但可以用历史平均或前次实际值做粗略估算)time_of_day:执行时段(平台负载有时段特征)node_load:执行节点当前的CPU和内存使用率proxy_health_score:代理健康评分previous_run_duration:同一店铺同一流程上一次执行耗时
标签就是本次任务的实际执行时长。过滤掉因异常中断导致数据不完整的记录,最终得到大约12万条有效样本。
我们没有一上来就上复杂模型,先用统计方法分析了各特征与执行时长的相关性。
最强的三个特征依次是:流程类型(不同流程耗时差异巨大,这是最主要的因子)、预估数据量(翻页深度与耗时近似线性关系)、上一次执行耗时(同一店铺同一流程的耗时具有强自相关性,只要数据量波动不大,上一次跑多久这一次大概率也差不多)。
代理健康评分和节点负载的影响相对小,但它们在极端值下会显著拖慢执行——代理延迟超过500ms时,页面交互耗时成倍增加。
三、从简单基线到分层预测
我们采用渐进式的方法构建预估模型。
第一阶段:历史均值基线。
对于每种流程类型,直接取最近30次成功执行的平均耗时作为预估值。超时阈值设为预估值的1.5倍。
这个极简版本上线后,超时误杀率立刻下降了40%。但问题是它对数据量波动不敏感——一个平常只采200条订单的店铺突然采2000条,预估还是按平均的15分钟,实际跑了45分钟,照样被1.5倍(22.5分钟)的超时误杀。
第二阶段:按数据量分桶的查表模型。
我们根据流程日志中的翻页次数或采集条数,把每个流程的样本按数据量分桶(0100条、100500条、500~2000条、2000条以上),每个桶单独计算均值和标准差。
调度时,任务携带一个预估数据量参数(来自上一次采集的实际数据量或业务系统传入的目标范围),引擎查表获得对应桶的平均耗时和P95耗时。超时阈值设为 P95 + 20% 缓冲。
class DurationEstimator:
def __init__(self):
self.bucket_stats = self._load_bucket_stats()
def _load_bucket_stats(self):
# 从历史数据库加载按流程和数据量分桶的统计
return {
"order_collect": {
"0-100": {"mean": 180, "p95": 240},
"100-500": {"mean": 420, "p95": 580},
"500-2000": {"mean": 900, "p95": 1350},
"2000+": {"mean": 2100, "p95": 3200}
}
}
def estimate(self, flow_name, estimated_data_volume):
buckets = self.bucket_stats.get(flow_name, {})
for bucket_range, stats in buckets.items():
low, high = self._parse_range(bucket_range)
if low <= estimated_data_volume <= high:
return stats
# fallback到最大桶
return buckets.get(list(buckets.keys())[-1], {"mean": 600, "p95": 900})
分桶模型让预估准确度进一步提升。数据量突增时,超时阈值自动放宽,误杀率再降了30%。
第三阶段:加入店铺个性的修正因子。
有些店铺的页面加载就是比别人慢——可能是商品数量多导致后台页面更重,可能是代理线路质量一般。我们在分桶均值基础上,给每个店铺计算了一个耗时偏离因子。
偏离因子 = 该店铺该流程最近10次执行的实际耗时均值 / 全平台同流程同数据量区间的均值。
预估值 = 分桶均值 × 店铺偏离因子。
def get_shop_deviation_factor(shop_id, flow_name):
recent_runs = get_recent_runs(shop_id, flow_name, limit=10)
if not recent_runs:
return 1.0
shop_avg = sum(r.duration for r in recent_runs) / len(recent_runs)
global_avg = get_global_average(flow_name, recent_runs[0].data_volume_bucket)
return shop_avg / global_avg if global_avg > 0 else 1.0
这个因子让模型从“通用预估”走向“个性化预估”。每个店铺有自己的节奏,模型尊重这种差异。
四、实时修正:当前上下文的微调
静态预估基于历史数据,但当前执行环境可能与历史差异巨大。我们在任务执行过程中加入了实时修正。
流程在关键步骤(翻页、大批量写入)执行时,Python插件会记录当前步骤的实际耗时,并与预估步骤耗时对比。如果偏差超过50%,引擎会动态调整剩余步骤的预估时间,并更新总超时阈值。
def adjust_timeout_on_progress(task_id, step_name, step_elapsed, step_estimated):
deviation = step_elapsed / step_estimated
if deviation > 1.5:
remaining_steps = get_remaining_steps(task_id)
remaining_estimated = sum(s.estimated_duration for s in remaining_steps)
new_remaining_estimate = remaining_estimated * deviation
new_total_timeout = get_elapsed(task_id) + new_remaining_estimate * 1.2
update_task_timeout(task_id, new_total_timeout)
logger.info(f"Task {task_id} timeout adjusted due to step deviation: {deviation:.2f}x")
这个动态调整机制确保了即使预估阶段低估了难度,任务也不会在接近终点时被误杀。
五、预估在调度中的应用
时间预估不只是用来设超时的。它在调度的多个环节发挥了作用。
资源分配:调度中心在分配浏览器实例时,会考虑当前运行任务的预估剩余时间。如果两个任务竞争同一资源,优先给预估耗时短的任务分配,减少整体等待时间。
DAG关键路径优化:DAG编排引擎根据每个节点的预估耗时,计算工作流的关键路径。非关键路径上的节点可以适当延迟启动或降低优先级,把资源留给关键路径节点。
SLA预警:如果某个任务的预估耗时已经超过了它的SLA窗口,调度中心会在任务开始前就发出预警,告知运营“今天的采集可能比平时慢,数据产出会延迟”。
异常早期检测:如果任务的实际耗时大幅超过P95预估(比如超出2倍),且实时修正已经触发过但仍在恶化,系统会在超时之前就生成告警,提醒可能出现了非正常的卡顿。
六、预估模型的自适应更新
业务在变,平台在变,耗时基线也在变。一个月前的分桶统计,可能已经无法反映当前的实际情况。
我们给预估模型加了自适应更新机制。每周自动重新计算一次分桶统计和店铺偏离因子,基于最近30天的数据。更新后的模型自动替换旧模型,调度中心无需重启。
同时,如果某类流程的执行时间在短期内(比如3天内)发生了持续漂移——比如P50耗时从300秒连续上升到了450秒——系统会触发一个提醒,建议工程师检查是否存在页面改版或代理质量劣化。
def detect_duration_drift(flow_name, window_days=3):
recent = get_durations(flow_name, days=window_days)
baseline = get_baseline_durations(flow_name, days=30, exclude_recent=window_days)
recent_median = median(recent)
baseline_median = median(baseline)
if recent_median > baseline_median * 1.3:
alert(f"Duration drift detected for {flow_name}: "
f"recent median {recent_median}s vs baseline {baseline_median}s")
这个机制让时间预估从一个静态模型变成了一个活的、持续校准的系统组件。
七、这套体系的实际收益
时间预估模型上线后,我们拿到了几组实打实的改进数据:
- 超时误杀率(任务实际能跑完但被超时kill)从8.3%降到了1.1%。
- 由于超时设置更合理,无效重试次数下降了约55%,省出了大量浏览器实例时间。
- DAG工作流的平均完成时间缩短了12%,因为关键路径被更精准地识别和优先调度。
- 运营关于“数据产出时间不确定”的咨询量明显减少,因为预警系统会提前告知可能的延迟。
还有一个意料之外的收益:时间预估数据成了我们向业务方承诺SLA的依据。业务问“早上8点触发的采集,几点能出数据”,我们可以给出“预计9点15分,最晚不超过9点40分”这样的回答,而不是“看情况”。
可预测性是自动化系统从“工具”走向“服务”的关键一步。
工具只需要跑出结果,服务需要给出承诺。
八、继续改进的方向
当前模型最薄弱的环节是预估数据量的准确性。采集流程的数据量是在执行过程中才能确定的——翻到最后一页才知道一共有多少条。我们目前用上一次的实际采集量或业务系统的粗略估计作为输入,偏差有时较大。
下一步想做的,是在流程执行到前几页时,根据页均数据量和分页总数(如果能从DOM获取),快速修正预估数据量,并联动修正剩余时间预估。这需要更紧密的流程内嵌点,但不涉及架构改动。
另一个方向是把预估的不确定性可视化。在DAG看板上,每个节点不仅显示“预计耗时X分钟”,还显示一个置信区间。让运营和运维看到预估的可靠性,而不仅仅是一个点值。
时间预估不是玄学,是可以用工程方法逐步逼近的科学。
每一次任务的执行,都在为下一次预估贡献数据,让系统越来越了解自己。
作者:林焱
一个深信自动化系统应该知道自己要跑多久的工程师
