import sys from loguru import logger from DB.model.StockDaily import get_stock_daily from DB.sqlite_db_main import SqliteDbMain, config from utils.tdxUtil import TdxUtil from utils.comm import * from fp.基本信息入库 import StockInfoMain class StockDailyMain: def __init__(self, ts_code=None, symbol=None, restart_id=0): # 配置日志输出到文件和控制台 logger.add("../log/StockDailyMain.log", rotation="500 MB", level="INFO") logger.add(sys.stderr, level="INFO") self.code_res = StockInfoMain().get_stock_basic(ts_code=ts_code, symbol=symbol, restart_id=restart_id) self.db_main = SqliteDbMain(config.stock_daily_db) self.tdx_util = TdxUtil("") def __filter_stock(self, code, name): if 'ST' in name: return False if code.startswith('30'): return False if code.startswith('68'): return False return True def init_data(self): """ 全量入库操作 :return: """ for result in self.code_res: tdx_util = TdxUtil("") s_type = tdx_util.get_security_type(code=result.ts_code) if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name): logger.info(f"{result.id},{result.ts_code},{result.name} 开始获取 daily 和 daily_basic_ts 数据!") self.save_by_code(result.ts_code) def save_by_code(self, ts_code): df, df_basic = self.get_daily_data(ts_code=ts_code) if df is not None and df_basic is not None: # 创建表 table_name = str(ts_code).split(".")[0] + "_daily" new_table_class = get_stock_daily(table_name=table_name) self.db_main.create_table(new_table_class) entries = [] for index, row in df.iterrows(): if '停牌' != row['trade_status']: basic_row = df_basic[df_basic['trade_date'] == row['trade_date']] if len(basic_row) > 0: basic_row = basic_row.drop(columns=['ts_code', 'trade_date', 'close']) if not basic_row.empty: row = pd.concat([row, basic_row.iloc[0]]) entry = new_table_class(**row) entries.append(entry) # 保存记录 self.db_main.insert_all_entry(entries) def task_data(self, trade_date=datetime.now().strftime('%Y%m%d')): """ 每日定时任务,补充数据操作 :param trade_date: :return: """ df, df_basic = pd.DataFrame(), pd.DataFrame() # 日线行情数据 for _ in range(5): try: df = xcsc_pro.daily(trade_date=trade_date)[::-1] break except Exception as e: logger.error(f"获取 [{trade_date}] 日线行情 出现问题:", e) time.sleep(1) continue else: logger.error("daily 重试5次依然没有解决问题,请重视~!!!") # 获取全部股票每日重要的基本面指标 for _ in range(5): try: df_basic = xcsc_pro.daily_basic_ts(trade_date=trade_date)[::-1] break except Exception as e: logger.error(f"获取 [{trade_date}] daily_basic_ts 出现问题:", e) time.sleep(1) continue else: logger.error("daily_basic_ts 重试5次依然没有解决问题,请重视~!!!") df_basic = df_basic.fillna(0) print(f" 获取 daily 和 daily_basic_ts 数据完毕!") for index, row in df.iterrows(): ts_code = row['ts_code'] tdx_util = TdxUtil("") s_type = tdx_util.get_security_type(code=ts_code) if s_type in tdx_util.SECURITY_TYPE and '交易' == row['trade_status']: basic_row = df_basic[df_basic['ts_code'] == row['ts_code']] basic_row = basic_row.drop(columns=['ts_code', 'trade_date', 'close']) row = pd.concat([row, basic_row.iloc[0]]) table_name = str(ts_code).split(".")[0] + "_daily" new_table_class = get_stock_daily(table_name=table_name) entry = new_table_class(**row) # 判断表是否存在 if self.db_main.has_table(entry): self.db_main.insert_entry(entry) # 批量创建trade_date唯一索引 def create_index(self): for result in self.code_res: tdx_util = TdxUtil("") s_type = tdx_util.get_security_type(code=result.ts_code) if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name): print(f"{result.id},{result.ts_code},{result.name} 开始创建索引!") table_name = str(result.ts_code).split(".")[0] + "_daily" sql = f"CREATE UNIQUE INDEX idx_unique_trade_date_{table_name} ON `{table_name}`(trade_date)" self.db_main.execute_sql(sql) # 删除指定日期的数据 def del_data_by_date(self, trade_date): for result in self.code_res: tdx_util = TdxUtil("") s_type = tdx_util.get_security_type(code=result.ts_code) if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name): table_name = str(result.ts_code).split(".")[0] + "_daily" new_table_class = get_stock_daily(table_name=table_name) # 判断表是否存在 if self.db_main.has_table(new_table_class): delete_condition = new_table_class.trade_date == trade_date self.db_main.delete_by_condition(new_table_class, delete_condition) def get_daily_data(self, ts_code): i = 0 df, df_basic = None, None while True: try: if i > 6: break # 尝试执行请求操作 # 如果成功,可以跳出循环 df = xcsc_pro.daily(ts_code=ts_code)[::-1] df_basic = xcsc_pro.daily_basic_ts(ts_code=ts_code)[::-1] df_basic = df_basic.fillna(0) logger.info(f"{ts_code} 获取 daily 和 daily_basic_ts 数据完毕!") break except Exception as e: i += 1 # 捕获超时异常 logger.warning("请求超时,等待2分钟后重试...") time.sleep(120) # 休眠2分钟 return df, df_basic def update_stock_basic(self): """ 更新基础表,补齐股票表 :return: """ # 1. 取 中国A股基本资料 StockInfoMain().insert_stock_basic() self.code_res = StockInfoMain().get_stock_basic() for result in self.code_res: s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name) if s_type not in self.tdx_util.SECURITY_TYPE: continue # 创建表 table_name = str(result.ts_code).split(".")[0] + "_daily" new_table_class = get_stock_daily(table_name=table_name) # 2. 判断表是否存在,不存在则创建 if not self.db_main.has_table(new_table_class): logger.info(f"{result.ts_code} 表不存在,开始创建!") # 3. 补齐数据 self.save_by_code(result.ts_code) if __name__ == '__main__': # current_date = datetime.now() current_date = datetime.strptime('20231024', "%Y%m%d") if if_run(current_date): trade_date = current_date.strftime('%Y%m%d') main = StockDailyMain() # main.init_data() # 初始化全市场的数据 main.task_data(trade_date) # 指定日期数据入库 # main.create_index() # 创建唯一索引 # main.del_data_by_date('20230915') # 删除指定日期数据 # main.save_by_code(ts_code='000050.SZ') # 指定更新的股票 # main.update_stock_basic()