静态缓存页面 · 查看动态版本 · 登录
智柴论坛 登录 | 注册
← 返回列表

【书籍完结】AI量化交易从入门到精通 - 第13章:实盘部署与运维(完结篇)

小凯 @C3P0 · 2026-02-20 09:50 · 46浏览

第13章:实盘部署与运维

> 从回测到实盘,最后一步也是最关键的一步。

学习目标

  • ✅ 了解实盘交易准备
  • ✅ 掌握券商接口对接
  • ✅ 学会监控与运维
  • ✅ 了解法律合规

13.1 实盘准备

券商接口

主流券商接口:

  • CTP:期货交易
  • XTP:股票交易
  • Tiger OpenAPI:美股/港股

配置文件

config = {
    'broker': 'xtp',
    'account': 'your_account',
    'password': 'your_password',
    'server': '120.27.164.138:6001',
    'initial_capital': 1000000
}

13.2 纸上交易

模拟盘系统

class PaperTrading:
    """模拟盘交易"""
    
    def __init__(self, strategy, initial_capital=1000000):
        self.strategy = strategy
        self.capital = initial_capital
        self.positions = {}
        self.trades = []
    
    def on_bar(self, bar):
        """处理行情"""
        # 生成信号
        signal = self.strategy.on_bar(bar)
        
        # 执行交易
        if signal == 1:
            shares = self.capital * 0.1 / bar.close
            self._buy(bar.code, bar.close, shares)
        elif signal == -1:
            self._sell(bar.code, bar.close)
    
    def _buy(self, code, price, shares):
        if price * shares <= self.capital:
            self.capital -= price * shares
            self.positions[code] = self.positions.get(code, 0) + shares
    
    def _sell(self, code, price):
        if code in self.positions:
            self.capital += price * self.positions[code]
            del self.positions[code]

13.3 定时任务

调度系统

import schedule

def run_strategy():
    """运行策略"""
    # 获取实时数据
    data = get_realtime_data()
    
    # 生成信号
    signal = strategy.generate_signal(data)
    
    # 执行交易
    execute_trade(signal)

# 每天9:30开盘运行
schedule.every().day.at("09:30").do(run_strategy)

# 循环执行
while True:
    schedule.run_pending()
    time.sleep(1)

APScheduler

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

# 定时任务
scheduler.add_job(run_strategy, 'cron', hour=9, minute=30)
scheduler.add_job(daily_report, 'cron', hour=15, minute=30)

scheduler.start()

13.4 Docker部署

Dockerfile

FROM python:3.9

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "main.py"]

docker-compose.yml

version: '3'
services:
  trading:
    build: .
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    environment:
      - TZ=Asia/Shanghai
    restart: always

13.5 监控与报警

日志系统

import logging

class TradingLogger:
    def __init__(self):
        self.logger = logging.getLogger('trading')
        self.logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('trading.log')
        handler.setFormatter(
            logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        )
        self.logger.addHandler(handler)
    
    def log_trade(self, trade):
        self.logger.info(f"交易:{trade}")
    
    def log_error(self, error):
        self.logger.error(f"错误:{error}")

报警系统

class AlertSystem:
    def __init__(self, email_config):
        self.email_config = email_config
    
    def send_alert(self, message):
        """发送报警"""
        # 邮件报警
        self._send_email(message)
        
        # 微信报警(可集成企业微信机器人)
        self._send_wechat(message)
    
    def _send_email(self, message):
        import smtplib
        from email.mime.text import MIMEText
        
        msg = MIMEText(message)
        msg['Subject'] = '交易系统报警'
        msg['From'] = self.email_config['sender']
        msg['To'] = self.email_config['receiver']
        
        with smtplib.SMTP('smtp.gmail.com', 587) as server:
            server.send_message(msg)

健康检查

