下载MD:
https://www.yuque.com/llj2/trade/tv0g3oqa9kwe7ggy
聚宽量化交易平台API完整使用指南
前言
本指南是基于聚宽(JoinQuant)量化交易平台API的完整使用手册,旨在为初学者提供从零开始学习量化交易策略开发的完整路径。聚宽是中国领先的量化交易平台,采用Python语言,为投资者提供了丰富的金融数据API和完整的策略开发环境。
通过本指南的学习,您将能够:
- 掌握聚宽平台的核心API使用方法
- 理解量化交易策略的开发流程
- 学会数据获取、处理和分析技巧
- 实现交易执行和风险管理
- 开发属于自己的量化交易策略
目录
1. 平台概述与基础概念
1.1 聚宽平台简介
聚宽(JoinQuant)是中国专业的量化交易研究平台,提供完整的Python编程环境用于量化策略开发。平台支持股票、基金、ETF、期货等多种金融产品的策略回测和模拟交易。[^1_1][^1_2][^1_3][^1_4][^1_5]
核心特色:
- 丰富的历史数据和实时数据
- 完整的Python量化开发环境
- 专业的回测引擎和风险分析工具
- 支持多种金融产品交易
- 提供因子库和技术分析工具
1.2 基础概念解释
在开始学习API之前,需要理解以下核心概念:
- context: 策略上下文对象,包含策略运行状态、账户信息、持仓等
- security: 证券代码,如'000001.XSHE'表示平安银行
- g: 全局变量对象,用于存储策略参数和状态
- data: 当前时点的市场数据对象
- order: 订单对象,包含下单相关信息
1.3 证券代码规范
聚宽平台使用标准化的证券代码格式:
# A股股票代码格式
'000001.XSHE' # 深交所股票(平安银行)
'600000.XSHG' # 上交所股票(浦发银行)
# 指数代码格式
'000300.XSHG' # 沪深300指数
'000905.XSHG' # 中证500指数
'399006.XSHE' # 创业板指数
# ETF基金代码格式
'510300.XSHG' # 沪深300ETF
'159915.XSHE' # 创业板ETF
# 期货代码格式(主力合约)
'RB9999.XSGE' # 螺纹钢主力合约
2. 策略框架结构
2.1 基本框架
每个聚宽策略都遵循标准的生命周期模式:
# 导入必要的库
from jqdata import *
import numpy as np
import pandas as pd
def initialize(context):
"""
策略初始化函数,只在策略开始时运行一次
用于设置基准、手续费、运行时间等
"""
pass
def before_trading_start(context):
"""每日开盘前运行(可选)"""
pass
def handle_data(context, data):
"""主要交易逻辑,按设定频率运行(可选)"""
pass
def after_trading_end(context):
"""每日收盘后运行(可选)"""
pass
2.2 函数说明
必需函数:
-
initialize(context)
: 策略初始化函数,系统自动调用,仅运行一次
可选函数:
-
before_trading_start(context)
: 开盘前执行 -
handle_data(context, data)
: 每个bar执行 -
after_trading_end(context)
: 收盘后执行 - 自定义定时执行函数
2.3 策略开发流程
- 策略设计: 确定投资理念和交易逻辑
- 数据准备: 获取所需的历史数据和实时数据
- 信号生成: 基于数据分析生成买卖信号
- 风险控制: 设置止损止盈和仓位管理
- 回测验证: 验证策略的有效性和稳定性
- 参数优化: 调整策略参数提高表现
- 模拟交易: 在模拟环境中验证策略
3. 核心配置API
3.1 基准设置
API函数:
set_benchmark(security)
设置策略基准指数,用于性能比较。
# 基本用法
set_benchmark('000300.XSHG') # 设置沪深300为基准
# 常用基准指数
set_benchmark('000300.XSHG') # 沪深300指数
set_benchmark('000905.XSHG') # 中证500指数
set_benchmark('399006.XSHE') # 创业板指数
set_benchmark('000001.XSHG') # 上证综指
set_benchmark('399101.XSHE') # 深证成指
3.2 系统选项配置
API函数:
set_option(option_name, value)
配置策略运行的系统参数。
# 重要配置(强烈推荐设置)
set_option('use_real_price', True) # 使用真实价格交易,避免回测偏差
set_option('avoid_future_data', True) # 避免未来数据泄露
# 可选配置
set_option('order_volume_ratio', 0.25) # 成交量比例限制(单笔订单不超过5分钟成交量的25%)
set_option('match_with_order_book', True) # 开启盘口撮合(更精确的成交模拟)
重要参数说明:
-
use_real_price
: 确保使用真实价格进行交易,避免不切实际的回测结果 -
avoid_future_data
: 防止在策略中意外使用未来数据,确保策略的真实性
3.3 交易成本设置
API函数:
set_order_cost(OrderCost(**kwargs), type)
设置不同类型证券的交易成本。
# 股票交易成本设置
set_order_cost(OrderCost(
open_tax=0, # 买入印花税(通常为0)
close_tax=0.001, # 卖出印花税(千分之1)
open_commission=0.0003, # 买入佣金(万分之3)
close_commission=0.0003, # 卖出佣金(万分之3)
close_today_commission=0, # 平今仓佣金
min_commission=5 # 最低佣金5元
), type='stock')
# ETF/基金交易成本设置(无印花税)
set_order_cost(OrderCost(
open_tax=0,
close_tax=0, # ETF无印花税
open_commission=0.0003,
close_commission=0.0003,
min_commission=5
), type='fund')
# 期货交易成本设置
set_order_cost(OrderCost(
open_tax=0,
close_tax=0,
open_commission=0.0002, # 期货佣金相对较低
close_commission=0.0002,
close_today_commission=0.0002,
min_commission=5
), type='futures')
3.4 滑点设置
API函数:
set_slippage(slippage_type)
设置交易滑点,模拟实际交易中的价格冲击。
# 固定滑点(推荐)
set_slippage(FixedSlippage(0.002)) # 固定0.2%滑点
# 价格相关滑点
set_slippage(PriceRelatedSlippage(0.002)) # 价格相关0.2%滑点
# 零滑点(理想情况,不建议在实际策略中使用)
set_slippage(FixedSlippage(0))
# 步长相关滑点(主要用于期货)
set_slippage(StepRelatedSlippage(2)) # 2个最小变动单位
3.5 完整初始化示例
def initialize(context):
"""完整的策略初始化示例"""
# 设置基准
set_benchmark('000300.XSHG')
# 系统配置
set_option('use_real_price', True)
set_option('avoid_future_data', True)
set_option('order_volume_ratio', 0.25)
# 交易成本
set_order_cost(OrderCost(
open_tax=0,
close_tax=0.001,
open_commission=0.0003,
close_commission=0.0003,
min_commission=5
), type='stock')
# 滑点设置
set_slippage(FixedSlippage(0.002))
# 策略参数
g.stock_num = 10 # 持仓股票数量
g.rebalance_period = 20 # 调仓周期(天)
g.benchmark = '000300.XSHG' # 基准指数
# 初始化变量
g.current_positions = [] # 当前持仓
g.target_stocks = [] # 目标股票池
g.trade_count = 0 # 交易次数统计
# 设置定时运行
run_daily(before_market_open, time='9:00')
run_daily(trade_stocks, time='9:30')
run_daily(after_market_close, time='15:30')
4. 数据获取API详解
4.1 价格数据获取
4.1.1 get_price() – 核心价格数据API
API函数:
get_price(security, start_date, end_date, frequency, fields, skip_paused, fq, count, panel, fill_paused)
获取历史价格数据的主要函数。
# 基础用法 - 获取单只股票数据
df = get_price(
security='000001.XSHE', # 股票代码
start_date='2023-01-01', # 开始日期
end_date='2023-12-31', # 结束日期
frequency='daily', # 数据频率
fields=['open', 'close', 'high', 'low', 'volume', 'money'], # 数据字段
skip_paused=True, # 跳过停牌日
fq='pre' # 复权方式:'pre'前复权/'post'后复权/None不复权
)
# 使用count参数获取最近N天数据
df = get_price('000001.XSHE', count=30, fields=['close', 'volume'])
# 获取多只股票数据
stock_list = ['000001.XSHE', '000002.XSHE', '600000.XSHG']
df = get_price(stock_list, count=20, frequency='daily', panel=False)
# 获取分钟级数据
df = get_price('000001.XSHE', count=240, frequency='1m', fields=['close'])
# 获取不同频率数据
df_5m = get_price('000001.XSHE', count=100, frequency='5m') # 5分钟数据
df_15m = get_price('000001.XSHE', count=50, frequency='15m') # 15分钟数据
df_1h = get_price('000001.XSHE', count=30, frequency='60m') # 1小时数据
参数详解:
-
security
: 股票代码或代码列表 -
start_date/end_date
: 开始和结束日期 -
frequency
: 数据频率-
'daily'
或'1d'
: 日线数据 -
'1m'
,'5m'
,'15m'
,'30m'
,'60m'
: 分钟级数据
-
-
fields
: 数据字段['open', 'close', 'high', 'low', 'volume', 'money']
-
fq
: 复权类型-
'pre'
: 前复权(推荐) -
'post'
: 后复权 -
None
: 不复权
-
-
panel
: 返回格式(False
推荐,返回DataFrame)
4.1.2 attribute_history() – 简化历史数据获取
API函数:
attribute_history(security, count, unit, fields, skip_paused, df, fq)
获取单只股票历史数据的便捷函数。
# 获取最近20天的收盘价
closes = attribute_history('000001.XSHE', 20, '1d', ['close'])
# 获取完整OHLCV数据
data = attribute_history(
security='000001.XSHE',
count=30, # 获取30天数据
unit='1d', # 时间单位
fields=['open', 'close', 'high', 'low', 'volume'],
skip_paused=True,
df=True, # 返回DataFrame格式
fq='pre' # 前复权
)
# 计算技术指标
ma5 = closes['close'].rolling(5).mean() # 5日均线
ma20 = closes['close'].rolling(20).mean() # 20日均线
4.1.3 history() – 多股票历史数据
API函数:
history(count, unit, field, security_list, df, skip_paused)
获取多只股票指定字段的历史数据。
# 获取多只股票的收盘价
stock_list = ['000001.XSHE', '000002.XSHE', '600000.XSHG']
prices = history(
count=10, # 最近10天
unit='1d', # 日线数据
field='close', # 只获取收盘价
security_list=stock_list,
df=True # 返回DataFrame
)
# 获取实时数据(在handle_data中使用)
def handle_data(context, data):
current_prices = history(1, '1m', 'close', g.stock_pool, df=False)
for stock in g.stock_pool:
print(f"{stock}: {current_prices[stock][-1]}")
4.2 实时数据获取
4.2.1 get_current_data() – 当前市场数据
API函数:
get_current_data()
获取所有股票的实时市场数据。
def check_market_status(context):
"""检查市场状态示例"""
current_data = get_current_data()
for stock in g.stock_pool:
stock_data = current_data[stock]
# 基础价格信息
last_price = stock_data.last_price # 最新价格
day_open = stock_data.day_open # 开盘价
high_limit = stock_data.high_limit # 涨停价
low_limit = stock_data.low_limit # 跌停价
# 状态信息
is_paused = stock_data.paused # 是否停牌
is_st = stock_data.is_st # 是否ST股票
name = stock_data.name # 股票名称
# 交易决策
if not is_paused and not is_st:
# 检查涨跌停状态
if last_price < high_limit and last_price > low_limit:
# 可以正常交易
log.info(f"{name}({stock}): 价格 {last_price}, 可交易")
else:
log.warning(f"{name}({stock}): 触及涨跌停,无法交易")
4.3 基本面数据获取
4.3.1 get_fundamentals() – 财务数据查询
API函数:
get_fundamentals(query_object, date, statDate)
获取上市公司基本面数据的核心函数。
# 基础财务数据查询
def get_fundamental_data(context):
"""获取基本面数据示例"""
q = query(
# 基础信息
valuation.code, # 股票代码
valuation.market_cap, # 总市值(亿元)
valuation.circulating_market_cap, # 流通市值(亿元)
# 估值指标
valuation.pe_ratio, # 市盈率(PE)
valuation.pb_ratio, # 市净率(PB)
valuation.ps_ratio, # 市销率(PS)
valuation.pcf_ratio, # 市现率(PCF)
# 盈利指标
indicator.roe, # 净资产收益率(ROE)
indicator.roa, # 总资产收益率(ROA)
indicator.eps, # 每股收益
indicator.operating_profit, # 营业利润
# 财务数据
income.total_operating_revenue, # 营业总收入
income.net_profit, # 净利润
balance.total_assets, # 总资产
balance.total_liability, # 总负债
cash_flow.net_operate_cash_flow # 经营性现金流净额
).filter(
# 筛选条件
valuation.market_cap.between(50, 2000), # 市值在50-2000亿之间
valuation.pe_ratio > 0, # PE大于0
valuation.pe_ratio < 50, # PE小于50
indicator.roe > 0.1, # ROE大于10%
income.net_profit > 0 # 净利润为正
).order_by(
valuation.market_cap.asc() # 按市值升序排列
).limit(100) # 限制返回100只股票
# 执行查询
df = get_fundamentals(q, date=context.previous_date)
return df
# 动态筛选示例
def select_quality_stocks(context, stock_pool):
"""选择优质股票"""
q = query(
valuation.code,
valuation.market_cap,
valuation.pe_ratio,
indicator.roe,
indicator.inc_revenue_year_on_year, # 营收同比增长率
indicator.inc_net_profit_year_on_year # 净利润同比增长率
).filter(
valuation.code.in_(stock_pool), # 在指定股票池内筛选
valuation.pe_ratio.between(5, 30), # PE在合理区间
indicator.roe > 0.15, # ROE大于15%
indicator.inc_revenue_year_on_year > 0.1, # 营收增长大于10%
indicator.inc_net_profit_year_on_year > 0.1 # 净利润增长大于10%
).order_by(
indicator.roe.desc() # 按ROE降序排列
).limit(50)
df = get_fundamentals(q)
return list(df.code)
4.3.2 get_valuation() – 估值数据
API函数:
get_valuation(security, start_date, end_date, fields)
获取股票估值数据的时间序列。
# 获取单只股票估值历史
valuation_data = get_valuation(
security='000001.XSHE',
start_date='2023-01-01',
end_date='2023-12-31',
fields=['market_cap', 'pe_ratio', 'pb_ratio', 'turnover_ratio']
)
# 获取多只股票当前估值
stocks = ['000001.XSHE', '000002.XSHE']
current_valuation = get_valuation(
security=stocks,
start_date=context.previous_date,
end_date=context.previous_date
)
4.4 股票基础信息
4.4.1 get_all_securities() – 获取证券列表
API函数:
get_all_securities(types, date)
获取所有或指定类型的证券信息。
# 获取所有股票
all_stocks = get_all_securities(types=['stock'])
stock_codes = list(all_stocks.index)
# 获取不同类型证券
etfs = get_all_securities(types=['etf']) # ETF
funds = get_all_securities(types=['fund']) # 基金
bonds = get_all_securities(types=['bond']) # 债券
# 获取指定日期的证券列表
stocks_2023 = get_all_securities(types=['stock'], date='2023-12-31')
# 获取混合类型
all_traded = get_all_securities(types=['stock', 'etf', 'fund'])
4.4.2 get_index_stocks() – 指数成分股
API函数:
get_index_stocks(index_symbol, date)
获取指数的成分股列表。
# 主要指数成分股
hs300_stocks = get_index_stocks('000300.XSHG') # 沪深300
csi500_stocks = get_index_stocks('000905.XSHG') # 中证500
sz50_stocks = get_index_stocks('000016.XSHG') # 上证50
gem_stocks = get_index_stocks('399006.XSHE') # 创业板指
# 获取历史成分股
hs300_2023 = get_index_stocks('000300.XSHG', date='2023-01-01')
# 行业指数成分股
bank_stocks = get_index_stocks('000036.XSHG') # 银行指数
tech_stocks = get_index_stocks('000933.XSHG') # 计算机指数
healthcare_stocks = get_index_stocks('000831.XSHG') # 医药指数
4.4.3 get_security_info() – 证券详细信息
API函数:
get_security_info(security, date)
获取单只证券的详细信息。
# 获取股票基本信息
stock_info = get_security_info('000001.XSHE')
print(f"股票名称: {stock_info.display_name}") # 平安银行
print(f"上市日期: {stock_info.start_date}") # 上市日期
print(f"退市日期: {stock_info.end_date}") # 退市日期(如果已退市)
print(f"证券类型: {stock_info.type}") # 证券类型
# 批量获取股票信息
def get_stocks_info(stock_list):
"""批量获取股票信息"""
stocks_info = {}
for stock in stock_list:
info = get_security_info(stock)
stocks_info[stock] = {
'name': info.display_name,
'start_date': info.start_date,
'type': info.type
}
return stocks_info
4.5 其他重要数据API
4.5.1 get_trade_days() – 交易日历
API函数:
get_trade_days(start_date, end_date, count)
获取交易日历信息。
# 获取指定期间的交易日
trade_days = get_trade_days(start_date='2023-01-01', end_date='2023-12-31')
# 获取最近N个交易日
recent_days = get_trade_days(end_date='2023-12-31', count=252) # 最近252个交易日
# 实用函数
def is_trading_day(date):
"""判断是否为交易日"""
trade_days = get_trade_days(start_date=date, end_date=date)
return len(trade_days) > 0
def get_previous_trading_day(date, n=1):
"""获取前N个交易日"""
trade_days = get_trade_days(end_date=date, count=n+1)
return trade_days[^1_0] if len(trade_days) > n else None
4.5.2 get_extras() – 特殊数据
API函数:
get_extras(info, security_list, start_date, end_date, df, count)
获取股票的特殊属性数据。
# 获取ST股票信息
st_data = get_extras('is_st', stock_list, count=1, end_date=context.previous_date)
# 获取停牌信息
paused_data = get_extras('paused', stock_list, count=1, end_date=context.previous_date)
# 获取涨跌停价格
limit_data = get_extras(['high_limit', 'low_limit'], stock_list, count=1, end_date=context.previous_date)
# 获取分红送股信息
dividend_data = get_extras('factor', stock_list, start_date='2023-01-01', end_date='2023-12-31')
5. 交易执行API指南
5.1 基础下单函数
5.1.1 order() – 基础下单
API函数:
order(security, amount, style, side)
最基础的下单函数,按股数下单。
# 基础用法
order('000001.XSHE', 1000) # 买入1000股
order('000001.XSHE', -500) # 卖出500股
# 使用不同订单类型
order('000001.XSHE', 1000, MarketOrderStyle()) # 市价单(默认)
order('000001.XSHE', 1000, LimitOrderStyle(12.50)) # 限价单,限价12.50元
# 下单示例函数
def place_order_example(security, amount, price=None):
"""下单示例函数"""
if price is None:
# 市价单
order_obj = order(security, amount)
else:
# 限价单
order_obj = order(security, amount, LimitOrderStyle(price))
if order_obj:
log.info(f"下单成功: {security}, 数量: {amount}")
return order_obj
else:
log.warning(f"下单失败: {security}")
return None
5.1.2 order_value() – 按金额下单
API函数:
order_value(security, value, style)
按金额下单,系统自动计算股数。
# 按金额买入
order_value('000001.XSHE', 10000) # 买入1万元的股票
order_value('000001.XSHE', -5000) # 卖出5000元的股票
# 动态资金分配
def allocate_funds(context, stock_list):
"""动态资金分配示例"""
if not stock_list:
return
# 计算每只股票分配的资金
available_cash = context.portfolio.available_cash
cash_per_stock = available_cash * 0.8 / len(stock_list) # 80%资金投入
for stock in stock_list:
order_value(stock, cash_per_stock)
log.info(f"买入 {stock}: {cash_per_stock:.2f}元")
5.1.3 order_target() – 目标持仓下单
API函数:
order_target(security, amount, style)
调整持仓到目标数量,推荐使用。
# 调整到目标股数
order_target('000001.XSHE', 1000) # 持仓调整到1000股
order_target('000001.XSHE', 0) # 清仓
# 智能调仓示例
def smart_rebalance(context, target_positions):
"""智能调仓函数"""
current_positions = context.portfolio.positions
# 处理卖出(先卖后买,释放资金)
for stock in current_positions:
if stock not in target_positions:
order_target(stock, 0)
log.info(f"清仓: {stock}")
# 处理买入和调整
for stock, target_amount in target_positions.items():
current_amount = current_positions.get(stock, type('obj', (object,), {'total_amount': 0})).total_amount
if current_amount != target_amount:
order_target(stock, target_amount)
log.info(f"调整 {stock}: {current_amount} -> {target_amount}")
5.1.4 order_target_value() – 目标市值下单
API函数:
order_target_value(security, value, style)
调整持仓到目标市值,最常用的下单函数。
# 调整到目标市值
order_target_value('000001.XSHE', 20000) # 持仓调整到2万元
order_target_value('000001.XSHE', 0) # 清仓
# 等权重投资组合
def equal_weight_portfolio(context, stock_list):
"""等权重投资组合"""
if not stock_list:
return
# 计算每只股票的目标市值
total_value = context.portfolio.total_value
target_value_per_stock = total_value * 0.9 / len(stock_list) # 90%资金投入
# 先清理不在目标列表的股票
for stock in context.portfolio.positions:
if stock not in stock_list:
order_target_value(stock, 0)
# 调整目标股票到等权重
for stock in stock_list:
order_target_value(stock, target_value_per_stock)
log.info(f"调整 {stock} 到目标市值: {target_value_per_stock:.2f}")
5.1.5 order_target_percent() – 按比例下单
API函数:
order_target_percent(security, percent, style)
按总资产比例调整持仓。
# 按比例调整持仓
order_target_percent('000001.XSHE', 0.1) # 调整到总资产的10%
order_target_percent('000001.XSHE', 0.05) # 调整到总资产的5%
# 风险控制的投资组合
def risk_controlled_portfolio(context, stock_weights):
"""风险控制的投资组合"""
# stock_weights: {股票代码: 权重}
# 检查权重总和
total_weight = sum(stock_weights.values())
if total_weight > 1.0:
log.warning(f"权重总和超过100%: {total_weight:.2%}, 进行等比缩放")
stock_weights = {k: v/total_weight for k, v in stock_weights.items()}
# 按权重调整持仓
for stock, weight in stock_weights.items():
order_target_percent(stock, weight)
log.info(f"调整 {stock} 到权重: {weight:.2%}")
5.2 高级下单功能
5.2.1 安全下单函数
def safe_order_target_value(security, value, max_retry=3):
"""安全的目标市值下单函数"""
current_data = get_current_data()
# 基础检查
if current_data[security].paused:
log.warning(f"{security} 停牌,无法交易")
return None
if current_data[security].is_st:
log.warning(f"{security} ST股票,建议谨慎交易")
# 涨跌停检查
last_price = current_data[security].last_price
high_limit = current_data[security].high_limit
low_limit = current_data[security].low_limit
if value > 0: # 买入检查
if last_price >= high_limit:
log.warning(f"{security} 涨停,无法买入")
return None
else: # 卖出检查
if last_price <= low_limit:
log.warning(f"{security} 跌停,可能无法卖出")
# 执行下单
for attempt in range(max_retry):
try:
order_obj = order_target_value(security, value)
if order_obj:
action = "买入" if value > 0 else "卖出" if value == 0 else "调整"
log.info(f"{action} {security}: 目标市值 {value:.2f}")
return order_obj
else:
log.warning(f"下单失败 {security}, 尝试 {attempt + 1}/{max_retry}")
except Exception as e:
log.error(f"下单异常 {security}: {str(e)}")
if attempt < max_retry - 1:
import time
time.sleep(1) # 等待1秒后重试
return None
def batch_safe_order(context, target_positions):
"""批量安全下单"""
success_count = 0
total_count = len(target_positions)
# 先处理卖出
for stock in context.portfolio.positions:
if stock not in target_positions:
if safe_order_target_value(stock, 0):
success_count += 1
# 再处理买入和调整
for stock, value in target_positions.items():
if safe_order_target_value(stock, value):
success_count += 1
success_rate = success_count / (total_count + len(context.portfolio.positions))
log.info(f"批量下单完成,成功率: {success_rate:.2%}")
return success_rate > 0.8 # 成功率超过80%认为成功
5.3 订单管理
5.3.1 订单查询
API函数:
get_orders()
,
get_open_orders()
,
get_trades()
def manage_orders(context):
"""订单管理示例"""
# 获取所有订单
all_orders = get_orders()
log.info(f"总订单数: {len(all_orders)}")
# 获取未完成订单
open_orders = get_open_orders()
log.info(f"未完成订单数: {len(open_orders)}")
# 检查订单状态
for order_id, order_obj in open_orders.items():
log.info(f"订单 {order_id}:")
log.info(f" 股票: {order_obj.security}")
log.info(f" 委托数量: {order_obj.amount}")
log.info(f" 成交数量: {order_obj.filled}")
log.info(f" 订单状态: {order_obj.status}")
log.info(f" 委托时间: {order_obj.add_time}")
# 取消超时订单(示例:30分钟未成交)
time_diff = (context.current_dt - order_obj.add_time).total_seconds()
if time_diff > 1800: # 30分钟 = 1800秒
cancel_order(order_obj)
log.info(f"取消超时订单: {order_id}")
# 获取成交记录
trades = get_trades()
total_trade_value = 0
for trade_id, trade_obj in trades.items():
trade_value = abs(trade_obj.price * trade_obj.amount)
total_trade_value += trade_value
log.info(f"成交: {trade_obj.security}, 价格: {trade_obj.price}, 数量: {trade_obj.amount}")
log.info(f"当日总交易金额: {total_trade_value:.2f}")
5.3.2 订单状态处理
from jqdata import OrderStatus
def check_order_status(order_obj):
"""检查订单状态"""
if order_obj is None:
return "订单创建失败"
if order_obj.status == OrderStatus.open:
return "订单已提交,等待成交"
elif order_obj.status == OrderStatus.filled:
return "订单已完全成交"
elif order_obj.status == OrderStatus.canceled:
return "订单已取消"
elif order_obj.status == OrderStatus.rejected:
return "订单被拒绝"
elif order_obj.status == OrderStatus.held:
return "订单部分成交"
else:
return f"未知状态: {order_obj.status}"
def handle_order_result(context, security, order_obj):
"""处理下单结果"""
status_msg = check_order_status(order_obj)
log.info(f"{security} 下单结果: {status_msg}")
if order_obj and order_obj.status == OrderStatus.filled:
# 订单完全成交,更新记录
g.trade_count += 1
log.info(f"成交确认: {security}, 成交价: {order_obj.price}, 成交量: {order_obj.filled}")
return True
elif order_obj and order_obj.status == OrderStatus.rejected:
# 订单被拒绝,记录原因
log.error(f"订单被拒绝: {security}, 可能原因: 资金不足或股票停牌")
return False
return None # 订单正在处理中
5.4 仓位管理
5.4.1 仓位信息查询
def analyze_portfolio(context):
"""投资组合分析"""
portfolio = context.portfolio
# 基础信息
log.info("=== 投资组合分析 ===")
log.info(f"总资产: {portfolio.total_value:.2f}")
log.info(f"可用现金: {portfolio.available_cash:.2f}")
log.info(f"持仓市值: {portfolio.positions_value:.2f}")
log.info(f"当日收益: {portfolio.daily_returns:.4f}")
log.info(f"累计收益率: {(portfolio.total_value / portfolio.starting_cash - 1):.4f}")
# 现金比例
cash_ratio = portfolio.available_cash / portfolio.total_value
log.info(f"现金比例: {cash_ratio:.2%}")
# 持仓详情
log.info("=== 持仓详情 ===")
positions = portfolio.positions
if not positions:
log.info("无持仓")
return
total_profit = 0
for stock, position in positions.items():
# 计算盈亏
profit = position.value - position.avg_cost * position.total_amount
profit_rate = (position.price / position.avg_cost - 1) * 100
# 持仓权重
weight = position.value / portfolio.total_value
log.info(f"{stock}:")
log.info(f" 持仓数量: {position.total_amount}")
log.info(f" 可卖数量: {position.closeable_amount}")
log.info(f" 平均成本: {position.avg_cost:.2f}")
log.info(f" 当前价格: {position.price:.2f}")
log.info(f" 持仓市值: {position.value:.2f}")
log.info(f" 持仓权重: {weight:.2%}")
log.info(f" 盈亏金额: {profit:.2f}")
log.info(f" 盈亏比例: {profit_rate:.2f}%")
total_profit += profit
log.info(f"总盈亏: {total_profit:.2f}")
5.4.2 动态仓位管理
def dynamic_position_management(context):
"""动态仓位管理"""
# 根据市场状况调整总仓位
market_condition = assess_market_condition(context)
if market_condition == "强势":
max_position = 0.95 # 95%仓位
elif market_condition == "正常":
max_position = 0.80 # 80%仓位
elif market_condition == "弱势":
max_position = 0.60 # 60%仓位
else: # 极弱
max_position = 0.30 # 30%仓位
# 当前仓位比例
current_position = context.portfolio.positions_value / context.portfolio.total_value
log.info(f"市场状况: {market_condition}")
log.info(f"当前仓位: {current_position:.2%}")
log.info(f"目标仓位: {max_position:.2%}")
# 如果当前仓位超过目标仓位,进行减仓
if current_position > max_position:
reduce_ratio = max_position / current_position
log.info(f"需要减仓,减仓比例: {(1-reduce_ratio):.2%}")
# 对所有持仓按比例减仓
for stock, position in context.portfolio.positions.items():
new_value = position.value * reduce_ratio
order_target_value(stock, new_value)
log.info(f"减仓 {stock}: {position.value:.2f} -> {new_value:.2f}")
def assess_market_condition(context):
"""评估市场状况"""
# 获取大盘指数数据
index_data = get_price('000300.XSHG', count=20, end_date=context.previous_date, fields=['close'])
if len(index_data) < 20:
return "正常"
# 计算指标
current_price = index_data['close'].iloc[-1]
ma5 = index_data['close'].tail(5).mean()
ma20 = index_data['close'].tail(20).mean()
# 简单的市场状况判断
if current_price > ma5 > ma20 and current_price > ma20 * 1.05:
return "强势"
elif current_price > ma20:
return "正常"
elif current_price > ma20 * 0.95:
return "弱势"
else:
return "极弱"
6. 时间调度管理
6.1 定时执行API
6.1.1 run_daily() – 每日执行
API函数:
run_daily(func, time, reference_security)
设置每日定时执行的函数。
def initialize(context):
"""定时任务设置示例"""
# 开盘前准备工作
run_daily(before_market_open, time='09:00')
run_daily(prepare_data, time='before_open') # 使用系统预定义时间
# 开盘后交易执行
run_daily(trade_stocks, time='09:30')
run_daily(monitor_positions, time='10:30') # 开盘后监控
# 收盘前最后检查
run_daily(final_check, time='14:50')
# 收盘后分析
run_daily(after_market_close, time='after_close')
run_daily(daily_report, time='16:00')
# 预定义时间选项
# 'before_open' - 开盘前
# 'open' - 开盘时
# 'after_close' - 收盘后
# '09:30' - 具体时间点(24小时制)
# 'every_bar' - 每个数据周期
def before_market_open(context):
"""开盘前执行的函数"""
log.info("=== 开盘前准备 ===")
# 更新股票池
g.target_stocks = select_target_stocks(context)
log.info(f"今日目标股票池: {len(g.target_stocks)}只")
# 检查持仓状态
check_current_positions(context)
# 预计算交易计划
g.trade_plan = generate_trade_plan(context)
def trade_stocks(context):
"""主要交易执行函数"""
log.info("=== 执行交易 ===")
# 执行预定的交易计划
execute_trade_plan(context, g.trade_plan)
# 记录交易结果
record_trade_results(context)
6.1.2 run_weekly() – 每周执行
API函数:
run_weekly(func, weekday, time, reference_security)
设置每周定时执行的函数。
def initialize(context):
"""每周执行示例"""
# 每周一开盘时重新选股和调仓
run_weekly(weekly_rebalance, weekday=1, time='09:30')
# 每周三进行风险检查
run_weekly(risk_assessment, weekday=3, time='14:00')
# 每周五生成周报
run_weekly(weekly_report, weekday=5, time='after_close')
# weekday参数:1=周一, 2=周二, 3=周三, 4=周四, 5=周五
def weekly_rebalance(context):
"""每周调仓"""
log.info("=== 每周调仓 ===")
# 重新筛选股票
new_stock_pool = comprehensive_stock_selection(context)
# 计算新的权重分配
target_weights = calculate_target_weights(context, new_stock_pool)
# 执行调仓
rebalance_portfolio(context, target_weights)
# 记录调仓信息
log.info(f"调仓完成,新持仓数量: {len(target_weights)}")
def comprehensive_stock_selection(context):
"""综合选股函数"""
# 获取基础股票池
base_pool = get_index_stocks('000300.XSHG')
# 基础过滤
filtered_pool = filter_stocks_comprehensive(context, base_pool)
# 基本面筛选
fundamental_pool = fundamental_screening(context, filtered_pool)
# 技术面筛选
technical_pool = technical_screening(context, fundamental_pool)
return technical_pool[:20] # 返回前20只股票
6.1.3 run_monthly() – 每月执行
API函数:
run_monthly(func, monthday, time, reference_security)
设置每月定时执行的函数。
def initialize(context):
"""每月执行示例"""
# 每月第一个交易日进行大调仓
run_monthly(monthly_major_rebalance, monthday=1, time='09:30')
# 每月15日进行策略回顾
run_monthly(strategy_review, monthday=15, time='after_close')
# 每月最后一个交易日生成月报
run_monthly(monthly_report, monthday=-1, time='15:30')
# monthday参数:1-31表示每月第几日,-1表示最后一日
def monthly_major_rebalance(context):
"""每月大调仓"""
log.info("=== 每月大调仓 ===")
# 全面的策略参数调整
adjust_strategy_parameters(context)
# 重新构建投资组合
rebuild_portfolio(context)
# 风险预算重分配
reallocate_risk_budget(context)
def strategy_review(context):
"""策略回顾"""
log.info("=== 策略月度回顾 ===")
# 计算月度业绩
monthly_performance = calculate_monthly_performance(context)
# 分析持仓贡献
position_contribution = analyze_position_contribution(context)
# 风险指标分析
risk_metrics = calculate_risk_metrics(context)
# 记录分析结果
record(
月度收益率=monthly_performance['return'],
最大回撤=risk_metrics['max_drawdown'],
夏普比率=risk_metrics['sharpe_ratio']
)
6.2 时间管理工具
6.2.1 时间相关属性
def time_management_example(context):
"""时间管理示例"""
# 获取当前时间信息
current_time = context.current_dt
previous_date = context.previous_date
log.info(f"当前时间: {current_time}")
log.info(f"前一交易日: {previous_date}")
# 时间格式转换
current_date = current_time.date() # 转换为日期
current_time_only = current_time.time() # 转换为时间
current_str = str(current_time) # 转换为字符串
# 时间计算
from datetime import timedelta
one_week_ago = current_date - timedelta(days=7)
# 判断交易时段
if current_time_only < pd.Timestamp('09:30').time():
log.info("开盘前")
elif current_time_only < pd.Timestamp('11:30').time():
log.info("上午交易时段")
elif current_time_only < pd.Timestamp('13:00').time():
log.info("午间休息")
elif current_time_only < pd.Timestamp('15:00').time():
log.info("下午交易时段")
else:
log.info("收盘后")
def is_month_end(context):
"""判断是否为月末"""
current_date = context.current_dt.date()
# 获取下一个交易日
next_trade_days = get_trade_days(start_date=current_date, count=2)
if len(next_trade_days) < 2:
return True # 如果没有下一个交易日,说明是最后一天
next_day = next_trade_days[^1_1]
# 如果下一个交易日是下个月,则今天是月末
return current_date.month != next_day.month
def is_quarter_end(context):
"""判断是否为季末"""
current_date = context.current_dt.date()
return current_date.month % 3 == 0 and is_month_end(context)
6.2.2 条件触发执行
def conditional_execution_example(context):
"""条件触发执行示例"""
# 基于时间的条件执行
if context.current_dt.day == 1:
monthly_tasks(context)
# 基于市场条件的执行
market_volatility = calculate_market_volatility(context)
if market_volatility > 0.3: # 波动率超过30%
high_volatility_strategy(context)
# 基于组合状态的执行
drawdown = calculate_current_drawdown(context)
if drawdown > 0.1: # 回撤超过10%
emergency_risk_control(context)
# 基于持仓时间的执行
check_position_holding_period(context)
def check_position_holding_period(context):
"""检查持仓时间"""
max_holding_days = 60 # 最大持仓天数
for stock, position in context.portfolio.positions.items():
holding_days = (context.current_dt.date() - position.init_time.date()).days
if holding_days > max_holding_days:
log.info(f"{stock} 持仓时间过长({holding_days}天),考虑减仓")
# 可以选择减仓或清仓
current_value = position.value
order_target_value(stock, current_value * 0.5) # 减仓50%
def emergency_risk_control(context):
"""紧急风险控制"""
log.warning("触发紧急风控措施")
# 降低仓位到50%
for stock, position in context.portfolio.positions.items():
new_value = position.value * 0.5
order_target_value(stock, new_value)
# 记录风控触发
g.emergency_control_triggered = True
g.emergency_trigger_time = context.current_dt
6.3 自定义调度策略
def advanced_scheduling_strategy(context):
"""高级调度策略"""
# 动态调度频率
volatility = calculate_market_volatility(context)
if volatility > 0.4:
# 高波动期间,增加监控频率
if not hasattr(g, 'high_vol_monitoring'):
g.high_vol_monitoring = True
# 注意:在运行时不能添加新的定时任务,这里仅作为示例
log.info("进入高波动监控模式")
# 基于业绩的调度
recent_performance = calculate_recent_performance(context)
if recent_performance < -0.05: # 近期表现不佳
if not hasattr(g, 'last_strategy_review'):
g.last_strategy_review = context.current_dt.date()
days_since_review = (context.current_dt.date() - g.last_strategy_review).days
if days_since_review > 5: # 5天后重新评估策略
review_and_adjust_strategy(context)
g.last_strategy_review = context.current_dt.date()
def calculate_recent_performance(context, days=10):
"""计算近期表现"""
current_value = context.portfolio.total_value
if not hasattr(g, 'value_history'):
g.value_history = []
# 记录每日净值
g.value_history.append(current_value)
# 保持最近的记录
if len(g.value_history) > days:
g.value_history.pop(0)
if len(g.value_history) >= days:
return (current_value / g.value_history[^1_0]) - 1
return 0
def review_and_adjust_strategy(context):
"""策略回顾和调整"""
log.info("=== 策略回顾和调整 ===")
# 分析近期表现不佳的原因
analyze_poor_performance(context)
# 调整策略参数
adjust_strategy_parameters(context)
# 可能的操作:
# 1. 降低仓位
# 2. 调整选股条件
# 3. 修改止损止盈参数
# 4. 暂停交易等待更好时机
log.info("策略调整完成")
7. 因子分析与技术指标
7.1 JQFactor因子库
7.1.1 get_factor_values() – 因子数据获取
API函数:
get_factor_values(securities, factors, start_date, end_date, count)
聚宽内置因子库的核心函数。
from jqfactor import get_factor_values
# 基础用法
def get_factor_data_example(context, stock_list):
"""因子数据获取示例"""
# 获取单个因子
factor_data = get_factor_values(
securities=stock_list,
factors=['ROE'], # 注意:因子名称大小写敏感
start_date='2023-01-01',
end_date='2023-12-31',
count=20 # 或使用count获取最近N期数据
)
# 获取多个因子
multi_factor_data = get_factor_values(
securities=stock_list,
factors=['ROE', 'ROA', 'PE', 'PB', 'market_cap'],
end_date=context.previous_date,
count=1 # 获取最新数据
)
return multi_factor_data
# 常用因子分类和名称对照
FACTOR_CATEGORIES = {
# 估值因子
'valuation': [
'PE', # 市盈率
'PB', # 市净率
'PS', # 市销率
'PCF', # 市现率
'PEG', # 市盈增长比
'EV_EBITDA' # 企业价值倍数
],
# 盈利因子
'profitability': [
'ROE', # 净资产收益率
'ROA', # 总资产收益率
'gross_profit_margin', # 毛利率
'net_profit_margin', # 净利率
'operating_profit_margin', # 营业利润率
'EBITDA_margin' # EBITDA利润率
],
# 成长因子
'growth': [
'sales_growth', # 营业收入增长率
'net_profit_growth_rate', # 净利润增长率
'total_asset_growth_rate', # 总资产增长率
'operating_revenue_growth_rate', # 营业收入增长率
'eps_growth' # 每股收益增长率
],
# 质量因子
'quality': [
'current_ratio', # 流动比率
'quick_ratio', # 速动比率
'debt_to_equity', # 负债权益比
'asset_turnover', # 资产周转率
'inventory_turnover' # 存货周转率
],
# 技术因子
'technical': [
'RSI', # 相对强弱指数
'MACD', # MACD指标
'BIAS', # 乖离率
'momentum_1m', # 1个月动量
'momentum_3m', # 3个月动量
'volatility_1m' # 1个月波动率
],
# 规模因子
'size': [
'market_cap', # 总市值
'circulating_market_cap', # 流通市值
'total_market_value' # 总市值
]
}
7.1.2 因子处理和分析
def factor_analysis_comprehensive(context, stock_pool):
"""综合因子分析"""
# 选择要分析的因子
selected_factors = [
'ROE', 'ROA', 'PE', 'PB',
'sales_growth', 'net_profit_growth_rate',
'current_ratio', 'market_cap'
]
# 获取因子数据
factor_data = get_factor_values(
securities=stock_pool,
factors=selected_factors,
end_date=context.previous_date,
count=1
)
# 转换为DataFrame格式便于处理
df = pd.DataFrame(index=stock_pool)
for factor in selected_factors:
if factor in factor_data:
df[factor] = factor_data[factor].iloc[^1_0]
# 数据清洗
df = clean_factor_data(df)
# 因子处理
df_processed = process_factors(df)
# 因子合成
composite_score = calculate_composite_score(df_processed)
return composite_score.sort_values(ascending=False)
def clean_factor_data(df):
"""因子数据清洗"""
# 处理缺失值
df_cleaned = df.copy()
# 删除缺失值过多的股票(缺失超过50%)
missing_ratio = df_cleaned.isnull().sum(axis=1) / df_cleaned.shape[^1_1]
df_cleaned = df_cleaned[missing_ratio < 0.5]
# 对剩余缺失值进行填充
for column in df_cleaned.columns:
if df_cleaned[column].dtype in ['float64', 'int64']:
# 数值型数据用中位数填充
median_value = df_cleaned[column].median()
df_cleaned[column].fillna(median_value, inplace=True)
return df_cleaned
def process_factors(df):
"""因子处理:去极值、标准化"""
df_processed = df.copy()
for column in df_processed.columns:
if df_processed[column].dtype in ['float64', 'int64']:
# 去极值(3倍标准差)
mean_val = df_processed[column].mean()
std_val = df_processed[column].std()
upper_limit = mean_val + 3 * std_val
lower_limit = mean_val - 3 * std_val
df_processed[column] = df_processed[column].clip(
lower=lower_limit, upper=upper_limit
)
# 标准化(Z-score)
df_processed[column] = (
df_processed[column] - df_processed[column].mean()
) / df_processed[column].std()
return df_processed
def calculate_composite_score(df_processed):
"""计算综合得分"""
# 定义因子权重
factor_weights = {
'ROE': 0.2,
'ROA': 0.15,
'PE': -0.1, # PE越低越好,负权重
'PB': -0.05, # PB越低越好,负权重
'sales_growth': 0.2,
'net_profit_growth_rate': 0.2,
'current_ratio': 0.1,
'market_cap': -0.1 # 小市值效应,负权重
}
# 计算加权得分
composite_scores = pd.Series(0, index=df_processed.index)
for factor, weight in factor_weights.items():
if factor in df_processed.columns:
composite_scores += df_processed[factor] * weight
return composite_scores
7.2 技术指标计算
7.2.1 使用talib库
import talib
def technical_indicators_talib(context, security):
"""使用talib计算技术指标"""
# 获取价格数据
price_data = attribute_history(security, 60, '1d',
['open', 'high', 'low', 'close', 'volume'])
if len(price_data) < 30:
return None
# 提取价格序列
open_prices = price_data['open'].values
high_prices = price_data['high'].values
low_prices = price_data['low'].values
close_prices = price_data['close'].values
volume = price_data['volume'].values
indicators = {}
# 移动平均线
indicators['MA5'] = talib.SMA(close_prices, timeperiod=5)[-1]
indicators['MA20'] = talib.SMA(close_prices, timeperiod=20)[-1]
indicators['MA60'] = talib.SMA(close_prices, timeperiod=60)[-1]
indicators['EMA12'] = talib.EMA(close_prices, timeperiod=12)[-1]
indicators['EMA26'] = talib.EMA(close_prices, timeperiod=26)[-1]
# 震荡指标
indicators['RSI'] = talib.RSI(close_prices, timeperiod=14)[-1]
indicators['STOCH_K'], indicators['STOCH_D'] = talib.STOCH(
high_prices, low_prices, close_prices,
fastk_period=9, slowk_period=3, slowd_period=3
)
indicators['STOCH_K'] = indicators['STOCH_K'][-1]
indicators['STOCH_D'] = indicators['STOCH_D'][-1]
# MACD指标
macd, macd_signal, macd_hist = talib.MACD(close_prices,
fastperiod=12,
slowperiod=26,
signalperiod=9)
indicators['MACD'] = macd[-1]
indicators['MACD_SIGNAL'] = macd_signal[-1]
indicators['MACD_HIST'] = macd_hist[-1]
# 布林带
bb_upper, bb_middle, bb_lower = talib.BBANDS(close_prices,
timeperiod=20,
nbdevup=2,
nbdevdn=2)
indicators['BB_UPPER'] = bb_upper[-1]
indicators['BB_MIDDLE'] = bb_middle[-1]
indicators['BB_LOWER'] = bb_lower[-1]
indicators['BB_POSITION'] = (close_prices[-1] - bb_lower[-1]) / (bb_upper[-1] - bb_lower[-1])
# 成交量指标
indicators['OBV'] = talib.OBV(close_prices, volume)[-1]
indicators['AD'] = talib.AD(high_prices, low_prices, close_prices, volume)[-1]
# 波动率指标
indicators['ATR'] = talib.ATR(high_prices, low_prices, close_prices, timeperiod=14)[-1]
# 形态识别(返回最近几天的信号)
indicators['HAMMER'] = talib.CDLHAMMER(open_prices, high_prices, low_prices, close_prices)[-3:]
indicators['DOJI'] = talib.CDLDOJI(open_prices, high_prices, low_prices, close_prices)[-3:]
return indicators
def generate_technical_signals(indicators):
"""基于技术指标生成交易信号"""
signals = {
'ma_signal': 0, # 均线信号
'rsi_signal': 0, # RSI信号
'macd_signal': 0, # MACD信号
'bb_signal': 0, # 布林带信号
'overall_signal': 0 # 综合信号
}
# 均线信号
if indicators['MA5'] > indicators['MA20'] > indicators['MA60']:
signals['ma_signal'] = 1 # 多头排列
elif indicators['MA5'] < indicators['MA20'] < indicators['MA60']:
signals['ma_signal'] = -1 # 空头排列
# RSI信号
if indicators['RSI'] < 30:
signals['rsi_signal'] = 1 # 超卖
elif indicators['RSI'] > 70:
signals['rsi_signal'] = -1 # 超买
# MACD信号
if indicators['MACD'] > indicators['MACD_SIGNAL'] and indicators['MACD_HIST'] > 0:
signals['macd_signal'] = 1 # 金叉且在零轴上方
elif indicators['MACD'] < indicators['MACD_SIGNAL'] and indicators['MACD_HIST'] < 0:
signals['macd_signal'] = -1 # 死叉且在零轴下方
# 布林带信号
if indicators['BB_POSITION'] < 0.2:
signals['bb_signal'] = 1 # 接近下轨
elif indicators['BB_POSITION'] > 0.8:
signals['bb_signal'] = -1 # 接近上轨
# 综合信号
signal_sum = sum([signals['ma_signal'], signals['rsi_signal'],
signals['macd_signal'], signals['bb_signal']])
if signal_sum >= 2:
signals['overall_signal'] = 1 # 买入信号
elif signal_sum <= -2:
signals['overall_signal'] = -1 # 卖出信号
return signals
7.2.2 使用jqlib技术分析
from jqlib.technical_analysis import *
def technical_indicators_jqlib(context, stock_list):
"""使用jqlib计算技术指标"""
check_date = context.previous_date
indicators_data = {}
for security in stock_list:
try:
indicators = {}
# 移动平均线
ma_result = MA(security, check_date=check_date, timeperiod=20, unit='1d')
indicators['MA20'] = ma_result.get(security, 0)
# MACD
macd_result = MACD(security, check_date=check_date,
SHORT=12, LONG=26, MID=9, unit='1d')
indicators['MACD'] = macd_result[^1_0].get(security, 0) # DIF
indicators['MACD_DEA'] = macd_result[^1_1].get(security, 0) # DEA
indicators['MACD_HIST'] = indicators['MACD'] - indicators['MACD_DEA']
# RSI
rsi_result = RSI(security, check_date=check_date, N1=14, unit='1d')
indicators['RSI'] = rsi_result.get(security, 50)
# KDJ
kdj_result = KDJ(security, check_date=check_date, N=9, M1=3, M2=3, unit='1d')
indicators['KDJ_K'] = kdj_result[^1_0].get(security, 50)
indicators['KDJ_D'] = kdj_result[^1_1].get(security, 50)
indicators['KDJ_J'] = 3 * indicators['KDJ_K'] - 2 * indicators['KDJ_D']
# 布林带
boll_result = Bollinger_Bands(security, check_date=check_date,
timeperiod=20, nbdevup=2, nbdevdn=2, unit='1d')
indicators['BOLL_UPPER'] = boll_result[^1_0].get(security, 0)
indicators['BOLL_MIDDLE'] = boll_result[^1_1].get(security, 0)
indicators['BOLL_LOWER'] = boll_result[^1_2].get(security, 0)
indicators_data[security] = indicators
except Exception as e:
log.warning(f"计算 {security} 技术指标时出错: {str(e)}")
continue
return indicators_data
def create_technical_score(indicators_data):
"""基于技术指标创建综合得分"""
scores = {}
for security, indicators in indicators_data.items():
score = 0
# 获取当前价格
current_data = get_current_data()
current_price = current_data[security].last_price
# 均线得分
if current_price > indicators.get('MA20', current_price):
score += 1
# MACD得分
if indicators.get('MACD', 0) > indicators.get('MACD_DEA', 0):
score += 1
# RSI得分
rsi = indicators.get('RSI', 50)
if 30 < rsi < 70: # RSI在合理区间
score += 1
elif rsi < 30: # 超卖
score += 2
elif rsi > 70: # 超买
score -= 1
# KDJ得分
kdj_k = indicators.get('KDJ_K', 50)
kdj_d = indicators.get('KDJ_D', 50)
if kdj_k > kdj_d and kdj_k < 80: # 金叉且不在超买区
score += 1
# 布林带得分
boll_upper = indicators.get('BOLL_UPPER', 0)
boll_lower = indicators.get('BOLL_LOWER', 0)
if boll_upper > 0 and boll_lower > 0:
bb_position = (current_price - boll_lower) / (boll_upper - boll_lower)
if bb_position < 0.3: # 靠近下轨
score += 1
elif bb_position > 0.7: # 靠近上轨
score -= 1
scores[security] = score
return scores
7.3 自定义技术指标
def custom_technical_indicators(context, security, period=20):
"""自定义技术指标计算"""
# 获取足够的历史数据
data = attribute_history(security, period * 3, '1d',
['open', 'high', 'low', 'close', 'volume'])
if len(data) < period:
return None
indicators = {}
# 自定义动量指标
indicators['momentum'] = calculate_momentum(data['close'], period)
# 自定义波动率指标
indicators['volatility'] = calculate_volatility(data['close'], period)
# 自定义趋势强度指标
indicators['trend_strength'] = calculate_trend_strength(data['close'], period)
# 自定义成交量价格指标
indicators['volume_price_trend'] = calculate_vpt(data['close'], data['volume'])
# 自定义支撑阻力指标
indicators['support_resistance'] = calculate_support_resistance(
data['high'], data['low'], data['close'], period
)
return indicators
def calculate_momentum(prices, period=20):
"""计算动量指标"""
if len(prices) < period + 1:
return 0
current_price = prices.iloc[-1]
past_price = prices.iloc[-period-1]
return (current_price / past_price - 1) * 100
def calculate_volatility(prices, period=20):
"""计算波动率(年化)"""
if len(prices) < period:
return 0
returns = prices.pct_change().dropna()
if len(returns) < period:
return 0
recent_returns = returns.tail(period)
return recent_returns.std() * (252 ** 0.5) # 年化波动率
def calculate_trend_strength(prices, period=20):
"""计算趋势强度(线性回归R²)"""
if len(prices) < period:
return 0
recent_prices = prices.tail(period)
x = np.arange(len(recent_prices))
y = recent_prices.values
# 线性回归
try:
slope, intercept = np.polyfit(x, y, 1)
y_pred = slope * x + intercept
# 计算R²
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
if ss_tot == 0:
return 0
r_squared = 1 - (ss_res / ss_tot)
return r_squared * (1 if slope > 0 else -1) # 上升趋势为正,下降为负
except:
return 0
def calculate_vpt(prices, volumes):
"""计算成交量价格趋势指标"""
if len(prices) < 2 or len(volumes) < 2:
return 0
price_changes = prices.pct_change().fillna(0)
vpt = (price_changes * volumes).cumsum()
return vpt.iloc[-1] if len(vpt) > 0 else 0
def calculate_support_resistance(highs, lows, closes, period=20):
"""计算支撑阻力指标"""
if len(highs) < period:
return {'support': 0, 'resistance': 0, 'position': 0.5}
recent_highs = highs.tail(period)
recent_lows = lows.tail(period)
current_price = closes.iloc[-1]
# 计算支撑阻力位
resistance = recent_highs.max()
support = recent_lows.min()
# 计算当前价格位置
if resistance > support:
position = (current_price - support) / (resistance - support)
else:
position = 0.5
return {
'support': support,
'resistance': resistance,
'position': position
}
def comprehensive_technical_analysis(context, stock_list):
"""综合技术分析"""
technical_scores = {}
for security in stock_list:
try:
# 获取各种技术指标
talib_indicators = technical_indicators_talib(context, security)
custom_indicators = custom_technical_indicators(context, security)
if talib_indicators is None or custom_indicators is None:
continue
# 综合评分
score = 0
# 基于talib指标评分
if talib_indicators['RSI'] < 30:
score += 2 # 超卖
elif talib_indicators['RSI'] > 70:
score -= 1 # 超买
if talib_indicators['MACD_HIST'] > 0:
score += 1 # MACD金叉
if talib_indicators['BB_POSITION'] < 0.2:
score += 1 # 靠近布林下轨
# 基于自定义指标评分
if custom_indicators['momentum'] > 5:
score += 1 # 正动量
elif custom_indicators['momentum'] < -5:
score -= 1 # 负动量
if custom_indicators['trend_strength'] > 0.5:
score += 1 # 强趋势
if custom_indicators['support_resistance']['position'] < 0.3:
score += 1 # 接近支撑位
technical_scores[security] = score
except Exception as e:
log.warning(f"分析 {security} 技术指标时出错: {str(e)}")
continue
# 按得分排序
sorted_scores = sorted(technical_scores.items(), key=lambda x: x[^1_1], reverse=True)
return dict(sorted_scores)
8. 数据过滤与风险管理
8.1 股票过滤函数
8.1.1 基础过滤器
def filter_basic_stocks(stock_list, context=None):
"""基础股票过滤器"""
if not stock_list:
return []
current_data = get_current_data()
filtered_stocks = []
for stock in stock_list:
try:
stock_data = current_data[stock]
# 基础过滤条件
if (
<div style="text-align: center">⁂</div>
[^1_1]: 4.md
[^1_2]: 5.md
[^1_3]: API2.md
[^1_4]: API3.md
[^1_5]: Ju-Kuan-Liang-Hua-Jiao-Yi-Ping-Tai-API.md
---
# 聚宽量化交易平台API完整使用指南(续)
## 8. 数据过滤与风险管理(续)
### 8.1 股票过滤函数(续)
```python
def comprehensive_stock_filter(context, stock_list):
"""综合股票过滤函数"""
# 获取当前市场数据
current_data = get_current_data()
filtered_stocks = []
for stock in stock_list:
stock_data = current_data[stock]
# 基础过滤条件
if (not stock_data.paused and # 非停牌
not stock_data.is_st and # 非ST
'ST' not in stock_data.name and # 名称中无ST
'*' not in stock_data.name and # 无星标
'退' not in stock_data.name): # 非退市
# 涨跌停过滤
if (stock_data.low_limit < stock_data.last_price < stock_data.high_limit):
# 成交量过滤(防止流动性不足)
volume_data = attribute_history(stock, 5, '1d', ['volume'])
if len(volume_data) >= 5:
avg_volume = volume_data['volume'].mean()
if avg_volume > 1000000: # 5日平均成交量大于100万股
filtered_stocks.append(stock)
return filtered_stocks
def filter_by_financial_health(context, stock_list):
"""基于财务健康度过滤"""
# 查询财务数据
q = query(
valuation.code,
valuation.pe_ratio,
valuation.pb_ratio,
indicator.roe,
indicator.current_ratio, # 流动比率
indicator.quick_ratio, # 速动比率
balance.total_liability, # 总负债
balance.total_assets # 总资产
).filter(
valuation.code.in_(stock_list)
)
df = get_fundamentals(q, date=context.previous_date)
# 财务健康度筛选
healthy_stocks = df[
(df['pe_ratio'] > 0) & (df['pe_ratio'] < 50) & # 合理PE
(df['pb_ratio'] > 0) & (df['pb_ratio'] < 8) & # 合理PB
(df['roe'] > 0.05) & # ROE大于5%
(df['current_ratio'] > 1.2) & # 流动比率大于1.2
(df['quick_ratio'] > 0.8) & # 速动比率大于0.8
(df['total_liability'] / df['total_assets'] < 0.7) # 资产负债率小于70%
]
return list(healthy_stocks['code'])
def filter_by_momentum(context, stock_list, momentum_days=20):
"""基于动量过滤股票"""
momentum_stocks = []
for stock in stock_list:
try:
prices = get_price(stock, count=momentum_days+1,
end_date=context.previous_date, fields=['close'])
if len(prices) >= momentum_days:
# 计算动量(相对强度)
momentum = (prices['close'].iloc[-1] / prices['close'].iloc[^2_0]) - 1
# 只选择正动量的股票
if momentum > 0.05: # 动量大于5%
momentum_stocks.append(stock)
except:
continue
return momentum_stocks
8.2 行业和板块分析
def get_industry_distribution(stock_list):
"""获取股票的行业分布"""
industry_dict = {}
for stock in stock_list:
try:
# 获取行业信息
info = get_security_info(stock)
industry = getattr(info, 'industry', '未知')
if industry not in industry_dict:
industry_dict[industry] = []
industry_dict[industry].append(stock)
except:
continue
return industry_dict
def limit_industry_concentration(stock_list, max_stocks_per_industry=3):
"""限制单个行业的股票数量"""
industry_dist = get_industry_distribution(stock_list)
balanced_stocks = []
for industry, stocks in industry_dist.items():
# 每个行业最多选择指定数量的股票
selected_stocks = stocks[:max_stocks_per_industry]
balanced_stocks.extend(selected_stocks)
return balanced_stocks
def analyze_sector_rotation(context):
"""行业轮动分析"""
sectors = {
'银行': '801780',
'医药': '801150',
'科技': '801750',
'消费': '801200',
'地产': '801180'
}
sector_performance = {}
for sector_name, sector_code in sectors.items():
try:
# 获取行业指数数据
sector_data = get_price(sector_code, count=20,
end_date=context.previous_date, fields=['close'])
if len(sector_data) >= 20:
# 计算近期表现
recent_return = (sector_data['close'].iloc[-1] /
sector_data['close'].iloc[-5]) - 1 # 5日收益
long_return = (sector_data['close'].iloc[-1] /
sector_data['close'].iloc[^2_0]) - 1 # 20日收益
sector_performance[sector_name] = {
'recent_return': recent_return,
'long_return': long_return,
'momentum': recent_return - long_return
}
except:
continue
return sector_performance
8.3 风险预警系统
def risk_warning_system(context):
"""风险预警系统"""
warnings = []
# 1. 仓位集中度检查
total_value = context.portfolio.total_value
positions = context.portfolio.positions
for stock, position in positions.items():
weight = position.value / total_value
if weight > 0.15: # 单股权重超过15%
warnings.append(f"持仓集中度警告:{stock} 权重 {weight:.2%}")
# 2. 行业集中度检查
industry_weights = calculate_industry_weights(context)
for industry, weight in industry_weights.items():
if weight > 0.4: # 单行业权重超过40%
warnings.append(f"行业集中度警告:{industry} 权重 {weight:.2%}")
# 3. 回撤检查
current_drawdown = calculate_current_drawdown(context)
if current_drawdown > 0.1: # 回撤超过10%
warnings.append(f"回撤警告:当前回撤 {current_drawdown:.2%}")
# 4. 波动率检查
volatility = calculate_portfolio_volatility(context)
if volatility > 0.3: # 年化波动率超过30%
warnings.append(f"波动率警告:当前年化波动率 {volatility:.2%}")
# 输出警告信息
for warning in warnings:
log.warning(warning)
return len(warnings) > 0
def calculate_industry_weights(context):
"""计算行业权重分布"""
industry_weights = {}
total_value = context.portfolio.total_value
for stock, position in context.portfolio.positions.items():
try:
info = get_security_info(stock)
industry = getattr(info, 'industry', '未知')
weight = position.value / total_value
if industry not in industry_weights:
industry_weights[industry] = 0
industry_weights[industry] += weight
except:
continue
return industry_weights
def calculate_current_drawdown(context):
"""计算当前回撤"""
if not hasattr(g, 'max_value_history'):
g.max_value_history = context.portfolio.total_value
current_value = context.portfolio.total_value
g.max_value_history = max(g.max_value_history, current_value)
drawdown = (g.max_value_history - current_value) / g.max_value_history
return drawdown
def calculate_portfolio_volatility(context, days=20):
"""计算组合波动率"""
if not hasattr(g, 'return_history'):
g.return_history = []
# 记录每日收益率
daily_return = context.portfolio.returns
g.return_history.append(daily_return)
# 保持指定天数的历史
if len(g.return_history) > days:
g.return_history.pop(0)
if len(g.return_history) >= days:
# 计算年化波动率
import numpy as np
returns_array = np.array(g.return_history)
volatility = np.std(returns_array) * np.sqrt(252)
return volatility
return 0
9. 实战策略开发
9.1 经典量化策略实现
9.1.1 海龟交易策略
def initialize(context):
"""海龟交易策略初始化"""
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
set_option('avoid_future_data', True)
# 海龟策略参数
g.turtle_params = {
'entry_period': 20, # 入场突破周期
'exit_period': 10, # 出场突破周期
'atr_period': 20, # ATR计算周期
'risk_per_trade': 0.02 # 每笔交易风险比例
}
# 股票池
g.stock_pool = get_index_stocks('000300.XSHG')[:50]
# 设置定时任务
run_daily(turtle_strategy, time='9:30')
def turtle_strategy(context):
"""海龟策略主逻辑"""
for stock in g.stock_pool:
try:
# 获取历史数据
data = get_price(stock, count=g.turtle_params['entry_period']+1,
end_date=context.previous_date,
fields=['high', 'low', 'close'])
if len(data) < g.turtle_params['entry_period']:
continue
# 计算入场和出场信号
entry_signal = check_entry_signal(stock, data)
exit_signal = check_exit_signal(stock, data)
# 执行交易逻辑
execute_turtle_trade(context, stock, data, entry_signal, exit_signal)
except Exception as e:
log.error(f"处理{stock}时发生错误: {e}")
continue
def check_entry_signal(stock, data):
"""检查入场信号"""
current_price = data['high'].iloc[-1]
highest_high = data['high'].iloc[:-1].max() # 前N日最高价
# 突破前N日最高价时入场
return current_price > highest_high
def check_exit_signal(stock, data):
"""检查出场信号"""
current_price = data['low'].iloc[-1]
lowest_low = data['low'].iloc[-g.turtle_params['exit_period']:].min()
# 跌破前N日最低价时出场
return current_price < lowest_low
def calculate_atr(data, period):
"""计算平均真实波幅"""
high = data['high']
low = data['low']
close = data['close']
# 计算真实波幅
tr1 = high - low
tr2 = abs(high - close.shift(1))
tr3 = abs(low - close.shift(1))
true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = true_range.rolling(window=period).mean()
return atr.iloc[-1]
def calculate_position_size(context, stock, data, atr):
"""计算仓位大小"""
# 基于ATR的风险管理
account_value = context.portfolio.total_value
risk_amount = account_value * g.turtle_params['risk_per_trade']
current_price = data['close'].iloc[-1]
# 每股风险 = 2 * ATR
risk_per_share = 2 * atr
if risk_per_share > 0:
shares = int(risk_amount / risk_per_share)
position_value = shares * current_price
return min(position_value, account_value * 0.1) # 限制单股最大10%仓位
return 0
def execute_turtle_trade(context, stock, data, entry_signal, exit_signal):
"""执行海龟交易"""
current_position = context.portfolio.positions.get(stock)
if entry_signal and not current_position:
# 入场逻辑
atr = calculate_atr(data, g.turtle_params['atr_period'])
if atr > 0:
position_value = calculate_position_size(context, stock, data, atr)
if position_value > 1000: # 最小交易金额
order_target_value(stock, position_value)
log.info(f"海龟策略买入: {stock}, 金额: {position_value:.2f}")
elif exit_signal and current_position:
# 出场逻辑
order_target_value(stock, 0)
log.info(f"海龟策略卖出: {stock}")
9.1.2 布林带均值回归策略
def initialize(context):
"""布林带均值回归策略"""
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
# 策略参数
g.bollinger_params = {
'period': 20, # 布林带周期
'std_multiplier': 2.0, # 标准差倍数
'rsi_period': 14, # RSI周期
'rsi_oversold': 30, # RSI超卖阈值
'rsi_overbought': 70 # RSI超买阈值
}
g.stock_pool = get_index_stocks('000300.XSHG')
g.max_positions = 10
run_daily(bollinger_strategy, time='9:30')
def bollinger_strategy(context):
"""布林带策略主逻辑"""
current_positions = len(context.portfolio.positions)
for stock in g.stock_pool:
try:
# 获取历史数据
data = get_price(stock, count=50, end_date=context.previous_date,
fields=['high', 'low', 'close', 'volume'])
if len(data) < g.bollinger_params['period']:
continue
# 计算技术指标
bb_upper, bb_middle, bb_lower = calculate_bollinger_bands(
data['close'], g.bollinger_params['period'],
g.bollinger_params['std_multiplier']
)
rsi = calculate_rsi(data['close'], g.bollinger_params['rsi_period'])
# 生成交易信号
signals = generate_bollinger_signals(
stock, data['close'].iloc[-1], bb_upper, bb_lower, rsi
)
# 执行交易
execute_bollinger_trade(context, stock, signals, current_positions)
except Exception as e:
log.error(f"处理{stock}布林带策略时错误: {e}")
continue
def calculate_bollinger_bands(prices, period, std_multiplier):
"""计算布林带"""
middle = prices.rolling(window=period).mean()
std = prices.rolling(window=period).std()
upper = middle + (std * std_multiplier)
lower = middle - (std * std_multiplier)
return upper.iloc[-1], middle.iloc[-1], lower.iloc[-1]
def calculate_rsi(prices, period):
"""计算RSI指标"""
delta = prices.diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=period).mean()
avg_loss = loss.rolling(window=period).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1]
def generate_bollinger_signals(stock, current_price, bb_upper, bb_lower, rsi):
"""生成布林带交易信号"""
signals = {'buy': False, 'sell': False}
# 买入信号:价格触及下轨且RSI超卖
if (current_price <= bb_lower and
rsi <= g.bollinger_params['rsi_oversold']):
signals['buy'] = True
log.info(f"{stock} 布林带买入信号: 价格{current_price:.2f}, 下轨{bb_lower:.2f}, RSI{rsi:.2f}")
# 卖出信号:价格触及上轨且RSI超买
elif (current_price >= bb_upper and
rsi >= g.bollinger_params['rsi_overbought']):
signals['sell'] = True
log.info(f"{stock} 布林带卖出信号: 价格{current_price:.2f}, 上轨{bb_upper:.2f}, RSI{rsi:.2f}")
return signals
def execute_bollinger_trade(context, stock, signals, current_positions):
"""执行布林带交易"""
position = context.portfolio.positions.get(stock)
if signals['buy'] and not position and current_positions < g.max_positions:
# 买入逻辑
position_value = context.portfolio.total_value / g.max_positions
order_target_value(stock, position_value)
log.info(f"布林带策略买入: {stock}")
elif signals['sell'] and position:
# 卖出逻辑
order_target_value(stock, 0)
log.info(f"布林带策略卖出: {stock}")
9.2 机器学习增强策略
9.2.1 因子挖掘与特征工程
def initialize(context):
"""机器学习增强策略初始化"""
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
set_option('avoid_future_data', True)
# ML策略参数
g.ml_params = {
'lookback_days': 60, # 历史数据天数
'prediction_days': 5, # 预测未来天数
'retrain_freq': 20, # 模型重训练频率
'feature_num': 15 # 特征数量
}
g.stock_pool = get_index_stocks('000300.XSHG')[:100]
g.model = None
g.last_train_date = None
run_weekly(ml_strategy, weekday=1, time='9:30')
def extract_features(stock, end_date, lookback_days=60):
"""提取股票特征"""
# 获取价格数据
price_data = get_price(stock, count=lookback_days, end_date=end_date,
fields=['open', 'high', 'low', 'close', 'volume'])
if len(price_data) < lookback_days:
return None
features = {}
# 技术特征
features.update(extract_technical_features(price_data))
# 基本面特征
features.update(extract_fundamental_features(stock, end_date))
# 市场特征
features.update(extract_market_features(end_date))
return features
def extract_technical_features(data):
"""提取技术分析特征"""
features = {}
close = data['close']
volume = data['volume']
high = data['high']
low = data['low']
# 价格动量特征
features['return_5d'] = (close.iloc[-1] / close.iloc[-6]) - 1
features['return_10d'] = (close.iloc[-1] / close.iloc[-11]) - 1
features['return_20d'] = (close.iloc[-1] / close.iloc[-21]) - 1
# 移动平均特征
features['ma5_ratio'] = close.iloc[-1] / close.tail(5).mean()
features['ma20_ratio'] = close.iloc[-1] / close.tail(20).mean()
features['ma60_ratio'] = close.iloc[-1] / close.tail(60).mean()
# 波动率特征
returns = close.pct_change().dropna()
features['volatility_10d'] = returns.tail(10).std()
features['volatility_20d'] = returns.tail(20).std()
# 成交量特征
features['volume_ratio'] = volume.iloc[-1] / volume.tail(20).mean()
features['volume_price_trend'] = calculate_volume_price_trend(close, volume)
# RSI特征
features['rsi'] = calculate_rsi(close, 14)
# 布林带位置
bb_upper, bb_middle, bb_lower = calculate_bollinger_bands(close, 20, 2)
features['bb_position'] = (close.iloc[-1] - bb_lower) / (bb_upper - bb_lower)
return features
def extract_fundamental_features(stock, end_date):
"""提取基本面特征"""
features = {}
try:
# 获取基本面数据
q = query(
valuation.code,
valuation.pe_ratio,
valuation.pb_ratio,
valuation.market_cap,
indicator.roe,
indicator.roa
).filter(valuation.code == stock)
df = get_fundamentals(q, date=end_date)
if not df.empty:
features['pe_ratio'] = df['pe_ratio'].iloc[^2_0]
features['pb_ratio'] = df['pb_ratio'].iloc[^2_0]
features['market_cap'] = np.log(df['market_cap'].iloc[^2_0]) if df['market_cap'].iloc[^2_0] > 0 else 0
features['roe'] = df['roe'].iloc[^2_0]
features['roa'] = df['roa'].iloc[^2_0]
except:
# 设置默认值
features.update({
'pe_ratio': 0, 'pb_ratio': 0, 'market_cap': 0,
'roe': 0, 'roa': 0
})
return features
def extract_market_features(end_date):
"""提取市场特征"""
features = {}
try:
# 获取大盘指数数据
market_data = get_price('000300.XSHG', count=20, end_date=end_date,
fields=['close'])
if len(market_data) >= 20:
market_return = (market_data['close'].iloc[-1] /
market_data['close'].iloc[-6]) - 1
features['market_return_5d'] = market_return
market_volatility = market_data['close'].pct_change().tail(20).std()
features['market_volatility'] = market_volatility
except:
features.update({
'market_return_5d': 0,
'market_volatility': 0
})
return features
def calculate_volume_price_trend(prices, volumes):
"""计算成交量价格趋势"""
price_changes = prices.pct_change().fillna(0)
vpt = (price_changes * volumes).cumsum()
return vpt.iloc[-1] / vpt.iloc[-20] - 1 if len(vpt) >= 20 else 0
def prepare_training_data(context, stock_list):
"""准备训练数据"""
X, y = [], []
# 获取历史数据构建训练集
for days_back in range(30, 200, 5): # 从30天前到200天前,每5天取一个样本
sample_date = get_trade_days(end_date=context.previous_date,
count=days_back)[^2_0]
for stock in stock_list:
try:
# 提取特征
features = extract_features(stock, sample_date,
g.ml_params['lookback_days'])
if not features:
continue
# 计算未来收益作为标签
future_return = calculate_future_return(
stock, sample_date, g.ml_params['prediction_days']
)
if future_return is not None:
X.append(list(features.values()))
y.append(1 if future_return > 0.02 else 0) # 二分类:涨幅>2%为正类
except Exception as e:
continue
return np.array(X), np.array(y)
def calculate_future_return(stock, start_date, days):
"""计算未来收益率"""
try:
future_dates = get_trade_days(start_date=start_date, count=days+1)
if len(future_dates) < days+1:
return None
start_price = get_price(stock, start_date=start_date,
end_date=start_date, fields=['close'])
end_price = get_price(stock, start_date=future_dates[days],
end_date=future_dates[days], fields=['close'])
if not start_price.empty and not end_price.empty:
return (end_price['close'].iloc[^2_0] / start_price['close'].iloc[^2_0]) - 1
except:
pass
return None
def train_model(X, y):
"""训练机器学习模型"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
if len(X) < 100: # 样本数量不足
return None, None
# 数据预处理
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 训练模型
model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
model.fit(X_scaled, y)
return model, scaler
def ml_strategy(context):
"""机器学习策略主逻辑"""
# 检查是否需要重新训练模型
if (g.model is None or
g.last_train_date is None or
(context.current_dt.date() - g.last_train_date).days >= g.ml_params['retrain_freq']):
log.info("开始训练机器学习模型...")
# 准备训练数据
X, y = prepare_training_data(context, g.stock_pool)
# 训练模型
g.model, g.scaler = train_model(X, y)
g.last_train_date = context.current_dt.date()
if g.model is None:
log.warning("模型训练失败")
return
log.info("模型训练完成")
# 使用模型进行预测和选股
if g.model is not None:
predictions = predict_stocks(context)
execute_ml_strategy(context, predictions)
def predict_stocks(context):
"""使用模型预测股票"""
predictions = {}
for stock in g.stock_pool:
try:
# 提取特征
features = extract_features(stock, context.previous_date,
g.ml_params['lookback_days'])
if not features:
continue
# 模型预测
X = np.array([list(features.values())])
X_scaled = g.scaler.transform(X)
# 获取预测概率
prob = g.model.predict_proba(X_scaled)[^2_0][^2_1] # 正类概率
predictions[stock] = prob
except Exception as e:
log.error(f"预测{stock}时出错: {e}")
continue
# 按预测概率排序
sorted_predictions = sorted(predictions.items(),
key=lambda x: x[^2_1], reverse=True)
return sorted_predictions
def execute_ml_strategy(context, predictions):
"""执行ML策略交易"""
# 选择预测概率最高的股票
top_stocks = [stock for stock, prob in predictions[:15]
if prob > 0.6] # 只选择概率>0.6的股票
# 基础过滤
top_stocks = comprehensive_stock_filter(context, top_stocks)
# 最终选择
final_stocks = top_stocks[:10]
# 调仓
current_positions = list(context.portfolio.positions.keys())
# 卖出不在目标列表的股票
for stock in current_positions:
if stock not in final_stocks:
order_target_value(stock, 0)
log.info(f"ML策略卖出: {stock}")
# 买入目标股票
if final_stocks:
position_value = context.portfolio.total_value / len(final_stocks)
for stock in final_stocks:
order_target_value(stock, position_value)
log.info(f"ML策略买入: {stock}, 预测概率: {dict(predictions)[stock]:.3f}")
10. 最佳实践与优化
10.1 策略组合与资产配置
def initialize(context):
"""多策略组合初始化"""
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
set_option('avoid_future_data', True)
# 策略权重配置
g.strategy_weights = {
'momentum': 0.3, # 动量策略30%
'mean_reversion': 0.2, # 均值回归20%
'factor': 0.3, # 因子策略30%
'ml': 0.2 # 机器学习20%
}
g.rebalance_freq = 5 # 每5天调仓一次
g.max_positions = 20
run_daily(multi_strategy_system, time='9:30')
def multi_strategy_system(context):
"""多策略系统"""
# 检查调仓时间
if context.current_dt.day % g.rebalance_freq != 0:
return
# 获取各策略的股票推荐
strategy_recommendations = {}
# 动量策略推荐
strategy_recommendations['momentum'] = momentum_stock_selection(context)
# 均值回归策略推荐
strategy_recommendations['mean_reversion'] = mean_reversion_selection(context)
# 因子策略推荐
strategy_recommendations['factor'] = factor_stock_selection(context)
# 机器学习策略推荐
strategy_recommendations['ml'] = ml_stock_selection(context)
# 组合优化
final_portfolio = optimize_portfolio_allocation(
context, strategy_recommendations
)
# 执行调仓
execute_portfolio_rebalance(context, final_portfolio)
def momentum_stock_selection(context):
"""动量策略选股"""
stock_pool = get_index_stocks('000300.XSHG')
momentum_scores = {}
for stock in stock_pool[:50]: # 限制计算量
try:
prices = get_price(stock, count=21, end_date=context.previous_date,
fields=['close'])
if len(prices) >= 21:
momentum = (prices['close'].iloc[-1] / prices['close'].iloc[^2_0]) - 1
momentum_scores[stock] = momentum
except:
continue
# 返回动量最强的前10只股票
sorted_stocks = sorted(momentum_scores.items(),
key=lambda x: x[^2_1], reverse=True)
return [stock for stock, score in sorted_stocks[:10] if score > 0]
def mean_reversion_selection(context):
"""均值回归策略选股"""
stock_pool = get_index_stocks('000905.XSHG')
reversion_scores = {}
for stock in stock_pool[:50]:
try:
prices = get_price(stock, count=40, end_date=context.previous_date,
fields=['close'])
if len(prices) >= 40:
# 计算相对于长期均线的偏离度
ma20 = prices['close'].tail(20).mean()
current_price = prices['close'].iloc[-1]
deviation = (current_price - ma20) / ma20
# 选择被低估的股票(负偏离度)
if deviation < -0.05: # 相对均线下跌超过5%
reversion_scores[stock] = -deviation # 转为正值用于排序
except:
continue
sorted_stocks = sorted(reversion_scores.items(),
key=lambda x: x[^2_1], reverse=True)
return [stock for stock, score in sorted_stocks[:8]]
def factor_stock_selection(context):
"""因子策略选股"""
try:
# 获取因子数据
stock_pool = get_index_stocks('000300.XSHG')
factor_data = get_factor_values(
stock_pool[:100], ['ROE', 'ROA', 'sales_growth'],
end_date=context.previous_date, count=1
)
# 计算综合得分
scores = {}
for stock in stock_pool[:100]:
try:
roe = factor_data['ROE'].iloc[^2_0].get(stock, 0)
roa = factor_data['ROA'].iloc[^2_0].get(stock, 0)
growth = factor_data['sales_growth'].iloc[^2_0].get(stock, 0)
# 综合得分
composite_score = roe * 0.4 + roa * 0.3 + growth * 0.3
scores[stock] = composite_score
except:
continue
sorted_stocks = sorted(scores.items(),
key=lambda x: x[^2_1], reverse=True)
return [stock for stock, score in sorted_stocks[:12] if score > 0]
except:
return []
def ml_stock_selection(context):
"""机器学习策略选股(简化版)"""
# 这里使用简化的ML逻辑,实际应用中可以更复杂
stock_pool = get_index_stocks('000300.XSHG')[:30]
selected_stocks = []
for stock in stock_pool:
try:
# 简单的特征:近期收益率和成交量
prices = get_price(stock, count=10, end_date=context.previous_date,
fields=['close', 'volume'])
if len(prices) >= 10:
recent_return = (prices['close'].iloc[-1] /
prices['close'].iloc[-6]) - 1
volume_trend = (prices['volume'].tail(3).mean() /
prices['volume'].head(3).mean()) - 1
# 简单的选股规则
if 0.02 < recent_return < 0.08 and volume_trend > 0.1:
selected_stocks.append(stock)
except:
continue
return selected_stocks[:6]
def optimize_portfolio_allocation(context, strategy_recommendations):
"""组合优化分配"""
final_portfolio = {}
# 按策略权重分配资金
total_value = context.portfolio.total_value
for strategy_name, stocks in strategy_recommendations.items():
if not stocks:
continue
strategy_weight = g.strategy_weights[strategy_name]
strategy_value = total_value * strategy_weight
value_per_stock = strategy_value / len(stocks)
for stock in stocks:
if stock in final_portfolio:
final_portfolio[stock] += value_per_stock
else:
final_portfolio[stock] = value_per_stock
# 限制最大持仓数量
if len(final_portfolio) > g.max_positions:
sorted_portfolio = sorted(final_portfolio.items(),
key=lambda x: x[^2_1], reverse=True)
final_portfolio = dict(sorted_portfolio[:g.max_positions])
return final_portfolio
def execute_portfolio_rebalance(context, target_portfolio):
"""执行组合调仓"""
current_positions = set(context.portfolio.positions.keys())
target_positions = set(target_portfolio.keys())
# 过滤目标股票
filtered_portfolio = {}
for stock, value in target_portfolio.items():
if is_tradable_stock(stock):
filtered_portfolio[stock] = value
# 卖出不在目标组合的股票
for stock in current_positions:
if stock not in filtered_portfolio:
order_target_value(stock, 0)
log.info(f"多策略系统卖出: {stock}")
# 买入/调整目标股票
for stock, target_value in filtered_portfolio.items():
order_target_value(stock, target_value)
log.info(f"多策略系统调仓: {stock}, 目标金额: {target_value:.2f}")
def is_tradable_stock(stock):
"""检查股票是否可交易"""
try:
current_data = get_current_data()
stock_data = current_data[stock]
return (not stock_data.paused and
not stock_data.is_st and
'ST' not in stock_data.name and
stock_data.low_limit < stock_data.last_price < stock_data.high_limit)
except:
return False
10.2 策略评估与监控
def performance_monitor(context):
"""策略表现监控"""
# 记录核心指标
record_strategy_metrics(context)
# 风险监控
risk_check_result = comprehensive_risk_check(context)
# 如果风险过高,执行应急措施
if risk_check_result['high_risk']:
emergency_risk_control(context, risk_check_result)
def record_strategy_metrics(context):
"""记录策略指标"""
portfolio = context.portfolio
# 基础指标
total_value = portfolio.total_value
daily_return = portfolio.returns
positions_count = len(portfolio.positions)
cash_ratio = portfolio.available_cash / total_value
# 记录到系统
record(
总资产=total_value,
日收益率=daily_return,
持仓数量=positions_count,
现金比例=cash_ratio
)
# 计算累计收益率
cumulative_return = (total_value / portfolio.starting_cash) - 1
# 计算最大回撤
max_drawdown = calculate_max_drawdown(context)
# 计算年化收益率和夏普比率
annual_return, sharpe_ratio = calculate_annual_metrics(context)
# 输出关键信息
if context.current_dt.day % 5 == 0: # 每5天输出一次
log.info(f"策略表现总结:")
log.info(f" 累计收益率: {cumulative_return:.2%}")
log.info(f" 年化收益率: {annual_return:.2%}")
log.info(f" 最大回撤: {max_drawdown:.2%}")
log.info(f" 夏普比率: {sharpe_ratio:.2f}")
log.info(f" 当前持仓: {positions_count}只")
def calculate_max_drawdown(context):
"""计算最大回撤"""
if not hasattr(g, 'net_value_history'):
g.net_value_history = []
current_net_value = context.portfolio.total_value / context.portfolio.starting_cash
g.net_value_history.append(current_net_value)
# 保留最近252个交易日数据
if len(g.net_value_history) > 252:
g.net_value_history.pop(0)
if len(g.net_value_history) < 2:
return 0
# 计算最大回撤
peak = g.net_value_history[^2_0]
max_drawdown = 0
for value in g.net_value_history:
if value > peak:
peak = value
drawdown = (peak - value) / peak
max_drawdown = max(max_drawdown, drawdown)
return max_drawdown
def calculate_annual_metrics(context):
"""计算年化指标"""
if not hasattr(g, 'daily_returns'):
g.daily_returns = []
g.daily_returns.append(context.portfolio.returns)
# 保留最近252个交易日数据
if len(g.daily_returns) > 252:
g.daily_returns.pop(0)
if len(g.daily_returns) < 20:
return 0, 0
import numpy as np
# 年化收益率
mean_return = np.mean(g.daily_returns)
annual_return = (1 + mean_return) ** 252 - 1
# 年化波动率
volatility = np.std(g.daily_returns) * np.sqrt(252)
# 夏普比率(假设无风险利率为3%)
risk_free_rate = 0.03
sharpe_ratio = (annual_return - risk_free_rate) / volatility if volatility > 0 else 0
return annual_return, sharpe_ratio
def comprehensive_risk_check(context):
"""综合风险检查"""
risk_result = {
'high_risk': False,
'risk_factors': [],
'risk_level': 'low'
}
# 1. 回撤风险检查
max_drawdown = calculate_max_drawdown(context)
if max_drawdown > 0.15: # 回撤超过15%
risk_result['risk_factors'].append(f'高回撤风险: {max_drawdown:.2%}')
risk_result['high_risk'] = True
# 2. 集中度风险检查
concentration_risk = check_concentration_risk(context)
if concentration_risk['high_concentration']:
risk_result['risk_factors'].extend(concentration_risk['details'])
risk_result['high_risk'] = True
# 3. 波动率风险检查
if hasattr(g, 'daily_returns') and len(g.daily_returns) >= 20:
recent_volatility = np.std(g.daily_returns[-20:]) * np.sqrt(252)
if recent_volatility > 0.35: # 年化波动率超过35%
risk_result['risk_factors'].append(f'高波动率风险: {recent_volatility:.2%}')
risk_result['high_risk'] = True
# 4. 流动性风险检查
liquidity_risk = check_liquidity_risk(context)
if liquidity_risk['high_risk']:
risk_result['risk_factors'].extend(liquidity_risk['details'])
risk_result['high_risk'] = True
# 确定风险等级
risk_count = len(risk_result['risk_factors'])
if risk_count >= 3:
risk_result['risk_level'] = 'high'
elif risk_count >= 1:
risk_result['risk_level'] = 'medium'
return risk_result
def check_concentration_risk(context):
"""检查集中度风险"""
result = {'high_concentration': False, 'details': []}
total_value = context.portfolio.total_value
positions = context.portfolio.positions
# 检查单股集中度
for stock, position in positions.items():
weight = position.value / total_value
if weight > 0.15: # 单股权重超过15%
result['high_concentration'] = True
result['details'].append(f'单股集中度过高: {stock} {weight:.2%}')
# 检查行业集中度
industry_weights = calculate_industry_weights(context)
for industry, weight in industry_weights.items():
if weight > 0.4: # 单行业权重超过40%
result['high_concentration'] = True
result['details'].append(f'行业集中度过高: {industry} {weight:.2%}')
return result
def check_liquidity_risk(context):
"""检查流动性风险"""
result = {'high_risk': False, 'details': []}
for stock, position in context.portfolio.positions.items():
try:
# 获取近期成交量数据
volume_data = attribute_history(stock, 5, '1d', ['volume'])
if len(volume_data) >= 5:
avg_volume = volume_data['volume'].mean()
position_volume = position.total_amount
# 如果持仓量超过日均成交量的20%,认为流动性风险较高
if position_volume > avg_volume * 0.2:
result['high_risk'] = True
ratio = position_volume / avg_volume
result['details'].append(f'流动性风险: {stock} 持仓占日均成交量 {ratio:.1%}')
except:
continue
return result
def emergency_risk_control(context, risk_result):
"""应急风险控制"""
log.warning("触发应急风险控制措施")
log.warning(f"风险因素: {risk_result['risk_factors']}")
if risk_result['risk_level'] == 'high':
# 高风险:大幅减仓
reduce_positions(context, 0.5) # 减仓50%
log.warning("执行50%减仓操作")
elif risk_result['risk_level'] == 'medium':
# 中等风险:适度减仓
reduce_positions(context, 0.3) # 减仓30%
log.warning("执行30%减仓操作")
# 记录风控触发
record(风控触发=1)
def reduce_positions(context, reduce_ratio):
"""减仓操作"""
for stock, position in context.portfolio.positions.items():
current_value = position.value
target_value = current_value * (1 - reduce_ratio)
order_target_value(stock, target_value)
log.info(f"减仓 {stock}: {current_value:.2f} -> {target_value:.2f}")
总结
通过本完整指南的学习,您现在已经掌握了聚宽量化交易平台的全部核心功能:
关键收获:
- 平台基础:深入理解了聚宽平台的架构、API设计理念和最佳实践
- 数据驾驭:掌握了价格数据、基本面数据、因子数据的获取和处理方法
- 策略开发:学会了从简单到复杂的策略开发流程和实现技巧
- 风险管理:建立了完整的风险控制体系和监控机制
- 实战应用:通过多个经典策略案例,具备了实际策略开发能力
进阶建议:
- 持续学习:量化交易是一个不断发展的领域,要保持学习新的理论和技术
- 实战验证:将所学策略在实际市场中进行验证,积累实战经验
- 风险第一:始终把风险控制放在首位,稳健比激进更重要
- 系统思维:将策略开发看作一个系统工程,注重各环节的协调配合
下一步行动:
- 选择一个您感兴趣的策略类型,从简单版本开始实现
- 在回测环境中充分验证策略的有效性和稳健性
- 逐步增加策略的复杂度和精细度
- 建立自己的策略库和风控体系
量化交易的成功需要理论学习、实践验证和持续优化的完美结合。相信通过本指南的学习和不断的实践,您将能够在聚宽平台上开发出属于自己的优秀量化策略。
记住:市场永远在变化,唯有不断学习和适应,才能在量化交易的道路上走得更远。