qnloft-stock/fp/板块数据入库.py

265 lines
8.9 KiB
Python
Raw Permalink Normal View History

2023-11-24 06:42:22 +00:00
from pyecharts import options as opts
from pyecharts.charts import Bar
from DB.model.ConceptSector import ConceptSector
from DB.model.IndustrySector import IndustrySector
from DB.model.StockByConceptSector import StockByConceptSector
from DB.model.StockByIndustrySector import StockByIndustrySector
from DB.sqlite_db_main import SqliteDbMain, config
from utils.comm import *
from utils.stock_web_api import execute_stock_web_api_method, stock_web_api_industry_summary, \
stock_web_api_concept_name
class SectorOpt:
def __init__(self, trade_date=datetime.now()):
self.trade_date = trade_date.strftime('%Y%m%d')
self.db_main = SqliteDbMain(config.stock_sector_db)
def save_hy_sector(self):
"""
行业板块
:return:
"""
print("开始拉取行业板块数据--->>")
hy_sector_df = execute_stock_web_api_method(func=stock_web_api_industry_summary)
if hy_sector_df is None:
return None
entries = []
for index, row in hy_sector_df.iterrows():
entry = IndustrySector(
trade_date=self.trade_date,
sector_name=row['板块'],
pct_change=row['涨跌幅'],
total_volume=row['总成交量'],
total_turnover=row['总成交额'],
net_inflows=row['净流入'],
rising_count=row['上涨家数'],
falling_count=row['下跌家数'],
average_price=row['均价'],
leading_stock=row['领涨股'],
leading_stock_latest_price=row['领涨股-最新价'],
leading_stock_pct_change=row['领涨股-涨跌幅']
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
print("<<---行业板块数据拉取结束")
self.db_main.get_db_size()
def save_gn_sector(self):
"""
概念板块
:return:
"""
print("开始拉取概念板块数据--->>")
# 概念当日的涨幅情况
ths_gn_df = execute_stock_web_api_method(func=stock_web_api_concept_name)
if ths_gn_df is None:
return None
entries = []
for index, row in ths_gn_df.iterrows():
entry = ConceptSector(
trade_date=self.trade_date,
sector_name=row['板块名称'],
sector_code=row['板块代码'],
pct_change=row['涨跌幅'],
total_market=row['总市值'],
rising_count=row['上涨家数'],
falling_count=row['下跌家数'],
new_price=row['最新价'],
leading_stock=row['领涨股票'],
leading_stock_pct_change=row['领涨股票-涨跌幅']
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
print("<<---概念板块数据拉取结束")
self.db_main.get_db_size()
def stock_by_gn_sector(self):
"""
获取概念板块个股建议每天更新一次
:return:
"""
for _ in range(5):
try:
# 清空表数据
self.db_main.delete_all_table(StockByConceptSector)
ths_gn_df = ak.stock_board_concept_name_em()
break
except Exception as e:
print("概念板块个股数据拉取错误:", e)
time.sleep(1)
continue
else:
print("概念板块个股数据拉取失败!!!")
return None
entries = []
for index, row in ths_gn_df.iterrows():
bk, bk_code = row['板块名称'], row['板块代码']
print(f"----------- {bk} -------------")
for _ in range(5):
try:
stock_gn_df = ak.stock_board_concept_cons_em(symbol=bk)
break
except Exception as e:
print("概念板块个股-板块成份股 数据拉取错误:", e)
time.sleep(1)
continue
else:
print("概念板块个股-板块成份股 数据拉取失败!!!")
return None
print(stock_gn_df['代码'])
for s_index, s_row in stock_gn_df.iterrows():
entry = StockByConceptSector(
update_date=datetime.now().strftime('%Y%m%d'),
sector_name=bk,
sector_code=row['板块代码'],
stock_name=s_row['名称'],
stock_code=s_row['代码'],
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
def stock_by_hy_sector(self):
"""
获取行业板块个股建议每周更新一次
:return:
"""
for _ in range(5):
try:
# 清空表数据
self.db_main.delete_all_table(StockByIndustrySector)
hy_sector_df = ak.stock_board_industry_summary_ths()
break
except Exception as e:
print("行业板块个股数据拉取错误:", e)
time.sleep(1)
continue
else:
print("行业板块个股数据拉取失败!!!")
return None
entries = []
for index, row in hy_sector_df.iterrows():
bk, bk_code = row['板块'], row['序号']
print(f"----------- {bk} -------------")
for _ in range(5):
try:
stock_bk_df = ak.stock_board_industry_cons_ths(symbol=bk)
break
except Exception as e:
print("行业板块个股数据拉取错误:", e)
time.sleep(1)
continue
else:
print("行业板块个股数据拉取失败!!!")
return None
for s_index, s_row in stock_bk_df.iterrows():
entry = StockByIndustrySector(
update_date=datetime.now().strftime('%Y%m%d'),
sector_name=bk,
sector_code=row['序号'],
stock_name=s_row['名称'],
stock_code=s_row['代码'],
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
def get_hy_sector_by_stock_code(self, symbol):
"""
根据股票代码查询所属 行业板块
:param symbol:
:return:
"""
query = self.db_main.session.query(StockByIndustrySector) \
.filter(StockByIndustrySector.stock_code == symbol).order_by(StockByIndustrySector.id)
return self.db_main.pandas_query_by_sql(stmt=query.statement).reset_index()
def get_gn_sector_by_stock_code(self, symbol):
"""
根据股票代码查询所属 概念板块
:param symbol:
:return:
"""
query = self.db_main.session.query(StockByConceptSector) \
.filter(StockByConceptSector.stock_code == symbol).order_by(StockByConceptSector.id)
return self.db_main.pandas_query_by_sql(stmt=query.statement).reset_index()
def industry_generate_chart(self):
"""
行业板块
:return:
"""
industry_num_limit_sql = f"select sector_name from stock_industry_sector where trade_date = '{self.trade_date}' ORDER BY pct_change desc limit 15"
names_res = self.db_main.execute_sql(industry_num_limit_sql)
# 提取结果中的单个列数据(假设你要提取 "收盘价" 这一列)
sector_names = [result.sector_name for result in names_res]
data_sql = f"SELECT sec.trade_date,sec.pct_change from " \
f"stock_industry_sector sec , " \
f"({industry_num_limit_sql}) sec_n " \
f"where sec.sector_name = sec_n.sector_name " \
f"and trade_date in (select trade_date from stock_industry_sector GROUP BY trade_date order by trade_date desc LIMIT 3)"
data_result = self.db_main.execute_sql(data_sql)
df = pd.DataFrame(data_result, columns=['trade_date', 'pct_change'])
# print(df)
grouped = df.groupby('trade_date')['pct_change'].apply(list).reset_index()
self.__generate_chart("行业板块 —— 复盘", grouped, sector_names, 'industry_generate_chart')
def concept_generate_chart(self):
"""
概念板块
:return:
"""
concept_num_limit_sql = f"select sector_name from stock_concept_sector where trade_date = '{self.trade_date}' ORDER BY pct_change desc limit 15"
names_res = self.db_main.execute_sql(concept_num_limit_sql)
# 提取结果中的单个列数据(假设你要提取 "收盘价" 这一列)
sector_names = [result.sector_name for result in names_res]
data_sql = f"SELECT sec.trade_date,sec.pct_change from " \
f"stock_concept_sector sec , " \
f"({concept_num_limit_sql}) sec_n " \
f"where sec.sector_name = sec_n.sector_name " \
f"and trade_date in (select trade_date from stock_concept_sector GROUP BY trade_date order by trade_date desc LIMIT 3)"
data_result = self.db_main.execute_sql(data_sql)
df = pd.DataFrame(data_result, columns=['trade_date', 'pct_change'])
# print(df)
grouped = df.groupby('trade_date')['pct_change'].apply(list).reset_index()
self.__generate_chart("概念板块 —— 复盘", grouped, sector_names, 'concept_generate_chart')
def __generate_chart(self, title, grouped, sector_names, file_name):
c = (
Bar(init_opts=opts.InitOpts(width="100%", height="400px"))
.add_xaxis(sector_names)
.add_yaxis(grouped.loc[0, 'trade_date'], grouped.loc[0, 'pct_change'])
.add_yaxis(grouped.loc[1, 'trade_date'], grouped.loc[1, 'pct_change'])
.add_yaxis(grouped.loc[2, 'trade_date'], grouped.loc[2, 'pct_change'])
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
yaxis_opts=opts.AxisOpts(offset=5),
xaxis_opts=opts.AxisOpts(offset=5, axislabel_opts=opts.LabelOpts(rotate=45, interval=0)),
legend_opts=opts.LegendOpts(pos_top="8%")
)
2023-11-24 15:29:22 +00:00
.render(f"html/{self.trade_date}/{file_name}.html")
2023-11-24 06:42:22 +00:00
)
if __name__ == '__main__':
current_date = datetime.now()
if if_run(current_date):
sector = SectorOpt(current_date)
# 行业板块
# sector.save_hy_sector()
# 概念板块
# sector.save_gn_sector()
# 生成报表
# sector.industry_generate_chart()
2023-11-24 15:29:22 +00:00
sector.concept_generate_chart()
2023-11-24 06:42:22 +00:00
# -------》 一下建议一周更新一次
# 行业板块个股
2023-11-24 15:29:22 +00:00
# sector.stock_by_hy_sector()
2023-11-24 06:42:22 +00:00
# 感念板块个股
2023-11-24 15:29:22 +00:00
# sector.stock_by_gn_sector()