diff --git a/jd_wskey.py b/jd_wskey.py index f0a71ad..1b973e3 100644 --- a/jd_wskey.py +++ b/jd_wskey.py @@ -1,20 +1,20 @@ # -*- coding: utf-8 -* ''' -定时自定义 -2 10 20 5 * jd_wskey.py new Env('wskey转换'); ''' - -import socket import base64 -import json -import os -import sys -import logging -import time -import re +import hashlib import hmac +import json +import logging +import os +import random +import re +import socket import struct +import sys +import time +import uuid WSKEY_MODE = 0 # 0 = Default / 1 = Debug! @@ -40,7 +40,7 @@ except Exception as err: logger.debug(str(err)) # 调试日志输出 logger.info("无推送文件") # 标准日志输出 -ver = 31207 # 版本号 +ver = 40904 # 版本号 def ttotp(key): @@ -52,6 +52,97 @@ def ttotp(key): return str(binary)[-6:].zfill(6) +def sign_core(par): + arr = [0x37, 0x92, 0x44, 0x68, 0xA5, 0x3D, 0xCC, 0x7F, 0xBB, 0xF, 0xD9, 0x88, 0xEE, 0x9A, 0xE9, 0x5A] + key2 = b"80306f4370b39fd5630ad0529f77adb6" + arr1 = [0 for _ in range(len(par))] + for i in range(len(par)): + r0 = int(par[i]) + r2 = arr[i & 0xf] + r4 = int(key2[i & 7]) + r0 = r2 ^ r0 + r0 = r0 ^ r4 + r0 = r0 + r2 + r2 = r2 ^ r0 + r1 = int(key2[i & 7]) + r2 = r2 ^ r1 + arr1[i] = r2 & 0xff + return bytes(arr1) + +def get_sign(functionId, body, uuid, client, clientVersion, st, sv): + all_arg = "functionId=%s&body=%s&uuid=%s&client=%s&clientVersion=%s&st=%s&sv=%s" % ( + functionId, body, uuid, client, clientVersion, st, sv) + ret_bytes = sign_core(str.encode(all_arg)) + info = hashlib.md5(base64.b64encode(ret_bytes)).hexdigest() + return info + +def base64Encode(string): + string1 = "KLMNOPQRSTABCDEFGHIJUVWXYZabcdopqrstuvwxefghijklmnyz0123456789+/" + string2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" + return base64.b64encode(string.encode("utf-8")).decode('utf-8').translate(str.maketrans(string1, string2)) + +def base64Decode(string): + string1 = "KLMNOPQRSTABCDEFGHIJUVWXYZabcdopqrstuvwxefghijklmnyz0123456789+/" + string2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" + stringbase = base64.b64decode(string.translate(str.maketrans(string1, string2))).decode('utf-8') + return stringbase + +def genJDUA(): + st = round(time.time() * 1000) + aid = base64Encode(''.join(str(uuid.uuid4()).split('-'))[16:]) + oaid = base64Encode(''.join(str(uuid.uuid4()).split('-'))[16:]) + ua = 'jdapp;android;11.1.4;;;appBuild/98176;ef/1;ep/{"hdid":"JM9F1ywUPwflvMIpYPok0tt5k9kW4ArJEU3lfLhxBqw=","ts":%s,"ridx":-1,"cipher":{"sv":"CJS=","ad":"%s","od":"%s","ov":"CzO=","ud":"%s"},"ciphertype":5,"version":"1.2.0","appname":"com.jingdong.app.mall"};Mozilla/5.0 (Linux; Android 12; M2102K1C Build/SKQ1.220303.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/97.0.4692.98 Mobile Safari/537.36' % (st, aid, oaid, aid) + return ua + +def genParams(): + suid = ''.join(str(uuid.uuid4()).split('-'))[16:] + buid = base64Encode(suid) + st = round(time.time() * 1000) + sv = random.choice(["102", "111", "120"]) + ep = json.dumps({ + "hdid": "JM9F1ywUPwflvMIpYPok0tt5k9kW4ArJEU3lfLhxBqw=", + "ts": st, + "ridx": -1, + "cipher": { + "area": "CV8yEJUzXzU0CNG0XzK=", + "d_model": "JWunCVVidRTr", + "wifiBssid": "dW5hbw93bq==", + "osVersion": "CJS=", + "d_brand": "WQvrb21f", + "screen": "CJuyCMenCNq=", + "uuid": buid, + "aid": buid, + "openudid": buid + }, + "ciphertype": 5, + "version": "1.2.0", + "appname": "com.jingdong.app.mall" + }).replace(" ", "") + body = '{"to":"https%3a%2f%2fplogin.m.jd.com%2fjd-mlogin%2fstatic%2fhtml%2fappjmp_blank.html"}' + sign = get_sign("genToken", body, suid, "android", "11.1.4", st, sv) + params = { + 'functionId': 'genToken', + 'clientVersion': '11.1.4', + 'build': '98176', + 'client': 'android', + 'partner': 'google', + 'oaid': suid, + 'sdkVersion': '31', + 'lang': 'zh_CN', + 'harmonyOs': '0', + 'networkType': 'UNKNOWN', + 'uemps': '0-2', + 'ext': '{"prstate": "0", "pvcStu": "1"}', + 'eid': 'eidAcef08121fds9MoeSDdMRQ1aUTyb1TyPr2zKHk5Asiauw+K/WvS1Ben1cH6N0UnBd7lNM50XEa2kfCcA2wwThkxZc1MuCNtfU/oAMGBqadgres4BU', + 'ef': '1', + 'ep': ep, + 'st': st, + 'sign': sign, + 'sv': sv + } + return params + + def ql_send(text): if "WSKEY_SEND" in os.environ and os.environ["WSKEY_SEND"] == 'disable': return True @@ -237,7 +328,7 @@ def check_ck(ck) -> bool: # 方法 检查 Cookie有效性 使用变量传递 headers = { 'Cookie': ck, 'Referer': 'https://home.m.jd.com/myJd/home.action', - 'user-agent': ua + 'user-agent': genJDUA() } # 设置 HTTP头 try: res = requests.get(url=url, headers=headers, verify=False, timeout=10, @@ -267,21 +358,22 @@ def check_ck(ck) -> bool: # 方法 检查 Cookie有效性 使用变量传递 # 返回值 bool jd_ck def getToken(wskey): # 方法 获取 Wskey转换使用的 Token 由 JD_API 返回 这里传递 wskey - try: - url = str(base64.b64decode(url_t).decode()) + 'api/genToken' # 设置云端服务器地址 路由为 genToken - header = {"User-Agent": ua} # 设置 HTTP头 - params = requests.get(url=url, headers=header, verify=False, timeout=20).json() # 设置 HTTP请求参数 超时 20秒 Json解析 - except Exception as err: - logger.info("Params参数获取失败") # 标准日志输出 - logger.debug(str(err)) # 调试日志输出 - # return False, wskey # 返回 -> False[Bool], Wskey - return False # 返回 -> False[Bool], Wskey + # try: + # url = str(base64.b64decode(url_t).decode()) + 'api/genToken' # 设置云端服务器地址 路由为 genToken + # header = {"User-Agent": ua} # 设置 HTTP头 + # params = requests.get(url=url, headers=header, verify=False, timeout=20).json() # 设置 HTTP请求参数 超时 20秒 Json解析 + # except Exception as err: + # logger.info("Params参数获取失败") # 标准日志输出 + # logger.debug(str(err)) # 调试日志输出 + # # return False, wskey # 返回 -> False[Bool], Wskey + # return False # 返回 -> False[Bool], Wskey + params = genParams() headers = { 'cookie': wskey, 'content-type': 'application/x-www-form-urlencoded; charset=UTF-8', 'charset': 'UTF-8', 'accept-encoding': 'br,gzip,deflate', - 'user-agent': ua + 'user-agent': genJDUA() } # 设置 HTTP头 url = 'https://api.m.jd.com/client.action' # 设置 URL地址 data = 'body=%7B%22to%22%3A%22https%253a%252f%252fplogin.m.jd.com%252fjd-mlogin%252fstatic%252fhtml%252fappjmp_blank.html%22%7D&' # 设置 POST 载荷 @@ -307,7 +399,7 @@ def appjmp(wskey, tokenKey): # 方法 传递 wskey & tokenKey # return False, wskey # 返回 -> False[Bool], Wskey return False # 返回 -> False[Bool], Wskey headers = { - 'User-Agent': ua, + 'User-Agent': genJDUA(), 'accept': 'accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'x-requested-with': 'com.jingdong.app.mall' } # 设置 HTTP头 @@ -476,52 +568,52 @@ def ql_insert(i_ck): # 方法 插入新变量 logger.info("\n账号添加失败\n--------------------\n") # 标准日志输出 -def cloud_info(): # 方法 云端信息 - url = str(base64.b64decode(url_t).decode()) + 'api/check_api' # 设置 URL地址 路由 [check_api] - for i in range(3): # For循环 3次 - try: - headers = {"authorization": "Bearer Shizuku"} # 设置 HTTP头 - res = requests.get(url=url, verify=False, headers=headers, timeout=20).text # HTTP[GET] 请求 超时 20秒 - except requests.exceptions.ConnectTimeout: - logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出 - time.sleep(1) # 休眠 1秒 - continue # 循环继续 - except requests.exceptions.ReadTimeout: - logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出 - time.sleep(1) # 休眠 1秒 - continue # 循环继续 - except Exception as err: - logger.info("\n未知错误云端, 退出脚本!") # 标准日志输出 - logger.debug(str(err)) # 调试日志输出 - sys.exit(1) # 脚本退出 - else: - try: - c_info = json.loads(res) # json读取参数 - except Exception as err: - logger.info("云端参数解析失败") # 标准日志输出 - logger.debug(str(err)) # 调试日志输出 - sys.exit(1) # 脚本退出 - else: - return c_info # 返回 -> c_info +# def cloud_info(): # 方法 云端信息 +# url = str(base64.b64decode(url_t).decode()) + 'api/check_api' # 设置 URL地址 路由 [check_api] +# for i in range(3): # For循环 3次 +# try: +# headers = {"authorization": "Bearer Shizuku"} # 设置 HTTP头 +# res = requests.get(url=url, verify=False, headers=headers, timeout=20).text # HTTP[GET] 请求 超时 20秒 +# except requests.exceptions.ConnectTimeout: +# logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出 +# time.sleep(1) # 休眠 1秒 +# continue # 循环继续 +# except requests.exceptions.ReadTimeout: +# logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出 +# time.sleep(1) # 休眠 1秒 +# continue # 循环继续 +# except Exception as err: +# logger.info("\n未知错误云端, 退出脚本!") # 标准日志输出 +# logger.debug(str(err)) # 调试日志输出 +# sys.exit(1) # 脚本退出 +# else: +# try: +# c_info = json.loads(res) # json读取参数 +# except Exception as err: +# logger.info("云端参数解析失败") # 标准日志输出 +# logger.debug(str(err)) # 调试日志输出 +# sys.exit(1) # 脚本退出 +# else: +# return c_info # 返回 -> c_info -def check_cloud(): - url_list = ['aHR0cHM6Ly9hcGkubW9tb2UubGluay8=', 'aHR0cHM6Ly9hcGkubGltb2UuZXUub3JnLw==', - 'aHR0cHM6Ly9hcGkuaWxpeWEuY2Yv'] - for i in url_list: - url = str(base64.b64decode(i).decode()) # 设置 url地址 [str] - try: - requests.get(url=url, verify=False, timeout=10) # HTTP[GET]请求 超时 10秒 - except Exception as err: - logger.debug(str(err)) # 调试日志输出 - continue # 循环继续 - else: # 分支判断 - info = ['HTTPS', 'Eu_HTTPS', 'CloudFlare'] # 输出信息[List] - logger.info(str(info[url_list.index(i)]) + " Server Check OK\n--------------------\n") # 标准日志输出 - return i # 返回 ->i - logger.info("\n云端地址全部失效, 请检查网络!") # 标准日志输出 - ql_send('云端地址失效. 请联系作者或者检查网络.') # 推送消息 - sys.exit(1) # 脚本退出 +# def check_cloud(): +# url_list = ['aHR0cHM6Ly9hcGkubW9tb2UubGluay8=', 'aHR0cHM6Ly9hcGkubGltb2UuZXUub3JnLw==', +# 'aHR0cHM6Ly9hcGkuaWxpeWEuY2Yv'] +# for i in url_list: +# url = str(base64.b64decode(i).decode()) # 设置 url地址 [str] +# try: +# requests.get(url=url, verify=False, timeout=10) # HTTP[GET]请求 超时 10秒 +# except Exception as err: +# logger.debug(str(err)) # 调试日志输出 +# continue # 循环继续 +# else: # 分支判断 +# info = ['HTTPS', 'Eu_HTTPS', 'CloudFlare'] # 输出信息[List] +# logger.info(str(info[url_list.index(i)]) + " Server Check OK\n--------------------\n") # 标准日志输出 +# return i # 返回 ->i +# logger.info("\n云端地址全部失效, 请检查网络!") # 标准日志输出 +# ql_send('云端地址失效. 请联系作者或者检查网络.') # 推送消息 +# sys.exit(1) # 脚本退出 def check_port(): # 方法 检查变量传递端口 @@ -543,10 +635,10 @@ if __name__ == '__main__': # Python主函数执行入口 ql_session = requests.session() token = ql_login() # 调用方法 [ql_login] 并赋值 [token] ql_id = check_id() - url_t = check_cloud() - cloud_arg = cloud_info() - update() - ua = cloud_arg['User-Agent'] + # url_t = check_cloud() + # cloud_arg = cloud_info() + # update() + # ua = cloud_arg['User-Agent'] wslist = get_wskey() envlist = get_env() sleepTime = int(os.environ.get("WSKEY_SLEEP", "10") if str(os.environ.get("WSKEY_SLEEP")).isdigit() else "10")