从回测到实盘,最后一步也是最关键的一步。
主流券商接口:
config = {
'broker': 'xtp',
'account': 'your_account',
'password': 'your_password',
'server': '120.27.164.138:6001',
'initial_capital': 1000000
}
class PaperTrading:
"""模拟盘交易"""
def __init__(self, strategy, initial_capital=1000000):
self.strategy = strategy
self.capital = initial_capital
self.positions = {}
self.trades = []
def on_bar(self, bar):
"""处理行情"""
# 生成信号
signal = self.strategy.on_bar(bar)
# 执行交易
if signal == 1:
shares = self.capital * 0.1 / bar.close
self._buy(bar.code, bar.close, shares)
elif signal == -1:
self._sell(bar.code, bar.close)
def _buy(self, code, price, shares):
if price * shares <= self.capital:
self.capital -= price * shares
self.positions[code] = self.positions.get(code, 0) + shares
def _sell(self, code, price):
if code in self.positions:
self.capital += price * self.positions[code]
del self.positions[code]
import schedule
def run_strategy():
"""运行策略"""
# 获取实时数据
data = get_realtime_data()
# 生成信号
signal = strategy.generate_signal(data)
# 执行交易
execute_trade(signal)
# 每天9:30开盘运行
schedule.every().day.at("09:30").do(run_strategy)
# 循环执行
while True:
schedule.run_pending()
time.sleep(1)
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
# 定时任务
scheduler.add_job(run_strategy, 'cron', hour=9, minute=30)
scheduler.add_job(daily_report, 'cron', hour=15, minute=30)
scheduler.start()
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "main.py"]
version: '3'
services:
trading:
build: .
volumes:
- ./data:/app/data
- ./logs:/app/logs
environment:
- TZ=Asia/Shanghai
restart: always
import logging
class TradingLogger:
def __init__(self):
self.logger = logging.getLogger('trading')
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler('trading.log')
handler.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
self.logger.addHandler(handler)
def log_trade(self, trade):
self.logger.info(f"交易:{trade}")
def log_error(self, error):
self.logger.error(f"错误:{error}")
class AlertSystem:
def __init__(self, email_config):
self.email_config = email_config
def send_alert(self, message):
"""发送报警"""
# 邮件报警
self._send_email(message)
# 微信报警(可集成企业微信机器人)
self._send_wechat(message)
def _send_email(self, message):
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(message)
msg['Subject'] = '交易系统报警'
msg['From'] = self.email_config['sender']
msg['To'] = self.email_config['receiver']
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.send_message(msg)
class HealthCheck:
def __init__(self, alert_system):
self.alert = alert_system
def check(self, system_status):
"""健康检查"""
issues = []
# 检查策略运行状态
if not system_status['strategy_running']:
issues.append("策略停止运行")
# 检查回撤
if system_status['drawdown'] < -0.1:
issues.append(f"回撤过大:{system_status['drawdown']:.2%}")
# 检查连接
if not system_status['broker_connected']:
issues.append("券商连接断开")
# 发送报警
if issues:
self.alert.send_alert("\n".join(issues))
量化交易存在风险,历史收益不代表未来表现。请根据自身风险承受能力谨慎投资。
🎉 恭喜你完成了全书的学习!
现在,开始你的量化交易之旅吧!
本文节选自《AI量化交易从入门到精通》第13章(完结)
完整内容请访问代码仓:bookwriting/part3practice/part13production/README.md
代码仓:https://github.com/charliedream1/aiquant_trade
社区:https://t.zsxq.com/dHt9l
还没有人回复