行情数据冷热接续方案:REST 回放与 WebSocket 实时推送的接缝处理

287 毫秒 · 47 笔成交 · 1 个全天最高价

09:31:00.000,REST 最后一根 1 分钟 K 线结束。09:31:00.287,WebSocket 第一条实时 tick 到达。中间这 287 毫秒里成交的 47 笔,策略一无所知。

这不是策略的错,是 REST 和 WebSocket 之间那道需要精确接合的缝。


读完这篇文章你能带走什么

你遇到的问题本文给出的方案你省下的时间
REST 和 WebSocket 字段名不统一两套字段体系精确对照,接续映射写一次永久复用5 天 → 半天
断线后不知道从哪里补数据锚点持久化 + 断点续传,先补历史再订实时避免漏单导致的实盘偏离
休市时段误判断线,反复重连业务层心跳判断 vs 传输层 keepalive 分离少排一个通宵
盘中冷启动,锚点落在未完成 K 线上方案 A 锚点偏移 vs 方案 B 预热缓冲,选择条件明确10 分钟上线,不用猜

一张图说清楚:冷热分层模型

冷数据(历史 K 线)          温数据(接缝)          热数据(实时 Tick)
◄───────────────────▶  ◄──287ms──▶  ◄─────────────────────────────▶
  REST 批量拉取              锚点              WebSocket 流式推送
  /v1/market/kline           ▲            wss://.../v1/realtime
     │                       │                       │
     │      最后已完成 K 线的 time + 周期             │
     │                       │                       │
     └───────────────────────┴───────────────────────┘
                   同一毫秒 UTC 时钟源

核心逻辑:取 REST 最后一条已完成 K 线的 time + K 线周期,作为锚点。WebSocket 只接收 timestamp 严格大于锚点的 tick。锚点定准,不重不漏;锚点偏了,要么漏信号,要么重复触发。

类比:这和数据库读写分离的同步延迟一样。从只读库切回主库的那几百毫秒,就是数据不一致的窗口。冷热接续和主从切换,本质上是同一个时序一致性问题。


最容易踩的四个坑

坑① 字段名不同,语义相同

kline 用 close / volume / time,ticker 用 last_price / volume_24h / timestamp。接续代码必须显式映射,混用直接报错。

坑② 断线后无断点

不知道最后一条成功接收的数据是什么时间。补 5 分钟怕漏,补 10 分钟怕重——猜着补就是赌。

坑③ 休市误判断线

A 股午休 WebSocket 无 tick 推送,但心跳正常。传输层没断,业务层没数据。误判就会触发不必要的重连。

坑④ 盘中启动,锚点落在未完成 K 线上

盘中 REST 最新 K 线的 close 是中间价,不是收盘价。锚点取错,接续瞬间出现错价。

预期行为 vs 实测行为:实际部署中常见的情况是,WebSocket 连接建立后的前几百毫秒内可能回放缓存的最近几条 tick。必须做 tick.timestamp > anchor_time 校验,否则历史数据会被当成新信号重复触发。


两种锚点策略,选择条件明确

方案 A:已完成 K 线偏移方案 B:tick 预热缓冲
怎么做REST 最新完成 K 线 time + 周期 = 锚点先短订 WebSocket 缓存几十条 tick,取最大 timestamp 为锚点
数据保障零漏单允许前几分钟做模式过渡
适用条件策略以 K 线驱动,回测与实盘严格对齐tick 级策略,或盘中紧急冷启动
选择逻辑你的回测完全基于 K 线 → 选 A不是基于 K 线,或需要尽快上线 → 选 B

没有“视情况而定”的第三种。


字段对照速查

// kline(历史 K 线)
{"time": 1778828160000, "open": "1332.14", "close": "1332.95", "volume": "293"}

// ticker(实时快照)
{"symbol": "600519.SH", "last_price": "1332.95", "volume_24h": "58184", "timestamp": 1778828402000}
数据含义kline 字段ticker 字段
价格closelast_price
成交量volume(K 线周期内)volume_24h(全天累计)
时间戳time(K 线起始时间)timestamp(快照生成时间)

牢记:同一毫秒 UTC 时钟源,直接比较,禁止 ×1000 转换。


代码实操

依赖安装:

