云上行情管道断流回补:从缺口识别到 gap_report 留痕的工程闭环

摘要

云上行情管道中,WebSocket 重连成功不意味着数据完整——连接恢复是传输层的事,K 线连续是业务层的事,两者之间隔着一套必须主动执行的缺口检测与回补流程。本文给出一个云上工程方案:断流后由定时任务基于本地 bar 时间轴检测缺口,通过 REST K 线分段回补缺失窗口,生成 gap_report 写入对象存储或数据库,并对 partial、failed 和 unrecoverable 状态触发监控告警。


你维护着一套云上行情管道:WebSocket 实时接收 K 线推送,落库后驱动看板、告警和 AI Agent 的查询。凌晨网关切换,连接断开。五秒后重连成功,watch dog 显示一切正常。你继续睡觉。

两周后策略回测出现偏差,排查发现那天凌晨的分钟线有一段是平的——不是没有波动,是那几分钟的数据根本没收到。重连后 WebSocket 推送的是重连时刻的快照,中间断流的几条 K 线没有被补回来。你的看板把缺失前后的数据直接连起来,画出了一根平滑的直线。

连接恢复 ≠ 数据完整。 watch dog 只能证明通道回来了,不能证明断流窗口内的数据一条不少。这个缺口如果不主动检测、回补、留痕并告警,就会永远留在你的数据库里,直到某次回测异常才被偶然发现。


一、云上行情管道为什么需要独立的 gap detection 层

在一个典型的云上行情管道中,数据流经五个环节:

环节职责断流场景下的风险
数据接入WebSocket 接收实时 K 线推送断流期间数据全部缺失,重连后无自动回补
数据落库订阅数据写入数据库或对象存储断流窗口被静默跳过,留下时间线裂缝
定时任务执行 gap detection,比对 bar 时间轴与实际收到的数据如果未配置,缺口永远不会被主动发现
回补执行用 REST K 线按段拉取缺失窗口回补失败或被限流时需退避重试,不能静默跳过
监控告警检查 gap_report 中的 partial/failed/unrecoverable 状态如果告警规则只覆盖连接状态,数据缺口将长期不被发现

actual_times 从哪里来? 从本地聚合或入库后的 bar_time 提取。K 线数据落库时,每根 bar 都应该有自己的时间戳——这个字段是后续所有比对的基础。它不能等同于 WebSocket 消息的到达时间,也不能用当前系统时间替代。如果你的管道在落库时没有保留每根 bar 的原始时间戳,gap detection 就无法执行。

expected_times 从哪里来? 根据 K 线周期和交易日历推算。1 分钟 K 线、断流 10 分钟、且在交易时段内,就应该有 10 根 bar。交易日历必须排除非交易时段、午休和节假日,否则 expected_count 会虚高,回补窗口永远有一批“无法补上”的缺口。


二、缺口检测:用 expected_times 和 actual_times 做集合差

from typing import List
from datetime import datetime


def detect_gaps(expected_times: List[datetime],
                actual_times: List[datetime]) -> dict:
    """
    expected_times: 根据 K 线周期和交易日历推算的 bar 时间轴。
    actual_times: 从本地入库 bar_time 提取的实际收到时间序列。
    返回 missing(缺失)、overlap(重叠)、unexpected(意外出现)。
    """
    expected_set = set(expected_times)
    actual_set = set(actual_times)

    missing = sorted(expected_set - actual_set)
    overlap = sorted(expected_set & actual_set)
    unexpected = sorted(actual_set - expected_set)

    return {
        "gap_count": len(missing),
        "missing": missing,
        "overlap_count": len(overlap),
        "unexpected_count": len(unexpected),
        "is_complete": len(missing) == 0 and len(unexpected) == 0
    }

expected_times 的生成逻辑:根据 interval 和断流起止时间推算。断流窗口 + 交易时段过滤 = 应该存在的 bar 序列。交易日历的维护成本往往被低估——各市场的节假日、半天交易日需要持续更新,且必须与数据源的实际开盘时间对齐。

actual_times 的生成逻辑:从数据库或对象存储中提取已入库 bar 的 bar_time,精度需与 expected_times 一致。如果 expected 用分钟级精度,actual 也归一化到同一精度再做比对。两者精度不一致时,缺口检测结果会产生系统性的 false positive 或 false negative——不是数据缺失了,而是你比对的时间刻度没对齐。


三、回补:用 REST K 线分段拉取缺失窗口

WebSocket 面向实时推送,不提供历史回放——这是协议定位问题,不是服务端缺陷。断流期间缺失的 K 线必须通过 REST K 线接口按窗口按需查询。MCP 工具只适合 AI 对话式按需调用,不进入自动化断流回补链路。

def fetch_kline_gap(symbol: str,
                    interval: str,
                    start: datetime,
                    end: datetime) -> dict:
    """
    用 REST K 线接口回补缺失窗口。
    返回 gap_report 所需的核心字段。
    
    重要:回补前先查 gap_report 表,用 gap_start + gap_end + symbol
    + interval 四个字段去重。如果已有同窗口的成功回补记录,跳过本次请求。
    去重逻辑必须在请求前执行,避免浪费 API 配额。
    """
    # 教学骨架,具体实现以数据源官方文档为准
    return {
        "symbol": symbol,
        "interval": interval,
        "start": start.isoformat(),
        "end": end.isoformat(),
        "klines": [],
        "status": "success",
        "raw_snapshot": {}
    }

分段请求,不要一次拉太大窗口。 缺口跨越数天时,按天或按小时分段。一次拉太多容易触发限流,且单次失败影响整个窗口。分段请求即使某一段失败,也能定位到具体哪一段没补上。

