MediaCrawler/recv_sms_notification.py

87 lines
2.9 KiB
Python
Raw Normal View History

# Start an HTTP server to receive SMS forwarding notifications and store them in Redis.
import re
import json
import asyncio
from typing import List
import aioredis
import tornado.web
import config
def extract_verification_code(message) -> str:
"""
Extract verification code of 6 digits from the SMS.
"""
pattern = re.compile(r'\b[0-9]{6}\b')
codes: List[str]= pattern.findall(message)
return codes[0] if codes and len(codes) > 0 else ""
class RecvSmsNotificationHandler(tornado.web.RequestHandler):
async def get(self):
self.set_status(404)
self.write("404")
async def post(self):
# GitHub address for the SMS forwarding functionhttps://github.com/pppscn/SmsForwarder
# Document address:https://gitee.com/pp/SmsForwarder/wikis/pages?sort_id=6040999&doc_id=1821427
# Forwarding channel definition
# {
# "platform": "xhs",
# "current_number": "138xxxxxxxx",
# "from_number": "[from]",
# "sms_content": "[org_content]",
# "timestamp": "[timestamp]"
# }
# SMS message body
# {
# 'platform': 'xhs', # or dy
# 'current_number': '138xxxxxxxx',
# 'from_number': '1069421xxx134',
# 'sms_content': '【小红书】您的验证码是: 171959 3分钟内有效。请勿向他人泄漏。如非本人操作可忽略本消息。',
# 'timestamp': '1686720601614'
# }
request_body = self.request.body.decode("utf-8")
req_body_dict = json.loads(request_body)
print("recv sms notification and body content: ", req_body_dict)
redis_obj = aioredis.from_url(url=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD, decode_responses=True)
sms_content = req_body_dict.get("sms_content")
sms_code = extract_verification_code(sms_content)
if sms_code:
# Save the verification code in Redis and set the expiration time to 3 minutes.
# Use Redis string data structure, in the following format:
# xhs_138xxxxxxxx -> 171959
key = f"{req_body_dict.get('platform')}_{req_body_dict.get('current_number')}"
await redis_obj.set(name=key, value=sms_code, ex=60 * 3)
self.set_status(200)
self.write("ok")
class Application(tornado.web.Application):
def __init__(self):
handlers = [(r'/', RecvSmsNotificationHandler)]
settings = dict(
gzip=True,
autoescape=None,
autoreload=True
)
super(Application, self).__init__(handlers, **settings)
async def main():
app = Application()
app.listen(port=9435)
print("Recv sms notification app running ...")
shutdown_event = tornado.locks.Event()
await shutdown_event.wait()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass