qnloft-stock/fp/自选股数据.py

310 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
from sqlalchemy import and_
from DB.model.StockDaily import get_stock_daily
from DB.sqlite_db_main import SqliteDbMain, config
from utils.tdxUtil import TdxUtil
from fp.基本信息入库 import StockInfoMain
from utils.formula import *
from loguru import logger
from fp.板块数据入库 import SectorOpt
from utils.comm import *
class OptionalStock:
def __init__(self, ts_code=None, symbol=None, restart_id=0, trade_date=datetime.now()):
self.trade_date = trade_date.strftime('%Y%m%d')
# 配置日志输出到文件和控制台
logger.add("../log/OptionalStock.log", rotation="500 MB", level="INFO")
logger.add(sys.stderr, level="INFO")
self.code_res = StockInfoMain().get_stock_basic(ts_code=ts_code, symbol=symbol, restart_id=restart_id)
self.db_main = SqliteDbMain(config.stock_daily_db)
self.tdx_util = TdxUtil("")
self.sector = SectorOpt()
def stock_daily(self):
rm_strategy_df, dbj_strategy_df = pd.DataFrame(), pd.DataFrame()
for result in self.code_res:
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
if s_type in self.tdx_util.SECURITY_TYPE:
table_name = str(result.ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
df = self.db_main.pandas_query_by_model(
model=new_table_class,
order_col=new_table_class.id.desc(),
page_number=1, page_size=60
)[::-1]
if len(df) > 0:
logger.info(
f"----------------<<{self.trade_date}{result.ts_code} 开始执行 N日新高策略>>----------------")
rm_strategy_df = rm_strategy_df.append(self.n_high_strategy(df=df))
logger.info(
f"----------------<<{self.trade_date}{result.ts_code} 开始执行 连板策略>>----------------")
dbj_strategy_df = dbj_strategy_df.append(self.conn_plate_strategy(df=df))
return rm_strategy_df, dbj_strategy_df
def n_high_strategy(self, df):
rm_strategy_df = pd.DataFrame()
rm_kdj = RM_KDJ(df['close'], df['high'], df['low'], 9, 3, 3)
df["KDJ_K"], df["KDJ_D"], df["KDJ_J"] = rm_kdj["KDJ_K"], rm_kdj["KDJ_D"], rm_kdj["KDJ_J"]
total, rise, fall, tomorrow_rise, tomorrow_fall = 0, 0, 0, 0, 0
for index, row in df.iterrows():
trade_date, code, close = row["trade_date"], row['ts_code'], row['close']
d, j = row['KDJ_D'], row['KDJ_J']
# 返回所要找的数据
if trade_date == self.trade_date:
# TODO 这里有问题d和J相同
logger.info(f'{code} -- >> {trade_date} -->> d={d} --> j={j}')
# 超买区间,未来跌幅
if d > 80 and j > 100:
rm_strategy_df = rm_strategy_df.append(row)
# rise, tomorrow_rise, total = self.back_testing(df, index, rise, tomorrow_rise, total, trade_date)
rm_strategy_df.reset_index(drop=True, inplace=True)
# if total > 0:
# console = f"满足策略:{total} 次,明日上涨概率:{round((tomorrow_rise / total) * 100, 2)} %" \
# f"三日后上涨概率:{round((rise / total) * 100, 2)} %"
# # logger.info(console)
# rm_strategy_df['history'] = console
return rm_strategy_df
def back_testing(self, df, index, rise, tomorrow_rise, total, trade_date):
if index > 3:
# 明天继续下跌的概率
tomorrow_pre = df.iloc[-index:-index + 1]['pct_chg'].values[0]
# 未来三天最大涨幅
max_h = df.iloc[-index:-index + 3]['pct_chg'].max().round(2)
# 计算最大回撤
h_max = df.iloc[-index:-index + 3]['high'].max()
l_min = df.iloc[-index:-index + 3]['low'].min()
mac_ret = f"{(h_max - l_min) / l_min:.2%}"
# 平均涨幅
mean_h = df.iloc[-index:-index + 3]['pct_chg'].mean().round(2)
total += 1
content = f"{trade_date} --> 明日涨幅: {tomorrow_pre}% " \
f"--> 3日最大涨幅{max_h}%" \
f"--> 3日最大回撤{mac_ret}" \
f"--> 3日平均涨幅{mean_h}"
# if trade_date > "20230101":
# logger.info(content)
# print(content)
if tomorrow_pre > 0:
tomorrow_rise += 1
if max_h > 0:
rise += 1
return rise, tomorrow_rise, total
def ma_strategy(self, df):
# MA5:当MA5在全部MA的最上面时
MA5, MA10, MA20, MA60 = MA(df['close'], 5), MA(df['close'], 10), MA(df['close'], 20), MA(
df['close'], 60)
pass
def conn_plate_strategy(self, df):
dbj_strategy_df = pd.DataFrame()
df['JXNH'] = JXNH(df['close'], df['open'], df['volume'])
total, rise, fall, tomorrow_rise, tomorrow_fall = 0, 0, 0, 0, 0
for index, row in df.iterrows():
trade_date, code, close, jxnh = row["trade_date"], row['ts_code'], row['close'], row['JXNH']
if jxnh:
# 返回所要找的数据
if trade_date == self.trade_date:
dbj_strategy_df = dbj_strategy_df.append(row)
rise, tomorrow_rise, total = self.back_testing(df, index, rise, tomorrow_rise, total, trade_date)
dbj_strategy_df.reset_index(drop=True, inplace=True)
if total > 0:
console = f"满足策略:{total} 次,明日上涨概率:{round((tomorrow_rise / total) * 100, 2)} %" \
f"三日后上涨概率:{round((rise / total) * 100, 2)} %"
# print(console)
# logger.info(console)
dbj_strategy_df['history'] = console
return dbj_strategy_df
def html_page_data(self):
rm_strategy_df, dbj_strategy_df = self.stock_daily()
# tdx_util = TdxUtil('D:\\new_tdx')
# tdx_util.set_zxg_file(cont=dbj_strategy_df['ts_code'])
# 策略1
id_name = "v-pills-rm"
rm_strategy_nav_pills = f'<button class="nav-link active" data-target="#{id_name}" type="button">RM策略自选</button>'
rm_strategy_nav_pills_content = self.__get_pills_content(rm_strategy_df, id_name, active=True)
# 策略2
id_name = "v-pills-dbj"
dbj_strategy_nav_pills = f'<button class="nav-link" data-target="#{id_name}" type="button">大保健策略自选</button>'
dbj_strategy_nav_pills_content = self.__get_pills_content(dbj_strategy_df, id_name)
res_html = f'<div class="row border p-3"><div class="col-3"><div class="nav flex-column nav-pills" id="v-pills-tab">' \
f'{rm_strategy_nav_pills}' \
f'{dbj_strategy_nav_pills}' \
f'</div></div>' \
f'<div class="col-9"><div class="tab-content" id="v-pills-tabContent">' \
f'{rm_strategy_nav_pills_content}' \
f'{dbj_strategy_nav_pills_content}' \
f'</div></div></div>'
return res_html
def __get_pills_content(self, df, id_name, active=False):
pills_content = ''
if len(df) > 0:
pills_content += f'<table class="table table-hover"><thead>' \
f'<tr>' \
f'<th scope="col">股票代码</th>' \
f'<th scope="col">策略表现</th>' \
f'<th scope="col">购买建议</th>' \
f'</tr>' \
f'</thead>' \
f'<tbody>'
for index, row in df.iterrows():
pills_content += f'<tr><th scope="row">{row["ts_code"]}</th>' \
f'<td>{row["history"]}</td>' \
f'<td>{row["close"]}</td>'
pills_content += '</tr>'
pills_content += f'</tbody></table>'
else:
pills_content += '<p class="text-center"><span>暂无数据...</span></p>'
pills_div = f'<div class="tab-pane fade show {"active" if active else ""}" id="{id_name}">{pills_content}</div>'
return pills_div
def process_data(self, sublist):
n = 16
start_date = return_trading_day(self.trade_date, -n)
res_df = pd.DataFrame(columns=["symbol"])
# 构建子查询字符串例如SELECT * FROM table1 UNION ALL SELECT * FROM table2 ...
subquery = " UNION ALL ".join(
[f"SELECT * FROM '{table_name}' where trade_date between '{start_date}' and '{self.trade_date}'" for
table_name in sublist])
# 将子查询字符串添加到主查询中
df = self.db_main.execute_sql_to_pandas(subquery)
# print(df)
# 使用 groupby 对数据进行分组
grouped = df.groupby('ts_code')
# 遍历分组后的数据
for name, group in grouped:
group = group.reset_index(drop=True)
# print(f"ts_code: {name}")
# print(group)
selected_indexes = group[group['trade_date'] == self.trade_date].index
if selected_indexes.empty or selected_indexes.values[0] != n:
continue
now_close, now_high, symbol, ts_code = group.loc[selected_indexes, 'close'].values[0], \
group.loc[selected_indexes, 'high'].values[0], \
group.loc[selected_indexes, 'ts_code'].values[0].split('.')[0], \
group.loc[selected_indexes, 'ts_code'].values[0]
max_high = group.iloc[0:n]['high'].max()
cond = [
now_close > max_high,
max_high > group.loc[selected_indexes - 1, 'high'].values[0]
]
if not all(cond):
continue
# 计算倾斜角度
skewness = round(sum(np.diff(group.iloc[0:n]['high'].tolist())), 3)
turnover_rate, volume_ratio = group.loc[selected_indexes, 'turnover_rate'].values[0], \
group.loc[selected_indexes, 'volume_ratio'].values[0]
cond = [
turnover_rate > 5,
volume_ratio > 1.5
]
if not all(cond):
continue
logger.info("====达成条件,开始进行进一步筛选====")
# 获取所属板块
gn_df = self.sector.get_gn_sector_by_stock_code(symbol)
hy_df = self.sector.get_hy_sector_by_stock_code(symbol)
logger.info(
f"code={symbol},now_close={now_close},last_high={max_high},"
f"换手率={turnover_rate},量比={volume_ratio}"
f"趋势={skewness},"
f"所属行业板块:{hy_df['sector_name'].to_numpy()},所属概念板块:{gn_df['sector_name'].to_numpy()}")
res_df = res_df.append({
"symbol": symbol,
"ts_code": ts_code,
"趋势": skewness,
"行业板块": hy_df['sector_name'].to_numpy(),
"概念板块": gn_df['sector_name'].to_numpy()
}, ignore_index=True)
return res_df
def more_formula(self, sublist):
# 获取删选出的结果集
df = self.process_data(sublist=sublist)
# 获取指标需要用到的日期数据
start_date = return_trading_day(self.trade_date, -250)
# 获取数据
for index, row in df.iterrows():
table_name = row["symbol"] + "_daily"
# 使用 and_ 和 between 执行范围查询
stock_daily = get_stock_daily(table_name=table_name)
res_df = self.db_main.pandas_query_by_condition(
model=stock_daily,
query_condition=and_(stock_daily.trade_date >= start_date))
selected_indexes = res_df[res_df['trade_date'] == self.trade_date].index
c, o, v = res_df['close'], res_df['open'], res_df['volume']
# 获取MA
res_df['MA5'], res_df['MA10'], res_df['MA20'], res_df['MA60'], res_df['MA120'], res_df['MA250'] = \
MA(c, 5), MA(c, 10), MA(c, 20), MA(c, 60), MA(c, 120), MA(c, 250)
# 出现 JXNH 买点
res_df['JXNH'] = JXNH(c, o, v)
# 未来三天的整体表现
pass
def junit_strategy(self):
# 测量代码块的执行时间
start_time_total = time.time()
table_array = []
for result in self.code_res:
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
if s_type in self.tdx_util.SECURITY_TYPE:
table_name = str(result.ts_code).split(".")[0] + "_daily"
table_array.append(table_name)
# 将 table_array 分成 num_sub_lists 个子列表
num_sub_lists = 10
chunk_size = len(table_array) // num_sub_lists
table_sub_lists = [table_array[i:i + chunk_size] for i in range(0, len(table_array), chunk_size)]
res_df = pd.DataFrame(columns=["symbol"])
# 对每个子列表执行查询并合并结果
for sublist in table_sub_lists:
res_df = res_df.append(self.process_data(sublist))
print(res_df.reset_index(drop=True))
# tdx_util = TdxUtil('D:\\new_tdx')
# tdx_util.set_zxg_file(cont=res_df['symbol'])
# 结束时间
logger.info(f"总执行时间: {time.time() - start_time_total / 60:.2f}")
self.future(res_df)
def future(self, df):
n = 3
end_date = return_trading_day(self.trade_date, n)
for index, row in df.iterrows():
stock_daily = get_stock_daily(table_name=row['symbol'] + "_daily")
future_df = self.db_main.pandas_query_by_condition(
model=stock_daily,
query_condition=stock_daily.trade_date.between(self.trade_date, end_date))
selected_indexes = future_df[future_df['trade_date'] == self.trade_date].index
print(selected_indexes)
if selected_indexes.empty:
continue
now_chg = future_df.loc[selected_indexes, 'pct_chg'].values[0]
tomorrow_chg = future_df.loc[selected_indexes + 1, 'pct_chg'].values[0]
sum_chg = round(future_df.iloc[1:n + 1]["pct_chg"].sum(), 2)
# todo 将结果输出到CSV文件中
# df.to_csv(f'/Users/renmeng/work_space/python_work/qnloft-get-web-everything/股票金融/量化交易/股票数据/{code}.csv',
# mode='a', header=False, index=False)
logger.info(f"未来走势-----》》》》")
logger.info(
f'{row["symbol"]} 】板块:【{row["行业板块"]}】,【{row["概念板块"]}】-- > \n'
f'当日涨幅:{now_chg},趋势:{row["趋势"]} \n'
f'{future_df["trade_date"][0]} 涨幅 -- > {tomorrow_chg} , 三日总涨幅:{sum_chg}'
)
if __name__ == '__main__':
date_obj = datetime.strptime('20231016', "%Y%m%d")
opt_stack = OptionalStock(trade_date=date_obj)
opt_stack.junit_strategy()
# print(opt_stack.html_page_data())