full 状态不能只看 recovered_count >= expected_count。 必须检查 recovered_times 是否覆盖了 missing_times。如果 expected 缺了 10:03、10:04、10:05 三根 bar,回补返回的是 10:03、10:04、10:06——recovered_count 也是 3,但 missing_times 没有完全覆盖。只看 count 会漏掉这种“数量对但时间不对”的缺口。


四、留痕:gap_report 写入对象存储或数据库

每一次断流回补都生成 gap_report,不管成功还是失败。这是事后排查“那段行情为什么不对”的唯一依据。

字段名来源说明
symbol请求参数回补的标的代码
interval请求参数K 线周期
gap_start检测结果缺口起始时间
gap_end检测结果缺口结束时间
expected_count检测结果应有 K 线条数
recovered_count回补结果实际回补条数
raw_snapshot_id客户端生成回补请求原始响应体的哈希摘要
status回补结果full(全部补回且 missing_times 覆盖完整)/ partial(部分补回)/ unrecoverable(确认不可恢复)/ failed(请求失败)
note自动或手动缺失条数、失败原因等
reported_at客户端生成报告生成时间
import hashlib
import json
from datetime import datetime, timezone


def write_gap_report(symbol: str,
                     interval: str,
                     gap_start: datetime,
                     gap_end: datetime,
                     expected_count: int,
                     recovered_klines: list,
                     raw_response: dict,
                     missing_times: list = None,
                     request_error: str = None) -> dict:
    """生成并返回 gap_report 记录。由调用方决定存储方式。"""
    raw_id = hashlib.sha256(
        json.dumps(raw_response, sort_keys=True, ensure_ascii=False, default=str).encode()
    ).hexdigest()[:16]

    recovered_count = len(recovered_klines) if recovered_klines else 0
    recovered_times = {bar.get("time") for bar in recovered_klines} if recovered_klines else set()
    missing_set = set(missing_times) if missing_times else set()

    if request_error:
        status = "failed"
        note = request_error
    elif recovered_count == 0:
        status = "empty"
        note = "check if gap window is outside available history range"
    elif missing_set and not missing_set.issubset(recovered_times):
        # 数量够,但缺的 bar 时间没被完全覆盖
        still_missing = missing_set - recovered_times
        status = "partial"
        note = f"expected {expected_count}, recovered {recovered_count}, still missing {len(still_missing)} bars: {sorted(still_missing)}"
    elif recovered_count >= expected_count:
        status = "full"
        note = f"expected {expected_count}, recovered {recovered_count}"
    else:
        status = "partial"
        note = f"expected {expected_count}, recovered {recovered_count}, still missing {expected_count - recovered_count}"

    return {
        "symbol": symbol,
        "interval": interval,
        "gap_start": gap_start.isoformat(),
        "gap_end": gap_end.isoformat(),
        "expected_count": expected_count,
        "recovered_count": recovered_count,
        "raw_snapshot_id": raw_id,
        "status": status,
        "note": note,
        "reported_at": datetime.now(timezone.utc).isoformat()
    }

区分 partial 和 unrecoverable。 REST 返回了部分数据但没拉全,是 partial。REST 返回空,且确认了查询参数、权限和历史覆盖范围后判定该时段数据不存在,标 unrecoverable。把 unrecoverable 标成 partial,下游会一直等待永远不会来的回补。


五、告警:对 partial、failed、unrecoverable 触发监控

定时任务扫描最近一段时间的 gap_report,发现以下状态触发告警:

状态告警级别处理动作
failed立即通知,排查网络、鉴权或限流原因
unrecoverable记录永久缺口,标记该时段数据不可用于回测
partial保留窗口,在下次定时任务中重新尝试回补,设定重试上限
empty人工确认是否升级为 unrecoverable

六、TickDB 在云上管道中的 REST / WebSocket / MCP 边界

这套断流检测、REST 回补和 gap_report 留痕流程是通用方法,不绑定任何特定数据源。如果使用 TickDB 作为行情接入入口,三个通道的职责边界如下:

通道职责断流回补场景中的角色
WebSocket持续推送实时行情提供 actual_times 的数据来源;断流后不能回补历史
REST K 线按窗口查询历史 K 线唯一合法的回补通道;分段拉取缺失窗口
MCPAI 工具按需查询仅适合对话式查询,不进入自动化监控链路

三个通道各司其职,不能混用。WebSocket 不是历史数据源,REST K 线不是实时推送通道,MCP 不是自动化回补工具。所有端点、字段路径、timestamp 语义和时间戳口径,以 TickDB 官方文档和你自己的实测为准。


七、最小检查清单

  • 重连后,有没有根据 interval 和交易日历生成 expected_times?
  • actual_times 是否来自入库后的 bar_time,而非 WebSocket 消息到达时间?
  • 回补前是否检查了 gap_report 表,防止重复写入同一缺口?
  • full 状态是否检查了 recovered_times 覆盖 missing_times,而不仅仅是 recovered_count?
  • partial 和 unrecoverable 是否正确区分?
  • gap_report 是否写入对象存储或数据库,留存 raw_snapshot_id?
  • 是否对 failed、unrecoverable、partial 状态配置了分级告警?

标签: 云上行情管道 / WebSocket 断流 / 数据回补 / gap_report / 可观测性 / TickDB / 火山引擎

📡 本文以 TickDB WebSocket 和 REST K 线接口作为行情接入示例。文中代码为 Python 教学骨架,所有端点、字段和参数以官方文档为准。本文仅讨论断流回补的工程方法,不构成投资建议。

0
0
0
0
评论
未登录
暂无评论