310 lines
13 KiB
Python
310 lines
13 KiB
Python
|
import sys
|
|||
|
|
|||
|
from sqlalchemy import and_
|
|||
|
|
|||
|
from DB.model.StockDaily import get_stock_daily
|
|||
|
from DB.sqlite_db_main import SqliteDbMain, config
|
|||
|
from utils.tdxUtil import TdxUtil
|
|||
|
from fp.基本信息入库 import StockInfoMain
|
|||
|
from utils.formula import *
|
|||
|
from loguru import logger
|
|||
|
|
|||
|
from fp.板块数据入库 import SectorOpt
|
|||
|
from utils.comm import *
|
|||
|
|
|||
|
|
|||
|
class OptionalStock:
|
|||
|
def __init__(self, ts_code=None, symbol=None, restart_id=0, trade_date=datetime.now()):
|
|||
|
self.trade_date = trade_date.strftime('%Y%m%d')
|
|||
|
# 配置日志输出到文件和控制台
|
|||
|
logger.add("../log/OptionalStock.log", rotation="500 MB", level="INFO")
|
|||
|
logger.add(sys.stderr, level="INFO")
|
|||
|
self.code_res = StockInfoMain().get_stock_basic(ts_code=ts_code, symbol=symbol, restart_id=restart_id)
|
|||
|
self.db_main = SqliteDbMain(config.stock_daily_db)
|
|||
|
self.tdx_util = TdxUtil("")
|
|||
|
self.sector = SectorOpt()
|
|||
|
|
|||
|
def stock_daily(self):
|
|||
|
rm_strategy_df, dbj_strategy_df = pd.DataFrame(), pd.DataFrame()
|
|||
|
for result in self.code_res:
|
|||
|
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
|
|||
|
if s_type in self.tdx_util.SECURITY_TYPE:
|
|||
|
table_name = str(result.ts_code).split(".")[0] + "_daily"
|
|||
|
new_table_class = get_stock_daily(table_name=table_name)
|
|||
|
df = self.db_main.pandas_query_by_model(
|
|||
|
model=new_table_class,
|
|||
|
order_col=new_table_class.id.desc(),
|
|||
|
page_number=1, page_size=60
|
|||
|
)[::-1]
|
|||
|
if len(df) > 0:
|
|||
|
logger.info(
|
|||
|
f"----------------<<{self.trade_date}:{result.ts_code} 开始执行 N日新高策略>>----------------")
|
|||
|
rm_strategy_df = rm_strategy_df.append(self.n_high_strategy(df=df))
|
|||
|
logger.info(
|
|||
|
f"----------------<<{self.trade_date}:{result.ts_code} 开始执行 连板策略>>----------------")
|
|||
|
dbj_strategy_df = dbj_strategy_df.append(self.conn_plate_strategy(df=df))
|
|||
|
return rm_strategy_df, dbj_strategy_df
|
|||
|
|
|||
|
def n_high_strategy(self, df):
|
|||
|
rm_strategy_df = pd.DataFrame()
|
|||
|
rm_kdj = RM_KDJ(df['close'], df['high'], df['low'], 9, 3, 3)
|
|||
|
df["KDJ_K"], df["KDJ_D"], df["KDJ_J"] = rm_kdj["KDJ_K"], rm_kdj["KDJ_D"], rm_kdj["KDJ_J"]
|
|||
|
total, rise, fall, tomorrow_rise, tomorrow_fall = 0, 0, 0, 0, 0
|
|||
|
for index, row in df.iterrows():
|
|||
|
trade_date, code, close = row["trade_date"], row['ts_code'], row['close']
|
|||
|
d, j = row['KDJ_D'], row['KDJ_J']
|
|||
|
# 返回所要找的数据
|
|||
|
if trade_date == self.trade_date:
|
|||
|
# TODO 这里有问题,d和J相同
|
|||
|
logger.info(f'{code} -- >> {trade_date} -->> d={d} --> j={j}')
|
|||
|
# 超买区间,未来跌幅
|
|||
|
if d > 80 and j > 100:
|
|||
|
rm_strategy_df = rm_strategy_df.append(row)
|
|||
|
# rise, tomorrow_rise, total = self.back_testing(df, index, rise, tomorrow_rise, total, trade_date)
|
|||
|
rm_strategy_df.reset_index(drop=True, inplace=True)
|
|||
|
# if total > 0:
|
|||
|
# console = f"满足策略:{total} 次,明日上涨概率:{round((tomorrow_rise / total) * 100, 2)} %," \
|
|||
|
# f"三日后上涨概率:{round((rise / total) * 100, 2)} %"
|
|||
|
# # logger.info(console)
|
|||
|
# rm_strategy_df['history'] = console
|
|||
|
return rm_strategy_df
|
|||
|
|
|||
|
def back_testing(self, df, index, rise, tomorrow_rise, total, trade_date):
|
|||
|
if index > 3:
|
|||
|
# 明天继续下跌的概率
|
|||
|
tomorrow_pre = df.iloc[-index:-index + 1]['pct_chg'].values[0]
|
|||
|
# 未来三天最大涨幅
|
|||
|
max_h = df.iloc[-index:-index + 3]['pct_chg'].max().round(2)
|
|||
|
# 计算最大回撤
|
|||
|
h_max = df.iloc[-index:-index + 3]['high'].max()
|
|||
|
l_min = df.iloc[-index:-index + 3]['low'].min()
|
|||
|
mac_ret = f"{(h_max - l_min) / l_min:.2%}"
|
|||
|
# 平均涨幅
|
|||
|
mean_h = df.iloc[-index:-index + 3]['pct_chg'].mean().round(2)
|
|||
|
total += 1
|
|||
|
content = f"{trade_date} --> 明日涨幅: {tomorrow_pre}% " \
|
|||
|
f"--> 3日最大涨幅:{max_h}%" \
|
|||
|
f"--> 3日最大回撤:{mac_ret}" \
|
|||
|
f"--> 3日平均涨幅:{mean_h}"
|
|||
|
# if trade_date > "20230101":
|
|||
|
# logger.info(content)
|
|||
|
# print(content)
|
|||
|
if tomorrow_pre > 0:
|
|||
|
tomorrow_rise += 1
|
|||
|
if max_h > 0:
|
|||
|
rise += 1
|
|||
|
return rise, tomorrow_rise, total
|
|||
|
|
|||
|
def ma_strategy(self, df):
|
|||
|
# MA5:当MA5在全部MA的最上面时
|
|||
|
MA5, MA10, MA20, MA60 = MA(df['close'], 5), MA(df['close'], 10), MA(df['close'], 20), MA(
|
|||
|
df['close'], 60)
|
|||
|
pass
|
|||
|
|
|||
|
def conn_plate_strategy(self, df):
|
|||
|
dbj_strategy_df = pd.DataFrame()
|
|||
|
df['JXNH'] = JXNH(df['close'], df['open'], df['volume'])
|
|||
|
total, rise, fall, tomorrow_rise, tomorrow_fall = 0, 0, 0, 0, 0
|
|||
|
for index, row in df.iterrows():
|
|||
|
trade_date, code, close, jxnh = row["trade_date"], row['ts_code'], row['close'], row['JXNH']
|
|||
|
if jxnh:
|
|||
|
# 返回所要找的数据
|
|||
|
if trade_date == self.trade_date:
|
|||
|
dbj_strategy_df = dbj_strategy_df.append(row)
|
|||
|
rise, tomorrow_rise, total = self.back_testing(df, index, rise, tomorrow_rise, total, trade_date)
|
|||
|
dbj_strategy_df.reset_index(drop=True, inplace=True)
|
|||
|
if total > 0:
|
|||
|
console = f"满足策略:{total} 次,明日上涨概率:{round((tomorrow_rise / total) * 100, 2)} %," \
|
|||
|
f"三日后上涨概率:{round((rise / total) * 100, 2)} %"
|
|||
|
# print(console)
|
|||
|
# logger.info(console)
|
|||
|
dbj_strategy_df['history'] = console
|
|||
|
return dbj_strategy_df
|
|||
|
|
|||
|
def html_page_data(self):
|
|||
|
rm_strategy_df, dbj_strategy_df = self.stock_daily()
|
|||
|
# tdx_util = TdxUtil('D:\\new_tdx')
|
|||
|
# tdx_util.set_zxg_file(cont=dbj_strategy_df['ts_code'])
|
|||
|
# 策略1
|
|||
|
id_name = "v-pills-rm"
|
|||
|
rm_strategy_nav_pills = f'<button class="nav-link active" data-target="#{id_name}" type="button">RM策略自选</button>'
|
|||
|
rm_strategy_nav_pills_content = self.__get_pills_content(rm_strategy_df, id_name, active=True)
|
|||
|
|
|||
|
# 策略2
|
|||
|
id_name = "v-pills-dbj"
|
|||
|
dbj_strategy_nav_pills = f'<button class="nav-link" data-target="#{id_name}" type="button">大保健策略自选</button>'
|
|||
|
dbj_strategy_nav_pills_content = self.__get_pills_content(dbj_strategy_df, id_name)
|
|||
|
|
|||
|
res_html = f'<div class="row border p-3"><div class="col-3"><div class="nav flex-column nav-pills" id="v-pills-tab">' \
|
|||
|
f'{rm_strategy_nav_pills}' \
|
|||
|
f'{dbj_strategy_nav_pills}' \
|
|||
|
f'</div></div>' \
|
|||
|
f'<div class="col-9"><div class="tab-content" id="v-pills-tabContent">' \
|
|||
|
f'{rm_strategy_nav_pills_content}' \
|
|||
|
f'{dbj_strategy_nav_pills_content}' \
|
|||
|
f'</div></div></div>'
|
|||
|
return res_html
|
|||
|
|
|||
|
def __get_pills_content(self, df, id_name, active=False):
|
|||
|
pills_content = ''
|
|||
|
if len(df) > 0:
|
|||
|
pills_content += f'<table class="table table-hover"><thead>' \
|
|||
|
f'<tr>' \
|
|||
|
f'<th scope="col">股票代码</th>' \
|
|||
|
f'<th scope="col">策略表现</th>' \
|
|||
|
f'<th scope="col">购买建议</th>' \
|
|||
|
f'</tr>' \
|
|||
|
f'</thead>' \
|
|||
|
f'<tbody>'
|
|||
|
for index, row in df.iterrows():
|
|||
|
pills_content += f'<tr><th scope="row">{row["ts_code"]}</th>' \
|
|||
|
f'<td>{row["history"]}</td>' \
|
|||
|
f'<td>{row["close"]}</td>'
|
|||
|
pills_content += '</tr>'
|
|||
|
pills_content += f'</tbody></table>'
|
|||
|
else:
|
|||
|
pills_content += '<p class="text-center"><span>暂无数据...</span></p>'
|
|||
|
pills_div = f'<div class="tab-pane fade show {"active" if active else ""}" id="{id_name}">{pills_content}</div>'
|
|||
|
return pills_div
|
|||
|
|
|||
|
def process_data(self, sublist):
|
|||
|
n = 16
|
|||
|
start_date = return_trading_day(self.trade_date, -n)
|
|||
|
res_df = pd.DataFrame(columns=["symbol"])
|
|||
|
# 构建子查询字符串,例如:SELECT * FROM table1 UNION ALL SELECT * FROM table2 ...
|
|||
|
subquery = " UNION ALL ".join(
|
|||
|
[f"SELECT * FROM '{table_name}' where trade_date between '{start_date}' and '{self.trade_date}'" for
|
|||
|
table_name in sublist])
|
|||
|
# 将子查询字符串添加到主查询中
|
|||
|
df = self.db_main.execute_sql_to_pandas(subquery)
|
|||
|
# print(df)
|
|||
|
# 使用 groupby 对数据进行分组
|
|||
|
grouped = df.groupby('ts_code')
|
|||
|
# 遍历分组后的数据
|
|||
|
for name, group in grouped:
|
|||
|
group = group.reset_index(drop=True)
|
|||
|
# print(f"ts_code: {name}")
|
|||
|
# print(group)
|
|||
|
selected_indexes = group[group['trade_date'] == self.trade_date].index
|
|||
|
if selected_indexes.empty or selected_indexes.values[0] != n:
|
|||
|
continue
|
|||
|
now_close, now_high, symbol, ts_code = group.loc[selected_indexes, 'close'].values[0], \
|
|||
|
group.loc[selected_indexes, 'high'].values[0], \
|
|||
|
group.loc[selected_indexes, 'ts_code'].values[0].split('.')[0], \
|
|||
|
group.loc[selected_indexes, 'ts_code'].values[0]
|
|||
|
max_high = group.iloc[0:n]['high'].max()
|
|||
|
cond = [
|
|||
|
now_close > max_high,
|
|||
|
max_high > group.loc[selected_indexes - 1, 'high'].values[0]
|
|||
|
]
|
|||
|
if not all(cond):
|
|||
|
continue
|
|||
|
# 计算倾斜角度
|
|||
|
skewness = round(sum(np.diff(group.iloc[0:n]['high'].tolist())), 3)
|
|||
|
turnover_rate, volume_ratio = group.loc[selected_indexes, 'turnover_rate'].values[0], \
|
|||
|
group.loc[selected_indexes, 'volume_ratio'].values[0]
|
|||
|
cond = [
|
|||
|
turnover_rate > 5,
|
|||
|
volume_ratio > 1.5
|
|||
|
]
|
|||
|
if not all(cond):
|
|||
|
continue
|
|||
|
logger.info("====达成条件,开始进行进一步筛选====")
|
|||
|
# 获取所属板块
|
|||
|
gn_df = self.sector.get_gn_sector_by_stock_code(symbol)
|
|||
|
hy_df = self.sector.get_hy_sector_by_stock_code(symbol)
|
|||
|
logger.info(
|
|||
|
f"code={symbol},now_close={now_close},last_high={max_high},"
|
|||
|
f"换手率={turnover_rate},量比={volume_ratio},"
|
|||
|
f"趋势={skewness},"
|
|||
|
f"所属行业板块:{hy_df['sector_name'].to_numpy()},所属概念板块:{gn_df['sector_name'].to_numpy()}")
|
|||
|
|
|||
|
res_df = res_df.append({
|
|||
|
"symbol": symbol,
|
|||
|
"ts_code": ts_code,
|
|||
|
"趋势": skewness,
|
|||
|
"行业板块": hy_df['sector_name'].to_numpy(),
|
|||
|
"概念板块": gn_df['sector_name'].to_numpy()
|
|||
|
}, ignore_index=True)
|
|||
|
return res_df
|
|||
|
|
|||
|
def more_formula(self, sublist):
|
|||
|
# 获取删选出的结果集
|
|||
|
df = self.process_data(sublist=sublist)
|
|||
|
# 获取指标需要用到的日期数据
|
|||
|
start_date = return_trading_day(self.trade_date, -250)
|
|||
|
# 获取数据
|
|||
|
for index, row in df.iterrows():
|
|||
|
table_name = row["symbol"] + "_daily"
|
|||
|
# 使用 and_ 和 between 执行范围查询
|
|||
|
stock_daily = get_stock_daily(table_name=table_name)
|
|||
|
res_df = self.db_main.pandas_query_by_condition(
|
|||
|
model=stock_daily,
|
|||
|
query_condition=and_(stock_daily.trade_date >= start_date))
|
|||
|
selected_indexes = res_df[res_df['trade_date'] == self.trade_date].index
|
|||
|
c, o, v = res_df['close'], res_df['open'], res_df['volume']
|
|||
|
# 获取MA
|
|||
|
res_df['MA5'], res_df['MA10'], res_df['MA20'], res_df['MA60'], res_df['MA120'], res_df['MA250'] = \
|
|||
|
MA(c, 5), MA(c, 10), MA(c, 20), MA(c, 60), MA(c, 120), MA(c, 250)
|
|||
|
# 出现 JXNH 买点
|
|||
|
res_df['JXNH'] = JXNH(c, o, v)
|
|||
|
# 未来三天的整体表现
|
|||
|
|
|||
|
pass
|
|||
|
|
|||
|
def junit_strategy(self):
|
|||
|
# 测量代码块的执行时间
|
|||
|
start_time_total = time.time()
|
|||
|
table_array = []
|
|||
|
for result in self.code_res:
|
|||
|
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
|
|||
|
if s_type in self.tdx_util.SECURITY_TYPE:
|
|||
|
table_name = str(result.ts_code).split(".")[0] + "_daily"
|
|||
|
table_array.append(table_name)
|
|||
|
# 将 table_array 分成 num_sub_lists 个子列表
|
|||
|
num_sub_lists = 10
|
|||
|
chunk_size = len(table_array) // num_sub_lists
|
|||
|
table_sub_lists = [table_array[i:i + chunk_size] for i in range(0, len(table_array), chunk_size)]
|
|||
|
res_df = pd.DataFrame(columns=["symbol"])
|
|||
|
# 对每个子列表执行查询并合并结果
|
|||
|
for sublist in table_sub_lists:
|
|||
|
res_df = res_df.append(self.process_data(sublist))
|
|||
|
print(res_df.reset_index(drop=True))
|
|||
|
# tdx_util = TdxUtil('D:\\new_tdx')
|
|||
|
# tdx_util.set_zxg_file(cont=res_df['symbol'])
|
|||
|
# 结束时间
|
|||
|
logger.info(f"总执行时间: {time.time() - start_time_total / 60:.2f} 分")
|
|||
|
self.future(res_df)
|
|||
|
|
|||
|
def future(self, df):
|
|||
|
n = 3
|
|||
|
end_date = return_trading_day(self.trade_date, n)
|
|||
|
for index, row in df.iterrows():
|
|||
|
stock_daily = get_stock_daily(table_name=row['symbol'] + "_daily")
|
|||
|
future_df = self.db_main.pandas_query_by_condition(
|
|||
|
model=stock_daily,
|
|||
|
query_condition=stock_daily.trade_date.between(self.trade_date, end_date))
|
|||
|
selected_indexes = future_df[future_df['trade_date'] == self.trade_date].index
|
|||
|
print(selected_indexes)
|
|||
|
if selected_indexes.empty:
|
|||
|
continue
|
|||
|
now_chg = future_df.loc[selected_indexes, 'pct_chg'].values[0]
|
|||
|
tomorrow_chg = future_df.loc[selected_indexes + 1, 'pct_chg'].values[0]
|
|||
|
sum_chg = round(future_df.iloc[1:n + 1]["pct_chg"].sum(), 2)
|
|||
|
# todo 将结果输出到CSV文件中
|
|||
|
# df.to_csv(f'/Users/renmeng/work_space/python_work/qnloft-get-web-everything/股票金融/量化交易/股票数据/{code}.csv',
|
|||
|
# mode='a', header=False, index=False)
|
|||
|
logger.info(f"未来走势-----》》》》")
|
|||
|
logger.info(
|
|||
|
f'【 {row["symbol"]} 】板块:【{row["行业板块"]}】,【{row["概念板块"]}】-- > \n'
|
|||
|
f'当日涨幅:{now_chg},趋势:{row["趋势"]} \n'
|
|||
|
f'{future_df["trade_date"][0]} 涨幅 -- > {tomorrow_chg} , 三日总涨幅:{sum_chg}'
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
date_obj = datetime.strptime('20231016', "%Y%m%d")
|
|||
|
opt_stack = OptionalStock(trade_date=date_obj)
|
|||
|
opt_stack.junit_strategy()
|
|||
|
# print(opt_stack.html_page_data())
|