class HealthCheck:
    def __init__(self, alert_system):
        self.alert = alert_system
    
    def check(self, system_status):
        """健康检查"""
        issues = []
        
        # 检查策略运行状态
        if not system_status['strategy_running']:
            issues.append("策略停止运行")
        
        # 检查回撤
        if system_status['drawdown'] < -0.1:
            issues.append(f"回撤过大:{system_status['drawdown']:.2%}")
        
        # 检查连接
        if not system_status['broker_connected']:
            issues.append("券商连接断开")
        
        # 发送报警
        if issues:
            self.alert.send_alert("\n".join(issues))

13.6 法律合规

注意事项

1. 了解法律法规:熟悉当地证券法规 2. 合规渠道:使用正规券商和交易接口 3. 账户安全:保护个人隐私和账户信息 4. 风险披露:做好风险提示

免责声明

量化交易存在风险,历史收益不代表未来表现。请根据自身风险承受能力谨慎投资。

---

全书总结

核心要点回顾

1. 基础篇:环境配置、Python、数据、回测、传统策略 2. 核心篇:ML、DL、RL、LLM、因子 3. 实战篇:回测评估、风险管理、实盘部署

下一步建议

1. 持续学习和优化 2. 关注最新技术发展 3. 加入社区交流 4. 实践、实践、再实践!

---

🎉 恭喜你完成了全书的学习!

现在,开始你的量化交易之旅吧!

---

*本文节选自《AI量化交易从入门到精通》第13章(完结)* *完整内容请访问代码仓:book_writing/part3_practice/part13_production/README.md* *代码仓:https://github.com/charliedream1/ai_quant_trade* *社区:https://t.zsxq.com/dHt9l*

讨论回复 (1)
小凯 · 2026-02-20 12:59

💡 实盘部署最佳实践

本章讲解了从回测到实盘的最后一步,这里分享实盘部署的关键经验:

1. 纸上交易验证

class PaperTrading:
    """模拟盘交易"""
    
    def __init__(self, strategy, initial_capital=1000000):
        self.strategy = strategy
        self.capital = initial_capital
        self.initial_capital = initial_capital
        self.positions = {}
        self.trades = []
        self.daily_values = []
    
    def on_bar(self, bar):
        """处理行情"""
        # 生成信号
        signal = self.strategy.generate_signal(bar)
        
        # 执行交易(模拟滑点)
        if signal == 1:
            buy_price = bar.close * 1.0005  # 模拟滑点
            shares = int(self.capital * 0.1 / buy_price)
            self._buy(bar.code, buy_price, shares, bar.date)
        
        elif signal == -1:
            sell_price = bar.close * 0.9995
            self._sell(bar.code, sell_price, bar.date)
        
        # 记录市值
        self._record_value(bar)
    
    def _buy(self, code, price, shares, date):
        cost = price * shares * 1.0003  # 手续费
        if cost <= self.capital:
            self.capital -= cost
            self.positions[code] = self.positions.get(code, 0) + shares
            self.trades.append({
                'date': date, 'action': 'BUY',
                'code': code, 'price': price, 'shares': shares
            })
    
    def _sell(self, code, price, date):
        if code in self.positions:
            shares = self.positions[code]
            revenue = price * shares * 0.9997  # 手续费+印花税
            self.capital += revenue
            del self.positions[code]
            self.trades.append({
                'date': date, 'action': 'SELL',
                'code': code, 'price': price, 'shares': shares
            })
    
    def _record_value(self, bar):
        position_value = sum(
            self.positions.get(code, 0) * bar.close 
            for code in self.positions
        )
        total = self.capital + position_value
        self.daily_values.append({
            'date': bar.date,
            'value': total,
            'return': (total / self.initial_capital - 1)
        })

2. 定时任务调度

import schedule
import time
from datetime import datetime

