qnloft-stock/utils/comm.py

286 lines
8.7 KiB
Python
Raw Permalink Normal View History

2023-11-24 02:49:22 +00:00
import ftplib
import os
from datetime import datetime
from ftplib import FTP, error_perm
from pathlib import Path
import akshare as ak
import efinance as ef
import qstock as qs
import tushare as ts
import pandas as pd
import time
import random
from tabulate import tabulate
import platform
import xcsc_tushare as xc
token = '0718534658b9d91b3f03dc8b220e4062193ebf6f6414d036505165e1'
xc.set_token('bd4f26f1eca8d660bd23e229260df46002d630d6e1fb9226380edec8')
# 仿真环境用这个方式可以连上,生产环境用这个连不上
xcsc_pro = xc.pro_api(env='prd', server='http://116.128.206.39:7172')
# 显示所有列
pd.set_option('display.max_columns', None)
# 显示所有行
pd.set_option('display.max_rows', None)
# 输出不折行
pd.set_option('expand_frame_repr', False)
# 最大列宽度
pd.set_option('display.max_colwidth', None)
def sleep():
print("开始休眠防止ip被拉黑或者进小黑屋")
# 生成随机的休眠时间
sleep_time = random.uniform(0, 2)
# 执行休眠
time.sleep(sleep_time)
print("休眠结束!")
def print_markdown(df: pd):
print(tabulate(df, headers='keys', tablefmt='github'))
def get_file(size):
"""
获取不同系统下的文件路径
:param size:
:return:
"""
sys_platform = platform.platform().lower()
if "macos" in sys_platform:
return f'/Users/renmeng/work_space/python_work/qnloft-get-web-everything/股票金融/量化交易/股票数据/{size}'
elif "windows" in sys_platform:
return f'E:\\Python_Workplace\qnloft-get-web-everything\\股票金融\\量化交易\\股票数据\\{size}'
else:
print("其他系统")
def write_file(content, file_name, mode_type='a'):
file = ""
sys_platform = platform.platform().lower()
if "macos" in sys_platform:
file = f'/Users/renmeng/work_space/python_work/qnloft-get-web-everything/股票金融/量化交易/股票数据/{file_name}'
elif "windows" in sys_platform:
file = f'D:\\文档\\数据测试\\{file_name}'
else:
print("其他系统")
path = Path(file)
# 创建目录或文件
if not path.exists():
if path.is_dir():
path.mkdir(parents=True) # 创建目录及其父目录
else:
path.touch() # 创建文件
with path.open(mode=mode_type) as file:
file.write(content)
def create_file(file_name):
path = Path(file_name)
# 创建目录或文件
if not path.exists():
path.mkdir(parents=True) # 创建目录及其父目录
def get_random_stock(size, n=0):
if size == 'M':
s_list = Path(get_file(size)).read_text().splitlines()
elif size == 'S':
s_list = Path(get_file(size)).read_text().splitlines()
elif size == 'L':
s_list = Path(get_file(size)).read_text().splitlines()
else:
raise ValueError("size输入错误")
if n > 0:
return random.sample(s_list, n)
else:
return s_list
def del_file_lines(size, to_delete):
content = Path(get_file(size)).read_text()
# 删除匹配的数字
for char in to_delete:
content = content.replace(char, '')
# 将更新后的内容写回文件
with Path(get_file(size)).open(mode='w') as file:
file.writelines(content)
def if_run(current_date=datetime.now()):
"""
判断当前日期是否是交易日
:param current_date:
:return:
"""
df = ak.tool_trade_date_hist_sina()
# 将日期列转换为 datetime 类型
df['trade_date'] = pd.to_datetime(df['trade_date'])
date_now_hour = current_date.hour
# 将日期格式化为 "yyyy-mm-dd" 形式
formatted_date = current_date.strftime('%Y-%m-%d')
# 检查 DataFrame 是否包含特定日期
contains_target_date = (df['trade_date'] == formatted_date).any()
print(f'当日日期是:{formatted_date} ,今日是否开盘:{contains_target_date}')
if contains_target_date:
return True
return False
def return_trading_day(now_date, offset_date, format_type='%Y%m%d'):
"""
获取当前时间的 前n个交易日期后n个交易日期
:param format_type:
:param offset_date:
:param now_date:
:return:
"""
df = ak.tool_trade_date_hist_sina()
# 将日期列转换为 datetime 类型
df['trade_date'] = pd.to_datetime(df['trade_date'])
if isinstance(now_date, datetime):
formatted_date = now_date.strftime('%Y-%m-%d')
else:
formatted_date = now_date
selected_indexes = df[df['trade_date'] == formatted_date].index
return df.loc[selected_indexes + offset_date, 'trade_date'].dt.strftime(format_type).values[0]
def upload_files_to_ftp(local_directory, remote_directory, file_name=None):
"""
ftp上传
:param local_directory:
:param remote_directory:
:param file_name:
:return:
"""
try:
ftp = FTP("qxu1142200198.my3w.com")
ftp.login('qxu1142200198', '48XZ55MB')
except error_perm as e:
print(f"Network error: {e}")
return False
try:
# 切换目录
ftp.cwd(remote_directory)
except ftplib.error_perm:
print("目录不存在, 开始创建...")
ftp.mkd(remote_directory)
try:
path = Path(local_directory)
# 判断是文件夹还是文件
if path.is_dir() and file_name is None:
files = os.listdir(local_directory) # get list of files in local directory
for file in files:
if file.endswith(".html"): # upload only .txt files
local_file = f"{local_directory}/{file}"
remote_file = f"{remote_directory}/{file}"
print(local_file, "----->>", remote_file)
with open(local_file, 'rb') as f:
try:
ftp.delete(remote_file)
except ftplib.error_perm:
print("目录不存在, 或已经删除...")
ftp.storbinary(f"STOR {remote_file}", f) # upload file to FTP server
if file_name is not None and len(file_name) > 0:
local_file = f"{local_directory}/{file_name}"
remote_file = f"{remote_directory}/{file_name}"
print(local_file, "----->>", remote_file)
with open(local_file, 'rb') as f:
try:
ftp.delete(remote_file)
except ftplib.error_perm:
print("目录不存在, 或已经删除...")
ftp.storbinary(f"STOR {remote_file}", f) # upload file to FTP server
finally:
ftp.quit()
return True
def back_testing(df, cond: str, n=3):
"""
回撤测试
:param df:
:param cond:
:param n:
:return:
"""
filtered_df = df.eval(cond)
total, tomorrow_rise, tomorrow_fall = 0, 0, 0
res_df = pd.DataFrame(
columns=['code', '日期', '明天涨跌幅', 'n日最大涨幅', 'n日平均涨跌幅', 'n日最大回撤'])
for index, row in df.iterrows():
if filtered_df[index] and index > n:
# print(f'{row["trade_date"]} --> {row["KDJ_D"]} --> {row["KDJ_J"]}')
total += 1
# 明天涨跌幅情况
tomorrow_pre = df.iloc[-index:-index + 1]['pct_chg'].values[0].round(2)
# n日 最大涨幅
max_h = df.iloc[-index:-index + n]['pct_chg'].max().round(2)
# n日 平均涨跌幅
mean_h = df.iloc[-index:-index + n]['pct_chg'].mean().round(2)
# n日 最大回撤
h_max = df.iloc[-index:-index + n]['high'].max()
l_min = df.iloc[-index:-index + n]['low'].min()
max_ret = f"{(h_max - l_min) / l_min:.2f}"
res_df.loc[total, 'code'], res_df.loc[total, '日期'] = row['ts_code'], row["trade_date"]
res_df.loc[total, '明天涨跌幅'] = tomorrow_pre
res_df.loc[total, 'n日最大涨幅'] = max_h
res_df.loc[total, 'n日平均涨跌幅'] = mean_h
res_df.loc[total, 'n日最大回撤'] = max_ret
return res_df
def buying_and_selling_decisions(df, cond_buy: str, cond_sell: str, cond_buy_read=None, cond_sell_read=None):
"""
买卖测试
:param df:
:param cond_buy:
:param cond_sell:
:param cond_buy_read:
:param cond_sell_read:
:return:
"""
total, buy, sell, total_chg, buy_date, flag_ready_b, flag_ready_s = 0, 0, 0, 0, 0, False, False
res_df = pd.DataFrame(
columns=['code', '买入日期', '卖出日期', '盈利', '持仓天数'])
for index, row in df.iterrows():
trade_date, c = row["trade_date"], row['close']
if (eval(cond_buy_read) or cond_buy_read is None) and buy == 0:
# 准备买入
print(f"{trade_date} {cond_buy_read},准备买入!")
flag_ready_b = True
if eval(cond_buy) and buy == 0 and flag_ready_b:
# 买入
print(f"{trade_date} {cond_buy},买入操作!")
buy, buy_date = c, trade_date
flag_ready_b = False
if (eval(cond_sell_read) or cond_sell_read is None) and buy > 0:
# 准备卖出
print(f"{trade_date} {cond_sell_read},准备卖出!")
flag_ready_s = True
if eval(cond_sell) and buy > 0 and flag_ready_s:
print(f"{trade_date} {cond_sell},卖出操作!")
total += 1
# 卖出
sell = c
# 计算盈利
chg = round(((sell - buy) / buy) * 100, 2)
# 计算持仓天数
start_date = datetime.strptime(buy_date, "%Y%m%d")
end_date = datetime.strptime(trade_date, "%Y%m%d")
# 计算间隔天数
interval_days = (end_date - start_date).days
buy, flag_ready_s = 0, False
res_df.loc[total, 'code'], res_df.loc[total, '买入日期'] = row['ts_code'], buy_date
res_df.loc[total, '卖出日期'] = trade_date
res_df.loc[total, '盈利'] = chg
res_df.loc[total, '持仓天数'] = interval_days
return res_df