287 毫秒 · 47 笔成交 · 1 个全天最高价
09:31:00.000,REST 最后一根 1 分钟 K 线结束。09:31:00.287,WebSocket 第一条实时 tick 到达。中间这 287 毫秒里成交的 47 笔,策略一无所知。
这不是策略的错,是 REST 和 WebSocket 之间那道需要精确接合的缝。
读完这篇文章你能带走什么
| 你遇到的问题 | 本文给出的方案 | 你省下的时间 |
|---|---|---|
| REST 和 WebSocket 字段名不统一 | 两套字段体系精确对照,接续映射写一次永久复用 | 5 天 → 半天 |
| 断线后不知道从哪里补数据 | 锚点持久化 + 断点续传,先补历史再订实时 | 避免漏单导致的实盘偏离 |
| 休市时段误判断线,反复重连 | 业务层心跳判断 vs 传输层 keepalive 分离 | 少排一个通宵 |
| 盘中冷启动,锚点落在未完成 K 线上 | 方案 A 锚点偏移 vs 方案 B 预热缓冲,选择条件明确 | 10 分钟上线,不用猜 |
一张图说清楚:冷热分层模型
冷数据(历史 K 线) 温数据(接缝) 热数据(实时 Tick)
◄───────────────────▶ ◄──287ms──▶ ◄─────────────────────────────▶
REST 批量拉取 锚点 WebSocket 流式推送
/v1/market/kline ▲ wss://.../v1/realtime
│ │ │
│ 最后已完成 K 线的 time + 周期 │
│ │ │
└───────────────────────┴───────────────────────┘
同一毫秒 UTC 时钟源
核心逻辑:取 REST 最后一条已完成 K 线的 time + K 线周期,作为锚点。WebSocket 只接收 timestamp 严格大于锚点的 tick。锚点定准,不重不漏;锚点偏了,要么漏信号,要么重复触发。
类比:这和数据库读写分离的同步延迟一样。从只读库切回主库的那几百毫秒,就是数据不一致的窗口。冷热接续和主从切换,本质上是同一个时序一致性问题。
最容易踩的四个坑
坑① 字段名不同,语义相同
kline 用 close / volume / time,ticker 用 last_price / volume_24h / timestamp。接续代码必须显式映射,混用直接报错。
坑② 断线后无断点
不知道最后一条成功接收的数据是什么时间。补 5 分钟怕漏,补 10 分钟怕重——猜着补就是赌。
坑③ 休市误判断线
A 股午休 WebSocket 无 tick 推送,但心跳正常。传输层没断,业务层没数据。误判就会触发不必要的重连。
坑④ 盘中启动,锚点落在未完成 K 线上
盘中 REST 最新 K 线的 close 是中间价,不是收盘价。锚点取错,接续瞬间出现错价。
预期行为 vs 实测行为:实际部署中常见的情况是,WebSocket 连接建立后的前几百毫秒内可能回放缓存的最近几条 tick。必须做
tick.timestamp > anchor_time校验,否则历史数据会被当成新信号重复触发。
两种锚点策略,选择条件明确
| 方案 A:已完成 K 线偏移 | 方案 B:tick 预热缓冲 | |
|---|---|---|
| 怎么做 | REST 最新完成 K 线 time + 周期 = 锚点 | 先短订 WebSocket 缓存几十条 tick,取最大 timestamp 为锚点 |
| 数据保障 | 零漏单 | 允许前几分钟做模式过渡 |
| 适用条件 | 策略以 K 线驱动,回测与实盘严格对齐 | tick 级策略,或盘中紧急冷启动 |
| 选择逻辑 | 你的回测完全基于 K 线 → 选 A | 不是基于 K 线,或需要尽快上线 → 选 B |
没有“视情况而定”的第三种。
字段对照速查
// kline(历史 K 线)
{"time": 1778828160000, "open": "1332.14", "close": "1332.95", "volume": "293"}
// ticker(实时快照)
{"symbol": "600519.SH", "last_price": "1332.95", "volume_24h": "58184", "timestamp": 1778828402000}
| 数据含义 | kline 字段 | ticker 字段 |
|---|---|---|
| 价格 | close | last_price |
| 成交量 | volume(K 线周期内) | volume_24h(全天累计) |
| 时间戳 | time(K 线起始时间) | timestamp(快照生成时间) |
牢记:同一毫秒 UTC 时钟源,直接比较,禁止 ×1000 转换。
代码实操
依赖安装:
pip install requests websocket-client
Step 1:拉历史 K 线 + 定锚点
import os, time, requests, math
from typing import Dict, List
API_KEY = os.getenv("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1"
SYMBOLS = [
"600519.SH", # 贵州茅台
"300750.SZ", # 宁德时代
"700.HK", # 腾讯控股(无前导零)
"AAPL.US", # 美股建议带 .US 后缀
"BTCUSDT", # 加密货币交易对
]
def _fetch_symbol_kline(sym: str, interval: str = "1m", limit: int = 100) -> dict:
"""
单品种 K 线请求。显式处理 3001(限流)/ 1001(鉴权)/ 非0(未知)三种错误。
限流可恢复,鉴权和未知错误直接阻断。
"""
url = f"{BASE_URL}/market/kline"
params = {"symbol": sym, "interval": interval, "limit": limit}
resp = requests.get(url, headers={"X-API-Key": API_KEY}, params=params, timeout=10)
data = resp.json()
if data["code"] == 3001: # 限流:读 Retry-After,抛出可恢复异常
retry_after = resp.headers.get("Retry-After")
wait = int(retry_after) if retry_after else 1
raise RuntimeError(f"Rate limited for {sym}, retry after {wait}s")
if data["code"] == 1001: # 鉴权/参数:阻断,永不重试
raise PermissionError(f"Auth or param error for {sym}: {data.get('message')}")
if data["code"] != 0: # 非预期错误码:显式 raise,禁止静默吞掉
raise RuntimeError(f"Unexpected error for {sym}: code={data['code']}")
return data
def fetch_latest_klines(symbols: List[str], interval: str = "1m", limit: int = 100) -> Dict[str, Dict]:
"""
返回每个品种的接续锚点。
锚点 = 最后已完成 K 线的 time + K 线周期(方案 A)。
"""
period_ms = {"1m": 60000, "5m": 300000, "1h": 3600000}.get(interval, 60000)
anchors = {}
for sym in symbols:
backoff = 1
while True:
try:
data = _fetch_symbol_kline(sym, interval, limit)
klines = data["data"]["klines"] # 嵌套路径:data.klines
if klines:
last = klines[-1]
anchors[sym] = {
"anchor_time": last["time"] + period_ms,
"last_close": float(last["close"]),
"interval": interval,
}
break
except PermissionError:
print(f"跳过 {sym}(权限错误)")
break
except RuntimeError as e:
if "Rate limited" in str(e):
wait = min(backoff, 8)
time.sleep(wait)
backoff = min(backoff * 2, 8)
continue # 限流退避后重试当前品种
raise # 非限流错误直接向上抛
except Exception as e:
print(f"{sym} 网络异常: {e}")
break
return anchors
核心是锚点 =
time+ 周期,不是拉取速度。 这行减法决定了后续所有接续逻辑的正确性。
Step 2:WebSocket 实时订阅 + 时间戳校验
import json, threading
from websocket import WebSocketApp
# WebSocket 回调线程写入,重连线程读取——非原子操作必须加锁保护
last_received_timestamps = {}
ts_lock = threading.Lock()
def start_websocket_stream(anchors: Dict[str, Dict]):
"""从锚点时间开始订阅 ticker,校验每条 tick,持久化断点。"""
ws_url = f"wss://api.tickdb.ai/v1/realtime?api_key={API_KEY}"
symbols = list(anchors.keys())
def on_open(ws):
# 订阅格式以 docs.tickdb.ai 最新文档为准
ws.send(json.dumps({
"type": "subscribe",
"channel": "ticker",
"symbols": symbols
}))
def on_message(ws, message):
tick = json.loads(message) # 推送为扁平 JSON,无 data 包装层
if tick.get("channel") != "ticker":
return
sym = tick.get("symbol")
tick_ts = tick.get("timestamp")
if sym not in anchors or tick_ts is None:
return
# 核心校验:只有严格大于锚点的数据才进入策略队列
if tick_ts <= anchors[sym]["anchor_time"]:
return
with ts_lock:
last_received_timestamps[sym] = tick_ts # 持久化断点
print(f"[{sym}] 最新价: {tick['last_price']} | 时间: {tick_ts}")
ws = WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_close=lambda ws, *args: print("WebSocket 断开,触发重连机制"),
on_error=lambda ws, err: print(f"WebSocket 错误: {err}"),
ping_interval=1, # 底层自动心跳,无需手动发 ping
ping_timeout=5,
)
ws.run_forever()
核心是
tick_ts > anchor_time这行比较,不是连接速度。 它保证策略收到的第一条实时数据一定是锚点之后的新成交。
Step 3:断线重连 + 断点续传
def reconnect_with_backoff(anchors: Dict[str, Dict], max_retries: int = 10):
"""
指数退避重连,最大退避 30s。
先补数据,再重新订阅——和文件断点续传一个逻辑。
"""
retry_count = 0
while retry_count < max_retries:
try:
for sym, info in anchors.items():
with ts_lock:
resume_time = last_received_timestamps.get(sym)
if resume_time is None:
continue
# 补拉断线期间的 K 线
resp = requests.get(
f"{BASE_URL}/market/kline",
headers={"X-API-Key": API_KEY},
params={"symbol": sym, "interval": "1m",
"start_time": str(resume_time), "limit": 50},
timeout=10,
)
data = resp.json()
if data["code"] == 0:
print(f"{sym} 补拉 {len(data['data']['klines'])} 根 K 线")
else:
raise RuntimeError(f"补数据失败 {sym}: {data}")
anchors[sym]["anchor_time"] = resume_time
start_websocket_stream(anchors)
break
except Exception as e:
retry_count += 1
wait_time = min(math.pow(2, retry_count - 1), 30) # 1s→2s→4s…上限 30s
print(f"重连失败 ({retry_count}/{max_retries}),{wait_time}s 后重试: {e}")
time.sleep(wait_time)
if retry_count >= max_retries:
raise ConnectionError("达到最大重试次数,请检查网络或 API Key")
核心是断点持久化 + 指数退避,不是重连速度。 每次 tick 到达时加锁写入
last_received_timestamps,重连后从这个时间点继续——不从头开始。
全流程时序一览
| 阶段 | REST(冷数据) | WebSocket(热数据) | 关键动作 |
|---|---|---|---|
| T0 初始化 | 拉最近 100 根 K 线 | — | 历史数据入库 |
| T1 定锚 | time + 周期 = 锚点 | — | 存入 anchors |
| T2 订阅 | — | 发送 ticker 订阅 | 实时数据开始流入 |
| T3 校验 | — | timestamp > 锚点 才入列 | 接续完成,不重不漏 |
| T4 断线 | — | last_received_timestamps 已保存 | 断点安全 |
| T5 补数据 | 从断点补拉 K 线 | — | 填补缺口 |
| T6 重订阅 | — | 重新订阅,从断点继续 | 无缝恢复 |
你真正在维护的,是两套数据源的翻译层
没有统一 API 时,你面对的是:REST 一套鉴权,WebSocket 另一套鉴权;kline 返回 close / time,ticker 推送 last_price / timestamp,每个字段名都要写映射函数;断线后不知道从哪补,补多怕重、补少怕漏;A 股午休 tick 停推、心跳正常,到底断没断线要写一堆判断。
TickDB 在这个背景下作为一个接入层出现:一个 API 同时提供 REST 历史 K 线和 WebSocket 实时 Tick,同一套 API Key(REST 在 Header,WebSocket 在 URL 参数),同一个毫秒 UTC 时钟源。kline 和 ticker 字段对照关系全部在 docs.tickdb.ai 可查。你不用再维护两套客户端和一套“猜断点”的补数据代码。
你的实盘曲线从第几分钟开始偏离回测?
断线重连那一刻。如果 last_received_timestamps 没持久化,补数据漏了 3 根 1 分钟 K 线,其中一根是全天最高点。卖出信号本该在那里触发,但因为接续断层,信号根本没生成。
从那根缺失的 K 线开始,实盘和回测分道扬镳。
你上一次检查自己的接续代码,是什么时候?
📡 数据由 TickDB.ai 提供