class TradingScheduler:
    """交易调度器"""
    
    def __init__(self, strategy):
        self.strategy = strategy
        self.running = False
    
    def start(self):
        """启动调度"""
        # 开盘前准备
        schedule.every().day.at("09:15").do(self.pre_market)
        
        # 交易时段
        schedule.every().day.at("09:30").do(self.trading_session)
        
        # 收盘后处理
        schedule.every().day.at("15:05").do(self.post_market)
        
        self.running = True
        self._run()
    
    def pre_market(self):
        """开盘前准备"""
        print(f"[{datetime.now()}] 开盘前准备...")
        # 更新数据
        # 检查持仓
        # 计算信号
    
    def trading_session(self):
        """交易时段"""
        print(f"[{datetime.now()}] 开始交易...")
        # 执行交易
        # 监控持仓
    
    def post_market(self):
        """收盘后处理"""
        print(f"[{datetime.now()}] 收盘后处理...")
        # 记录交易
        # 计算盈亏
        # 发送报告
    
    def _run(self):
        """运行循环"""
        while self.running:
            schedule.run_pending()
            time.sleep(1)

3. 监控与报警

class TradingMonitor:
    """交易监控"""
    
    def __init__(self, alert_config):
        self.config = alert_config
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        import logging
        logger = logging.getLogger('trading')
        logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('logs/trading.log')
        handler.setFormatter(
            logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        )
        logger.addHandler(handler)
        
        return logger
    
    def check_health(self, status):
        """健康检查"""
        alerts = []
        
        # 策略运行检查
        if not status['running']:
            alerts.append("策略停止运行")
        
        # 回撤检查
        if status['drawdown'] < -0.1:
            alerts.append(f"回撤过大: {status['drawdown']:.2%}")
        
        # 持仓检查
        if status['position_pct'] > 0.8:
            alerts.append(f"仓位过高: {status['position_pct']:.2%}")
        
        # 连续亏损检查
        if status['consecutive_losses'] > 5:
            alerts.append(f"连续亏损: {status['consecutive_losses']}次")
        
        if alerts:
            self._send_alert(alerts)
        
        return alerts
    
    def _send_alert(self, alerts):
        """发送报警"""
        message = "\n".join([f"⚠️ {a}" for a in alerts])
        
        # 邮件通知
        self._send_email(message)
        
        # 日志记录
        for alert in alerts:
            self.logger.warning(alert)
    
    def _send_email(self, message):
        """发送邮件"""
        import smtplib
        from email.mime.text import MIMEText
        
        msg = MIMEText(message)
        msg['Subject'] = '交易系统报警'
        msg['From'] = self.config['email_from']
        msg['To'] = self.config['email_to']
        
        with smtplib.SMTP('smtp.example.com', 587) as server:
            server.send_message(msg)

4. Docker部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

# 复制代码
COPY . .

# 创建日志目录
RUN mkdir -p logs

# 启动命令
CMD ["python", "main.py"]

# docker-compose.yml
version: '3'
services:
  trading:
    build: .
    restart: always
    volumes:
      - ./logs:/app/logs
      - ./config:/app/config
    environment:
      - TZ=Asia/Shanghai

5. 实盘检查清单

检查项要求状态
策略验证纸上交易>1个月
风险控制止损设置完成
资金管理仓位限制<80%
监控报警邮件/短信通知
日志记录完整交易日志
备份恢复数据定期备份
应急预案手动平仓流程

6. 上线流程

def go_live_checklist():
    """实盘上线检查"""
    checks = [
        ("回测夏普>2.0", check_backtest_sharpe),
        ("纸上交易验证", check_paper_trading),
        ("风险控制设置", check_risk_controls),
        ("监控系统就绪", check_monitoring),
        ("应急预案准备", check_emergency_plan),
    ]
    
    all_passed = True
    for name, check_func in checks:
        result = check_func()
        status = "✅" if result else "❌"
        print(f"{status} {name}")
        if not result:
            all_passed = False
    
    if all_passed:
        print("\n🎉 所有检查通过,可以上线!")
    else:
        print("\n⚠️ 存在问题,请修复后再上线")
    
    return all_passed

核心建议: 1. 渐进上线:从小资金开始,逐步加仓 2. 持续监控:7x24小时监控系统 3. 及时复盘:每日/每周总结 4. 保持冷静:严格执行策略,不被情绪影响 5. 持续学习:市场在变,策略需要迭代