摘要
云上行情管道中,WebSocket 重连成功不意味着数据完整——连接恢复是传输层的事,K 线连续是业务层的事,两者之间隔着一套必须主动执行的缺口检测与回补流程。本文给出一个云上工程方案:断流后由定时任务基于本地 bar 时间轴检测缺口,通过 REST K 线分段回补缺失窗口,生成 gap_report 写入对象存储或数据库,并对 partial、failed 和 unrecoverable 状态触发监控告警。
你维护着一套云上行情管道:WebSocket 实时接收 K 线推送,落库后驱动看板、告警和 AI Agent 的查询。凌晨网关切换,连接断开。五秒后重连成功,watch dog 显示一切正常。你继续睡觉。
两周后策略回测出现偏差,排查发现那天凌晨的分钟线有一段是平的——不是没有波动,是那几分钟的数据根本没收到。重连后 WebSocket 推送的是重连时刻的快照,中间断流的几条 K 线没有被补回来。你的看板把缺失前后的数据直接连起来,画出了一根平滑的直线。
连接恢复 ≠ 数据完整。 watch dog 只能证明通道回来了,不能证明断流窗口内的数据一条不少。这个缺口如果不主动检测、回补、留痕并告警,就会永远留在你的数据库里,直到某次回测异常才被偶然发现。
一、云上行情管道为什么需要独立的 gap detection 层
在一个典型的云上行情管道中,数据流经五个环节:
| 环节 | 职责 | 断流场景下的风险 |
|---|---|---|
| 数据接入 | WebSocket 接收实时 K 线推送 | 断流期间数据全部缺失,重连后无自动回补 |
| 数据落库 | 订阅数据写入数据库或对象存储 | 断流窗口被静默跳过,留下时间线裂缝 |
| 定时任务 | 执行 gap detection,比对 bar 时间轴与实际收到的数据 | 如果未配置,缺口永远不会被主动发现 |
| 回补执行 | 用 REST K 线按段拉取缺失窗口 | 回补失败或被限流时需退避重试,不能静默跳过 |
| 监控告警 | 检查 gap_report 中的 partial/failed/unrecoverable 状态 | 如果告警规则只覆盖连接状态,数据缺口将长期不被发现 |
actual_times 从哪里来? 从本地聚合或入库后的 bar_time 提取。K 线数据落库时,每根 bar 都应该有自己的时间戳——这个字段是后续所有比对的基础。它不能等同于 WebSocket 消息的到达时间,也不能用当前系统时间替代。如果你的管道在落库时没有保留每根 bar 的原始时间戳,gap detection 就无法执行。
expected_times 从哪里来? 根据 K 线周期和交易日历推算。1 分钟 K 线、断流 10 分钟、且在交易时段内,就应该有 10 根 bar。交易日历必须排除非交易时段、午休和节假日,否则 expected_count 会虚高,回补窗口永远有一批“无法补上”的缺口。
二、缺口检测:用 expected_times 和 actual_times 做集合差
from typing import List
from datetime import datetime
def detect_gaps(expected_times: List[datetime],
actual_times: List[datetime]) -> dict:
"""
expected_times: 根据 K 线周期和交易日历推算的 bar 时间轴。
actual_times: 从本地入库 bar_time 提取的实际收到时间序列。
返回 missing(缺失)、overlap(重叠)、unexpected(意外出现)。
"""
expected_set = set(expected_times)
actual_set = set(actual_times)
missing = sorted(expected_set - actual_set)
overlap = sorted(expected_set & actual_set)
unexpected = sorted(actual_set - expected_set)
return {
"gap_count": len(missing),
"missing": missing,
"overlap_count": len(overlap),
"unexpected_count": len(unexpected),
"is_complete": len(missing) == 0 and len(unexpected) == 0
}
expected_times 的生成逻辑:根据 interval 和断流起止时间推算。断流窗口 + 交易时段过滤 = 应该存在的 bar 序列。交易日历的维护成本往往被低估——各市场的节假日、半天交易日需要持续更新,且必须与数据源的实际开盘时间对齐。
actual_times 的生成逻辑:从数据库或对象存储中提取已入库 bar 的 bar_time,精度需与 expected_times 一致。如果 expected 用分钟级精度,actual 也归一化到同一精度再做比对。两者精度不一致时,缺口检测结果会产生系统性的 false positive 或 false negative——不是数据缺失了,而是你比对的时间刻度没对齐。
三、回补:用 REST K 线分段拉取缺失窗口
WebSocket 面向实时推送,不提供历史回放——这是协议定位问题,不是服务端缺陷。断流期间缺失的 K 线必须通过 REST K 线接口按窗口按需查询。MCP 工具只适合 AI 对话式按需调用,不进入自动化断流回补链路。
def fetch_kline_gap(symbol: str,
interval: str,
start: datetime,
end: datetime) -> dict:
"""
用 REST K 线接口回补缺失窗口。
返回 gap_report 所需的核心字段。
重要:回补前先查 gap_report 表,用 gap_start + gap_end + symbol
+ interval 四个字段去重。如果已有同窗口的成功回补记录,跳过本次请求。
去重逻辑必须在请求前执行,避免浪费 API 配额。
"""
# 教学骨架,具体实现以数据源官方文档为准
return {
"symbol": symbol,
"interval": interval,
"start": start.isoformat(),
"end": end.isoformat(),
"klines": [],
"status": "success",
"raw_snapshot": {}
}
分段请求,不要一次拉太大窗口。 缺口跨越数天时,按天或按小时分段。一次拉太多容易触发限流,且单次失败影响整个窗口。分段请求即使某一段失败,也能定位到具体哪一段没补上。
full 状态不能只看 recovered_count >= expected_count。 必须检查 recovered_times 是否覆盖了 missing_times。如果 expected 缺了 10:03、10:04、10:05 三根 bar,回补返回的是 10:03、10:04、10:06——recovered_count 也是 3,但 missing_times 没有完全覆盖。只看 count 会漏掉这种“数量对但时间不对”的缺口。
四、留痕:gap_report 写入对象存储或数据库
每一次断流回补都生成 gap_report,不管成功还是失败。这是事后排查“那段行情为什么不对”的唯一依据。
| 字段名 | 来源 | 说明 |
|---|---|---|
symbol | 请求参数 | 回补的标的代码 |
interval | 请求参数 | K 线周期 |
gap_start | 检测结果 | 缺口起始时间 |
gap_end | 检测结果 | 缺口结束时间 |
expected_count | 检测结果 | 应有 K 线条数 |
recovered_count | 回补结果 | 实际回补条数 |
raw_snapshot_id | 客户端生成 | 回补请求原始响应体的哈希摘要 |
status | 回补结果 | full(全部补回且 missing_times 覆盖完整)/ partial(部分补回)/ unrecoverable(确认不可恢复)/ failed(请求失败) |
note | 自动或手动 | 缺失条数、失败原因等 |
reported_at | 客户端生成 | 报告生成时间 |
import hashlib
import json
from datetime import datetime, timezone
def write_gap_report(symbol: str,
interval: str,
gap_start: datetime,
gap_end: datetime,
expected_count: int,
recovered_klines: list,
raw_response: dict,
missing_times: list = None,
request_error: str = None) -> dict:
"""生成并返回 gap_report 记录。由调用方决定存储方式。"""
raw_id = hashlib.sha256(
json.dumps(raw_response, sort_keys=True, ensure_ascii=False, default=str).encode()
).hexdigest()[:16]
recovered_count = len(recovered_klines) if recovered_klines else 0
recovered_times = {bar.get("time") for bar in recovered_klines} if recovered_klines else set()
missing_set = set(missing_times) if missing_times else set()
if request_error:
status = "failed"
note = request_error
elif recovered_count == 0:
status = "empty"
note = "check if gap window is outside available history range"
elif missing_set and not missing_set.issubset(recovered_times):
# 数量够,但缺的 bar 时间没被完全覆盖
still_missing = missing_set - recovered_times
status = "partial"
note = f"expected {expected_count}, recovered {recovered_count}, still missing {len(still_missing)} bars: {sorted(still_missing)}"
elif recovered_count >= expected_count:
status = "full"
note = f"expected {expected_count}, recovered {recovered_count}"
else:
status = "partial"
note = f"expected {expected_count}, recovered {recovered_count}, still missing {expected_count - recovered_count}"
return {
"symbol": symbol,
"interval": interval,
"gap_start": gap_start.isoformat(),
"gap_end": gap_end.isoformat(),
"expected_count": expected_count,
"recovered_count": recovered_count,
"raw_snapshot_id": raw_id,
"status": status,
"note": note,
"reported_at": datetime.now(timezone.utc).isoformat()
}
区分 partial 和 unrecoverable。 REST 返回了部分数据但没拉全,是 partial。REST 返回空,且确认了查询参数、权限和历史覆盖范围后判定该时段数据不存在,标 unrecoverable。把 unrecoverable 标成 partial,下游会一直等待永远不会来的回补。
五、告警:对 partial、failed、unrecoverable 触发监控
定时任务扫描最近一段时间的 gap_report,发现以下状态触发告警:
| 状态 | 告警级别 | 处理动作 |
|---|---|---|
failed | 高 | 立即通知,排查网络、鉴权或限流原因 |
unrecoverable | 中 | 记录永久缺口,标记该时段数据不可用于回测 |
partial | 中 | 保留窗口,在下次定时任务中重新尝试回补,设定重试上限 |
empty | 低 | 人工确认是否升级为 unrecoverable |
六、TickDB 在云上管道中的 REST / WebSocket / MCP 边界
这套断流检测、REST 回补和 gap_report 留痕流程是通用方法,不绑定任何特定数据源。如果使用 TickDB 作为行情接入入口,三个通道的职责边界如下:
| 通道 | 职责 | 断流回补场景中的角色 |
|---|---|---|
| WebSocket | 持续推送实时行情 | 提供 actual_times 的数据来源;断流后不能回补历史 |
| REST K 线 | 按窗口查询历史 K 线 | 唯一合法的回补通道;分段拉取缺失窗口 |
| MCP | AI 工具按需查询 | 仅适合对话式查询,不进入自动化监控链路 |
三个通道各司其职,不能混用。WebSocket 不是历史数据源,REST K 线不是实时推送通道,MCP 不是自动化回补工具。所有端点、字段路径、timestamp 语义和时间戳口径,以 TickDB 官方文档和你自己的实测为准。
七、最小检查清单
- 重连后,有没有根据 interval 和交易日历生成 expected_times?
- actual_times 是否来自入库后的 bar_time,而非 WebSocket 消息到达时间?
- 回补前是否检查了 gap_report 表,防止重复写入同一缺口?
- full 状态是否检查了 recovered_times 覆盖 missing_times,而不仅仅是 recovered_count?
- partial 和 unrecoverable 是否正确区分?
- gap_report 是否写入对象存储或数据库,留存 raw_snapshot_id?
- 是否对 failed、unrecoverable、partial 状态配置了分级告警?
标签: 云上行情管道 / WebSocket 断流 / 数据回补 / gap_report / 可观测性 / TickDB / 火山引擎
📡 本文以 TickDB WebSocket 和 REST K 线接口作为行情接入示例。文中代码为 Python 教学骨架,所有端点、字段和参数以官方文档为准。本文仅讨论断流回补的工程方法,不构成投资建议。
