如何用 Python 对接实时股票 API 并构建预警系统?:实战分享

在量化交易和个人投资管理中,实时监控股票价格并设置自动预警是提升决策效率的关键。今天,我将手把手教你如何使用 Python 对接实时股票 API,并构建一个功能完整的股票价格预警系统。

picture.image

系统架构设计

我们的预警系统将包含以下核心模块:

  1. 数据获取模块 - 通过 WebSocket 连接实时股票 API
  2. 数据处理模块 - 解析和处理实时行情数据
  3. 预警规则引擎 - 根据预设条件触发预警
  4. 通知模块 - 通过多种渠道发送预警信息

技术栈选择

  • Python 3.8+ - 主要编程语言
  • WebSocket 客户端 - 实时数据接收
  • Pandas - 数据处理和分析
  • SQLite/Redis - 预警规则和状态存储
  • SMTP/Telegram API - 预警通知发送

第一步:配置开发环境

# requirements.txt
# 实时股票API客户端和数据处理库
websocket-client>=1.3.0
requests>=2.28.0
pandas>=1.5.0
numpy>=1.24.0
python-dotenv>=0.21.0
schedule>=1.1.0
redis>=4.5.0

# 可选的通知库
python-telegram-bot>=20.0.0
twilio>=8.0.0

第二步:实现基于 iTick WebSocket的实时数据连接器

# tick_data_connector.py
import websocket
import json
import threading
import time
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, List, Callable, Optional

class ITickWebSocketClient:
    """
    iTick Stocks WebSocket 实时行情客户端
    """

    def __init__(self, token: str, symbols: List[str]):
        self.token = token
        self.symbols = symbols  # 例如 ["AAPL$US", "TSLA$US"]
        self.ws_url = "wss://api.itick.org/stock"
        self.ws = None
        self.is_connected = False
        self.is_authenticated = False
        self.data_callbacks = []
        self.error_callbacks = []

        # 数据缓存(以 symbol 如 "AAPL$US" 为 key)
        self.price_cache: Dict[str, Dict] = {}
        self.last_update: Dict[str, datetime] = {}

        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)

    def add_data_callback(self, callback: Callable):
        self.data_callbacks.append(callback)

    def add_error_callback(self, callback: Callable):
        self.error_callbacks.append(callback)

    def on_message(self, ws, message):
        try:
            data = json.loads(message)

            # 连接成功
            if data.get("msg") == "Connected Successfully":
                self.logger.info("WebSocket 连接成功")

            # 认证结果
            elif data.get("resAc") == "auth":
                if data.get("code") == 1:
                    self.is_authenticated = True
                    self.logger.info("认证成功")
                    self._subscribe()
                else:
                    self.logger.error("认证失败")
                    ws.close()

            # 订阅结果
            elif data.get("resAc") == "subscribe":
                if data.get("code") == 1:
                    self.logger.info("订阅成功")
                else:
                    self.logger.error(f"订阅失败: {data.get('msg')}")

            # 心跳响应
            elif data.get("resAc") == "pong":
                self.logger.debug("收到 pong")

            # 实际行情数据
            elif data.get("data"):
                market_data = data["data"]
                data_type = market_data.get("type", "")
                symbol = market_data.get("s") or market_data.get("s")  # tick/quote 使用 s

                if data_type == "tick":
                    tick = self._parse_tick_data(market_data)
                    self.price_cache[symbol] = tick
                    self.last_update[symbol] = datetime.now()
                    for callback in self.data_callbacks:
                        callback(tick)

                # 如需处理 quote、depth,可在此扩展

        except Exception as e:
            self.logger.error(f"消息处理异常: {e}")

    def _parse_tick_data(self, raw: Dict) -> Dict:
        """解析 iTick tick 数据"""
        return {
            'symbol': raw.get('s'),
            'price': raw.get('ld'),        # 最新成交价
            'volume': raw.get('v', 0),
            'timestamp': datetime.fromtimestamp(raw.get('t', 0) / 1000).isoformat(),
            'received_at': datetime.now().isoformat()
        }

    def on_error(self, ws, error):
        self.logger.error(f"WebSocket 错误: {error}")
        for cb in self.error_callbacks:
            cb(error)

    def on_close(self, ws, close_status_code, close_msg):
        self.is_connected = False
        self.logger.info("WebSocket 连接关闭")

    def on_open(self, ws):
        self.is_connected = True
        self.logger.info("WebSocket 已打开,等待认证...")

    def _subscribe(self):
        """发送订阅消息"""
        subscribe_msg = {
            "ac": "subscribe",
            "params": ",".join(self.symbols),
            "types": "tick,quote,depth"   # 可根据需要添加 K 数据
        }
        self.ws.send(json.dumps(subscribe_msg))
        self.logger.info(f"已发送订阅: {subscribe_msg['params']}")

    def _send_ping(self):
        """心跳线程"""
        while self.is_connected:
            time.sleep(30)
            if self.is_connected:
                ping_msg = {
                    "ac": "ping",
                    "params": str(int(time.time() * 1000))
                }
                self.ws.send(json.dumps(ping_msg))
                self.logger.debug("发送 ping")

    def connect(self):
        headers = {"token": self.token}
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            header=headers,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        # 主线程运行 WebSocket
        wst = threading.Thread(target=self.ws.run_forever)
        wst.daemon = True
        wst.start()

        # 启动心跳线程
        ping_thread = threading.Thread(target=self._send_ping)
        ping_thread.daemon = True
        ping_thread.start()

        # 等待连接建立
        for _ in range(30):
            if self.is_authenticated:
                break
            time.sleep(1)

    def disconnect(self):
        if self.ws:
            self.ws.close()
        self.is_connected = False

    def get_current_price(self, symbol: str) -> Optional[float]:
        if symbol in self.price_cache:
            return self.price_cache[symbol].get('price')
        return None

