qnloft-stock/utils/tdxUtil.py

177 lines
5.1 KiB
Python
Raw Permalink Normal View History

2023-11-24 02:49:22 +00:00
import struct
from pathlib import Path
import pandas as pd
"""
通达信操作工具类
"""
class TdxUtil:
SECURITY_EXCHANGE = ["sz", "sh"]
SECURITY_COEFFICIENT = {"SH_A_STOCK": [0.01, 0.01], "SH_B_STOCK": [0.001, 0.01], "SH_INDEX": [0.01, 1.0],
"SH_FUND": [0.001, 1.0], "SH_BOND": [0.001, 1.0], "SZ_A_STOCK": [0.01, 0.01],
"SZ_B_STOCK": [0.01, 0.01], "SZ_INDEX": [0.01, 1.0], "SZ_FUND": [0.001, 0.01],
"SZ_BOND": [0.001, 0.01]}
SECURITY_TYPE = ["SH_A_STOCK", "SH_B_STOCK", "SH_INDEX", "SH_FUND", "SH_BOND", "SZ_A_STOCK", "SZ_B_STOCK",
"SZ_INDEX", "SZ_FUND", "SZ_BOND"]
def __init__(self, tdx_path: str):
self.f_name = None
self.path = tdx_path
# 通达信自选股路径
self.zxg_path = tdx_path + "\\T0002\\blocknew\\ZXG.blk"
# 上证日线
self.lday_sz_path = tdx_path + "\\vipdoc\\sz\\lday\\"
# 沪深日线
self.lday_sh_path = tdx_path + "\\vipdoc\\sh\\lday\\"
def set_zxg_file(self, cont: pd):
"""
通达信自选股写入
:param cont:
:return:
"""
# 将 DataFrame 内容逐行写入文本文件
with open(self.zxg_path, "w") as file:
for item in cont:
stock_code = str(item)
code = stock_code.split(".")[0]
if len(code) > 1:
if stock_code.startswith(('50', '51', '60', '688', '73', '90', '110', '113', '132', '204', '78')):
code = '1' + stock_code
if stock_code.startswith(('00', '13', '18', '15', '16', '18', '20', '30', '39', '115', '1318')):
code = '0' + stock_code
file.write(code + '\n')
# 关闭文件
file.close()
def get_zxg_file(self):
"""
读取通达信自选股
:return:
"""
f = open(self.zxg_path, 'r')
z = f.read()
f.close()
return z
def get_tdx_stock_data(self, code=None, freq=None, market=None):
"""
获取K线数据(日线1分钟线5分钟线30分钟线)
:param code: 股票代码
:param freq: 分钟数据1代表1分钟5代表5分钟
:param market: 上海证券交易所: sh, 深证证券交易所: sz, 北京证券交易所: bj
:return:
"""
# 获取全部的数据
if code is None:
return self.get_all_stock()
if freq is None:
return self.get_day_k(code, market)
else:
return self.get_freq_k(code, market, freq)
def get_all_stock(self):
"""
获取全部股票
:return:
"""
codes = []
for market in self.SECURITY_EXCHANGE:
files = Path(f'\\vipdoc\\{market}\\lday\\')
for file in files.iterdir():
if not file.is_file():
continue
tdx_code = file.name.split(".")[0]
market = tdx_code[:2]
code = tdx_code[2:]
res = self.get_security_type(code=code, market=market)
filter_stock = ["SZ_A_STOCK", "SH_A_STOCK"]
if res not in filter_stock:
continue
codes.append(code)
return codes
def get_day_k(self, code, market):
security_type = self.get_security_type(code, market)
if security_type not in self.SECURITY_TYPE:
print("Unknown security type !\n")
raise NotImplementedError
if security_type == "SZ_A_STOCK":
self.f_name = f"{self.lday_sz_path}sz{code}.day"
if security_type == "SH_A_STOCK":
self.f_name = f"{self.lday_sh_path}sh{code}.day"
print(self.f_name)
df = pd.DataFrame(columns=['trade_date', 'open', 'high', 'low', 'close', 'amount', 'volume'])
file_path = Path(self.f_name)
coefficient = [0.01, 0.01]
if not file_path.is_file():
raise f"{self.f_name} 不是一个文件!"
content = file_path.read_bytes()
record_struct = struct.Struct('<IIIIIfII')
print(len(content), record_struct.size)
for offset in range(0, len(content), record_struct.size):
row = record_struct.unpack_from(content, offset)
t_date = str(row[0])
# datestr = t_date[:4] + "-" + t_date[4:6] + "-" + t_date[6:]
close = row[4] * coefficient[0]
open = row[1] * coefficient[0]
new_row = [
t_date,
open,
row[2] * coefficient[0],
row[3] * coefficient[0],
close,
row[5],
row[6] * coefficient[1],
]
df.loc[len(df)] = new_row
return df
def get_freq_k(self, code, market, freq):
pass
def get_security_type(self, code, name=None, market=None):
exchange = str(market).lower()
code_head = code[:2]
if name is not None:
if 'ST' in name or '*' in name:
return None
if exchange == self.SECURITY_EXCHANGE[0]:
if code_head in ["00", "30"]:
return "SZ_A_STOCK"
elif code_head in ["20"]:
return "SZ_B_STOCK"
elif code_head in ["39"]:
return "SZ_INDEX"
elif code_head in ["15", "16"]:
return "SZ_FUND"
elif code_head in ["10", "11", "12", "13", "14"]:
return "SZ_BOND"
elif exchange == self.SECURITY_EXCHANGE[1]:
if code_head in ["60", "68"]: # 688XXX科创板
return "SH_A_STOCK"
elif code_head in ["90"]:
return "SH_B_STOCK"
elif code_head in ["00", "88", "99"]:
return "SH_INDEX"
elif code_head in ["50", "51"]:
return "SH_FUND"
elif code_head in ["01", "10", "11", "12", "13", "14", "20"]:
return "SH_BOND"
else:
# 如果没有标识只返回A股 TODO 不要创业板
if code_head in ["00"]:
return "SZ_A_STOCK"
elif code_head in ["60"]:
return "SH_A_STOCK"
else:
return None
if __name__ == '__main__':
tdx = TdxUtil("D:\\new_tdx")
print(tdx.get_tdx_stock_data("002448"))