跟新全部文件

This commit is contained in:
rm 2023-11-24 14:42:22 +08:00
parent e2db3cf16e
commit 7cce592887
14 changed files with 1626 additions and 1 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.pyc
.idea/
*.log

View File

@ -4,7 +4,7 @@ from DB.model.StockDailyFreq import get_stock_daily_freq
from DB.sqlite_db_main import SqliteDbMain, config
from utils.tdxUtil import TdxUtil
from utils.comm import *
from 基本信息入库 import StockInfoMain
from fp.基本信息入库 import StockInfoMain
class StockDailyFreqMain:

186
fp/日线数据入库.py Normal file
View File

@ -0,0 +1,186 @@
import sys
from loguru import logger
from DB.model.StockDaily import get_stock_daily
from DB.sqlite_db_main import SqliteDbMain, config
from utils.tdxUtil import TdxUtil
from utils.comm import *
from fp.基本信息入库 import StockInfoMain
class StockDailyMain:
def __init__(self, ts_code=None, symbol=None, restart_id=0):
# 配置日志输出到文件和控制台
logger.add("../log/StockDailyMain.log", rotation="500 MB", level="INFO")
logger.add(sys.stderr, level="INFO")
self.code_res = StockInfoMain().get_stock_basic(ts_code=ts_code, symbol=symbol, restart_id=restart_id)
self.db_main = SqliteDbMain(config.stock_daily_db)
self.tdx_util = TdxUtil("")
def __filter_stock(self, code, name):
if 'ST' in name:
return False
if code.startswith('30'):
return False
if code.startswith('68'):
return False
return True
def init_data(self):
"""
全量入库操作
:return:
"""
for result in self.code_res:
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=result.ts_code)
if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name):
logger.info(f"{result.id}{result.ts_code}{result.name} 开始获取 daily 和 daily_basic_ts 数据!")
self.save_by_code(result.ts_code)
def save_by_code(self, ts_code):
df, df_basic = self.get_daily_data(ts_code=ts_code)
if df is not None and df_basic is not None:
# 创建表
table_name = str(ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
self.db_main.create_table(new_table_class)
entries = []
for index, row in df.iterrows():
if '停牌' != row['trade_status']:
basic_row = df_basic[df_basic['trade_date'] == row['trade_date']]
if len(basic_row) > 0:
basic_row = basic_row.drop(columns=['ts_code', 'trade_date', 'close'])
if not basic_row.empty:
row = pd.concat([row, basic_row.iloc[0]])
entry = new_table_class(**row)
entries.append(entry)
# 保存记录
self.db_main.insert_all_entry(entries)
def task_data(self, trade_date=datetime.now().strftime('%Y%m%d')):
"""
每日定时任务补充数据操作
:param trade_date:
:return:
"""
df, df_basic = pd.DataFrame(), pd.DataFrame()
# 日线行情数据
for _ in range(5):
try:
df = xcsc_pro.daily(trade_date=trade_date)[::-1]
break
except Exception as e:
logger.error(f"获取 [{trade_date}] 日线行情 出现问题:", e)
time.sleep(1)
continue
else:
logger.error("daily 重试5次依然没有解决问题请重视~")
# 获取全部股票每日重要的基本面指标
for _ in range(5):
try:
df_basic = xcsc_pro.daily_basic_ts(trade_date=trade_date)[::-1]
break
except Exception as e:
logger.error(f"获取 [{trade_date}] daily_basic_ts 出现问题:", e)
time.sleep(1)
continue
else:
logger.error("daily_basic_ts 重试5次依然没有解决问题请重视~")
df_basic = df_basic.fillna(0)
print(f" 获取 daily 和 daily_basic_ts 数据完毕!")
for index, row in df.iterrows():
ts_code = row['ts_code']
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=ts_code)
if s_type in tdx_util.SECURITY_TYPE and '交易' == row['trade_status']:
basic_row = df_basic[df_basic['ts_code'] == row['ts_code']]
basic_row = basic_row.drop(columns=['ts_code', 'trade_date', 'close'])
row = pd.concat([row, basic_row.iloc[0]])
table_name = str(ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
entry = new_table_class(**row)
# 判断表是否存在
if self.db_main.has_table(entry):
self.db_main.insert_entry(entry)
# 批量创建trade_date唯一索引
def create_index(self):
for result in self.code_res:
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=result.ts_code)
if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name):
print(f"{result.id}{result.ts_code}{result.name} 开始创建索引!")
table_name = str(result.ts_code).split(".")[0] + "_daily"
sql = f"CREATE UNIQUE INDEX idx_unique_trade_date_{table_name} ON `{table_name}`(trade_date)"
self.db_main.execute_sql(sql)
# 删除指定日期的数据
def del_data_by_date(self, trade_date):
for result in self.code_res:
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=result.ts_code)
if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name):
table_name = str(result.ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
# 判断表是否存在
if self.db_main.has_table(new_table_class):
delete_condition = new_table_class.trade_date == trade_date
self.db_main.delete_by_condition(new_table_class, delete_condition)
def get_daily_data(self, ts_code):
i = 0
df, df_basic = None, None
while True:
try:
if i > 6:
break
# 尝试执行请求操作
# 如果成功,可以跳出循环
df = xcsc_pro.daily(ts_code=ts_code)[::-1]
df_basic = xcsc_pro.daily_basic_ts(ts_code=ts_code)[::-1]
df_basic = df_basic.fillna(0)
logger.info(f"{ts_code} 获取 daily 和 daily_basic_ts 数据完毕!")
break
except Exception as e:
i += 1
# 捕获超时异常
logger.warning("请求超时等待2分钟后重试...")
time.sleep(120) # 休眠2分钟
return df, df_basic
def update_stock_basic(self):
"""
更新基础表补齐股票表
:return:
"""
# 1. 取 中国A股基本资料
StockInfoMain().insert_stock_basic()
self.code_res = StockInfoMain().get_stock_basic()
for result in self.code_res:
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
if s_type not in self.tdx_util.SECURITY_TYPE:
continue
# 创建表
table_name = str(result.ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
# 2. 判断表是否存在,不存在则创建
if not self.db_main.has_table(new_table_class):
logger.info(f"{result.ts_code} 表不存在,开始创建!")
# 3. 补齐数据
self.save_by_code(result.ts_code)
if __name__ == '__main__':
# current_date = datetime.now()
current_date = datetime.strptime('20231024', "%Y%m%d")
if if_run(current_date):
trade_date = current_date.strftime('%Y%m%d')
main = StockDailyMain()
# main.init_data() # 初始化全市场的数据
main.task_data(trade_date) # 指定日期数据入库
# main.create_index() # 创建唯一索引
# main.del_data_by_date('20230915') # 删除指定日期数据
# main.save_by_code(ts_code='000050.SZ') # 指定更新的股票
# main.update_stock_basic()

264
fp/板块数据入库.py Normal file
View File

@ -0,0 +1,264 @@
from pyecharts import options as opts
from pyecharts.charts import Bar
from DB.model.ConceptSector import ConceptSector
from DB.model.IndustrySector import IndustrySector
from DB.model.StockByConceptSector import StockByConceptSector
from DB.model.StockByIndustrySector import StockByIndustrySector
from DB.sqlite_db_main import SqliteDbMain, config
from utils.comm import *
from utils.stock_web_api import execute_stock_web_api_method, stock_web_api_industry_summary, \
stock_web_api_concept_name
class SectorOpt:
def __init__(self, trade_date=datetime.now()):
self.trade_date = trade_date.strftime('%Y%m%d')
self.db_main = SqliteDbMain(config.stock_sector_db)
def save_hy_sector(self):
"""
行业板块
:return:
"""
print("开始拉取行业板块数据--->>")
hy_sector_df = execute_stock_web_api_method(func=stock_web_api_industry_summary)
if hy_sector_df is None:
return None
entries = []
for index, row in hy_sector_df.iterrows():
entry = IndustrySector(
trade_date=self.trade_date,
sector_name=row['板块'],
pct_change=row['涨跌幅'],
total_volume=row['总成交量'],
total_turnover=row['总成交额'],
net_inflows=row['净流入'],
rising_count=row['上涨家数'],
falling_count=row['下跌家数'],
average_price=row['均价'],
leading_stock=row['领涨股'],
leading_stock_latest_price=row['领涨股-最新价'],
leading_stock_pct_change=row['领涨股-涨跌幅']
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
print("<<---行业板块数据拉取结束")
self.db_main.get_db_size()
def save_gn_sector(self):
"""
概念板块
:return:
"""
print("开始拉取概念板块数据--->>")
# 概念当日的涨幅情况
ths_gn_df = execute_stock_web_api_method(func=stock_web_api_concept_name)
if ths_gn_df is None:
return None
entries = []
for index, row in ths_gn_df.iterrows():
entry = ConceptSector(
trade_date=self.trade_date,
sector_name=row['板块名称'],
sector_code=row['板块代码'],
pct_change=row['涨跌幅'],
total_market=row['总市值'],
rising_count=row['上涨家数'],
falling_count=row['下跌家数'],
new_price=row['最新价'],
leading_stock=row['领涨股票'],
leading_stock_pct_change=row['领涨股票-涨跌幅']
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
print("<<---概念板块数据拉取结束")
self.db_main.get_db_size()
def stock_by_gn_sector(self):
"""
获取概念板块个股建议每天更新一次
:return:
"""
for _ in range(5):
try:
# 清空表数据
self.db_main.delete_all_table(StockByConceptSector)
ths_gn_df = ak.stock_board_concept_name_em()
break
except Exception as e:
print("概念板块个股数据拉取错误:", e)
time.sleep(1)
continue
else:
print("概念板块个股数据拉取失败!!!")
return None
entries = []
for index, row in ths_gn_df.iterrows():
bk, bk_code = row['板块名称'], row['板块代码']
print(f"----------- {bk} -------------")
for _ in range(5):
try:
stock_gn_df = ak.stock_board_concept_cons_em(symbol=bk)
break
except Exception as e:
print("概念板块个股-板块成份股 数据拉取错误:", e)
time.sleep(1)
continue
else:
print("概念板块个股-板块成份股 数据拉取失败!!!")
return None
print(stock_gn_df['代码'])
for s_index, s_row in stock_gn_df.iterrows():
entry = StockByConceptSector(
update_date=datetime.now().strftime('%Y%m%d'),
sector_name=bk,
sector_code=row['板块代码'],
stock_name=s_row['名称'],
stock_code=s_row['代码'],
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
def stock_by_hy_sector(self):
"""
获取行业板块个股建议每周更新一次
:return:
"""
for _ in range(5):
try:
# 清空表数据
self.db_main.delete_all_table(StockByIndustrySector)
hy_sector_df = ak.stock_board_industry_summary_ths()
break
except Exception as e:
print("行业板块个股数据拉取错误:", e)
time.sleep(1)
continue
else:
print("行业板块个股数据拉取失败!!!")
return None
entries = []
for index, row in hy_sector_df.iterrows():
bk, bk_code = row['板块'], row['序号']
print(f"----------- {bk} -------------")
for _ in range(5):
try:
stock_bk_df = ak.stock_board_industry_cons_ths(symbol=bk)
break
except Exception as e:
print("行业板块个股数据拉取错误:", e)
time.sleep(1)
continue
else:
print("行业板块个股数据拉取失败!!!")
return None
for s_index, s_row in stock_bk_df.iterrows():
entry = StockByIndustrySector(
update_date=datetime.now().strftime('%Y%m%d'),
sector_name=bk,
sector_code=row['序号'],
stock_name=s_row['名称'],
stock_code=s_row['代码'],
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
def get_hy_sector_by_stock_code(self, symbol):
"""
根据股票代码查询所属 行业板块
:param symbol:
:return:
"""
query = self.db_main.session.query(StockByIndustrySector) \
.filter(StockByIndustrySector.stock_code == symbol).order_by(StockByIndustrySector.id)
return self.db_main.pandas_query_by_sql(stmt=query.statement).reset_index()
def get_gn_sector_by_stock_code(self, symbol):
"""
根据股票代码查询所属 概念板块
:param symbol:
:return:
"""
query = self.db_main.session.query(StockByConceptSector) \
.filter(StockByConceptSector.stock_code == symbol).order_by(StockByConceptSector.id)
return self.db_main.pandas_query_by_sql(stmt=query.statement).reset_index()
def industry_generate_chart(self):
"""
行业板块
:return:
"""
industry_num_limit_sql = f"select sector_name from stock_industry_sector where trade_date = '{self.trade_date}' ORDER BY pct_change desc limit 15"
names_res = self.db_main.execute_sql(industry_num_limit_sql)
# 提取结果中的单个列数据(假设你要提取 "收盘价" 这一列)
sector_names = [result.sector_name for result in names_res]
data_sql = f"SELECT sec.trade_date,sec.pct_change from " \
f"stock_industry_sector sec , " \
f"({industry_num_limit_sql}) sec_n " \
f"where sec.sector_name = sec_n.sector_name " \
f"and trade_date in (select trade_date from stock_industry_sector GROUP BY trade_date order by trade_date desc LIMIT 3)"
data_result = self.db_main.execute_sql(data_sql)
df = pd.DataFrame(data_result, columns=['trade_date', 'pct_change'])
# print(df)
grouped = df.groupby('trade_date')['pct_change'].apply(list).reset_index()
self.__generate_chart("行业板块 —— 复盘", grouped, sector_names, 'industry_generate_chart')
def concept_generate_chart(self):
"""
概念板块
:return:
"""
concept_num_limit_sql = f"select sector_name from stock_concept_sector where trade_date = '{self.trade_date}' ORDER BY pct_change desc limit 15"
names_res = self.db_main.execute_sql(concept_num_limit_sql)
# 提取结果中的单个列数据(假设你要提取 "收盘价" 这一列)
sector_names = [result.sector_name for result in names_res]
data_sql = f"SELECT sec.trade_date,sec.pct_change from " \
f"stock_concept_sector sec , " \
f"({concept_num_limit_sql}) sec_n " \
f"where sec.sector_name = sec_n.sector_name " \
f"and trade_date in (select trade_date from stock_concept_sector GROUP BY trade_date order by trade_date desc LIMIT 3)"
data_result = self.db_main.execute_sql(data_sql)
df = pd.DataFrame(data_result, columns=['trade_date', 'pct_change'])
# print(df)
grouped = df.groupby('trade_date')['pct_change'].apply(list).reset_index()
self.__generate_chart("概念板块 —— 复盘", grouped, sector_names, 'concept_generate_chart')
def __generate_chart(self, title, grouped, sector_names, file_name):
c = (
Bar(init_opts=opts.InitOpts(width="100%", height="400px"))
.add_xaxis(sector_names)
.add_yaxis(grouped.loc[0, 'trade_date'], grouped.loc[0, 'pct_change'])
.add_yaxis(grouped.loc[1, 'trade_date'], grouped.loc[1, 'pct_change'])
.add_yaxis(grouped.loc[2, 'trade_date'], grouped.loc[2, 'pct_change'])
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
yaxis_opts=opts.AxisOpts(offset=5),
xaxis_opts=opts.AxisOpts(offset=5, axislabel_opts=opts.LabelOpts(rotate=45, interval=0)),
legend_opts=opts.LegendOpts(pos_top="8%")
)
.render(f"../html/{self.trade_date}/{file_name}.html")
)
if __name__ == '__main__':
current_date = datetime.now()
if if_run(current_date):
sector = SectorOpt(current_date)
# 行业板块
# sector.save_hy_sector()
# 概念板块
# sector.save_gn_sector()
# 生成报表
# sector.industry_generate_chart()
# sector.concept_generate_chart()
# -------》 一下建议一周更新一次
# 行业板块个股
sector.stock_by_hy_sector()
# 感念板块个股
sector.stock_by_gn_sector()

98
fp/涨跌停数据.py Normal file
View File

@ -0,0 +1,98 @@
from pyecharts.charts import Line
from pyecharts import options as opts
from utils.comm import *
class LimitList:
def __init__(self, trade_date):
self.trade_date = trade_date.strftime('%Y%m%d')
self.start_date = return_trading_day(trade_date, -7)
self.end_date = self.trade_date
self.limit_df = pd.DataFrame()
for _ in range(5):
try:
# 涨跌停列表
self.limit_df = xcsc_pro.limit_list_d(start_date=self.start_date, end_date=self.end_date)
break
except Exception as e:
print("涨跌停列表数据获取失败,", e)
time.sleep(1)
continue
else:
print("涨跌停列表数据获取失败次数过多,请关注!!!")
def limit_list_chart(self):
# 按照 trade_date 和 limit 进行分组统计
grouped = self.limit_df.groupby(['trade_date', 'limit']).size().reset_index(name='count')
# 使用 pivot_table 将 limit 类型作为列,统计数量
pivot_table = grouped.pivot_table(index='trade_date', columns='limit', values='count', fill_value=0)
# print(pivot_table)
# D跌停U涨停Z炸板
# 获取不同 limit 类型的数据
u_list = pivot_table['U'].tolist()
d_list = pivot_table['D'].tolist()
z_list = pivot_table['Z'].tolist()
trade_dates = pivot_table.index.tolist()
c = (
Line(init_opts=opts.InitOpts(width="100%", height="380px"))
.add_xaxis(trade_dates)
.add_yaxis("跌停数", d_list)
.add_yaxis("涨停数", u_list)
.add_yaxis("炸板数", z_list)
.set_global_opts(
tooltip_opts=opts.TooltipOpts(trigger="axis"),
title_opts=opts.TitleOpts(title="涨跌停 —— 复盘", pos_left="center"),
yaxis_opts=opts.AxisOpts(offset=5),
xaxis_opts=opts.AxisOpts(offset=5, axislabel_opts=opts.LabelOpts(rotate=45, interval=0)),
legend_opts=opts.LegendOpts(pos_top="8%")
)
.render(f"../html/{self.trade_date}/limit_list_chart.html")
)
def html_page_data(self):
"""
生成HTML文件
:return:
"""
print(self.limit_df)
df = self.limit_df[self.limit_df['trade_date'] == self.end_date].fillna(0).sort_values(
by=["industry", "limit_times"],
ascending=False).reset_index(drop=True)
table_content = f'<table class="table table-hover"><thead>' \
f'<tr>' \
f'<th scope="col">股票代码</th>' \
f'<th scope="col">所属行业</th>' \
f'<th scope="col">股票名称</th>' \
f'<th scope="col">换手率</th>' \
f'<th scope="col">连板数</th>' \
f'<th scope="col">状态</th>' \
f'</tr>' \
f'</thead>' \
f'<tbody>'
for index, row in df.iterrows():
table_content += f'<tr><th scope="row">{row["ts_code"]}</th>' \
f'<td>{row["industry"]}</td>' \
f'<td>{row["name"]}</td>' \
f'<td>{row["turnover_ratio"]}</td>' \
f'<td>{row["limit_times"]}</td>'
if row['limit'] == 'D':
table_content += f'<td class="table-success">跌停</td>'
elif row['limit'] == 'U':
table_content += f'<td class="table-danger">涨停</td>'
elif row['limit'] == 'Z':
table_content += f'<td class="table-warning">炸板</td>'
table_content += '</tr>'
table_content += f'</tbody></table>'
return table_content
if __name__ == '__main__':
limit = LimitList(datetime.strptime('20231108', "%Y%m%d"))
# limit.limit_list_chart()
limit.html_page_data()

309
fp/自选股数据.py Normal file
View File

@ -0,0 +1,309 @@
import sys
from sqlalchemy import and_
from DB.model.StockDaily import get_stock_daily
from DB.sqlite_db_main import SqliteDbMain, config
from utils.tdxUtil import TdxUtil
from fp.基本信息入库 import StockInfoMain
from utils.formula import *
from loguru import logger
from fp.板块数据入库 import SectorOpt
from utils.comm import *
class OptionalStock:
def __init__(self, ts_code=None, symbol=None, restart_id=0, trade_date=datetime.now()):
self.trade_date = trade_date.strftime('%Y%m%d')
# 配置日志输出到文件和控制台
logger.add("../log/OptionalStock.log", rotation="500 MB", level="INFO")
logger.add(sys.stderr, level="INFO")
self.code_res = StockInfoMain().get_stock_basic(ts_code=ts_code, symbol=symbol, restart_id=restart_id)
self.db_main = SqliteDbMain(config.stock_daily_db)
self.tdx_util = TdxUtil("")
self.sector = SectorOpt()
def stock_daily(self):
rm_strategy_df, dbj_strategy_df = pd.DataFrame(), pd.DataFrame()
for result in self.code_res:
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
if s_type in self.tdx_util.SECURITY_TYPE:
table_name = str(result.ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
df = self.db_main.pandas_query_by_model(
model=new_table_class,
order_col=new_table_class.id.desc(),
page_number=1, page_size=60
)[::-1]
if len(df) > 0:
logger.info(
f"----------------<<{self.trade_date}{result.ts_code} 开始执行 N日新高策略>>----------------")
rm_strategy_df = rm_strategy_df.append(self.n_high_strategy(df=df))
logger.info(
f"----------------<<{self.trade_date}{result.ts_code} 开始执行 连板策略>>----------------")
dbj_strategy_df = dbj_strategy_df.append(self.conn_plate_strategy(df=df))
return rm_strategy_df, dbj_strategy_df
def n_high_strategy(self, df):
rm_strategy_df = pd.DataFrame()
rm_kdj = RM_KDJ(df['close'], df['high'], df['low'], 9, 3, 3)
df["KDJ_K"], df["KDJ_D"], df["KDJ_J"] = rm_kdj["KDJ_K"], rm_kdj["KDJ_D"], rm_kdj["KDJ_J"]
total, rise, fall, tomorrow_rise, tomorrow_fall = 0, 0, 0, 0, 0
for index, row in df.iterrows():
trade_date, code, close = row["trade_date"], row['ts_code'], row['close']
d, j = row['KDJ_D'], row['KDJ_J']
# 返回所要找的数据
if trade_date == self.trade_date:
# TODO 这里有问题d和J相同
logger.info(f'{code} -- >> {trade_date} -->> d={d} --> j={j}')
# 超买区间,未来跌幅
if d > 80 and j > 100:
rm_strategy_df = rm_strategy_df.append(row)
# rise, tomorrow_rise, total = self.back_testing(df, index, rise, tomorrow_rise, total, trade_date)
rm_strategy_df.reset_index(drop=True, inplace=True)
# if total > 0:
# console = f"满足策略:{total} 次,明日上涨概率:{round((tomorrow_rise / total) * 100, 2)} %" \
# f"三日后上涨概率:{round((rise / total) * 100, 2)} %"
# # logger.info(console)
# rm_strategy_df['history'] = console
return rm_strategy_df
def back_testing(self, df, index, rise, tomorrow_rise, total, trade_date):
if index > 3:
# 明天继续下跌的概率
tomorrow_pre = df.iloc[-index:-index + 1]['pct_chg'].values[0]
# 未来三天最大涨幅
max_h = df.iloc[-index:-index + 3]['pct_chg'].max().round(2)
# 计算最大回撤
h_max = df.iloc[-index:-index + 3]['high'].max()
l_min = df.iloc[-index:-index + 3]['low'].min()
mac_ret = f"{(h_max - l_min) / l_min:.2%}"
# 平均涨幅
mean_h = df.iloc[-index:-index + 3]['pct_chg'].mean().round(2)
total += 1
content = f"{trade_date} --> 明日涨幅: {tomorrow_pre}% " \
f"--> 3日最大涨幅{max_h}%" \
f"--> 3日最大回撤{mac_ret}" \
f"--> 3日平均涨幅{mean_h}"
# if trade_date > "20230101":
# logger.info(content)
# print(content)
if tomorrow_pre > 0:
tomorrow_rise += 1
if max_h > 0:
rise += 1
return rise, tomorrow_rise, total
def ma_strategy(self, df):
# MA5:当MA5在全部MA的最上面时
MA5, MA10, MA20, MA60 = MA(df['close'], 5), MA(df['close'], 10), MA(df['close'], 20), MA(
df['close'], 60)
pass
def conn_plate_strategy(self, df):
dbj_strategy_df = pd.DataFrame()
df['JXNH'] = JXNH(df['close'], df['open'], df['volume'])
total, rise, fall, tomorrow_rise, tomorrow_fall = 0, 0, 0, 0, 0
for index, row in df.iterrows():
trade_date, code, close, jxnh = row["trade_date"], row['ts_code'], row['close'], row['JXNH']
if jxnh:
# 返回所要找的数据
if trade_date == self.trade_date:
dbj_strategy_df = dbj_strategy_df.append(row)
rise, tomorrow_rise, total = self.back_testing(df, index, rise, tomorrow_rise, total, trade_date)
dbj_strategy_df.reset_index(drop=True, inplace=True)
if total > 0:
console = f"满足策略:{total} 次,明日上涨概率:{round((tomorrow_rise / total) * 100, 2)} %" \
f"三日后上涨概率:{round((rise / total) * 100, 2)} %"
# print(console)
# logger.info(console)
dbj_strategy_df['history'] = console
return dbj_strategy_df
def html_page_data(self):
rm_strategy_df, dbj_strategy_df = self.stock_daily()
# tdx_util = TdxUtil('D:\\new_tdx')
# tdx_util.set_zxg_file(cont=dbj_strategy_df['ts_code'])
# 策略1
id_name = "v-pills-rm"
rm_strategy_nav_pills = f'<button class="nav-link active" data-target="#{id_name}" type="button">RM策略自选</button>'
rm_strategy_nav_pills_content = self.__get_pills_content(rm_strategy_df, id_name, active=True)
# 策略2
id_name = "v-pills-dbj"
dbj_strategy_nav_pills = f'<button class="nav-link" data-target="#{id_name}" type="button">大保健策略自选</button>'
dbj_strategy_nav_pills_content = self.__get_pills_content(dbj_strategy_df, id_name)
res_html = f'<div class="row border p-3"><div class="col-3"><div class="nav flex-column nav-pills" id="v-pills-tab">' \
f'{rm_strategy_nav_pills}' \
f'{dbj_strategy_nav_pills}' \
f'</div></div>' \
f'<div class="col-9"><div class="tab-content" id="v-pills-tabContent">' \
f'{rm_strategy_nav_pills_content}' \
f'{dbj_strategy_nav_pills_content}' \
f'</div></div></div>'
return res_html
def __get_pills_content(self, df, id_name, active=False):
pills_content = ''
if len(df) > 0:
pills_content += f'<table class="table table-hover"><thead>' \
f'<tr>' \
f'<th scope="col">股票代码</th>' \
f'<th scope="col">策略表现</th>' \
f'<th scope="col">购买建议</th>' \
f'</tr>' \
f'</thead>' \
f'<tbody>'
for index, row in df.iterrows():
pills_content += f'<tr><th scope="row">{row["ts_code"]}</th>' \
f'<td>{row["history"]}</td>' \
f'<td>{row["close"]}</td>'
pills_content += '</tr>'
pills_content += f'</tbody></table>'
else:
pills_content += '<p class="text-center"><span>暂无数据...</span></p>'
pills_div = f'<div class="tab-pane fade show {"active" if active else ""}" id="{id_name}">{pills_content}</div>'
return pills_div
def process_data(self, sublist):
n = 16
start_date = return_trading_day(self.trade_date, -n)
res_df = pd.DataFrame(columns=["symbol"])
# 构建子查询字符串例如SELECT * FROM table1 UNION ALL SELECT * FROM table2 ...
subquery = " UNION ALL ".join(
[f"SELECT * FROM '{table_name}' where trade_date between '{start_date}' and '{self.trade_date}'" for
table_name in sublist])
# 将子查询字符串添加到主查询中
df = self.db_main.execute_sql_to_pandas(subquery)
# print(df)
# 使用 groupby 对数据进行分组
grouped = df.groupby('ts_code')
# 遍历分组后的数据
for name, group in grouped:
group = group.reset_index(drop=True)
# print(f"ts_code: {name}")
# print(group)
selected_indexes = group[group['trade_date'] == self.trade_date].index
if selected_indexes.empty or selected_indexes.values[0] != n:
continue
now_close, now_high, symbol, ts_code = group.loc[selected_indexes, 'close'].values[0], \
group.loc[selected_indexes, 'high'].values[0], \
group.loc[selected_indexes, 'ts_code'].values[0].split('.')[0], \
group.loc[selected_indexes, 'ts_code'].values[0]
max_high = group.iloc[0:n]['high'].max()
cond = [
now_close > max_high,
max_high > group.loc[selected_indexes - 1, 'high'].values[0]
]
if not all(cond):
continue
# 计算倾斜角度
skewness = round(sum(np.diff(group.iloc[0:n]['high'].tolist())), 3)
turnover_rate, volume_ratio = group.loc[selected_indexes, 'turnover_rate'].values[0], \
group.loc[selected_indexes, 'volume_ratio'].values[0]
cond = [
turnover_rate > 5,
volume_ratio > 1.5
]
if not all(cond):
continue
logger.info("====达成条件,开始进行进一步筛选====")
# 获取所属板块
gn_df = self.sector.get_gn_sector_by_stock_code(symbol)
hy_df = self.sector.get_hy_sector_by_stock_code(symbol)
logger.info(
f"code={symbol},now_close={now_close},last_high={max_high},"
f"换手率={turnover_rate},量比={volume_ratio}"
f"趋势={skewness},"
f"所属行业板块:{hy_df['sector_name'].to_numpy()},所属概念板块:{gn_df['sector_name'].to_numpy()}")
res_df = res_df.append({
"symbol": symbol,
"ts_code": ts_code,
"趋势": skewness,
"行业板块": hy_df['sector_name'].to_numpy(),
"概念板块": gn_df['sector_name'].to_numpy()
}, ignore_index=True)
return res_df
def more_formula(self, sublist):
# 获取删选出的结果集
df = self.process_data(sublist=sublist)
# 获取指标需要用到的日期数据
start_date = return_trading_day(self.trade_date, -250)
# 获取数据
for index, row in df.iterrows():
table_name = row["symbol"] + "_daily"
# 使用 and_ 和 between 执行范围查询
stock_daily = get_stock_daily(table_name=table_name)
res_df = self.db_main.pandas_query_by_condition(
model=stock_daily,
query_condition=and_(stock_daily.trade_date >= start_date))
selected_indexes = res_df[res_df['trade_date'] == self.trade_date].index
c, o, v = res_df['close'], res_df['open'], res_df['volume']
# 获取MA
res_df['MA5'], res_df['MA10'], res_df['MA20'], res_df['MA60'], res_df['MA120'], res_df['MA250'] = \
MA(c, 5), MA(c, 10), MA(c, 20), MA(c, 60), MA(c, 120), MA(c, 250)
# 出现 JXNH 买点
res_df['JXNH'] = JXNH(c, o, v)
# 未来三天的整体表现
pass
def junit_strategy(self):
# 测量代码块的执行时间
start_time_total = time.time()
table_array = []
for result in self.code_res:
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
if s_type in self.tdx_util.SECURITY_TYPE:
table_name = str(result.ts_code).split(".")[0] + "_daily"
table_array.append(table_name)
# 将 table_array 分成 num_sub_lists 个子列表
num_sub_lists = 10
chunk_size = len(table_array) // num_sub_lists
table_sub_lists = [table_array[i:i + chunk_size] for i in range(0, len(table_array), chunk_size)]
res_df = pd.DataFrame(columns=["symbol"])
# 对每个子列表执行查询并合并结果
for sublist in table_sub_lists:
res_df = res_df.append(self.process_data(sublist))
print(res_df.reset_index(drop=True))
# tdx_util = TdxUtil('D:\\new_tdx')
# tdx_util.set_zxg_file(cont=res_df['symbol'])
# 结束时间
logger.info(f"总执行时间: {time.time() - start_time_total / 60:.2f}")
self.future(res_df)
def future(self, df):
n = 3
end_date = return_trading_day(self.trade_date, n)
for index, row in df.iterrows():
stock_daily = get_stock_daily(table_name=row['symbol'] + "_daily")
future_df = self.db_main.pandas_query_by_condition(
model=stock_daily,
query_condition=stock_daily.trade_date.between(self.trade_date, end_date))
selected_indexes = future_df[future_df['trade_date'] == self.trade_date].index
print(selected_indexes)
if selected_indexes.empty:
continue
now_chg = future_df.loc[selected_indexes, 'pct_chg'].values[0]
tomorrow_chg = future_df.loc[selected_indexes + 1, 'pct_chg'].values[0]
sum_chg = round(future_df.iloc[1:n + 1]["pct_chg"].sum(), 2)
# todo 将结果输出到CSV文件中
# df.to_csv(f'/Users/renmeng/work_space/python_work/qnloft-get-web-everything/股票金融/量化交易/股票数据/{code}.csv',
# mode='a', header=False, index=False)
logger.info(f"未来走势-----》》》》")
logger.info(
f'{row["symbol"]} 】板块:【{row["行业板块"]}】,【{row["概念板块"]}】-- > \n'
f'当日涨幅:{now_chg},趋势:{row["趋势"]} \n'
f'{future_df["trade_date"][0]} 涨幅 -- > {tomorrow_chg} , 三日总涨幅:{sum_chg}'
)
if __name__ == '__main__':
date_obj = datetime.strptime('20231016', "%Y%m%d")
opt_stack = OptionalStock(trade_date=date_obj)
opt_stack.junit_strategy()
# print(opt_stack.html_page_data())

250
fp/行情指数数据.py Normal file
View File

@ -0,0 +1,250 @@
from datetime import timedelta
import numpy as np
import pandas
from utils.comm import *
class AllIndex:
# 中国的指数
ZH_INDEX = pd.DataFrame(
[["上证指数", "000001.SH"], ["深证成指", "399001.SZ"], ["创业板指", "399006.SZ"], ["中证100", "000903.SH"],
["中证100", "399903.SZ"]],
columns=['name', 'value'])
# 世界的指数
GLOBAL_INDEX = ['道琼斯', '标普500', '纳斯达克', '恒生指数', '日经225', '韩国KOSPI', '美元指数']
METAL_INDEX = pd.DataFrame(
[["黄金", "Au99.99"]
# , ["白银", "Ag99.99"]
], columns=['name', 'value'])
def __init__(self, trade_date):
self.trade_date = trade_date.strftime('%Y%m%d')
self.start_date = return_trading_day(trade_date, -29)
self.end_date = self.trade_date
def calendar_page_json(self):
"""
日历页面数据只要A股指数
获取1年内的全部数控生成一个json文件
:return:
"""
# 计算一年前的日期
start_date = datetime.strptime(self.end_date, '%Y%m%d') - timedelta(days=365)
start_date = start_date.strftime("%Y%m%d")
# 在首页显示 这三个的当日最新涨跌情况
index_page_display = ["000001.SH", "399001.SZ", "399006.SZ"]
res_pandas = pandas.DataFrame(columns=['title', 'start', 'textColor', 'backgroundColor'])
for stock in index_page_display:
for _ in range(5):
try:
# 获取单个指数日线行情
df = xcsc_pro.index_daily(
ts_code=stock,
start_date=start_date,
end_date=self.end_date,
fields="ts_code,trade_date,pct_chg")
break
except Exception as e:
print("获取单个指数日线行情 出现问题:", e)
time.sleep(1)
continue
else:
print("重试5次依然没有解决问题请重视~")
return None
# 获取对应的中文名称
name = self.ZH_INDEX.loc[self.ZH_INDEX['value'] == stock, 'name'].iloc[0]
df['pct_chg_format'] = (round(df['pct_chg'], 2)).astype("str") + "%"
df['title'] = name + " --> " + df['pct_chg_format']
# 将字符串转换为日期时间对象
df['trade_date'] = pd.to_datetime(df['trade_date'])
# 格式化日期时间列为 "YYYY-MM-DD" 形式
df['start'] = df['trade_date'].dt.strftime("%Y-%m-%d")
# 设置条件
conditions = [
(df['pct_chg'] > 1),
(df['pct_chg'] < -1),
((-1 <= df['pct_chg']) & (df['pct_chg'] < 0)),
((0 < df['pct_chg']) & (df['pct_chg'] <= 1))
]
# 字体颜色 当涨跌幅大于1的时候将字体颜色设置成白色 #e60000
text_colors = ['#ffffff', '#ffffff', '#5fcf57', '#e60000']
df['textColor'] = np.select(conditions, text_colors)
# 背景颜色
background_colors = ['#e60000', '#5fcf57', '#ffffff', '#ffffff']
df['backgroundColor'] = np.select(conditions, background_colors)
df = df[['title', 'start', 'textColor', 'backgroundColor']]
res_pandas = pd.concat([res_pandas, df], ignore_index=True)
return res_pandas
def html_page_data(self):
# A股指数
a_all_df, a_table_date, a_home_page_df = self.a_data()
# 全球部分指数
g_all_df, g_table_date, g_home_page_df = self.global_index()
# 贵金属行情
s_all_df, s_table_date, s_home_page_df = self.spot_hist_sge()
# 生成首页的徽章
badge = ""
badge += self.__create_badge(a_home_page_df)
badge += self.__create_badge(g_home_page_df)
badge += self.__create_badge(s_home_page_df)
# 生成 指数数据 页
card_deck_df = pd.concat([a_table_date, g_table_date], axis=0)
card_deck_df = pd.concat([card_deck_df, s_table_date], axis=0)
card_deck = self.__create_card_deck(card_deck_df)
return badge, card_deck
def __create_badge(self, df):
# 生成首页的徽章
badge = ""
for index, row in df.iterrows():
badge_color = "badge-success"
if row['pct_chg'] > 0:
badge_color = "badge-danger"
badge += f"<div class='col-md-auto'><span class='badge badge-pill {badge_color}'>{row['name']}{row['pct_chg']}</span></div>"
return badge
def __create_card_deck(self, df):
grouped = df.groupby('name')
card_deck = '<div class="card-deck">'
row = 1
for group_name, group_df in grouped:
card_deck_color = "border-success"
if group_df.head(1)["pct_chg"].values[0] > 0:
card_deck_color = "border-danger"
card_deck += f'<div class="card {card_deck_color} mb-3" style="max-width: 14rem;">' \
f'<div class="card-header">{group_name}</div>' \
f'<div class="card-body {"text-danger" if group_df.head(1)["pct_chg"].values[0] > 0 else "text-success"}">' \
f'<h4 class="card-title">{round(group_df.head(1)["close"].values[0], 2)}</h4>' \
f'<p class="card-text row">' \
f'<span class="{"text-danger" if group_df.head(2)["pct_chg"].values[1] > 0 else "text-success"} ml-3">' \
f'{round(group_df.head(2)["close"].values[1], 2)}' \
f'</span>' \
f'<span class="{"text-danger" if group_df.head(3)["pct_chg"].values[2] > 0 else "text-success"} ml-auto mr-3">' \
f'{round(group_df.head(3)["close"].values[2], 2)}' \
f'</span>' \
f'</p>' \
f'</div>' \
f'</div>'
if row % 5 == 0:
card_deck += '</div>'
print(f"{row - 1} -- >> {len(grouped)}")
if row != len(grouped):
card_deck += '<div class="card-deck">'
if row % 5 != 0 and row == len(grouped):
card_deck += '</div>'
row += 1
return card_deck
def a_data(self):
"""
A股指数数据
返回 1个月内31
:return:
"""
# 在首页显示 这三个的当日最新涨跌情况
index_page_display = ["000001.SH", "399001.SZ", "399006.SZ"]
all_df, home_page_df, table_df = pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
for index, row in self.ZH_INDEX.iterrows():
print(self.start_date, " --- >>", self.end_date)
# 获取单个指数日线行情
for _ in range(5):
try:
df = xcsc_pro.index_daily(ts_code=row['value'], start_date=self.start_date,
end_date=self.end_date,
fields="ts_code,trade_date,pct_chg,close")
break
except Exception as e:
print("获取单个指数日线行情 出现问题:", e)
continue
else:
print("重试5次依然没有解决问题请重视~")
return None
df['pct_chg'] = df['pct_chg'].round(2)
df['name'] = self.ZH_INDEX.loc[self.ZH_INDEX['value'] == row['value'], 'name'].iloc[0]
# 一个月的数据
all_df = pd.concat([all_df, df], axis=0)
if index_page_display.count(row['value']):
print(df)
# 一日的数据
home_page_date = df.head(1)
home_page_df = pd.concat([home_page_df, home_page_date], axis=0)
# 三日的数据
table_df_date = df.head(3)
table_df = pd.concat([table_df, table_df_date], axis=0)
return all_df, table_df, home_page_df
def global_index(self):
"""
国外部分指数数据
3日内的涨跌幅
1个月内的折线图
:return:
"""
all_df, table_df, home_page_df = pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
for i in self.GLOBAL_INDEX:
for _ in range(5):
try:
df = qs.get_data(i)
break
except Exception as e:
print(f"国外部分指数数据【{i}】拉取出现错误,", e)
continue
else:
print("重试5次依然没有解决问题请重视~")
return None
df['pct_chg'] = round(((df['close'] - df['open']) / df['open']) * 100, 2)
all_df_date, table_df_date, home_page_data = df.tail(30), df.tail(3), df.tail(1)
all_df = pd.concat([all_df, all_df_date], axis=0)
table_df = pd.concat([table_df, table_df_date], axis=0)
home_page_df = pd.concat([home_page_df, home_page_data], axis=0)
# pct_chg = global_df.groupby('name')['pct_chg'].apply(list).reset_index()
# print(pct_chg)
return all_df, table_df, home_page_df
def oil_index(self): # 油价
# 调价日期
energy_oil_hist_df = ak.energy_oil_hist()
print(energy_oil_hist_df)
formatted_date = energy_oil_hist_df['调整日期'].iloc[-3].strftime('%Y%m%d')
# 地区油价
energy_oil_detail_df = ak.energy_oil_detail(date=formatted_date)
energy_oil_detail_df = energy_oil_detail_df[energy_oil_detail_df['地区'] == '北京']
print(energy_oil_detail_df)
def spot_hist_sge(self):
all_df, table_df, home_page_df = pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
for index, row in self.METAL_INDEX.iterrows():
for _ in range(5):
try:
df = ak.spot_hist_sge(symbol=row['value'])
break
except Exception as e:
print(f"上海黄金交易所-数据资讯-行情走势-历史数据,", e)
continue
else:
print("重试5次依然没有解决问题请重视~")
return None
df['name'] = self.METAL_INDEX.loc[self.METAL_INDEX['value'] == row['value'], 'name'].iloc[0]
df['pct_chg'] = round(((df['close'] - df['open']) / df['open']) * 100, 2)
all_df_date, table_df_date, home_page_data = df.tail(30), df.tail(3), df.tail(1)
all_df = pd.concat([all_df, all_df_date], axis=0)
table_df = pd.concat([table_df, table_df_date], axis=0)
home_page_df = pd.concat([home_page_df, home_page_data], axis=0)
return all_df, table_df, home_page_df
if __name__ == '__main__':
# current_date = datetime.now().strftime('%Y%m%d')
current_date = datetime.strptime('20230918', "%Y%m%d")
# if if_run(current_date):
index = AllIndex(trade_date=current_date)
# index.a_index()
badge, card_deck = index.html_page_data()
print(badge)

90
fp/资金出入概览.py Normal file
View File

@ -0,0 +1,90 @@
from pyecharts.charts import Bar, Line
import pyecharts.options as opts
from utils.stock_web_api import execute_stock_web_api_method, stock_web_api_money_flow
from utils.comm import *
class MoneyFlow:
def __init__(self, trade_date):
self.trade_date = trade_date.strftime('%Y%m%d')
self.start_date = return_trading_day(trade_date, -6)
self.end_date = self.trade_date
print(self.start_date, "---", self.end_date)
def hsgt_top10(self):
"""
当日沪深股通十大成交股
:return:
"""
df = xcsc_pro.hsgt_top10(trade_date=self.end_date)
print(df)
def money_flow_chart(self):
df = execute_stock_web_api_method(
func=stock_web_api_money_flow,
start_date=self.start_date, end_date=self.end_date)
x_data = df['trade_date'].tolist()
# 沪股通
y_axis_hgt = (round(df['hgt'] / 100, 2)).tolist()
# 深股通
y_axis_sgt = (round(df['sgt'] / 100, 2)).tolist()
# 北上资金
y_axis_north_money = (round(df['north_money'] / 100, 2)).tolist()
bar = (
Bar(init_opts=opts.InitOpts(width="100%", height="400px"))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="沪股通",
y_axis=y_axis_hgt,
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="深股通",
y_axis=y_axis_sgt,
label_opts=opts.LabelOpts(is_show=False),
)
.extend_axis(
yaxis=opts.AxisOpts(
name="金额",
type_="value",
interval=2000,
axislabel_opts=opts.LabelOpts(formatter="{value} 亿元"),
)
)
.set_global_opts(
tooltip_opts=opts.TooltipOpts(
is_show=True, trigger="axis", axis_pointer_type="cross"
),
xaxis_opts=opts.AxisOpts(
type_="category",
axispointer_opts=opts.AxisPointerOpts(is_show=True, type_="shadow"),
),
yaxis_opts=opts.AxisOpts(
name="金额",
type_="value",
interval=3000,
axislabel_opts=opts.LabelOpts(formatter="{value} 亿元"),
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
),
)
)
line = (
Line()
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="北上资金",
# yaxis_index 0折线图使用柱状图的y轴 1使用extend_axis中的Y轴
yaxis_index=0,
y_axis=y_axis_north_money,
label_opts=opts.LabelOpts(is_show=False),
)
)
bar.overlap(line).render(f"../复盘指标/html/{self.trade_date}/mixed_bar_and_line.html")
if __name__ == '__main__':
# money_flow = MoneyFlow(trade_date=datetime.now().strftime('%Y%m%d'))
money_flow = MoneyFlow(trade_date=datetime.now())
money_flow.money_flow_chart()

130
fp_main.py Normal file
View File

@ -0,0 +1,130 @@
import logging
import sys
from jinja2 import Environment, FileSystemLoader
from loguru import logger
from fp.新闻资讯 import News
from fp.日线数据入库 import StockDailyMain
from fp.板块数据入库 import SectorOpt
from fp.涨跌停数据 import LimitList
from fp.自选股数据 import OptionalStock
from fp.行情指数数据 import AllIndex
from fp.资金出入概览 import MoneyFlow
from utils.comm import *
class AppMain:
def __init__(self, trade_date=datetime.now()):
# 配置日志输出到文件和控制台
logger.add("log/FPAppMain.log", rotation="500 MB", level="INFO")
logger.add(sys.stderr, level="INFO")
self.trade_date = trade_date.strftime('%Y%m%d')
if if_run(trade_date):
# 创建一个文件夹
self.file_name = f'html/{self.trade_date}'
create_file(self.file_name)
# 各大指数数据
self.index = AllIndex(trade_date=trade_date)
# 板块数据
self.sector_opt = SectorOpt(trade_date=trade_date)
# 涨跌停 炸板
self.limit_list = LimitList(trade_date=trade_date)
# 资金流入流出
self.money_flow = MoneyFlow(trade_date=trade_date)
# 日线数据
self.stock_daily = StockDailyMain()
# 新闻咨询
self.news = News(trade_date=trade_date)
# 我的自选
self.optional_stock = OptionalStock(trade_date=trade_date)
def calendar_page_data(self):
logger.info(f"{self.trade_date} start--> 开始 JSON 文件生成!")
res_df = self.index.calendar_page_json()
# 将 DataFrame 转换为 JSON并写入文件
res_df.to_json('html/json/events.json', orient='records')
logger.info(f"{self.trade_date} --> JSON 文件已更新")
def init_data(self):
"""
每日数据初始化
:return:
"""
logger.info(f"{self.trade_date} start--> 开始数据初始化!")
# 日线数据入库
self.stock_daily.task_data(self.trade_date)
# 行业板块 入库
self.sector_opt.save_hy_sector()
# 概念板块 入库
self.sector_opt.save_gn_sector()
# 行业板块个股
self.sector_opt.stock_by_hy_sector()
# 概念板块个股
self.sector_opt.stock_by_gn_sector()
logger.info(f"{self.trade_date} end--> 数据初始化完毕!")
def create_chart(self):
logger.info(f"{self.trade_date} start--> 开始图表页面生成!")
# 板块图表
self.sector_opt.industry_generate_chart()
self.sector_opt.concept_generate_chart()
# 涨跌停 炸板 图表
self.limit_list.limit_list_chart()
# 资金出入图表
self.money_flow.money_flow_chart()
logger.info(f"{self.trade_date} end--> 图表页面生成完毕!")
def create_page(self):
logger.info(f"{self.trade_date} start--> 开始数据静态页面生成!")
badge, card_deck = self.index.html_page_data()
limit_list_table_content = self.limit_list.html_page_data()
# 新闻数据
cctv_content, cls_content = self.news.html_page_data()
# 首页数据 页面
self.__write_html(template_name="index_template.html", write_html_name="index.html",
badge=badge, news_cctv=cctv_content, news_cls=cls_content)
# 指数行情 页面
self.__write_html(template_name="zs_data_template.html", write_html_name="zs_data.html", card_deck=card_deck)
# 涨跌停 炸板 页面
self.__write_html(template_name="limit_list_template.html", write_html_name="limit_list.html",
limit_table=limit_list_table_content)
# 我的自选 页面 (这个时间会很长)
# optional_stock_pills_tab_content = self.optional_stock.html_page_data()
# self.__write_html(template_name="zx_data_template.html", write_html_name="zx_data.html",
# pills_tabContent=optional_stock_pills_tab_content)
logger.info(f"{self.trade_date} --> 数据静态页面生成完毕!")
def __write_html(self, template_name, write_html_name, *args, **kwargs):
print(template_name, "---", write_html_name)
print(dict(*args, **kwargs))
# 设置 Jinja2 环境
env = Environment(loader=FileSystemLoader('template/'))
template = env.get_template(template_name)
# 将 DataFrame 数据传递给模板并渲染
rendered_output = template.render(dict(*args, **kwargs))
# 将渲染后的内容写入 HTML 文件
with open(self.file_name + f'/{write_html_name}', 'w', encoding='utf-8') as f:
f.write(rendered_output)
print("HTML rendered and saved as 'output.html'")
def upload_ftp(self):
logger.info(f"{self.trade_date} start--> 开始FTP上传")
# ftp上传
# 上传json文件
upload_files_to_ftp(
'html/json', '/htdocs/stock/json', "events.json")
# 上传html文件
upload_files_to_ftp(
f'html/{self.trade_date}', f'/htdocs/stock/{self.trade_date}')
logger.info(f"{self.trade_date} end--> FTP上传完毕")
if __name__ == '__main__':
# date_obj = datetime.strptime('20231110', "%Y%m%d")
app = AppMain()
# app.calendar_page_data()
# app.init_data()
# app.create_chart()
# app.create_page()
# app.upload_ftp()

View File

@ -0,0 +1,80 @@
## 元素颜色
| 颜色代码 | 说明 |
|-----------|-----|
| primary | 蓝色 |
| secondary | 灰色 |
| success | 绿色 |
| danger | 红色 |
| warning | 黄色 |
| info | 浅蓝色 |
| light | 白色 |
| dark | 黑色 |
## 表格添加颜色
<!-- On cells (`tr` or `td`) -->
| 颜色代码 | 说明 |
|-----------------|-------|
| table-active | 选中的浅灰 |
| table-primary ||
| table-secondary ||
| table-success ||
| table-danger ||
| table-warning ||
| table-info ||
| table-light ||
| table-dark ||
## 元素之间添加间隔
| 间隔代码 | 说明 |
|--------------------------|--------|
| mb-*margin-bottom | 内部下边距 |
| mt-*margin-top | 内部上边距 |
| ml-*margin-left | 内部左边距 |
| mr-*margin-right | 内部右边距 |
| px-*padding-horizontal | 外部水平边距 |
| py-*padding-vertical | 外部垂直边距 |
## 元素位置
`ml-auto``mr-auto` 分别用于使元素在左侧和右侧具有自动外边距,从而在容器中实现水平居左或居右。
`mx-auto` 用于使元素在水平方向上具有自动外边距,从而实现水平居中对齐。
`my-auto` 用于使元素在垂直方向上具有自动外边距,从而实现垂直居中对齐。
`text-center` 用于文本元素,使文本水平居中对齐。
`text-left``text-right` 用于文本元素,分别用于使文本左对齐或右对齐。
justify-content-start、justify-content-center 和 justify-content-end 用于 Flexbox 布局中的容器,分别用于在水平方向上使内容开始、居中或结束对齐。
align-items-start、align-items-center 和 align-items-end 用于 Flexbox 布局中的容器,分别用于在垂直方向上使内容顶部、居中或底部对齐。
align-self-start、align-self-center 和 align-self-end 用于 Flexbox 布局中的项目,分别用于在垂直方向上使单个项目的内容顶部、居中或底部对齐。
## 字体大小
`.display-1``.display-4`:这些类用于设置显示文本的不同字体大小,其中 `.display-1` 是最大的,`.display-4` 是最小的。
`.h1``.h6`:这些类用于设置标题的字体大小,其中 `.h1` 是最大的,`.h6`是最小的。
`.lead`:这个类用于设置段落或文本的字体大小,使其稍微大一些,适合用作引导文本。
`.small`:这个类用于设置文本的较小字体大小。
`.text-[size]`:这是一个自定义类,你可以用具体的字体大小名称替换 `[size]`。例如,`.text-sm` 用于设置小号字体,`.text-lg` 用于设置大号字体。
```html
<p class="display-1">这是display-1文本</p>
<p class="display-2">这是display-2文本</p>
<p class="display-3">这是display-3文本</p>
<p class="display-4">这是display-4文本</p>
<h1 class="h1">这是标题</h1>
<p class="lead">这是引导文本</p>
<p class="small">这是small号文本</p>
<p class="text-lg">这是lg号文本</p>
```

71
html/calendar.html Normal file
View File

@ -0,0 +1,71 @@
<!doctype html>
<html lang="zh-CN">
<head>
<!-- 必须的 meta 标签 -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<!-- Bootstrap 的 CSS 文件 -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/css/bootstrap.min.css"
crossorigin="anonymous">
<link href='https://cdn.jsdelivr.net/npm/@fortawesome/fontawesome-free@5.13.1/css/all.css' rel='stylesheet'>
<title>日历</title>
</head>
<body>
<div id="head"></div>
<div class="container">
<div id="calendar"></div>
</div>
<!-- 选项 1jQuery 和 Bootstrap 集成包(集成了 Popper -->
<!-- jQuery and JavaScript Bundle with Popper -->
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/4.6.2/js/bootstrap.bundle.min.js"
crossorigin="anonymous"></script>
<script src='https://cdn.jsdelivr.net/npm/fullcalendar@6.1.8/index.global.min.js'></script>
<script src="https://cdn.jsdelivr.net/npm/fullcalendar@5.10.1/locales/zh-cn.js"></script>
<script>
document.addEventListener('DOMContentLoaded', function() {
var calendarEl = document.getElementById('calendar');
$.getJSON('json/events.json?nocache=' + new Date().getTime(), function(data) {
var events = data;
var calendar = new FullCalendar.Calendar(calendarEl, {
initialView: 'dayGridMonth',
themeSystem: 'bootstrap',
locale: 'zh-cn',
selectable: true, // 允许选择日期
select: function(info) {
// 处理点击日期的事件
var clickedDate = info.start;
// 创建一个日期对象
var date = new Date(clickedDate);
// 转换为北京时间
date.setMinutes(date.getMinutes() - date.getTimezoneOffset() + 480); // 480 分钟是北京时区的偏移量
// 格式化日期
var year = date.getFullYear();
var month = (date.getMonth() + 1).toString().padStart(2, '0'); // 月份从 0 开始,所以要加 1
var day = date.getDate().toString().padStart(2, '0');
// 格式化后的日期字符串
var formattedDate = `${year}${month}${day}`;
console.log(formattedDate); // 输出格式化后的日期
// 进行页面跳转,例如跳转到指定日期的详情页面
window.location.href = formattedDate + '/index.html';
},
// textColor 在全天时间中才生效
events: events
});
calendar.render();
});
});
</script>
<!-- 选项 2Popper 和 Bootstrap 的 JS 插件各自独立 -->
<!--
<script src="https://cdn.jsdelivr.net/npm/jquery@3.5.1/dist/jquery.slim.min.js" integrity="sha384-DfXdz2htPH0lsSSs5nCTpuj/zy4C+OGpamoFVy38MVBnE+IbbVYUew+OrCXaRkfj" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/popper.js@1.16.1/dist/umd/popper.min.js" integrity="sha384-9/reFTGAW83EW2RDu2S0VKaIzap3H66lZH81PoYlFhbGU+6BZp6G7niu735Sk7lN" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/js/bootstrap.min.js" integrity="sha384-Lge2E2XotzMiwH69/MXB72yLpwyENMiOKX8zS8Qo7LDCvaBIWGL+GlRQEKIpYR04" crossorigin="anonymous"></script>
-->
</body>
</html>

34
html/head.html Normal file
View File

@ -0,0 +1,34 @@
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container collapse navbar-collapse">
<a class="navbar-brand" href="#">
<img src="http://qnloft.com/img/logo.png" width="80" height="35" class="d-inline-block align-top" alt="">
</a>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbarNav"
aria-controls="navbarNav" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<ul class="navbar-nav ml-auto">
<li class="nav-item">
<a class="nav-link" href="index.html">首页</span></a>
</li>
<li class="nav-item">
<a class="nav-link" href="zs_data.html">指数复盘</a>
</li>
<li class="nav-item">
<a class="nav-link" href="#">资金复盘</a>
</li>
<li class="nav-item">
<a class="nav-link" href="limit_list.html">板块复盘</a>
</li>
<li class="nav-item">
<a class="nav-link" href="zx_data.html">我的自选</a>
</li>
<li class="nav-item">
<a class="nav-link" href="http://page.tdx.com.cn:7615/site/kggx/tk_yzlhb_yz.html?color=#" target="_blank">龙虎榜</a>
</li>
<li class="nav-item">
<a class="nav-link" href="http://pro.gw.com.cn/newfpsq/index.html#/" target="_blank">盘中情况</a>
</li>
</ul>
</div>
</nav>

43
html/zs.html Normal file
View File

@ -0,0 +1,43 @@
<!doctype html>
<html lang="zh-CN">
<head>
<!-- 必须的 meta 标签 -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<!-- Bootstrap 的 CSS 文件 -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/css/bootstrap.min.css" integrity="sha384-xOolHFLEh07PJGoPkLv1IbcEPTNtaed2xpHsD9ESMhqIYd0nLMwNLD69Npy4HI+N" crossorigin="anonymous">
<title>测试页面</title>
</head>
<body>
<p class="display-1">这是display-1文本</p>
<p class="display-2">这是display-2文本</p>
<p class="display-3">这是display-3文本</p>
<p class="display-4">这是display-4文本</p>
<h1 class="h1">这是标题</h1>
<p class="lead">这是引导文本</p>
<p class="small">这是small号文本</p>
<p class="text-lg">这是lg号文本</p>
<!-- 选项 1jQuery 和 Bootstrap 集成包(集成了 Popper -->
<script src="https://cdn.jsdelivr.net/npm/jquery@3.5.1/dist/jquery.slim.min.js" integrity="sha384-DfXdz2htPH0lsSSs5nCTpuj/zy4C+OGpamoFVy38MVBnE+IbbVYUew+OrCXaRkfj" crossorigin="anonymous">
</script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/js/bootstrap.bundle.min.js" integrity="sha384-7ymO4nGrkm372HoSbq1OY2DP4pEZnMiA+E0F3zPr+JQQtQ82gQ1HPY3QIVtztVua" crossorigin="anonymous">
</script>
<script>
// 使用 jQuery 来实现卡片切换和内容变换
$(document).ready(function() {
});
</script>
<!-- 选项 2Popper 和 Bootstrap 的 JS 插件各自独立 -->
<!--
<script src="https://cdn.jsdelivr.net/npm/jquery@3.5.1/dist/jquery.slim.min.js" integrity="sha384-DfXdz2htPH0lsSSs5nCTpuj/zy4C+OGpamoFVy38MVBnE+IbbVYUew+OrCXaRkfj" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/popper.js@1.16.1/dist/umd/popper.min.js" integrity="sha384-9/reFTGAW83EW2RDu2S0VKaIzap3H66lZH81PoYlFhbGU+6BZp6G7niu735Sk7lN" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/js/bootstrap.min.js" integrity="sha384-Lge2E2XotzMiwH69/MXB72yLpwyENMiOKX8zS8Qo7LDCvaBIWGL+GlRQEKIpYR04" crossorigin="anonymous"></script>
-->
</body>
</html>

67
utils/stock_web_api.py Normal file
View File

@ -0,0 +1,67 @@
from utils.comm import *
def stock_web_api_method_decorator(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@stock_web_api_method_decorator
def stock_web_api_money_flow(start_date=None, end_date=None):
# 沪深港通资金流向
df = xcsc_pro.moneyflow_hsgt(start_date=start_date, end_date=end_date)[::-1]
if df[df["trade_date"] == end_date].empty:
# df 如果里面不包含当天日期的数据,则使用备用接口
ak_df = ak.stock_hsgt_fund_flow_summary_em()
northward = ak_df[ak_df['资金方向'] == '北向']
hgt = round(northward[northward['板块'] == '沪股通']['成交净买额'].sum(), 2) * 100
sgt = round(northward[northward['板块'] == '深股通']['成交净买额'].sum(), 2) * 100
north_money = round(hgt + sgt, 2)
substitute_df = pd.DataFrame(
[{
"trade_date": northward["交易日"].iloc[0].strftime('%Y%m%d'),
"hgt": hgt,
"sgt": sgt,
"north_money": north_money,
}],
index=[0],
)
result = pd.concat([df, substitute_df], ignore_index=True)
return result
if df is not None and not df.empty:
return df
raise Exception("沪深港通资金流向数据拉取失败!")
@stock_web_api_method_decorator
def stock_web_api_industry_summary(): # 同花顺 - 行业板块数据
df = ak.stock_board_industry_summary_ths()
if df is not None and not df.empty:
return df
raise Exception("同花顺 - 行业板块数据 数据拉取失败!")
@stock_web_api_method_decorator
def stock_web_api_concept_name(): # 同花顺 - 概念板块数据
df = ak.stock_board_concept_name_em()
if df is not None and not df.empty:
return df
raise Exception("同花顺 - 概念板块数据 数据拉取失败!")
def execute_stock_web_api_method(func, *args, **kwargs):
for _ in range(5):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"{func}】 -- 拉取出现问题 【{e}】, 开始进行重试!")
time.sleep(1)
else:
print(f"{func}】 5次出现错误请关注")
return None
if __name__ == '__main__':
print(execute_stock_web_api_method(func=stock_web_api_money_flow, start_date='20231123', end_date='20231123'))