第三步:构建预警规则引擎

# alert_engine.py
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Any
import json
import logging

class AlertEngine:
    """预警规则引擎"""

    def __init__(self, db_path: str = "alerts.db"):
        self.db_path = db_path
        self.active_alerts = {}
        self.logger = logging.getLogger(__name__)
        self._init_database()

    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # 创建预警规则表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alert_rules (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                symbol TEXT NOT NULL,
                rule_type TEXT NOT NULL,
                condition TEXT NOT NULL,
                threshold REAL,
                value TEXT,
                is_active INTEGER DEFAULT 1,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                last_triggered TIMESTAMP
            )
        ''')

        # 创建预警历史表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alert_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                alert_rule_id INTEGER,
                symbol TEXT NOT NULL,
                trigger_price REAL,
                trigger_value TEXT,
                triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (alert_rule_id) REFERENCES alert_rules (id)
            )
        ''')

        conn.commit()
        conn.close()

        # 从数据库加载活动预警
        self._load_active_alerts()

    def _load_active_alerts(self):
        """从数据库加载活动预警规则"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute(
            "SELECT * FROM alert_rules WHERE is_active = 1"
        )

        for row in cursor.fetchall():
            rule_id = row[0]
            self.active_alerts[rule_id] = {
                'symbol': row[1],
                'rule_type': row[2],
                'condition': row[3],
                'threshold': row[4],
                'value': row[5],
                'last_triggered': row[7]
            }

        conn.close()
        self.logger.info(f"加载了 {len(self.active_alerts)} 个活动预警规则")

    def add_alert_rule(self, rule_data: Dict) -> int:
        """添加预警规则"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT INTO alert_rules
            (symbol, rule_type, condition, threshold, value)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            rule_data['symbol'],
            rule_data['rule_type'],
            rule_data['condition'],
            rule_data.get('threshold'),
            json.dumps(rule_data.get('value', {}))
        ))

        rule_id = cursor.lastrowid
        conn.commit()
        conn.close()

        # 添加到活动预警
        self.active_alerts[rule_id] = {
            'symbol': rule_data['symbol'],
            'rule_type': rule_data['rule_type'],
            'condition': rule_data['condition'],
            'threshold': rule_data.get('threshold'),
            'value': rule_data.get('value', {}),
            'last_triggered': None
        }

        self.logger.info(f"添加预警规则: {rule_data['symbol']} - {rule_data['rule_type']}")
        return rule_id

    def check_price_alert(self, symbol: str, price: float) -> List[Dict]:
        """检查价格预警"""
        triggered_alerts = []

        for rule_id, rule in self.active_alerts.items():
            if rule['symbol'] != symbol:
                continue

            if rule['rule_type'] == 'price':
                triggered = self._evaluate_price_condition(
                    price,
                    rule['condition'],
                    rule['threshold']
                )

                if triggered:
                    # 检查是否在冷却期内(避免频繁触发)
                    if self._is_in_cooldown(rule['last_triggered']):
                        continue

                    alert_info = {
                        'rule_id': rule_id,
                        'symbol': symbol,
                        'rule_type': 'price',
                        'condition': rule['condition'],
                        'threshold': rule['threshold'],
                        'trigger_price': price,
                        'message': self._generate_alert_message(
                            symbol, price, rule['condition'], rule['threshold']
                        )
                    }

                    triggered_alerts.append(alert_info)

                    # 更新最后触发时间
                    self._update_last_triggered(rule_id)

        return triggered_alerts

    def _evaluate_price_condition(self, price: float,
                                 condition: str, threshold: float) -> bool:
        """评估价格条件"""
        if condition == 'above':
            return price > threshold
        elif condition == 'below':
            return price < threshold
        elif condition == 'cross_above':
            # 需要历史价格数据,这里简化为单次比较
            return price >= threshold
        elif condition == 'cross_below':
            return price <= threshold
        elif condition == 'percentage_change':
            # 需要历史价格计算百分比变化
            # 实际应用中需要更复杂的实现
            return False

        return False

    def _is_in_cooldown(self, last_triggered, cooldown_minutes: int = 5) -> bool:
        """检查是否在冷却期内"""
        if not last_triggered:
            return False

        if isinstance(last_triggered, str):
            last_triggered = datetime.fromisoformat(last_triggered)

        cooldown_end = last_triggered + timedelta(minutes=cooldown_minutes)
        return datetime.now() < cooldown_end

    def _update_last_triggered(self, rule_id: int):
        """更新最后触发时间"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            UPDATE alert_rules
            SET last_triggered = CURRENT_TIMESTAMP
            WHERE id = ?
        ''', (rule_id,))

        conn.commit()
        conn.close()

        # 更新内存中的记录
        if rule_id in self.active_alerts:
            self.active_alerts[rule_id]['last_triggered'] = datetime.now().isoformat()

    def _generate_alert_message(self, symbol: str, price: float,
                               condition: str, threshold: float) -> str:
        """生成预警消息"""
        if condition == 'above':
            return f"🚨 {symbol} 价格突破 {threshold},当前价格: {price:.2f}"
        elif condition == 'below':
            return f"⚠️ {symbol} 价格跌破 {threshold},当前价格: {price:.2f}"
        else:
            return f"📈 {symbol} 触发预警条件,当前价格: {price:.2f}"

第四步:实现通知发送器

# notifier.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging
from typing import Dict

class AlertNotifier:
    """预警通知器"""

    def __init__(self, config: Dict):
        self.config = config
        self.logger = logging.getLogger(__name__)

    def send_email(self, to_email: str, subject: str, body: str) -> bool:
        """发送邮件通知"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.config.get('email_from')
            msg['To'] = to_email
            msg['Subject'] = subject

            msg.attach(MIMEText(body, 'plain'))

            with smtplib.SMTP(
                self.config.get('smtp_server'),
                self.config.get('smtp_port', 587)
            ) as server:
                server.starttls()
                server.login(
                    self.config.get('email_user'),
                    self.config.get('email_password')
                )
                server.send_message(msg)

            self.logger.info(f"邮件发送成功: {to_email}")
            return True

        except Exception as e:
            self.logger.error(f"邮件发送失败: {e}")
            return False

    def send_telegram(self, chat_id: str, message: str) -> bool:
        """发送Telegram通知(简化版)"""
        try:
            # 实际应用中需要使用python-telegram-bot库
            # 这里使用requests模拟
            import requests

            bot_token = self.config.get('telegram_bot_token')
            if not bot_token:
                self.logger.warning("未配置Telegram Bot Token")
                return False

            url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
            payload = {
                'chat_id': chat_id,
                'text': message,
                'parse_mode': 'HTML'
            }

            response = requests.post(url, json=payload)

            if response.status_code == 200:
                self.logger.info(f"Telegram消息发送成功: {chat_id}")
                return True
            else:
                self.logger.error(f"Telegram发送失败: {response.text}")
                return False

        except ImportError:
            self.logger.warning("未安装requests库,无法发送Telegram消息")
            return False
        except Exception as e:
            self.logger.error(f"Telegram发送错误: {e}")
            return False

    def send_console_notification(self, alert_info: Dict):
        """控制台通知(开发调试用)"""
        print("\n" + "="*50)
        print("🚨 股票预警触发 🚨")
        print(f"股票: {alert_info['symbol']}")
        print(f"价格: {alert_info['trigger_price']:.2f}")
        print(f"条件: {alert_info['condition']} {alert_info['threshold']}")
        print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("="*50 + "\n")

