在量化交易和个人投资管理中,实时监控股票价格并设置自动预警是提升决策效率的关键。今天,我将手把手教你如何使用 Python 对接实时股票 API,并构建一个功能完整的股票价格预警系统。
系统架构设计
我们的预警系统将包含以下核心模块:
- 数据获取模块 - 通过 WebSocket 连接实时股票 API
- 数据处理模块 - 解析和处理实时行情数据
- 预警规则引擎 - 根据预设条件触发预警
- 通知模块 - 通过多种渠道发送预警信息
技术栈选择
- Python 3.8+ - 主要编程语言
- WebSocket 客户端 - 实时数据接收
- Pandas - 数据处理和分析
- SQLite/Redis - 预警规则和状态存储
- SMTP/Telegram API - 预警通知发送
第一步:配置开发环境
# requirements.txt
# 实时股票API客户端和数据处理库
websocket-client>=1.3.0
requests>=2.28.0
pandas>=1.5.0
numpy>=1.24.0
python-dotenv>=0.21.0
schedule>=1.1.0
redis>=4.5.0
# 可选的通知库
python-telegram-bot>=20.0.0
twilio>=8.0.0
第二步:实现基于 iTick WebSocket的实时数据连接器
# tick_data_connector.py
import websocket
import json
import threading
import time
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, List, Callable, Optional
class ITickWebSocketClient:
"""
iTick Stocks WebSocket 实时行情客户端
"""
def __init__(self, token: str, symbols: List[str]):
self.token = token
self.symbols = symbols # 例如 ["AAPL$US", "TSLA$US"]
self.ws_url = "wss://api.itick.org/stock"
self.ws = None
self.is_connected = False
self.is_authenticated = False
self.data_callbacks = []
self.error_callbacks = []
# 数据缓存(以 symbol 如 "AAPL$US" 为 key)
self.price_cache: Dict[str, Dict] = {}
self.last_update: Dict[str, datetime] = {}
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def add_data_callback(self, callback: Callable):
self.data_callbacks.append(callback)
def add_error_callback(self, callback: Callable):
self.error_callbacks.append(callback)
def on_message(self, ws, message):
try:
data = json.loads(message)
# 连接成功
if data.get("msg") == "Connected Successfully":
self.logger.info("WebSocket 连接成功")
# 认证结果
elif data.get("resAc") == "auth":
if data.get("code") == 1:
self.is_authenticated = True
self.logger.info("认证成功")
self._subscribe()
else:
self.logger.error("认证失败")
ws.close()
# 订阅结果
elif data.get("resAc") == "subscribe":
if data.get("code") == 1:
self.logger.info("订阅成功")
else:
self.logger.error(f"订阅失败: {data.get('msg')}")
# 心跳响应
elif data.get("resAc") == "pong":
self.logger.debug("收到 pong")
# 实际行情数据
elif data.get("data"):
market_data = data["data"]
data_type = market_data.get("type", "")
symbol = market_data.get("s") or market_data.get("s") # tick/quote 使用 s
if data_type == "tick":
tick = self._parse_tick_data(market_data)
self.price_cache[symbol] = tick
self.last_update[symbol] = datetime.now()
for callback in self.data_callbacks:
callback(tick)
# 如需处理 quote、depth,可在此扩展
except Exception as e:
self.logger.error(f"消息处理异常: {e}")
def _parse_tick_data(self, raw: Dict) -> Dict:
"""解析 iTick tick 数据"""
return {
'symbol': raw.get('s'),
'price': raw.get('ld'), # 最新成交价
'volume': raw.get('v', 0),
'timestamp': datetime.fromtimestamp(raw.get('t', 0) / 1000).isoformat(),
'received_at': datetime.now().isoformat()
}
def on_error(self, ws, error):
self.logger.error(f"WebSocket 错误: {error}")
for cb in self.error_callbacks:
cb(error)
def on_close(self, ws, close_status_code, close_msg):
self.is_connected = False
self.logger.info("WebSocket 连接关闭")
def on_open(self, ws):
self.is_connected = True
self.logger.info("WebSocket 已打开,等待认证...")
def _subscribe(self):
"""发送订阅消息"""
subscribe_msg = {
"ac": "subscribe",
"params": ",".join(self.symbols),
"types": "tick,quote,depth" # 可根据需要添加 K 数据
}
self.ws.send(json.dumps(subscribe_msg))
self.logger.info(f"已发送订阅: {subscribe_msg['params']}")
def _send_ping(self):
"""心跳线程"""
while self.is_connected:
time.sleep(30)
if self.is_connected:
ping_msg = {
"ac": "ping",
"params": str(int(time.time() * 1000))
}
self.ws.send(json.dumps(ping_msg))
self.logger.debug("发送 ping")
def connect(self):
headers = {"token": self.token}
self.ws = websocket.WebSocketApp(
self.ws_url,
header=headers,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 主线程运行 WebSocket
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
# 启动心跳线程
ping_thread = threading.Thread(target=self._send_ping)
ping_thread.daemon = True
ping_thread.start()
# 等待连接建立
for _ in range(30):
if self.is_authenticated:
break
time.sleep(1)
def disconnect(self):
if self.ws:
self.ws.close()
self.is_connected = False
def get_current_price(self, symbol: str) -> Optional[float]:
if symbol in self.price_cache:
return self.price_cache[symbol].get('price')
return None
第三步:构建预警规则引擎
# alert_engine.py
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Any
import json
import logging
class AlertEngine:
"""预警规则引擎"""
def __init__(self, db_path: str = "alerts.db"):
self.db_path = db_path
self.active_alerts = {}
self.logger = logging.getLogger(__name__)
self._init_database()
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建预警规则表
cursor.execute('''
CREATE TABLE IF NOT EXISTS alert_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
rule_type TEXT NOT NULL,
condition TEXT NOT NULL,
threshold REAL,
value TEXT,
is_active INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_triggered TIMESTAMP
)
''')
# 创建预警历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS alert_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_rule_id INTEGER,
symbol TEXT NOT NULL,
trigger_price REAL,
trigger_value TEXT,
triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (alert_rule_id) REFERENCES alert_rules (id)
)
''')
conn.commit()
conn.close()
# 从数据库加载活动预警
self._load_active_alerts()
def _load_active_alerts(self):
"""从数据库加载活动预警规则"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM alert_rules WHERE is_active = 1"
)
for row in cursor.fetchall():
rule_id = row[0]
self.active_alerts[rule_id] = {
'symbol': row[1],
'rule_type': row[2],
'condition': row[3],
'threshold': row[4],
'value': row[5],
'last_triggered': row[7]
}
conn.close()
self.logger.info(f"加载了 {len(self.active_alerts)} 个活动预警规则")
def add_alert_rule(self, rule_data: Dict) -> int:
"""添加预警规则"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO alert_rules
(symbol, rule_type, condition, threshold, value)
VALUES (?, ?, ?, ?, ?)
''', (
rule_data['symbol'],
rule_data['rule_type'],
rule_data['condition'],
rule_data.get('threshold'),
json.dumps(rule_data.get('value', {}))
))
rule_id = cursor.lastrowid
conn.commit()
conn.close()
# 添加到活动预警
self.active_alerts[rule_id] = {
'symbol': rule_data['symbol'],
'rule_type': rule_data['rule_type'],
'condition': rule_data['condition'],
'threshold': rule_data.get('threshold'),
'value': rule_data.get('value', {}),
'last_triggered': None
}
self.logger.info(f"添加预警规则: {rule_data['symbol']} - {rule_data['rule_type']}")
return rule_id
def check_price_alert(self, symbol: str, price: float) -> List[Dict]:
"""检查价格预警"""
triggered_alerts = []
for rule_id, rule in self.active_alerts.items():
if rule['symbol'] != symbol:
continue
if rule['rule_type'] == 'price':
triggered = self._evaluate_price_condition(
price,
rule['condition'],
rule['threshold']
)
if triggered:
# 检查是否在冷却期内(避免频繁触发)
if self._is_in_cooldown(rule['last_triggered']):
continue
alert_info = {
'rule_id': rule_id,
'symbol': symbol,
'rule_type': 'price',
'condition': rule['condition'],
'threshold': rule['threshold'],
'trigger_price': price,
'message': self._generate_alert_message(
symbol, price, rule['condition'], rule['threshold']
)
}
triggered_alerts.append(alert_info)
# 更新最后触发时间
self._update_last_triggered(rule_id)
return triggered_alerts
def _evaluate_price_condition(self, price: float,
condition: str, threshold: float) -> bool:
"""评估价格条件"""
if condition == 'above':
return price > threshold
elif condition == 'below':
return price < threshold
elif condition == 'cross_above':
# 需要历史价格数据,这里简化为单次比较
return price >= threshold
elif condition == 'cross_below':
return price <= threshold
elif condition == 'percentage_change':
# 需要历史价格计算百分比变化
# 实际应用中需要更复杂的实现
return False
return False
def _is_in_cooldown(self, last_triggered, cooldown_minutes: int = 5) -> bool:
"""检查是否在冷却期内"""
if not last_triggered:
return False
if isinstance(last_triggered, str):
last_triggered = datetime.fromisoformat(last_triggered)
cooldown_end = last_triggered + timedelta(minutes=cooldown_minutes)
return datetime.now() < cooldown_end
def _update_last_triggered(self, rule_id: int):
"""更新最后触发时间"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE alert_rules
SET last_triggered = CURRENT_TIMESTAMP
WHERE id = ?
''', (rule_id,))
conn.commit()
conn.close()
# 更新内存中的记录
if rule_id in self.active_alerts:
self.active_alerts[rule_id]['last_triggered'] = datetime.now().isoformat()
def _generate_alert_message(self, symbol: str, price: float,
condition: str, threshold: float) -> str:
"""生成预警消息"""
if condition == 'above':
return f"🚨 {symbol} 价格突破 {threshold},当前价格: {price:.2f}"
elif condition == 'below':
return f"⚠️ {symbol} 价格跌破 {threshold},当前价格: {price:.2f}"
else:
return f"📈 {symbol} 触发预警条件,当前价格: {price:.2f}"
第四步:实现通知发送器
# notifier.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging
from typing import Dict
class AlertNotifier:
"""预警通知器"""
def __init__(self, config: Dict):
self.config = config
self.logger = logging.getLogger(__name__)
def send_email(self, to_email: str, subject: str, body: str) -> bool:
"""发送邮件通知"""
try:
msg = MIMEMultipart()
msg['From'] = self.config.get('email_from')
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(
self.config.get('smtp_server'),
self.config.get('smtp_port', 587)
) as server:
server.starttls()
server.login(
self.config.get('email_user'),
self.config.get('email_password')
)
server.send_message(msg)
self.logger.info(f"邮件发送成功: {to_email}")
return True
except Exception as e:
self.logger.error(f"邮件发送失败: {e}")
return False
def send_telegram(self, chat_id: str, message: str) -> bool:
"""发送Telegram通知(简化版)"""
try:
# 实际应用中需要使用python-telegram-bot库
# 这里使用requests模拟
import requests
bot_token = self.config.get('telegram_bot_token')
if not bot_token:
self.logger.warning("未配置Telegram Bot Token")
return False
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'HTML'
}
response = requests.post(url, json=payload)
if response.status_code == 200:
self.logger.info(f"Telegram消息发送成功: {chat_id}")
return True
else:
self.logger.error(f"Telegram发送失败: {response.text}")
return False
except ImportError:
self.logger.warning("未安装requests库,无法发送Telegram消息")
return False
except Exception as e:
self.logger.error(f"Telegram发送错误: {e}")
return False
def send_console_notification(self, alert_info: Dict):
"""控制台通知(开发调试用)"""
print("\n" + "="*50)
print("🚨 股票预警触发 🚨")
print(f"股票: {alert_info['symbol']}")
print(f"价格: {alert_info['trigger_price']:.2f}")
print(f"条件: {alert_info['condition']} {alert_info['threshold']}")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50 + "\n")
第五步:整合主应用程序
# main.py
import json
import time
import signal
import sys
from datetime import datetime
from typing import List
import logging
from tick_data_connector import TickDataAPIClient
from alert_engine import AlertEngine
from notifier import AlertNotifier
class StockAlertSystem:
"""股票预警系统主程序"""
def __init__(self, config_path: str = "config.json"):
# 加载配置
with open(config_path, 'r') as f:
self.config = json.load(f)
# 设置日志
self._setup_logging()
self.logger = logging.getLogger(__name__)
# 初始化组件
self.api_client = TickDataAPIClient(
api_key=self.config['itick']['token'],
symbols=self.config['monitor_symbols']
)
self.alert_engine = AlertEngine(
db_path=self.config.get('database_path', 'alerts.db')
)
self.notifier = AlertNotifier(self.config['notifications'])
# 添加预定义预警规则
self._setup_default_alerts()
# 运行标志
self.running = False
def _setup_logging(self):
"""配置日志系统"""
log_config = self.config.get('logging', {})
log_level = getattr(logging, log_config.get('level', 'INFO'))
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_config.get('file', 'stock_alert.log')),
logging.StreamHandler()
]
)
def _setup_default_alerts(self):
"""设置默认预警规则"""
default_alerts = self.config.get('default_alerts', [])
for alert_rule in default_alerts:
self.alert_engine.add_alert_rule(alert_rule)
def handle_tick_data(self, tick_data: Dict):
"""处理实时tick数据"""
symbol = tick_data['symbol']
price = tick_data['price']
# 检查是否触发预警
triggered_alerts = self.alert_engine.check_price_alert(symbol, price)
# 发送通知
for alert in triggered_alerts:
self._send_alert_notifications(alert)
def _send_alert_notifications(self, alert_info: Dict):
"""发送预警通知"""
message = alert_info['message']
# 控制台通知(开发用)
self.notifier.send_console_notification(alert_info)
# 邮件通知
if self.config['notifications'].get('enable_email', False):
for email in self.config['notifications'].get('email_recipients', []):
self.notifier.send_email(
email,
f"股票预警: {alert_info['symbol']}",
message
)
# Telegram通知
if self.config['notifications'].get('enable_telegram', False):
for chat_id in self.config['notifications'].get('telegram_chat_ids', []):
self.notifier.send_telegram(chat_id, message)
# 记录到日志
self.logger.info(f"预警触发: {message}")
def run(self):
"""运行主程序"""
self.running = True
# 设置信号处理
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
# 注册数据回调
self.api_client.add_data_callback(self.handle_tick_data)
# 连接API
self.logger.info("正在连接Tick API...")
self.api_client.connect()
# 主循环
self.logger.info("股票预警系统已启动")
try:
while self.running:
# 检查系统状态
self._monitor_system_health()
time.sleep(1)
except KeyboardInterrupt:
self.shutdown()
except Exception as e:
self.logger.error(f"系统运行错误: {e}")
self.shutdown()
def _monitor_system_health(self):
"""监控系统健康状态"""
# 这里可以添加更多的健康检查逻辑
# 例如:检查API连接状态、数据库连接、磁盘空间等
# 示例:检查最近数据更新时间
for symbol in self.config['monitor_symbols']:
last_update = self.api_client.last_update.get(symbol)
if last_update:
time_diff = (datetime.now() - last_update).total_seconds()
if time_diff > 60: # 超过60秒没有数据
self.logger.warning(f"{symbol} 数据更新延迟: {time_diff:.0f}秒")
def shutdown(self, signum=None, frame=None):
"""关闭系统"""
self.logger.info("正在关闭系统...")
self.running = False
# 断开API连接
self.api_client.disconnect()
self.logger.info("系统已关闭")
sys.exit(0)
if __name__ == "__main__":
# 创建配置文件(示例)
sample_config = {
"itick": {
"token": "your_iTick_api_key_here",
},
"monitor_symbols": ["AAPL$US", "TSLA$US", "GOOGL$US", "MSFT$US"],
"default_alerts": [
{
"symbol": "AAPL$US",
"rule_type": "price",
"condition": "above",
"threshold": 230.0,
"value": {}
},
{
"symbol": "TSLA$US",
"rule_type": "price",
"condition": "below",
"threshold": 300.0,
"value": {}
}
],
"notifications": {
"enable_email": False,
"email_user": "your_email@gmail.com",
"email_password": "your_app_password",
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"email_from": "your_email@gmail.com",
"email_recipients": ["recipient@example.com"],
"enable_telegram": False,
"telegram_bot_token": "your_bot_token",
"telegram_chat_ids": ["your_chat_id"]
},
"logging": {
"level": "INFO",
"file": "stock_alert.log"
},
"database_path": "alerts.db"
}
# 保存示例配置
with open("config_sample.json", "w") as f:
json.dump(sample_config, f, indent=2)
print("示例配置文件已生成: config_sample.json")
print("请复制并修改为 config.json,然后运行系统")
# 实际运行代码(取消注释以下行)
# system = StockAlertSystem("config.json")
# system.run()
第六步:扩展功能和优化建议
1. 添加更多预警规则类型
# 技术指标预警
def add_technical_alert(self, symbol: str, indicator: str,
condition: str, value: float):
"""添加技术指标预警"""
pass # 实现MACD、RSI、布林带等指标预警
# 成交量异常预警
def add_volume_alert(self, symbol: str, volume_multiplier: float = 2.0):
"""添加成交量异常预警"""
pass
2. 实现数据持久化和分析
def save_tick_data_to_database(self, tick_data: Dict):
"""保存tick数据到数据库"""
conn = sqlite3.connect("tick_data.db")
cursor = conn.cursor()
cursor.execute('''
INSERT INTO tick_data
(symbol, price, volume, timestamp, bid, ask, bid_size, ask_size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
tick_data['symbol'],
tick_data['price'],
tick_data['volume'],
tick_data['timestamp'],
tick_data['bid'],
tick_data['ask'],
tick_data['bid_size'],
tick_data['ask_size']
))
conn.commit()
conn.close()
3. 添加 Web 界面(使用 Flask 或 FastAPI)
# web_interface.py
from flask import Flask, render_template, request, jsonify
app = Flask(__name__)
@app.route('/')
def dashboard():
"""监控仪表板"""
return render_template('dashboard.html')
@app.route('/api/alerts', methods=['GET'])
def get_alerts():
"""获取预警列表API"""
# 从数据库查询预警
return jsonify({"alerts": []})
@app.route('/api/add_alert', methods=['POST'])
def add_alert():
"""添加预警规则API"""
rule_data = request.json
# 添加到预警引擎
return jsonify({"success": True})
部署和运维建议
- 服务器选择:使用云服务器(AWS EC2、阿里云 ECS)确保 24 小时运行
- 进程管理:使用 Supervisor 或 systemd 管理进程
- 日志监控:配置日志轮转和监控告警
- 备份策略:定期备份数据库和配置文件
- 安全考虑:
- API 密钥使用环境变量存储
- 数据库加密
- 定期更新依赖库
故障排除和调试
# debug_utils.py
def check_api_connection():
"""检查API连接状态"""
pass
def simulate_tick_data():
"""模拟tick数据用于测试"""
pass
def validate_alert_rules():
"""验证预警规则配置"""
pass
总结
通过本教程,你已经学会了:
- 如何使用 Python 连接实时股票 API
- 构建一个可扩展的预警规则引擎
- 实现多种通知方式(邮件、Telegram 等)
- 设计一个完整的股票监控系统架构
下一步改进方向:
- 性能优化:使用异步 IO 提高并发处理能力
- 机器学习集成:添加基于机器学习的智能预警
- 多市场支持:扩展支持加密货币、外汇等市场
- 移动应用:开发手机 App 实时接收预警
参考文档:https://blog.itick.org/quant-trading/stock-api-integration-with-telegram-alerts
GitHub 项目地址:https://github.com/itick-org/
