qnloft-stock/fp/日线数据入库.py

187 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
from loguru import logger
from DB.model.StockDaily import get_stock_daily
from DB.sqlite_db_main import SqliteDbMain, config
from utils.tdxUtil import TdxUtil
from utils.comm import *
from fp.基本信息入库 import StockInfoMain
class StockDailyMain:
def __init__(self, ts_code=None, symbol=None, restart_id=0):
# 配置日志输出到文件和控制台
logger.add("../log/StockDailyMain.log", rotation="500 MB", level="INFO")
logger.add(sys.stderr, level="INFO")
self.code_res = StockInfoMain().get_stock_basic(ts_code=ts_code, symbol=symbol, restart_id=restart_id)
self.db_main = SqliteDbMain(config.stock_daily_db)
self.tdx_util = TdxUtil("")
def __filter_stock(self, code, name):
if 'ST' in name:
return False
if code.startswith('30'):
return False
if code.startswith('68'):
return False
return True
def init_data(self):
"""
全量入库操作
:return:
"""
for result in self.code_res:
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=result.ts_code)
if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name):
logger.info(f"{result.id}{result.ts_code}{result.name} 开始获取 daily 和 daily_basic_ts 数据!")
self.save_by_code(result.ts_code)
def save_by_code(self, ts_code):
df, df_basic = self.get_daily_data(ts_code=ts_code)
if df is not None and df_basic is not None:
# 创建表
table_name = str(ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
self.db_main.create_table(new_table_class)
entries = []
for index, row in df.iterrows():
if '停牌' != row['trade_status']:
basic_row = df_basic[df_basic['trade_date'] == row['trade_date']]
if len(basic_row) > 0:
basic_row = basic_row.drop(columns=['ts_code', 'trade_date', 'close'])
if not basic_row.empty:
row = pd.concat([row, basic_row.iloc[0]])
entry = new_table_class(**row)
entries.append(entry)
# 保存记录
self.db_main.insert_all_entry(entries)
def task_data(self, trade_date=datetime.now().strftime('%Y%m%d')):
"""
每日定时任务,补充数据操作
:param trade_date:
:return:
"""
df, df_basic = pd.DataFrame(), pd.DataFrame()
# 日线行情数据
for _ in range(5):
try:
df = xcsc_pro.daily(trade_date=trade_date)[::-1]
break
except Exception as e:
logger.error(f"获取 [{trade_date}] 日线行情 出现问题:", e)
time.sleep(1)
continue
else:
logger.error("daily 重试5次依然没有解决问题请重视~")
# 获取全部股票每日重要的基本面指标
for _ in range(5):
try:
df_basic = xcsc_pro.daily_basic_ts(trade_date=trade_date)[::-1]
break
except Exception as e:
logger.error(f"获取 [{trade_date}] daily_basic_ts 出现问题:", e)
time.sleep(1)
continue
else:
logger.error("daily_basic_ts 重试5次依然没有解决问题请重视~")
df_basic = df_basic.fillna(0)
print(f" 获取 daily 和 daily_basic_ts 数据完毕!")
for index, row in df.iterrows():
ts_code = row['ts_code']
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=ts_code)
if s_type in tdx_util.SECURITY_TYPE and '交易' == row['trade_status']:
basic_row = df_basic[df_basic['ts_code'] == row['ts_code']]
basic_row = basic_row.drop(columns=['ts_code', 'trade_date', 'close'])
row = pd.concat([row, basic_row.iloc[0]])
table_name = str(ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
entry = new_table_class(**row)
# 判断表是否存在
if self.db_main.has_table(entry):
self.db_main.insert_entry(entry)
# 批量创建trade_date唯一索引
def create_index(self):
for result in self.code_res:
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=result.ts_code)
if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name):
print(f"{result.id}{result.ts_code}{result.name} 开始创建索引!")
table_name = str(result.ts_code).split(".")[0] + "_daily"
sql = f"CREATE UNIQUE INDEX idx_unique_trade_date_{table_name} ON `{table_name}`(trade_date)"
self.db_main.execute_sql(sql)
# 删除指定日期的数据
def del_data_by_date(self, trade_date):
for result in self.code_res:
tdx_util = TdxUtil("")
s_type = tdx_util.get_security_type(code=result.ts_code)
if s_type in tdx_util.SECURITY_TYPE and self.__filter_stock(result.ts_code, result.name):
table_name = str(result.ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
# 判断表是否存在
if self.db_main.has_table(new_table_class):
delete_condition = new_table_class.trade_date == trade_date
self.db_main.delete_by_condition(new_table_class, delete_condition)
def get_daily_data(self, ts_code):
i = 0
df, df_basic = None, None
while True:
try:
if i > 6:
break
# 尝试执行请求操作
# 如果成功,可以跳出循环
df = xcsc_pro.daily(ts_code=ts_code)[::-1]
df_basic = xcsc_pro.daily_basic_ts(ts_code=ts_code)[::-1]
df_basic = df_basic.fillna(0)
logger.info(f"{ts_code} 获取 daily 和 daily_basic_ts 数据完毕!")
break
except Exception as e:
i += 1
# 捕获超时异常
logger.warning("请求超时等待2分钟后重试...")
time.sleep(120) # 休眠2分钟
return df, df_basic
def update_stock_basic(self):
"""
更新基础表,补齐股票表
:return:
"""
# 1. 取 中国A股基本资料
StockInfoMain().insert_stock_basic()
self.code_res = StockInfoMain().get_stock_basic()
for result in self.code_res:
s_type = self.tdx_util.get_security_type(code=result.ts_code, name=result.name)
if s_type not in self.tdx_util.SECURITY_TYPE:
continue
# 创建表
table_name = str(result.ts_code).split(".")[0] + "_daily"
new_table_class = get_stock_daily(table_name=table_name)
# 2. 判断表是否存在,不存在则创建
if not self.db_main.has_table(new_table_class):
logger.info(f"{result.ts_code} 表不存在,开始创建!")
# 3. 补齐数据
self.save_by_code(result.ts_code)
if __name__ == '__main__':
# current_date = datetime.now()
current_date = datetime.strptime('20231024', "%Y%m%d")
if if_run(current_date):
trade_date = current_date.strftime('%Y%m%d')
main = StockDailyMain()
# main.init_data() # 初始化全市场的数据
main.task_data(trade_date) # 指定日期数据入库
# main.create_index() # 创建唯一索引
# main.del_data_by_date('20230915') # 删除指定日期数据
# main.save_by_code(ts_code='000050.SZ') # 指定更新的股票
# main.update_stock_basic()