pip install requests websocket-client

Step 1:拉历史 K 线 + 定锚点

import os, time, requests, math
from typing import Dict, List

API_KEY = os.getenv("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1"

SYMBOLS = [
    "600519.SH",    # 贵州茅台
    "300750.SZ",    # 宁德时代
    "700.HK",       # 腾讯控股(无前导零)
    "AAPL.US",      # 美股建议带 .US 后缀
    "BTCUSDT",      # 加密货币交易对
]


def _fetch_symbol_kline(sym: str, interval: str = "1m", limit: int = 100) -> dict:
    """
    单品种 K 线请求。显式处理 3001(限流)/ 1001(鉴权)/ 非0(未知)三种错误。
    限流可恢复,鉴权和未知错误直接阻断。
    """
    url = f"{BASE_URL}/market/kline"
    params = {"symbol": sym, "interval": interval, "limit": limit}
    resp = requests.get(url, headers={"X-API-Key": API_KEY}, params=params, timeout=10)
    data = resp.json()

    if data["code"] == 3001:           # 限流:读 Retry-After,抛出可恢复异常
        retry_after = resp.headers.get("Retry-After")
        wait = int(retry_after) if retry_after else 1
        raise RuntimeError(f"Rate limited for {sym}, retry after {wait}s")

    if data["code"] == 1001:           # 鉴权/参数:阻断,永不重试
        raise PermissionError(f"Auth or param error for {sym}: {data.get('message')}")

    if data["code"] != 0:              # 非预期错误码:显式 raise,禁止静默吞掉
        raise RuntimeError(f"Unexpected error for {sym}: code={data['code']}")

    return data


def fetch_latest_klines(symbols: List[str], interval: str = "1m", limit: int = 100) -> Dict[str, Dict]:
    """
    返回每个品种的接续锚点。
    锚点 = 最后已完成 K 线的 time + K 线周期(方案 A)。
    """
    period_ms = {"1m": 60000, "5m": 300000, "1h": 3600000}.get(interval, 60000)
    anchors = {}

    for sym in symbols:
        backoff = 1
        while True:
            try:
                data = _fetch_symbol_kline(sym, interval, limit)
                klines = data["data"]["klines"]          # 嵌套路径:data.klines
                if klines:
                    last = klines[-1]
                    anchors[sym] = {
                        "anchor_time": last["time"] + period_ms,
                        "last_close": float(last["close"]),
                        "interval": interval,
                    }
                break
            except PermissionError:
                print(f"跳过 {sym}(权限错误)")
                break
            except RuntimeError as e:
                if "Rate limited" in str(e):
                    wait = min(backoff, 8)
                    time.sleep(wait)
                    backoff = min(backoff * 2, 8)
                    continue        # 限流退避后重试当前品种
                raise               # 非限流错误直接向上抛
            except Exception as e:
                print(f"{sym} 网络异常: {e}")
                break
    return anchors

核心是锚点 = time + 周期,不是拉取速度。 这行减法决定了后续所有接续逻辑的正确性。


Step 2:WebSocket 实时订阅 + 时间戳校验

import json, threading
from websocket import WebSocketApp

# WebSocket 回调线程写入,重连线程读取——非原子操作必须加锁保护
last_received_timestamps = {}
ts_lock = threading.Lock()


def start_websocket_stream(anchors: Dict[str, Dict]):
    """从锚点时间开始订阅 ticker,校验每条 tick,持久化断点。"""
    ws_url = f"wss://api.tickdb.ai/v1/realtime?api_key={API_KEY}"
    symbols = list(anchors.keys())

    def on_open(ws):
        # 订阅格式以 docs.tickdb.ai 最新文档为准
        ws.send(json.dumps({
            "type": "subscribe",
            "channel": "ticker",
            "symbols": symbols
        }))

    def on_message(ws, message):
        tick = json.loads(message)          # 推送为扁平 JSON,无 data 包装层
        if tick.get("channel") != "ticker":
            return

        sym = tick.get("symbol")
        tick_ts = tick.get("timestamp")
        if sym not in anchors or tick_ts is None:
            return

        # 核心校验:只有严格大于锚点的数据才进入策略队列
        if tick_ts <= anchors[sym]["anchor_time"]:
            return

        with ts_lock:
            last_received_timestamps[sym] = tick_ts   # 持久化断点

        print(f"[{sym}] 最新价: {tick['last_price']} | 时间: {tick_ts}")

    ws = WebSocketApp(
        ws_url,
        on_open=on_open,
        on_message=on_message,
        on_close=lambda ws, *args: print("WebSocket 断开,触发重连机制"),
        on_error=lambda ws, err: print(f"WebSocket 错误: {err}"),
        ping_interval=1,            # 底层自动心跳,无需手动发 ping
        ping_timeout=5,
    )
    ws.run_forever()

核心是 tick_ts > anchor_time 这行比较,不是连接速度。 它保证策略收到的第一条实时数据一定是锚点之后的新成交。


Step 3:断线重连 + 断点续传

def reconnect_with_backoff(anchors: Dict[str, Dict], max_retries: int = 10):
    """
    指数退避重连,最大退避 30s。
    先补数据,再重新订阅——和文件断点续传一个逻辑。
    """
    retry_count = 0
    while retry_count < max_retries:
        try:
            for sym, info in anchors.items():
                with ts_lock:
                    resume_time = last_received_timestamps.get(sym)
                if resume_time is None:
                    continue

                # 补拉断线期间的 K 线
                resp = requests.get(
                    f"{BASE_URL}/market/kline",
                    headers={"X-API-Key": API_KEY},
                    params={"symbol": sym, "interval": "1m",
                            "start_time": str(resume_time), "limit": 50},
                    timeout=10,
                )
                data = resp.json()
                if data["code"] == 0:
                    print(f"{sym} 补拉 {len(data['data']['klines'])} 根 K 线")
                else:
                    raise RuntimeError(f"补数据失败 {sym}: {data}")

                anchors[sym]["anchor_time"] = resume_time

            start_websocket_stream(anchors)
            break
        except Exception as e:
            retry_count += 1
            wait_time = min(math.pow(2, retry_count - 1), 30)   # 1s→2s→4s…上限 30s
            print(f"重连失败 ({retry_count}/{max_retries}),{wait_time}s 后重试: {e}")
            time.sleep(wait_time)

    if retry_count >= max_retries:
        raise ConnectionError("达到最大重试次数,请检查网络或 API Key")

核心是断点持久化 + 指数退避,不是重连速度。 每次 tick 到达时加锁写入 last_received_timestamps,重连后从这个时间点继续——不从头开始。


全流程时序一览

阶段REST(冷数据)WebSocket(热数据)关键动作
T0 初始化拉最近 100 根 K 线历史数据入库
T1 定锚time + 周期 = 锚点存入 anchors
T2 订阅发送 ticker 订阅实时数据开始流入
T3 校验timestamp > 锚点 才入列接续完成,不重不漏
T4 断线last_received_timestamps 已保存断点安全
T5 补数据从断点补拉 K 线填补缺口
T6 重订阅重新订阅,从断点继续无缝恢复

你真正在维护的,是两套数据源的翻译层

没有统一 API 时,你面对的是:REST 一套鉴权,WebSocket 另一套鉴权;kline 返回 close / time,ticker 推送 last_price / timestamp,每个字段名都要写映射函数;断线后不知道从哪补,补多怕重、补少怕漏;A 股午休 tick 停推、心跳正常,到底断没断线要写一堆判断。

TickDB 在这个背景下作为一个接入层出现:一个 API 同时提供 REST 历史 K 线和 WebSocket 实时 Tick,同一套 API Key(REST 在 Header,WebSocket 在 URL 参数),同一个毫秒 UTC 时钟源。kline 和 ticker 字段对照关系全部在 docs.tickdb.ai 可查。你不用再维护两套客户端和一套“猜断点”的补数据代码。


你的实盘曲线从第几分钟开始偏离回测?

断线重连那一刻。如果 last_received_timestamps 没持久化,补数据漏了 3 根 1 分钟 K 线,其中一根是全天最高点。卖出信号本该在那里触发,但因为接续断层,信号根本没生成。

从那根缺失的 K 线开始,实盘和回测分道扬镳。

你上一次检查自己的接续代码,是什么时候?

📡 数据由 TickDB.ai 提供

0
0
0
0
评论
未登录
暂无评论