qnloft-spider/main_notify.py

96 lines
2.3 KiB
Python
Raw Permalink Normal View History

2023-12-28 09:44:24 +00:00
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
import json
import threading
import requests
# 原先的 print 函数和主线程的锁
_print = print
mutex = threading.Lock()
# 定义新的 print 函数
def print(text, *args, **kw):
"""
使输出有序进行不出现多线程同一时间输出导致错乱的问题
"""
with mutex:
_print(text, *args, **kw)
# 通知服务
# fmt: off
push_config = {
2023-12-29 02:14:40 +00:00
'PUSH_PLUS_TOKEN': 'beacff77d6e44db9a803ced07f3a11cd', # push+ 微信推送的用户令牌
'PUSHME_KEY': 'eQ5X030UxENFGVojO7tf', # PushMe 酱的 PUSHME_KEY
2023-12-28 09:44:24 +00:00
}
2024-01-01 15:42:45 +00:00
def pushme(title: str, content: str) -> None:
2023-12-28 09:44:24 +00:00
"""
2024-01-01 15:42:45 +00:00
使用 PushMe 推送消息
2023-12-28 09:44:24 +00:00
"""
2024-01-01 15:42:45 +00:00
if not push_config.get("PUSHME_KEY"):
print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送")
2023-12-28 09:44:24 +00:00
return
2024-01-01 15:42:45 +00:00
print("PushMe 服务启动")
2023-12-28 09:44:24 +00:00
2024-01-01 15:42:45 +00:00
url = f'https://push.i-i.me/?push_key={push_config.get("PUSHME_KEY")}'
2023-12-28 09:44:24 +00:00
data = {
"title": title,
2024-01-01 15:42:45 +00:00
"content": content,
2023-12-28 09:44:24 +00:00
}
response = requests.post(url, data=data)
2024-01-01 15:42:45 +00:00
if response.status_code == 200 and response.text == "success":
print("PushMe 推送成功!")
2023-12-28 09:44:24 +00:00
else:
2024-01-01 15:42:45 +00:00
print(f"PushMe 推送失败!{response.status_code} {response.text}")
2023-12-28 09:44:24 +00:00
def pushplus_bot(title: str, content: str) -> None:
"""
通过 push+ 推送消息
"""
if not push_config.get("PUSH_PLUS_TOKEN"):
print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送")
return
print("PUSHPLUS 服务启动")
url = "http://www.pushplus.plus/send"
data = {
"token": push_config.get("PUSH_PLUS_TOKEN"),
"title": title,
"content": content,
"topic": push_config.get("PUSH_PLUS_USER"),
}
body = json.dumps(data).encode(encoding="utf-8")
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, data=body, headers=headers).json()
if response["code"] == 200:
print("PUSHPLUS 推送成功!")
else:
url_old = "http://pushplus.hxtrip.com/send"
headers["Accept"] = "application/json"
response = requests.post(url=url_old, data=body, headers=headers).json()
if response["code"] == 200:
print("PUSHPLUS(hxtrip) 推送成功!")
else:
print("PUSHPLUS 推送失败!")
def main():
2023-12-29 02:14:40 +00:00
# send("title", "content")
2024-01-01 15:42:45 +00:00
# pushme("再测试一下", "这个是在关闭后台时候推送的看看能收到吗22222")
# pushplus_bot("", "")
pass
2023-12-28 09:44:24 +00:00
if __name__ == "__main__":
main()