#!/usr/bin/env python3 # _*_ coding:utf-8 _*_ import json import threading import requests # 原先的 print 函数和主线程的锁 _print = print mutex = threading.Lock() # 定义新的 print 函数 def print(text, *args, **kw): """ 使输出有序进行,不出现多线程同一时间输出导致错乱的问题。 """ with mutex: _print(text, *args, **kw) # 通知服务 # fmt: off push_config = { 'PUSH_PLUS_TOKEN': 'beacff77d6e44db9a803ced07f3a11cd', # push+ 微信推送的用户令牌 'PUSHME_KEY': 'eQ5X030UxENFGVojO7tf', # PushMe 酱的 PUSHME_KEY } def pushme(title: str, content: str) -> None: """ 使用 PushMe 推送消息。 """ if not push_config.get("PUSHME_KEY"): print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送") return print("PushMe 服务启动") url = f'https://push.i-i.me/?push_key={push_config.get("PUSHME_KEY")}' data = { "title": title, "content": content, } response = requests.post(url, data=data) if response.status_code == 200 and response.text == "success": print("PushMe 推送成功!") else: print(f"PushMe 推送失败!{response.status_code} {response.text}") def pushplus_bot(title: str, content: str) -> None: """ 通过 push+ 推送消息。 """ if not push_config.get("PUSH_PLUS_TOKEN"): print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送") return print("PUSHPLUS 服务启动") url = "http://www.pushplus.plus/send" data = { "token": push_config.get("PUSH_PLUS_TOKEN"), "title": title, "content": content, "topic": push_config.get("PUSH_PLUS_USER"), } body = json.dumps(data).encode(encoding="utf-8") headers = {"Content-Type": "application/json"} response = requests.post(url=url, data=body, headers=headers).json() if response["code"] == 200: print("PUSHPLUS 推送成功!") else: url_old = "http://pushplus.hxtrip.com/send" headers["Accept"] = "application/json" response = requests.post(url=url_old, data=body, headers=headers).json() if response["code"] == 200: print("PUSHPLUS(hxtrip) 推送成功!") else: print("PUSHPLUS 推送失败!") def main(): # send("title", "content") # pushme("再测试一下", "这个是在关闭后台时候推送的!看看能收到吗?22222") # pushplus_bot("", "") pass if __name__ == "__main__": main()