第五步:整合主应用程序

# main.py
import json
import time
import signal
import sys
from datetime import datetime
from typing import List
import logging

from tick_data_connector import TickDataAPIClient
from alert_engine import AlertEngine
from notifier import AlertNotifier

class StockAlertSystem:
    """股票预警系统主程序"""

    def __init__(self, config_path: str = "config.json"):
        # 加载配置
        with open(config_path, 'r') as f:
            self.config = json.load(f)

        # 设置日志
        self._setup_logging()
        self.logger = logging.getLogger(__name__)

        # 初始化组件
        self.api_client = TickDataAPIClient(
            api_key=self.config['itick']['token'],
            symbols=self.config['monitor_symbols']
        )

        self.alert_engine = AlertEngine(
            db_path=self.config.get('database_path', 'alerts.db')
        )

        self.notifier = AlertNotifier(self.config['notifications'])

        # 添加预定义预警规则
        self._setup_default_alerts()

        # 运行标志
        self.running = False

    def _setup_logging(self):
        """配置日志系统"""
        log_config = self.config.get('logging', {})
        log_level = getattr(logging, log_config.get('level', 'INFO'))

        logging.basicConfig(
            level=log_level,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_config.get('file', 'stock_alert.log')),
                logging.StreamHandler()
            ]
        )

    def _setup_default_alerts(self):
        """设置默认预警规则"""
        default_alerts = self.config.get('default_alerts', [])

        for alert_rule in default_alerts:
            self.alert_engine.add_alert_rule(alert_rule)

    def handle_tick_data(self, tick_data: Dict):
        """处理实时tick数据"""
        symbol = tick_data['symbol']
        price = tick_data['price']

        # 检查是否触发预警
        triggered_alerts = self.alert_engine.check_price_alert(symbol, price)

        # 发送通知
        for alert in triggered_alerts:
            self._send_alert_notifications(alert)

    def _send_alert_notifications(self, alert_info: Dict):
        """发送预警通知"""
        message = alert_info['message']

        # 控制台通知(开发用)
        self.notifier.send_console_notification(alert_info)

        # 邮件通知
        if self.config['notifications'].get('enable_email', False):
            for email in self.config['notifications'].get('email_recipients', []):
                self.notifier.send_email(
                    email,
                    f"股票预警: {alert_info['symbol']}",
                    message
                )

        # Telegram通知
        if self.config['notifications'].get('enable_telegram', False):
            for chat_id in self.config['notifications'].get('telegram_chat_ids', []):
                self.notifier.send_telegram(chat_id, message)

        # 记录到日志
        self.logger.info(f"预警触发: {message}")

    def run(self):
        """运行主程序"""
        self.running = True

        # 设置信号处理
        signal.signal(signal.SIGINT, self.shutdown)
        signal.signal(signal.SIGTERM, self.shutdown)

        # 注册数据回调
        self.api_client.add_data_callback(self.handle_tick_data)

        # 连接API
        self.logger.info("正在连接Tick API...")
        self.api_client.connect()

        # 主循环
        self.logger.info("股票预警系统已启动")
        try:
            while self.running:
                # 检查系统状态
                self._monitor_system_health()
                time.sleep(1)

        except KeyboardInterrupt:
            self.shutdown()
        except Exception as e:
            self.logger.error(f"系统运行错误: {e}")
            self.shutdown()

    def _monitor_system_health(self):
        """监控系统健康状态"""
        # 这里可以添加更多的健康检查逻辑
        # 例如:检查API连接状态、数据库连接、磁盘空间等

        # 示例:检查最近数据更新时间
        for symbol in self.config['monitor_symbols']:
            last_update = self.api_client.last_update.get(symbol)
            if last_update:
                time_diff = (datetime.now() - last_update).total_seconds()
                if time_diff > 60:  # 超过60秒没有数据
                    self.logger.warning(f"{symbol} 数据更新延迟: {time_diff:.0f}秒")

    def shutdown(self, signum=None, frame=None):
        """关闭系统"""
        self.logger.info("正在关闭系统...")
        self.running = False

        # 断开API连接
        self.api_client.disconnect()

        self.logger.info("系统已关闭")
        sys.exit(0)

