This commit is contained in:
2024 2024-09-04 19:58:58 +08:00
parent 10c7b35b6b
commit a52c77b16e
1 changed files with 163 additions and 71 deletions

View File

@ -1,20 +1,20 @@
# -*- coding: utf-8 -*
'''
定时自定义
2 10 20 5 * jd_wskey.py
new Env('wskey转换');
'''
import socket
import base64
import json
import os
import sys
import logging
import time
import re
import hashlib
import hmac
import json
import logging
import os
import random
import re
import socket
import struct
import sys
import time
import uuid
WSKEY_MODE = 0
# 0 = Default / 1 = Debug!
@ -40,7 +40,7 @@ except Exception as err:
logger.debug(str(err)) # 调试日志输出
logger.info("无推送文件") # 标准日志输出
ver = 31207 # 版本号
ver = 40904 # 版本号
def ttotp(key):
@ -52,6 +52,97 @@ def ttotp(key):
return str(binary)[-6:].zfill(6)
def sign_core(par):
arr = [0x37, 0x92, 0x44, 0x68, 0xA5, 0x3D, 0xCC, 0x7F, 0xBB, 0xF, 0xD9, 0x88, 0xEE, 0x9A, 0xE9, 0x5A]
key2 = b"80306f4370b39fd5630ad0529f77adb6"
arr1 = [0 for _ in range(len(par))]
for i in range(len(par)):
r0 = int(par[i])
r2 = arr[i & 0xf]
r4 = int(key2[i & 7])
r0 = r2 ^ r0
r0 = r0 ^ r4
r0 = r0 + r2
r2 = r2 ^ r0
r1 = int(key2[i & 7])
r2 = r2 ^ r1
arr1[i] = r2 & 0xff
return bytes(arr1)
def get_sign(functionId, body, uuid, client, clientVersion, st, sv):
all_arg = "functionId=%s&body=%s&uuid=%s&client=%s&clientVersion=%s&st=%s&sv=%s" % (
functionId, body, uuid, client, clientVersion, st, sv)
ret_bytes = sign_core(str.encode(all_arg))
info = hashlib.md5(base64.b64encode(ret_bytes)).hexdigest()
return info
def base64Encode(string):
string1 = "KLMNOPQRSTABCDEFGHIJUVWXYZabcdopqrstuvwxefghijklmnyz0123456789+/"
string2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
return base64.b64encode(string.encode("utf-8")).decode('utf-8').translate(str.maketrans(string1, string2))
def base64Decode(string):
string1 = "KLMNOPQRSTABCDEFGHIJUVWXYZabcdopqrstuvwxefghijklmnyz0123456789+/"
string2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
stringbase = base64.b64decode(string.translate(str.maketrans(string1, string2))).decode('utf-8')
return stringbase
def genJDUA():
st = round(time.time() * 1000)
aid = base64Encode(''.join(str(uuid.uuid4()).split('-'))[16:])
oaid = base64Encode(''.join(str(uuid.uuid4()).split('-'))[16:])
ua = 'jdapp;android;11.1.4;;;appBuild/98176;ef/1;ep/{"hdid":"JM9F1ywUPwflvMIpYPok0tt5k9kW4ArJEU3lfLhxBqw=","ts":%s,"ridx":-1,"cipher":{"sv":"CJS=","ad":"%s","od":"%s","ov":"CzO=","ud":"%s"},"ciphertype":5,"version":"1.2.0","appname":"com.jingdong.app.mall"};Mozilla/5.0 (Linux; Android 12; M2102K1C Build/SKQ1.220303.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/97.0.4692.98 Mobile Safari/537.36' % (st, aid, oaid, aid)
return ua
def genParams():
suid = ''.join(str(uuid.uuid4()).split('-'))[16:]
buid = base64Encode(suid)
st = round(time.time() * 1000)
sv = random.choice(["102", "111", "120"])
ep = json.dumps({
"hdid": "JM9F1ywUPwflvMIpYPok0tt5k9kW4ArJEU3lfLhxBqw=",
"ts": st,
"ridx": -1,
"cipher": {
"area": "CV8yEJUzXzU0CNG0XzK=",
"d_model": "JWunCVVidRTr",
"wifiBssid": "dW5hbw93bq==",
"osVersion": "CJS=",
"d_brand": "WQvrb21f",
"screen": "CJuyCMenCNq=",
"uuid": buid,
"aid": buid,
"openudid": buid
},
"ciphertype": 5,
"version": "1.2.0",
"appname": "com.jingdong.app.mall"
}).replace(" ", "")
body = '{"to":"https%3a%2f%2fplogin.m.jd.com%2fjd-mlogin%2fstatic%2fhtml%2fappjmp_blank.html"}'
sign = get_sign("genToken", body, suid, "android", "11.1.4", st, sv)
params = {
'functionId': 'genToken',
'clientVersion': '11.1.4',
'build': '98176',
'client': 'android',
'partner': 'google',
'oaid': suid,
'sdkVersion': '31',
'lang': 'zh_CN',
'harmonyOs': '0',
'networkType': 'UNKNOWN',
'uemps': '0-2',
'ext': '{"prstate": "0", "pvcStu": "1"}',
'eid': 'eidAcef08121fds9MoeSDdMRQ1aUTyb1TyPr2zKHk5Asiauw+K/WvS1Ben1cH6N0UnBd7lNM50XEa2kfCcA2wwThkxZc1MuCNtfU/oAMGBqadgres4BU',
'ef': '1',
'ep': ep,
'st': st,
'sign': sign,
'sv': sv
}
return params
def ql_send(text):
if "WSKEY_SEND" in os.environ and os.environ["WSKEY_SEND"] == 'disable':
return True
@ -237,7 +328,7 @@ def check_ck(ck) -> bool: # 方法 检查 Cookie有效性 使用变量传递
headers = {
'Cookie': ck,
'Referer': 'https://home.m.jd.com/myJd/home.action',
'user-agent': ua
'user-agent': genJDUA()
} # 设置 HTTP头
try:
res = requests.get(url=url, headers=headers, verify=False, timeout=10,
@ -267,21 +358,22 @@ def check_ck(ck) -> bool: # 方法 检查 Cookie有效性 使用变量传递
# 返回值 bool jd_ck
def getToken(wskey): # 方法 获取 Wskey转换使用的 Token 由 JD_API 返回 这里传递 wskey
try:
url = str(base64.b64decode(url_t).decode()) + 'api/genToken' # 设置云端服务器地址 路由为 genToken
header = {"User-Agent": ua} # 设置 HTTP头
params = requests.get(url=url, headers=header, verify=False, timeout=20).json() # 设置 HTTP请求参数 超时 20秒 Json解析
except Exception as err:
logger.info("Params参数获取失败") # 标准日志输出
logger.debug(str(err)) # 调试日志输出
# return False, wskey # 返回 -> False[Bool], Wskey
return False # 返回 -> False[Bool], Wskey
# try:
# url = str(base64.b64decode(url_t).decode()) + 'api/genToken' # 设置云端服务器地址 路由为 genToken
# header = {"User-Agent": ua} # 设置 HTTP头
# params = requests.get(url=url, headers=header, verify=False, timeout=20).json() # 设置 HTTP请求参数 超时 20秒 Json解析
# except Exception as err:
# logger.info("Params参数获取失败") # 标准日志输出
# logger.debug(str(err)) # 调试日志输出
# # return False, wskey # 返回 -> False[Bool], Wskey
# return False # 返回 -> False[Bool], Wskey
params = genParams()
headers = {
'cookie': wskey,
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'charset': 'UTF-8',
'accept-encoding': 'br,gzip,deflate',
'user-agent': ua
'user-agent': genJDUA()
} # 设置 HTTP头
url = 'https://api.m.jd.com/client.action' # 设置 URL地址
data = 'body=%7B%22to%22%3A%22https%253a%252f%252fplogin.m.jd.com%252fjd-mlogin%252fstatic%252fhtml%252fappjmp_blank.html%22%7D&' # 设置 POST 载荷
@ -307,7 +399,7 @@ def appjmp(wskey, tokenKey): # 方法 传递 wskey & tokenKey
# return False, wskey # 返回 -> False[Bool], Wskey
return False # 返回 -> False[Bool], Wskey
headers = {
'User-Agent': ua,
'User-Agent': genJDUA(),
'accept': 'accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'x-requested-with': 'com.jingdong.app.mall'
} # 设置 HTTP头
@ -476,52 +568,52 @@ def ql_insert(i_ck): # 方法 插入新变量
logger.info("\n账号添加失败\n--------------------\n") # 标准日志输出
def cloud_info(): # 方法 云端信息
url = str(base64.b64decode(url_t).decode()) + 'api/check_api' # 设置 URL地址 路由 [check_api]
for i in range(3): # For循环 3次
try:
headers = {"authorization": "Bearer Shizuku"} # 设置 HTTP头
res = requests.get(url=url, verify=False, headers=headers, timeout=20).text # HTTP[GET] 请求 超时 20秒
except requests.exceptions.ConnectTimeout:
logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出
time.sleep(1) # 休眠 1秒
continue # 循环继续
except requests.exceptions.ReadTimeout:
logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出
time.sleep(1) # 休眠 1秒
continue # 循环继续
except Exception as err:
logger.info("\n未知错误云端, 退出脚本!") # 标准日志输出
logger.debug(str(err)) # 调试日志输出
sys.exit(1) # 脚本退出
else:
try:
c_info = json.loads(res) # json读取参数
except Exception as err:
logger.info("云端参数解析失败") # 标准日志输出
logger.debug(str(err)) # 调试日志输出
sys.exit(1) # 脚本退出
else:
return c_info # 返回 -> c_info
# def cloud_info(): # 方法 云端信息
# url = str(base64.b64decode(url_t).decode()) + 'api/check_api' # 设置 URL地址 路由 [check_api]
# for i in range(3): # For循环 3次
# try:
# headers = {"authorization": "Bearer Shizuku"} # 设置 HTTP头
# res = requests.get(url=url, verify=False, headers=headers, timeout=20).text # HTTP[GET] 请求 超时 20秒
# except requests.exceptions.ConnectTimeout:
# logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出
# time.sleep(1) # 休眠 1秒
# continue # 循环继续
# except requests.exceptions.ReadTimeout:
# logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出
# time.sleep(1) # 休眠 1秒
# continue # 循环继续
# except Exception as err:
# logger.info("\n未知错误云端, 退出脚本!") # 标准日志输出
# logger.debug(str(err)) # 调试日志输出
# sys.exit(1) # 脚本退出
# else:
# try:
# c_info = json.loads(res) # json读取参数
# except Exception as err:
# logger.info("云端参数解析失败") # 标准日志输出
# logger.debug(str(err)) # 调试日志输出
# sys.exit(1) # 脚本退出
# else:
# return c_info # 返回 -> c_info
def check_cloud():
url_list = ['aHR0cHM6Ly9hcGkubW9tb2UubGluay8=', 'aHR0cHM6Ly9hcGkubGltb2UuZXUub3JnLw==',
'aHR0cHM6Ly9hcGkuaWxpeWEuY2Yv']
for i in url_list:
url = str(base64.b64decode(i).decode()) # 设置 url地址 [str]
try:
requests.get(url=url, verify=False, timeout=10) # HTTP[GET]请求 超时 10秒
except Exception as err:
logger.debug(str(err)) # 调试日志输出
continue # 循环继续
else: # 分支判断
info = ['HTTPS', 'Eu_HTTPS', 'CloudFlare'] # 输出信息[List]
logger.info(str(info[url_list.index(i)]) + " Server Check OK\n--------------------\n") # 标准日志输出
return i # 返回 ->i
logger.info("\n云端地址全部失效, 请检查网络!") # 标准日志输出
ql_send('云端地址失效. 请联系作者或者检查网络.') # 推送消息
sys.exit(1) # 脚本退出
# def check_cloud():
# url_list = ['aHR0cHM6Ly9hcGkubW9tb2UubGluay8=', 'aHR0cHM6Ly9hcGkubGltb2UuZXUub3JnLw==',
# 'aHR0cHM6Ly9hcGkuaWxpeWEuY2Yv']
# for i in url_list:
# url = str(base64.b64decode(i).decode()) # 设置 url地址 [str]
# try:
# requests.get(url=url, verify=False, timeout=10) # HTTP[GET]请求 超时 10秒
# except Exception as err:
# logger.debug(str(err)) # 调试日志输出
# continue # 循环继续
# else: # 分支判断
# info = ['HTTPS', 'Eu_HTTPS', 'CloudFlare'] # 输出信息[List]
# logger.info(str(info[url_list.index(i)]) + " Server Check OK\n--------------------\n") # 标准日志输出
# return i # 返回 ->i
# logger.info("\n云端地址全部失效, 请检查网络!") # 标准日志输出
# ql_send('云端地址失效. 请联系作者或者检查网络.') # 推送消息
# sys.exit(1) # 脚本退出
def check_port(): # 方法 检查变量传递端口
@ -543,10 +635,10 @@ if __name__ == '__main__': # Python主函数执行入口
ql_session = requests.session()
token = ql_login() # 调用方法 [ql_login] 并赋值 [token]
ql_id = check_id()
url_t = check_cloud()
cloud_arg = cloud_info()
update()
ua = cloud_arg['User-Agent']
# url_t = check_cloud()
# cloud_arg = cloud_info()
# update()
# ua = cloud_arg['User-Agent']
wslist = get_wskey()
envlist = get_env()
sleepTime = int(os.environ.get("WSKEY_SLEEP", "10") if str(os.environ.get("WSKEY_SLEEP")).isdigit() else "10")