from pyecharts import options as opts from pyecharts.charts import Bar from DB.model.ConceptSector import ConceptSector from DB.model.IndustrySector import IndustrySector from DB.model.StockByConceptSector import StockByConceptSector from DB.model.StockByIndustrySector import StockByIndustrySector from DB.sqlite_db_main import SqliteDbMain, config from utils.comm import * from utils.stock_web_api import execute_stock_web_api_method, stock_web_api_industry_summary, \ stock_web_api_concept_name class SectorOpt: def __init__(self, trade_date=datetime.now()): self.trade_date = trade_date.strftime('%Y%m%d') self.db_main = SqliteDbMain(config.stock_sector_db) def save_hy_sector(self): """ 行业板块 :return: """ print("开始拉取行业板块数据--->>") hy_sector_df = execute_stock_web_api_method(func=stock_web_api_industry_summary) if hy_sector_df is None: return None entries = [] for index, row in hy_sector_df.iterrows(): entry = IndustrySector( trade_date=self.trade_date, sector_name=row['板块'], pct_change=row['涨跌幅'], total_volume=row['总成交量'], total_turnover=row['总成交额'], net_inflows=row['净流入'], rising_count=row['上涨家数'], falling_count=row['下跌家数'], average_price=row['均价'], leading_stock=row['领涨股'], leading_stock_latest_price=row['领涨股-最新价'], leading_stock_pct_change=row['领涨股-涨跌幅'] ) entries.append(entry) self.db_main.insert_all_entry(entries) print("<<---行业板块数据拉取结束") self.db_main.get_db_size() def save_gn_sector(self): """ 概念板块 :return: """ print("开始拉取概念板块数据--->>") # 概念当日的涨幅情况 ths_gn_df = execute_stock_web_api_method(func=stock_web_api_concept_name) if ths_gn_df is None: return None entries = [] for index, row in ths_gn_df.iterrows(): entry = ConceptSector( trade_date=self.trade_date, sector_name=row['板块名称'], sector_code=row['板块代码'], pct_change=row['涨跌幅'], total_market=row['总市值'], rising_count=row['上涨家数'], falling_count=row['下跌家数'], new_price=row['最新价'], leading_stock=row['领涨股票'], leading_stock_pct_change=row['领涨股票-涨跌幅'] ) entries.append(entry) self.db_main.insert_all_entry(entries) print("<<---概念板块数据拉取结束") self.db_main.get_db_size() def stock_by_gn_sector(self): """ 获取概念板块个股(建议每天更新一次) :return: """ for _ in range(5): try: # 清空表数据 self.db_main.delete_all_table(StockByConceptSector) ths_gn_df = ak.stock_board_concept_name_em() break except Exception as e: print("概念板块个股数据拉取错误:", e) time.sleep(1) continue else: print("概念板块个股数据拉取失败!!!") return None entries = [] for index, row in ths_gn_df.iterrows(): bk, bk_code = row['板块名称'], row['板块代码'] print(f"----------- {bk} -------------") for _ in range(5): try: stock_gn_df = ak.stock_board_concept_cons_em(symbol=bk) break except Exception as e: print("概念板块个股-板块成份股 数据拉取错误:", e) time.sleep(1) continue else: print("概念板块个股-板块成份股 数据拉取失败!!!") return None print(stock_gn_df['代码']) for s_index, s_row in stock_gn_df.iterrows(): entry = StockByConceptSector( update_date=datetime.now().strftime('%Y%m%d'), sector_name=bk, sector_code=row['板块代码'], stock_name=s_row['名称'], stock_code=s_row['代码'], ) entries.append(entry) self.db_main.insert_all_entry(entries) def stock_by_hy_sector(self): """ 获取行业板块个股(建议每周更新一次) :return: """ for _ in range(5): try: # 清空表数据 self.db_main.delete_all_table(StockByIndustrySector) hy_sector_df = ak.stock_board_industry_summary_ths() break except Exception as e: print("行业板块个股数据拉取错误:", e) time.sleep(1) continue else: print("行业板块个股数据拉取失败!!!") return None entries = [] for index, row in hy_sector_df.iterrows(): bk, bk_code = row['板块'], row['序号'] print(f"----------- {bk} -------------") for _ in range(5): try: stock_bk_df = ak.stock_board_industry_cons_ths(symbol=bk) break except Exception as e: print("行业板块个股数据拉取错误:", e) time.sleep(1) continue else: print("行业板块个股数据拉取失败!!!") return None for s_index, s_row in stock_bk_df.iterrows(): entry = StockByIndustrySector( update_date=datetime.now().strftime('%Y%m%d'), sector_name=bk, sector_code=row['序号'], stock_name=s_row['名称'], stock_code=s_row['代码'], ) entries.append(entry) self.db_main.insert_all_entry(entries) def get_hy_sector_by_stock_code(self, symbol): """ 根据股票代码查询所属 行业板块 :param symbol: :return: """ query = self.db_main.session.query(StockByIndustrySector) \ .filter(StockByIndustrySector.stock_code == symbol).order_by(StockByIndustrySector.id) return self.db_main.pandas_query_by_sql(stmt=query.statement).reset_index() def get_gn_sector_by_stock_code(self, symbol): """ 根据股票代码查询所属 概念板块 :param symbol: :return: """ query = self.db_main.session.query(StockByConceptSector) \ .filter(StockByConceptSector.stock_code == symbol).order_by(StockByConceptSector.id) return self.db_main.pandas_query_by_sql(stmt=query.statement).reset_index() def industry_generate_chart(self): """ 行业板块 :return: """ industry_num_limit_sql = f"select sector_name from stock_industry_sector where trade_date = '{self.trade_date}' ORDER BY pct_change desc limit 15" names_res = self.db_main.execute_sql(industry_num_limit_sql) # 提取结果中的单个列数据(假设你要提取 "收盘价" 这一列) sector_names = [result.sector_name for result in names_res] data_sql = f"SELECT sec.trade_date,sec.pct_change from " \ f"stock_industry_sector sec , " \ f"({industry_num_limit_sql}) sec_n " \ f"where sec.sector_name = sec_n.sector_name " \ f"and trade_date in (select trade_date from stock_industry_sector GROUP BY trade_date order by trade_date desc LIMIT 3)" data_result = self.db_main.execute_sql(data_sql) df = pd.DataFrame(data_result, columns=['trade_date', 'pct_change']) # print(df) grouped = df.groupby('trade_date')['pct_change'].apply(list).reset_index() self.__generate_chart("行业板块 —— 复盘", grouped, sector_names, 'industry_generate_chart') def concept_generate_chart(self): """ 概念板块 :return: """ concept_num_limit_sql = f"select sector_name from stock_concept_sector where trade_date = '{self.trade_date}' ORDER BY pct_change desc limit 15" names_res = self.db_main.execute_sql(concept_num_limit_sql) # 提取结果中的单个列数据(假设你要提取 "收盘价" 这一列) sector_names = [result.sector_name for result in names_res] data_sql = f"SELECT sec.trade_date,sec.pct_change from " \ f"stock_concept_sector sec , " \ f"({concept_num_limit_sql}) sec_n " \ f"where sec.sector_name = sec_n.sector_name " \ f"and trade_date in (select trade_date from stock_concept_sector GROUP BY trade_date order by trade_date desc LIMIT 3)" data_result = self.db_main.execute_sql(data_sql) df = pd.DataFrame(data_result, columns=['trade_date', 'pct_change']) # print(df) grouped = df.groupby('trade_date')['pct_change'].apply(list).reset_index() self.__generate_chart("概念板块 —— 复盘", grouped, sector_names, 'concept_generate_chart') def __generate_chart(self, title, grouped, sector_names, file_name): c = ( Bar(init_opts=opts.InitOpts(width="100%", height="400px")) .add_xaxis(sector_names) .add_yaxis(grouped.loc[0, 'trade_date'], grouped.loc[0, 'pct_change']) .add_yaxis(grouped.loc[1, 'trade_date'], grouped.loc[1, 'pct_change']) .add_yaxis(grouped.loc[2, 'trade_date'], grouped.loc[2, 'pct_change']) .set_global_opts( title_opts=opts.TitleOpts(title=title, pos_left="center"), yaxis_opts=opts.AxisOpts(offset=5), xaxis_opts=opts.AxisOpts(offset=5, axislabel_opts=opts.LabelOpts(rotate=45, interval=0)), legend_opts=opts.LegendOpts(pos_top="8%") ) .render(f"../html/{self.trade_date}/{file_name}.html") ) if __name__ == '__main__': current_date = datetime.now() if if_run(current_date): sector = SectorOpt(current_date) # 行业板块 # sector.save_hy_sector() # 概念板块 # sector.save_gn_sector() # 生成报表 # sector.industry_generate_chart() # sector.concept_generate_chart() # -------》 一下建议一周更新一次 # 行业板块个股 sector.stock_by_hy_sector() # 感念板块个股 sector.stock_by_gn_sector()