if __name__ == "__main__":
    # 创建配置文件(示例)
    sample_config = {
        "itick": {
            "token": "your_iTick_api_key_here",
        },
        "monitor_symbols": ["AAPL$US", "TSLA$US", "GOOGL$US", "MSFT$US"],
        "default_alerts": [
            {
                "symbol": "AAPL$US",
                "rule_type": "price",
                "condition": "above",
                "threshold": 230.0,
                "value": {}
            },
            {
                "symbol": "TSLA$US",
                "rule_type": "price",
                "condition": "below",
                "threshold": 300.0,
                "value": {}
            }
        ],
        "notifications": {
            "enable_email": False,
            "email_user": "your_email@gmail.com",
            "email_password": "your_app_password",
            "smtp_server": "smtp.gmail.com",
            "smtp_port": 587,
            "email_from": "your_email@gmail.com",
            "email_recipients": ["recipient@example.com"],
            "enable_telegram": False,
            "telegram_bot_token": "your_bot_token",
            "telegram_chat_ids": ["your_chat_id"]
        },
        "logging": {
            "level": "INFO",
            "file": "stock_alert.log"
        },
        "database_path": "alerts.db"
    }

    # 保存示例配置
    with open("config_sample.json", "w") as f:
        json.dump(sample_config, f, indent=2)

    print("示例配置文件已生成: config_sample.json")
    print("请复制并修改为 config.json,然后运行系统")

    # 实际运行代码(取消注释以下行)
    # system = StockAlertSystem("config.json")
    # system.run()

