96 lines
2.3 KiB
Python
96 lines
2.3 KiB
Python
#!/usr/bin/env python3
|
||
# _*_ coding:utf-8 _*_
|
||
import json
|
||
import threading
|
||
|
||
import requests
|
||
|
||
# 原先的 print 函数和主线程的锁
|
||
_print = print
|
||
mutex = threading.Lock()
|
||
|
||
|
||
# 定义新的 print 函数
|
||
def print(text, *args, **kw):
|
||
"""
|
||
使输出有序进行,不出现多线程同一时间输出导致错乱的问题。
|
||
"""
|
||
with mutex:
|
||
_print(text, *args, **kw)
|
||
|
||
|
||
# 通知服务
|
||
# fmt: off
|
||
push_config = {
|
||
'PUSH_PLUS_TOKEN': 'beacff77d6e44db9a803ced07f3a11cd', # push+ 微信推送的用户令牌
|
||
'PUSHME_KEY': 'eQ5X030UxENFGVojO7tf', # PushMe 酱的 PUSHME_KEY
|
||
}
|
||
|
||
|
||
def pushme(title: str, content: str) -> None:
|
||
"""
|
||
使用 PushMe 推送消息。
|
||
"""
|
||
if not push_config.get("PUSHME_KEY"):
|
||
print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送")
|
||
return
|
||
print("PushMe 服务启动")
|
||
|
||
url = f'https://push.i-i.me/?push_key={push_config.get("PUSHME_KEY")}'
|
||
data = {
|
||
"title": title,
|
||
"content": content,
|
||
}
|
||
response = requests.post(url, data=data)
|
||
|
||
if response.status_code == 200 and response.text == "success":
|
||
print("PushMe 推送成功!")
|
||
else:
|
||
print(f"PushMe 推送失败!{response.status_code} {response.text}")
|
||
|
||
|
||
def pushplus_bot(title: str, content: str) -> None:
|
||
"""
|
||
通过 push+ 推送消息。
|
||
"""
|
||
if not push_config.get("PUSH_PLUS_TOKEN"):
|
||
print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送")
|
||
return
|
||
print("PUSHPLUS 服务启动")
|
||
|
||
url = "http://www.pushplus.plus/send"
|
||
data = {
|
||
"token": push_config.get("PUSH_PLUS_TOKEN"),
|
||
"title": title,
|
||
"content": content,
|
||
"topic": push_config.get("PUSH_PLUS_USER"),
|
||
}
|
||
body = json.dumps(data).encode(encoding="utf-8")
|
||
headers = {"Content-Type": "application/json"}
|
||
response = requests.post(url=url, data=body, headers=headers).json()
|
||
|
||
if response["code"] == 200:
|
||
print("PUSHPLUS 推送成功!")
|
||
|
||
else:
|
||
url_old = "http://pushplus.hxtrip.com/send"
|
||
headers["Accept"] = "application/json"
|
||
response = requests.post(url=url_old, data=body, headers=headers).json()
|
||
|
||
if response["code"] == 200:
|
||
print("PUSHPLUS(hxtrip) 推送成功!")
|
||
|
||
else:
|
||
print("PUSHPLUS 推送失败!")
|
||
|
||
|
||
def main():
|
||
# send("title", "content")
|
||
# pushme("再测试一下", "这个是在关闭后台时候推送的!看看能收到吗?22222")
|
||
# pushplus_bot("", "")
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|