import ftplib import os from datetime import datetime from ftplib import FTP, error_perm from pathlib import Path import akshare as ak import efinance as ef import qstock as qs import tushare as ts import pandas as pd import time import random from tabulate import tabulate import platform import xcsc_tushare as xc token = '0718534658b9d91b3f03dc8b220e4062193ebf6f6414d036505165e1' xc.set_token('bd4f26f1eca8d660bd23e229260df46002d630d6e1fb9226380edec8') # 仿真环境用这个方式可以连上,生产环境用这个连不上 xcsc_pro = xc.pro_api(env='prd', server='http://116.128.206.39:7172') # 显示所有列 pd.set_option('display.max_columns', None) # 显示所有行 pd.set_option('display.max_rows', None) # 输出不折行 pd.set_option('expand_frame_repr', False) # 最大列宽度 pd.set_option('display.max_colwidth', None) def sleep(): print("开始休眠,防止ip被拉黑或者进小黑屋") # 生成随机的休眠时间 sleep_time = random.uniform(0, 2) # 执行休眠 time.sleep(sleep_time) print("休眠结束!") def print_markdown(df: pd): print(tabulate(df, headers='keys', tablefmt='github')) def get_file(size): """ 获取不同系统下的文件路径 :param size: :return: """ sys_platform = platform.platform().lower() if "macos" in sys_platform: return f'/Users/renmeng/work_space/python_work/qnloft-get-web-everything/股票金融/量化交易/股票数据/{size}' elif "windows" in sys_platform: return f'E:\\Python_Workplace\qnloft-get-web-everything\\股票金融\\量化交易\\股票数据\\{size}' else: print("其他系统") def write_file(content, file_name, mode_type='a'): file = "" sys_platform = platform.platform().lower() if "macos" in sys_platform: file = f'/Users/renmeng/work_space/python_work/qnloft-get-web-everything/股票金融/量化交易/股票数据/{file_name}' elif "windows" in sys_platform: file = f'D:\\文档\\数据测试\\{file_name}' else: print("其他系统") path = Path(file) # 创建目录或文件 if not path.exists(): if path.is_dir(): path.mkdir(parents=True) # 创建目录及其父目录 else: path.touch() # 创建文件 with path.open(mode=mode_type) as file: file.write(content) def create_file(file_name): path = Path(file_name) # 创建目录或文件 if not path.exists(): path.mkdir(parents=True) # 创建目录及其父目录 def get_random_stock(size, n=0): if size == 'M': s_list = Path(get_file(size)).read_text().splitlines() elif size == 'S': s_list = Path(get_file(size)).read_text().splitlines() elif size == 'L': s_list = Path(get_file(size)).read_text().splitlines() else: raise ValueError("size输入错误!") if n > 0: return random.sample(s_list, n) else: return s_list def del_file_lines(size, to_delete): content = Path(get_file(size)).read_text() # 删除匹配的数字 for char in to_delete: content = content.replace(char, '') # 将更新后的内容写回文件 with Path(get_file(size)).open(mode='w') as file: file.writelines(content) def if_run(current_date=datetime.now()): """ 判断当前日期是否是交易日 :param current_date: :return: """ df = ak.tool_trade_date_hist_sina() # 将日期列转换为 datetime 类型 df['trade_date'] = pd.to_datetime(df['trade_date']) date_now_hour = current_date.hour # 将日期格式化为 "yyyy-mm-dd" 形式 formatted_date = current_date.strftime('%Y-%m-%d') # 检查 DataFrame 是否包含特定日期 contains_target_date = (df['trade_date'] == formatted_date).any() print(f'当日日期是:{formatted_date} ,今日是否开盘:{contains_target_date}') if contains_target_date: return True return False def return_trading_day(now_date, offset_date, format_type='%Y%m%d'): """ 获取当前时间的 前n个交易日期,后n个交易日期 :param format_type: :param offset_date: :param now_date: :return: """ df = ak.tool_trade_date_hist_sina() # 将日期列转换为 datetime 类型 df['trade_date'] = pd.to_datetime(df['trade_date']) if isinstance(now_date, datetime): formatted_date = now_date.strftime('%Y-%m-%d') else: formatted_date = now_date selected_indexes = df[df['trade_date'] == formatted_date].index return df.loc[selected_indexes + offset_date, 'trade_date'].dt.strftime(format_type).values[0] def upload_files_to_ftp(local_directory, remote_directory, file_name=None): """ ftp上传 :param local_directory: :param remote_directory: :param file_name: :return: """ try: ftp = FTP("qxu1142200198.my3w.com") ftp.login('qxu1142200198', '48XZ55MB') except error_perm as e: print(f"Network error: {e}") return False try: # 切换目录 ftp.cwd(remote_directory) except ftplib.error_perm: print("目录不存在, 开始创建...") ftp.mkd(remote_directory) try: path = Path(local_directory) # 判断是文件夹还是文件 if path.is_dir() and file_name is None: files = os.listdir(local_directory) # get list of files in local directory for file in files: if file.endswith(".html"): # upload only .txt files local_file = f"{local_directory}/{file}" remote_file = f"{remote_directory}/{file}" print(local_file, "----->>", remote_file) with open(local_file, 'rb') as f: try: ftp.delete(remote_file) except ftplib.error_perm: print("目录不存在, 或已经删除...") ftp.storbinary(f"STOR {remote_file}", f) # upload file to FTP server if file_name is not None and len(file_name) > 0: local_file = f"{local_directory}/{file_name}" remote_file = f"{remote_directory}/{file_name}" print(local_file, "----->>", remote_file) with open(local_file, 'rb') as f: try: ftp.delete(remote_file) except ftplib.error_perm: print("目录不存在, 或已经删除...") ftp.storbinary(f"STOR {remote_file}", f) # upload file to FTP server finally: ftp.quit() return True def back_testing(df, cond: str, n=3): """ 回撤测试 :param df: :param cond: :param n: :return: """ filtered_df = df.eval(cond) total, tomorrow_rise, tomorrow_fall = 0, 0, 0 res_df = pd.DataFrame( columns=['code', '日期', '明天涨跌幅', 'n日最大涨幅', 'n日平均涨跌幅', 'n日最大回撤']) for index, row in df.iterrows(): if filtered_df[index] and index > n: # print(f'{row["trade_date"]} --> {row["KDJ_D"]} --> {row["KDJ_J"]}') total += 1 # 明天涨跌幅情况 tomorrow_pre = df.iloc[-index:-index + 1]['pct_chg'].values[0].round(2) # n日 最大涨幅 max_h = df.iloc[-index:-index + n]['pct_chg'].max().round(2) # n日 平均涨跌幅 mean_h = df.iloc[-index:-index + n]['pct_chg'].mean().round(2) # n日 最大回撤 h_max = df.iloc[-index:-index + n]['high'].max() l_min = df.iloc[-index:-index + n]['low'].min() max_ret = f"{(h_max - l_min) / l_min:.2f}" res_df.loc[total, 'code'], res_df.loc[total, '日期'] = row['ts_code'], row["trade_date"] res_df.loc[total, '明天涨跌幅'] = tomorrow_pre res_df.loc[total, 'n日最大涨幅'] = max_h res_df.loc[total, 'n日平均涨跌幅'] = mean_h res_df.loc[total, 'n日最大回撤'] = max_ret return res_df def buying_and_selling_decisions(df, cond_buy: str, cond_sell: str, cond_buy_read=None, cond_sell_read=None): """ 买卖测试 :param df: :param cond_buy: :param cond_sell: :param cond_buy_read: :param cond_sell_read: :return: """ total, buy, sell, total_chg, buy_date, flag_ready_b, flag_ready_s = 0, 0, 0, 0, 0, False, False res_df = pd.DataFrame( columns=['code', '买入日期', '卖出日期', '盈利', '持仓天数']) for index, row in df.iterrows(): trade_date, c = row["trade_date"], row['close'] if (eval(cond_buy_read) or cond_buy_read is None) and buy == 0: # 准备买入 print(f"{trade_date} {cond_buy_read},准备买入!") flag_ready_b = True if eval(cond_buy) and buy == 0 and flag_ready_b: # 买入 print(f"{trade_date} {cond_buy},买入操作!") buy, buy_date = c, trade_date flag_ready_b = False if (eval(cond_sell_read) or cond_sell_read is None) and buy > 0: # 准备卖出 print(f"{trade_date} {cond_sell_read},准备卖出!") flag_ready_s = True if eval(cond_sell) and buy > 0 and flag_ready_s: print(f"{trade_date} {cond_sell},卖出操作!") total += 1 # 卖出 sell = c # 计算盈利 chg = round(((sell - buy) / buy) * 100, 2) # 计算持仓天数 start_date = datetime.strptime(buy_date, "%Y%m%d") end_date = datetime.strptime(trade_date, "%Y%m%d") # 计算间隔天数 interval_days = (end_date - start_date).days buy, flag_ready_s = 0, False res_df.loc[total, 'code'], res_df.loc[total, '买入日期'] = row['ts_code'], buy_date res_df.loc[total, '卖出日期'] = trade_date res_df.loc[total, '盈利'] = chg res_df.loc[total, '持仓天数'] = interval_days return res_df