第六步:扩展功能和优化建议

1. 添加更多预警规则类型

# 技术指标预警
def add_technical_alert(self, symbol: str, indicator: str,
                       condition: str, value: float):
    """添加技术指标预警"""
    pass  # 实现MACD、RSI、布林带等指标预警

# 成交量异常预警
def add_volume_alert(self, symbol: str, volume_multiplier: float = 2.0):
    """添加成交量异常预警"""
    pass

2. 实现数据持久化和分析

def save_tick_data_to_database(self, tick_data: Dict):
    """保存tick数据到数据库"""
    conn = sqlite3.connect("tick_data.db")
    cursor = conn.cursor()

    cursor.execute('''
        INSERT INTO tick_data
        (symbol, price, volume, timestamp, bid, ask, bid_size, ask_size)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    ''', (
        tick_data['symbol'],
        tick_data['price'],
        tick_data['volume'],
        tick_data['timestamp'],
        tick_data['bid'],
        tick_data['ask'],
        tick_data['bid_size'],
        tick_data['ask_size']
    ))

    conn.commit()
    conn.close()

3. 添加 Web 界面(使用 Flask 或 FastAPI)

# web_interface.py
from flask import Flask, render_template, request, jsonify

app = Flask(__name__)

@app.route('/')
def dashboard():
    """监控仪表板"""
    return render_template('dashboard.html')

@app.route('/api/alerts', methods=['GET'])
def get_alerts():
    """获取预警列表API"""
    # 从数据库查询预警
    return jsonify({"alerts": []})

@app.route('/api/add_alert', methods=['POST'])
def add_alert():
    """添加预警规则API"""
    rule_data = request.json
    # 添加到预警引擎
    return jsonify({"success": True})

部署和运维建议

  1. 服务器选择:使用云服务器(AWS EC2、阿里云 ECS)确保 24 小时运行
  2. 进程管理:使用 Supervisor 或 systemd 管理进程
  3. 日志监控:配置日志轮转和监控告警
  4. 备份策略:定期备份数据库和配置文件
  5. 安全考虑
    • API 密钥使用环境变量存储
    • 数据库加密
    • 定期更新依赖库

故障排除和调试

# debug_utils.py
def check_api_connection():
    """检查API连接状态"""
    pass

def simulate_tick_data():
    """模拟tick数据用于测试"""
    pass

def validate_alert_rules():
    """验证预警规则配置"""
    pass

总结

通过本教程,你已经学会了:

  1. 如何使用 Python 连接实时股票 API
  2. 构建一个可扩展的预警规则引擎
  3. 实现多种通知方式(邮件、Telegram 等)
  4. 设计一个完整的股票监控系统架构

下一步改进方向

  1. 性能优化:使用异步 IO 提高并发处理能力
  2. 机器学习集成:添加基于机器学习的智能预警
  3. 多市场支持:扩展支持加密货币、外汇等市场
  4. 移动应用:开发手机 App 实时接收预警

参考文档:https://blog.itick.org/quant-trading/stock-api-integration-with-telegram-alerts
GitHub 项目地址https://github.com/itick-org/

0
0
0
0
评论
未登录
暂无评论