import struct from pathlib import Path import pandas as pd """ 通达信操作工具类 """ class TdxUtil: SECURITY_EXCHANGE = ["sz", "sh"] SECURITY_COEFFICIENT = {"SH_A_STOCK": [0.01, 0.01], "SH_B_STOCK": [0.001, 0.01], "SH_INDEX": [0.01, 1.0], "SH_FUND": [0.001, 1.0], "SH_BOND": [0.001, 1.0], "SZ_A_STOCK": [0.01, 0.01], "SZ_B_STOCK": [0.01, 0.01], "SZ_INDEX": [0.01, 1.0], "SZ_FUND": [0.001, 0.01], "SZ_BOND": [0.001, 0.01]} SECURITY_TYPE = ["SH_A_STOCK", "SH_B_STOCK", "SH_INDEX", "SH_FUND", "SH_BOND", "SZ_A_STOCK", "SZ_B_STOCK", "SZ_INDEX", "SZ_FUND", "SZ_BOND"] def __init__(self, tdx_path: str): self.f_name = None self.path = tdx_path # 通达信自选股路径 self.zxg_path = tdx_path + "\\T0002\\blocknew\\ZXG.blk" # 上证日线 self.lday_sz_path = tdx_path + "\\vipdoc\\sz\\lday\\" # 沪深日线 self.lday_sh_path = tdx_path + "\\vipdoc\\sh\\lday\\" def set_zxg_file(self, cont: pd): """ 通达信自选股写入 :param cont: :return: """ # 将 DataFrame 内容逐行写入文本文件 with open(self.zxg_path, "w") as file: for item in cont: stock_code = str(item) code = stock_code.split(".")[0] if len(code) > 1: if stock_code.startswith(('50', '51', '60', '688', '73', '90', '110', '113', '132', '204', '78')): code = '1' + stock_code if stock_code.startswith(('00', '13', '18', '15', '16', '18', '20', '30', '39', '115', '1318')): code = '0' + stock_code file.write(code + '\n') # 关闭文件 file.close() def get_zxg_file(self): """ 读取通达信自选股 :return: """ f = open(self.zxg_path, 'r') z = f.read() f.close() return z def get_tdx_stock_data(self, code=None, freq=None, market=None): """ 获取K线数据(日线,1分钟线,5分钟线,30分钟线) :param code: 股票代码 :param freq: 分钟数据,1:代表1分钟,5:代表5分钟 :param market: 上海证券交易所: sh, 深证证券交易所: sz, 北京证券交易所: bj :return: """ # 获取全部的数据 if code is None: return self.get_all_stock() if freq is None: return self.get_day_k(code, market) else: return self.get_freq_k(code, market, freq) def get_all_stock(self): """ 获取全部股票 :return: """ codes = [] for market in self.SECURITY_EXCHANGE: files = Path(f'\\vipdoc\\{market}\\lday\\') for file in files.iterdir(): if not file.is_file(): continue tdx_code = file.name.split(".")[0] market = tdx_code[:2] code = tdx_code[2:] res = self.get_security_type(code=code, market=market) filter_stock = ["SZ_A_STOCK", "SH_A_STOCK"] if res not in filter_stock: continue codes.append(code) return codes def get_day_k(self, code, market): security_type = self.get_security_type(code, market) if security_type not in self.SECURITY_TYPE: print("Unknown security type !\n") raise NotImplementedError if security_type == "SZ_A_STOCK": self.f_name = f"{self.lday_sz_path}sz{code}.day" if security_type == "SH_A_STOCK": self.f_name = f"{self.lday_sh_path}sh{code}.day" print(self.f_name) df = pd.DataFrame(columns=['trade_date', 'open', 'high', 'low', 'close', 'amount', 'volume']) file_path = Path(self.f_name) coefficient = [0.01, 0.01] if not file_path.is_file(): raise f"{self.f_name} 不是一个文件!" content = file_path.read_bytes() record_struct = struct.Struct('