qnloft-stock/utils/formula.py

692 lines
17 KiB
Python
Raw Permalink Normal View History

2023-11-24 02:49:22 +00:00
#!/usr/bin/python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
def EMA(number, n):
return pd.Series(number).ewm(alpha=2 / (n + 1), adjust=True).mean()
def MA(number, n):
return pd.Series.rolling(number, n).mean()
def SMA(number, n, m=1):
_df = number.fillna(0)
return pd.Series(_df).ewm(com=n - m, adjust=True).mean()
def RM_SMA(DF, N, M):
DF = DF.fillna(0)
z = len(DF)
var = np.zeros(z)
var[0] = DF[0]
for i in range(1, z):
var[i] = (DF[i] * M + var[i - 1] * (N - M)) / N
for i in range(z):
DF[i] = var[i]
return DF
def ATR(close, high, low, n):
"""
真实波幅
:param close:
:param high:
:param low:
:param n:
:return:
"""
c, h, l_ = close, high, low
mtr = MAX(MAX((h - l_), ABS(REF(c, 1) - h)), ABS(REF(c, 1) - l_))
atr = MA(mtr, n)
return pd.DataFrame({'MTR': mtr, 'ATR': atr})
def HHV(number, n):
return pd.Series.rolling(number, n).max()
def LLV(number, n):
return pd.Series.rolling(number, n).min()
def SUM(number, n):
return pd.Series.rolling(number, n).sum()
def ABS(number):
return np.abs(number)
def MAX(A, B):
return np.maximum(A, B)
def MIN(A, B):
var = IF(A < B, A, B)
return var
def IF(COND, V1, V2):
var = np.flip(np.where(COND, V1, V2))
return pd.Series(var)[::-1]
def REF(DF, N):
var = DF.diff(N)
var = DF - var
return var
def STD(number, n):
return pd.Series.rolling(number, n).std()
def MACD(close, f, s, m):
"""
:param close:
:param f:
:param s:
:param m:
:return:
"""
EMAFAST = EMA(close, f)
EMASLOW = EMA(close, s)
DIFF = EMAFAST - EMASLOW
DEA = EMA(DIFF, m)
MACD = (DIFF - DEA) * 2
return pd.DataFrame({
'DIFF': round(DIFF, 2),
'DEA': round(DEA, 2), 'MACD': round(MACD, 2)})
def KDJ(close, high, low, n, m1, m2):
"""
:param close:
:param high:
:param low:
:param n:
:param m1:
:param m2:
:return:
"""
c, h, l = close, high, low
RSV = (c - LLV(l, n)) / (HHV(h, n) - LLV(l, n)) * 100
K = SMA(RSV, m1, 1)
D = SMA(K, m2, 1)
J = 3 * K - 2 * D
return pd.DataFrame({'KDJ_K': round(K, 2), 'KDJ_D': round(D, 2), 'KDJ_J': round(J, 2)})
def OSC(close, n, m):
"""
变动速率线
:param close:
:param n:
:param m:
:return:
"""
c = close
OS = (c - MA(c, n)) * 100
MAOSC = EMA(OS, m)
return pd.DataFrame({'OSC': OS, 'MAOSC': MAOSC})
def BBI(close, N1, N2, N3, N4):
"""
多空指标
:param close:
:param N1:
:param N2:
:param N3:
:param N4:
:return:
"""
bbi = (MA(close, N1) + MA(close, N2) + MA(close, N3) + MA(close, N4)) / 4
return pd.DataFrame({'BBI': round(bbi, 2)})
def BBIBOLL(close, n, m, n1=3, n2=6, n3=12, n4=24):
"""
多空布林线
:param close:
:param n1:
:param n2:
:param n3:
:param n4:
:param n:
:param m:
:return:
"""
bbi_boll = BBI(close, n1, n2, n3, n4)['BBI']
UPER = bbi_boll + m * STD(bbi_boll, n)
DOWN = bbi_boll - m * STD(bbi_boll, n)
return pd.DataFrame({'BBIBOLL': round(bbi_boll, 2), 'UPER': round(UPER, 2), 'DOWN': round(DOWN, 2)})
def PBX(close, n1, n2, n3, n4, n5, n6):
"""
瀑布线
:param close:
:param n1:
:param n2:
:param n3:
:param n4:
:param n5:
:param n6:
:return:
"""
c = close
PBX1 = (EMA(c, n1) + MA(c, 2 * n1) + MA(c, 4 * n1)) / 3
PBX2 = (EMA(c, n2) + MA(c, 2 * n2) + MA(c, 4 * n2)) / 3
PBX3 = (EMA(c, n3) + MA(c, 2 * n3) + MA(c, 4 * n3)) / 3
PBX4 = (EMA(c, n4) + MA(c, 2 * n4) + MA(c, 4 * n4)) / 3
PBX5 = (EMA(c, n5) + MA(c, 2 * n5) + MA(c, 4 * n5)) / 3
PBX6 = (EMA(c, n6) + MA(c, 2 * n6) + MA(c, 4 * n6)) / 3
return pd.DataFrame(
{'PBX1': round(PBX1, 2), 'PBX2': round(PBX2, 2), 'PBX3': round(PBX3, 2),
'PBX4': round(PBX4, 2), 'PBX5': round(PBX5, 2), 'PBX6': round(PBX6, 2)}
)
def BOLL(close, N): # 布林线
boll = MA(close, N)
UB = boll + 2 * STD(close, N)
LB = boll - 2 * STD(close, N)
return pd.DataFrame({'BOLL': round(boll, 2), 'UB': round(UB, 2), 'LB': round(LB, 2)})
def ROC(close, n, m):
"""
变动率指标
:param close:
:param n:
:param m:
:return:
"""
c = close
roc = 100 * (c - REF(c, n)) / REF(c, n)
maroc = MA(roc, m)
return pd.DataFrame({'ROC': round(roc, 2), 'MAROC': round(maroc, 2)})
def MTM(close, n, m):
"""
动量线
:param close:
:param n:
:param m:
:return:
"""
c = close
mtm = c - REF(c, n)
mtm_ma = MA(mtm, m)
return pd.DataFrame({'MTM': round(mtm, 2), 'MTMMA': round(mtm_ma, 2)})
def MFI(close, high, low, vol, n):
"""
资金指标
:param close:
:param high:
:param low:
:param vol:
:param n:
:return:
"""
c, h, l, v = close, high, low, vol
TYP = (c + h + l) / 3
V1 = SUM(IF(TYP > REF(TYP, 1), TYP * v, 0), n) / \
SUM(IF(TYP < REF(TYP, 1), TYP * v, 0), n)
mfi = 100 - (100 / (1 + V1))
return pd.DataFrame({'MFI': round(mfi, 2)})
def SKDJ(close, high, low, N, M):
c = close
LOWV = LLV(low, N)
HIGHV = HHV(high, N)
RSV = EMA((c - LOWV) / (HIGHV - LOWV) * 100, M)
K = EMA(RSV, M)
D = MA(K, M)
return pd.DataFrame({'SKDJ_K': round(K, 2), 'SKDJ_D': round(D, 2)})
def WR(close, high, low, N, N1):
"""
威廉指标
:param close:
:param high:
:param low:
:param N:
:param N1:
:return:
"""
c, h, l = close, high, low
WR1 = round(100 * (HHV(h, N) - c) / (HHV(h, N) - LLV(l, N)), 2)
WR2 = round(100 * (HHV(h, N1) - c) / (HHV(h, N1) - LLV(l, N1)), 2)
return pd.DataFrame({'WR1': round(WR1, 2), 'WR2': round(WR2, 2)})
def BIAS(DF, N1, N2, N3): # 乖离率
CLOSE = DF
BIAS1 = (CLOSE - MA(CLOSE, N1)) / MA(CLOSE, N1) * 100
BIAS2 = (CLOSE - MA(CLOSE, N2)) / MA(CLOSE, N2) * 100
BIAS3 = (CLOSE - MA(CLOSE, N3)) / MA(CLOSE, N3) * 100
DICT = {'BIAS1': BIAS1, 'BIAS2': BIAS2, 'BIAS3': BIAS3}
VAR = pd.DataFrame(DICT)
return VAR
def RSI(c, N1, N2, N3): # 相对强弱指标RSI1:SMA(MAX(CLOSE-LC,0),N1,1)/SMA(ABS(CLOSE-LC),N1,1)*100;
DIF = c - REF(c, 1)
RSI1 = round((SMA(MAX(DIF, 0), N1) / round(SMA(ABS(DIF), N1) * 100, 3)) * 10000, 2)
RSI2 = round((SMA(MAX(DIF, 0), N2) / round(SMA(ABS(DIF), N2) * 100, 3)) * 10000, 2)
RSI3 = round((SMA(MAX(DIF, 0), N3) / round(SMA(ABS(DIF), N3) * 100, 3)) * 10000, 2)
return pd.DataFrame({'RSI1': RSI1, 'RSI2': RSI2, 'RSI3': RSI3})
def ADTM(DF, N, M): # 动态买卖气指标
HIGH = DF['high']
LOW = DF['low']
OPEN = DF['open']
DTM = IF(OPEN <= REF(OPEN, 1), 0, MAX(
(HIGH - OPEN), (OPEN - REF(OPEN, 1))))
DBM = IF(OPEN >= REF(OPEN, 1), 0, MAX((OPEN - LOW), (OPEN - REF(OPEN, 1))))
STM = SUM(DTM, N)
SBM = SUM(DBM, N)
ADTM1 = IF(STM > SBM, (STM - SBM) / STM,
IF(STM == SBM, 0, (STM - SBM) / SBM))
MAADTM = MA(ADTM1, M)
DICT = {'ADTM': ADTM1, 'MAADTM': MAADTM}
VAR = pd.DataFrame(DICT)
return VAR
def DDI(DF, N, N1, M, M1): # 方向标准离差指数
H = DF['high']
L = DF['low']
DMZ = IF((H + L) <= (REF(H, 1) + REF(L, 1)), 0,
MAX(ABS(H - REF(H, 1)), ABS(L - REF(L, 1))))
DMF = IF((H + L) >= (REF(H, 1) + REF(L, 1)), 0,
MAX(ABS(H - REF(H, 1)), ABS(L - REF(L, 1))))
DIZ = SUM(DMZ, N) / (SUM(DMZ, N) + SUM(DMF, N))
DIF = SUM(DMF, N) / (SUM(DMF, N) + SUM(DMZ, N))
ddi = DIZ - DIF
ADDI = SMA(ddi, N1, M)
AD = MA(ADDI, M1)
DICT = {'DDI': ddi, 'ADDI': ADDI, 'AD': AD}
VAR = pd.DataFrame(DICT)
return VAR
ZIG_STATE_START = 0
ZIG_STATE_RISE = 1
ZIG_STATE_FALL = 2
def ZIG(d, k, n):
"""
之字转向指标当前价格变化超过 x% 时候变化
:param d: 交易日期
:param k: 价格
:param n: 系数
:return:
"""
x = round(n / 100, 2)
peer_i = 0
candidate_i = None
scan_i = 0
peers = [0]
z = np.zeros(len(k))
state = ZIG_STATE_START
while True:
scan_i += 1
if scan_i == len(k) - 1:
# 扫描到尾部
if candidate_i is None:
peer_i = scan_i
peers.append(peer_i)
else:
if state == ZIG_STATE_RISE:
if k[scan_i] >= k[candidate_i]:
print(d[scan_i], "1 --->>>", d[candidate_i])
peer_i = scan_i
peers.append(peer_i)
else:
peer_i = candidate_i
peers.append(peer_i)
peer_i = scan_i
peers.append(peer_i)
elif state == ZIG_STATE_FALL:
if k[scan_i] <= k[candidate_i]:
print(d[scan_i], "2 --->>>", d[candidate_i])
peer_i = scan_i
peers.append(peer_i)
else:
peer_i = candidate_i
peers.append(peer_i)
peer_i = scan_i
peers.append(peer_i)
break
if state == ZIG_STATE_START:
if k[scan_i] >= k[peer_i] * (1 + x):
print(d[scan_i], "3 --->>>", d[peer_i])
candidate_i = scan_i
state = ZIG_STATE_RISE
elif k[scan_i] <= k[peer_i] * (1 - x):
print(d[scan_i], "4 --->>>", d[peer_i])
candidate_i = scan_i
state = ZIG_STATE_FALL
elif state == ZIG_STATE_RISE:
if k[scan_i] >= k[candidate_i]:
candidate_i = scan_i
elif k[scan_i] <= k[candidate_i] * (1 - x):
print(d[scan_i], "5 --->>>", d[candidate_i])
peer_i = candidate_i
peers.append(peer_i)
state = ZIG_STATE_FALL
candidate_i = scan_i
elif state == ZIG_STATE_FALL:
if k[scan_i] <= k[candidate_i]:
print(d[scan_i], "6 --->>>", d[candidate_i])
candidate_i = scan_i
elif k[scan_i] >= k[candidate_i] * (1 + x):
print(d[scan_i], "7 --->>>", d[candidate_i])
peer_i = candidate_i
peers.append(peer_i)
state = ZIG_STATE_RISE
candidate_i = scan_i
for i in range(len(peers) - 1):
peer_start_i = peers[i]
peer_end_i = peers[i + 1]
start_value = k[peer_start_i]
end_value = k[peer_end_i]
a = (end_value - start_value) / (peer_end_i - peer_start_i) # 斜率
for j in range(peer_end_i - peer_start_i + 1):
z[j + peer_start_i] = start_value + a * j
return pd.Series(z), peers
def TROUGHBARS(z, p, m):
"""
m zig 波谷到当前的距离
:param z: zig 指标
:param p: zip 转折点
:param m: 系数
:return:
"""
trough_bars = np.zeros(len(z))
if len(z) > 3:
j = 1
# 判断第一个是谷还是峰 ,峰则取偶数,如果是谷,则取奇数
if z[0] > z[1]:
# 第一个是波谷
for i in range(len(p)):
peer = p[i]
j = i + m * 2
if 0 < i and len(p) > j and i % 2 == 1:
num = p[j] - peer - 1
trough_bars[p[j] - 1] = num
trough_bars[p[j]] = 1
if z[0] < z[1]:
# 第一个是波峰
for i in range(len(p)):
peer = p[i]
j = i + m * 2
if 0 < i and len(p) > j and i % 2 == 0:
num = p[j] - peer - 1
trough_bars[p[j] - 1] = num
trough_bars[p[j]] = 1
return pd.Series(trough_bars)
def CROSS(a, b):
"""
穿越信号
当a向上穿越b时标记1当a向下穿越b时标记-1没穿越标记0
:param obj:
:param ref:
:return:
"""
assert len(a) == len(b), '穿越信号输入维度不相等'
assert len(a) > 1, '穿越信号长度至少为2'
res = np.zeros(len(a))
for i in range(len(a) - 2, -1, -1):
if a[i + 1] <= b[i + 1] and a[i] > b[i] and a[i + 1] < a[i]:
# 向上穿越时标记1
res[i] = 1
elif a[i + 1] >= b[i + 1] and a[i] < b[i] and a[i + 1] > a[i]:
res[i] = -1
else:
res[i] = 0
# print(f"a+1 = {a[i + 1]}, b+1 = {b[i + 1]} , a = {a[i]} , b = {b[i]} , res = {res[i]}")
return pd.Series(res)
def _calc_slope(x):
return np.polyfit(range(len(x)), x, 1)[0]
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.values.strides + (a.values.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def SLOPE(series, n):
"""
SLOPE(X,N) 返回线性回归斜率,N支持变量
参考https://blog.csdn.net/luhouxiang/article/details/113816062
"""
a = rolling_window(series, n)
obj = np.array([_calc_slope(x) for x in a])
new_obj = np.pad(obj, (len(series) - len(obj), 0), 'constant', constant_values=(np.nan, np.nan))
return new_obj
def GOLD_MACD(df_data: pd.DataFrame):
"""
黄金MACD指标
:param df_data:
:return:
"""
df = df_data[::-1].reset_index(drop=True)
CLOSE = df["close"]
d = df["trade_date"]
MACD = (EMA(CLOSE, 30) - REF(EMA(CLOSE, 30), 1)) / REF(EMA(CLOSE, 30), 1) * 100
DIF = EMA(SUM(MACD, 2), 5)
buy_1 = DIF > REF(DIF, 1)
buy_2 = DIF < REF(DIF, 1)
DEA = MA(DIF, 5)
return pd.DataFrame(
{'code': df['ts_code'], 'date': d, 'MACD': MACD, 'DIF': DIF, 'DEA': DEA, 'buy1': buy_1, "buy2": buy_2})
def DJCPX(df_data: pd.DataFrame):
"""
顶级操盘线 指标
:param df_data:
:return:
"""
df = df_data[::-1].reset_index(drop=True)
k = df["close"]
d = df["trade_date"]
# print(f'{d[i]} -->> {buy_1[i]} -->> {buy_2[i]} -->> {B[i]}')
VAR_200 = round((100 - ((90 * (HHV(df["high"], 20) - df["close"])) / (
HHV(df["high"], 20) - LLV(df["low"], 20)))), 2)
VAR_300 = round((100 - MA(
((100 * (HHV(df["high"], 5) - df["close"])) / (HHV(df["high"], 5) - LLV(df["low"], 5))),
34)), 2)
VAR_300_MA_5 = MA(VAR_300, 5)
# F:IF(CROSS(VAR200,MA(VAR300,5)),LOW * 0.98,DRAWNULL),CROSSDOT,LINETHICK3,COLORFF00FF;
# CROSS 上穿函数 CROSS(A,B)表示当A从下方向上穿过B时返回1,否则返回0
F = np.zeros(df.shape[0])
VAR_CROSS = CROSS(VAR_200, VAR_300_MA_5)
for i in range(df.shape[0]):
if VAR_CROSS[i] == 1:
F[i] = round(df["low"][i] * 0.98, 2)
# 重心:=(C+0.618*REF(C,1)+0.382*REF(C,1)+0.236*REF(C,3)+0.146*REF(C,4))/2.382;
ZX = round((k + (0.618 * REF(k, 1)) + (0.382 * REF(k, 1)) + (0.236 * REF(k, 3)) + (
0.146 * REF(k, 4))) / 2.382, 2)
# 【操盘线】:EMA(((SLOPE(C,22)*20)+C),55),COLORYELLOW,LINETHICK4;
CPX = round(EMA(((SLOPE(k, 22) * 20) + k), 55), 2)
# 【黄金线】:IF(重心>=【操盘线】,【操盘线】,DRAWNULL),COLORRED,LINETHICK2;
HJX = np.zeros(df.shape[0])
# 【空仓线】:IF(重心<【操盘线】,【操盘线】,DRAWNULL),COLORCYAN,LINETHICK2;
KCX = np.zeros(df.shape[0])
for i in range(df.shape[0]):
if ZX[i] >= CPX[i]:
HJX[i] = CPX[i]
else:
KCX[i] = CPX[i]
return pd.DataFrame(
{'code': df['ts_code'], 'date': d, 'F': F, '黄金线': HJX, '空仓线': KCX})
def CCI(DF, n: int = 14):
TP = (DF['low'] + DF['high'] + DF['close']) / 3
MA = TP.rolling(window=n).mean()
MD = TP.rolling(window=n).apply(lambda x: abs(x - x.mean()).mean(), raw=False)
return round((TP - MA) / (0.015 * MD), 2)
def bullish(DF, N):
"""
多头指标N项的递增序列
:param DF:
:param N:
:return:
"""
return pd.Series.rolling(DF, N).apply(lambda x: x.is_monotonic_increasing)
def bearish(DF, N):
"""
空头指标N项的递减序列
:param DF:
:param N:
:return:
"""
return pd.Series.rolling(DF, N).apply(lambda x: x.is_monotonic_decreasing)
def OBV(c, v, M):
# diff 计算相邻元素的差值
change = np.diff(c)
# sign 用于获取数组元素的符号的函数,对于正数,返回 1对于负数返回 -1对于零返回 0
# hstack 用于水平(按列)连接数组的函数
sig = np.hstack([[1], np.sign(change)])
# cumsum 计算累积和的方法。它将给定数组中的元素逐个累加
obv_ = np.cumsum(v * sig)
OBV = pd.Series(obv_)
MAOBV = MA(OBV, M)
return pd.DataFrame({'OBV': OBV, 'MAOBV': MAOBV})
def OBV_PLUS(DF, M):
"""
OBV策略升级TODO 还没完成
1. 增加价格相距大的那一天成交量的权重这可以更突出上升趋势和下降趋势
2. 当天的成交量以一定比例加入OBV中而不是将全天的成交量全部加入OBV中
:param DF:
:param M:
:return:
"""
CLOSE = DF['close']
VOL = DF['vol']
ref = REF(CLOSE, 1)
var_total = 0
for index, row in DF.iterrows():
if np.isnan(ref[index]):
var_total += VOL[index]
continue
if row["close"] > ref[index]:
vol = VOL[index] * 1
elif row["close"] == ref[index]:
vol = 0
else:
vol = VOL[index] * -1
var_total += vol
def ASI(OPEN, CLOSE, HIGH, LOW, M1=26, M2=10):
"""
# 振动升降指标
:param OPEN:
:param CLOSE:
:param HIGH:
:param LOW:
:param M1:
:param M2:
:return:
"""
LC = REF(CLOSE, 1)
AA = ABS(HIGH - LC)
BB = ABS(LOW - LC)
CC = ABS(HIGH - REF(LOW, 1))
DD = ABS(LC - REF(OPEN, 1))
R = IF((AA > BB) & (AA > CC), AA + BB / 2 + DD / 4, IF((BB > CC) & (BB > AA), BB + AA / 2 + DD / 4, CC + DD / 4))
X = (CLOSE - LC + (CLOSE - OPEN) / 2 + LC - REF(OPEN, 1))
SI = 16 * X / R * MAX(AA, BB)
ASI = SUM(SI, M1)
ASIT = MA(ASI, M2)
return {'ASI': ASI, 'ASIT': ASIT}
def DMI(CLOSE, HIGH, LOW, M1=14, M2=6): # 动向指标:结果和同花顺,通达信完全一致
TR = SUM(MAX(MAX(HIGH - LOW, ABS(HIGH - REF(CLOSE, 1))), ABS(LOW - REF(CLOSE, 1))), M1)
HD = HIGH - REF(HIGH, 1)
LD = REF(LOW, 1) - LOW
DMP = SUM(IF((HD > 0) & (HD > LD), HD, 0), M1)
DMM = SUM(IF((LD > 0) & (LD > HD), LD, 0), M1)
PDI = (DMP * 100) / TR
MDI = (DMM * 100) / TR
ADX = MA(ABS(MDI - PDI) / (PDI + MDI) * 100, M2)
ADXR = (ADX + REF(ADX, M2)) / 2
return {'PDI': round(PDI.fillna(0), 2), 'MDI': round(MDI.fillna(0), 2), 'ADX': round(ADX.fillna(0), 2),
'ADXR': round(ADXR.fillna(0), 2)}
def RM_KDJ(C, H, L, N, M1, M2):
RSV = (C - LLV(L, N)) / (HHV(H, N) - LLV(L, N)) * 100
K = RM_SMA(RSV, M1, 1)
D = RM_SMA(K, M2, 1)
J = 3 * K - 2 * D
return pd.DataFrame({'KDJ_K': round(K, 2), 'KDJ_D': round(D, 2), 'KDJ_J': round(J, 2)})
def INTPART(number):
number = number.fillna(0)
return number.astype(int)
def JXNH(CLOSE, OPEN, VOL):
VAR1 = MA(CLOSE, 5)
VAR2 = MA(CLOSE, 10)
VAR3 = MA(CLOSE, 30)
VARB = SUM(CLOSE * VOL * 100, 28) / SUM(VOL * 100, 28)
VARC = INTPART(VARB * 100) / 100
VARD = EMA(CLOSE, 5) - EMA(CLOSE, 10)
VARE = EMA(VARD, 9)
VAR13 = REF(VARE, 1)
VAR14 = VARE
VAR15 = VAR14 - VAR13
VAR16 = REF(VARD, 1)
VAR17 = VARD
VAR18 = VAR17 - VAR16
VAR19 = OPEN
VAR1A = CLOSE
JXNH = (VAR19 <= VAR1) & \
(VAR19 <= VAR2) & \
(VAR19 <= VAR3) & \
(VAR1A >= VAR1) & \
(VAR1A >= VARC) & (VAR15 > 0) & (VAR18 > 0)
return pd.DataFrame({'JXNH': JXNH})