MediaCrawler/tools/utils.py

266 lines
8.7 KiB
Python
Raw Normal View History

import re
import os
import time
import random
import base64
import logging
from io import BytesIO
from urllib.parse import urlparse
from typing import Optional, Dict, List, Tuple
import cv2
import httpx
import numpy as np
from PIL import Image, ImageDraw
from playwright.async_api import Cookie
from playwright.async_api import Page
async def find_login_qrcode(page: Page, selector: str) -> str:
"""find login qrcode image from target selector"""
try:
elements = await page.wait_for_selector(
selector=selector,
)
login_qrcode_img = await elements.get_property("src")
return str(login_qrcode_img)
except Exception as e:
print(e)
return ""
def show_qrcode(qr_code: str):
"""parse base64 encode qrcode image and show it"""
qr_code = qr_code.split(",")[1]
qr_code = base64.b64decode(qr_code)
image = Image.open(BytesIO(qr_code))
# Add a square border around the QR code and display it within the border to improve scanning accuracy.
width, height = image.size
new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255))
new_image.paste(image, (10, 10))
draw = ImageDraw.Draw(new_image)
draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1)
new_image.show()
def get_user_agent() -> str:
ua_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
]
return random.choice(ua_list)
def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]:
if not cookies:
return "", {}
cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies])
cookie_dict = dict()
for cookie in cookies:
cookie_dict[cookie.get('name')] = cookie.get('value')
return cookies_str, cookie_dict
def convert_str_cookie_to_dict(cookie_str: str) -> Dict:
cookie_dict = dict()
if not cookie_str:
return cookie_dict
for cookie in cookie_str.split(";"):
cookie = cookie.strip()
if not cookie:
continue
cookie = cookie.split("=")
if len(cookie) != 2:
continue
cookie_value = cookie[1]
if isinstance(cookie_value, list):
cookie_value = "".join(cookie_value)
cookie_dict[cookie[0]] = cookie_value
return cookie_dict
def get_current_timestamp():
return int(time.time() * 1000)
def match_interact_info_count(count_str: str) -> int:
if not count_str:
return 0
match = re.search(r'\d+', count_str)
if match:
number = match.group()
return int(number)
else:
return 0
def init_loging_config():
level = logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(name)s %(levelname)s %(message)s ",
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.Logger("Media Crawler")
class Slide:
"""
copy from https://blog.csdn.net/weixin_43582101 thanks for author
update: relakkes
"""
def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None):
"""
:param gap: 缺口图片链接或者url
:param bg: 带缺口的图片链接或者url
"""
self.img_dir = os.path.join(os.getcwd(), 'temp_image')
if not os.path.exists(self.img_dir):
os.makedirs(self.img_dir)
bg_resize = bg_size if bg_size else (340, 212)
gap_size = gap_size if gap_size else (68, 68)
self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize)
self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size)
self.out = out if out else os.path.join(self.img_dir, 'out.jpg')
@staticmethod
def check_is_img_path(img, img_type, resize):
if img.startswith('http'):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": urlparse(img).hostname,
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.164 Safari/537.36",
}
img_res = httpx.get(img, headers=headers)
if img_res.status_code == 200:
img_path = f'./temp_image/{img_type}.jpg'
image = np.asarray(bytearray(img_res.content), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
if resize:
image = cv2.resize(image, dsize=resize)
cv2.imwrite(img_path, image)
return img_path
else:
raise Exception(f"保存{img_type}图片失败")
else:
return img
@staticmethod
def clear_white(img):
"""清除图片的空白区域,这里主要清除滑块的空白"""
img = cv2.imread(img)
rows, cols, channel = img.shape
min_x = 255
min_y = 255
max_x = 0
max_y = 0
for x in range(1, rows):
for y in range(1, cols):
t = set(img[x, y])
if len(t) >= 2:
if x <= min_x:
min_x = x
elif x >= max_x:
max_x = x
if y <= min_y:
min_y = y
elif y >= max_y:
max_y = y
img1 = img[min_x:max_x, min_y: max_y]
return img1
def template_match(self, tpl, target):
th, tw = tpl.shape[:2]
result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED)
# 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
tl = max_loc
br = (tl[0] + tw, tl[1] + th)
# 绘制矩形边框,将匹配区域标注出来
# target目标图像
# tl矩形定点
# br矩形的宽高
# (0,0,255):矩形边框颜色
# 1矩形边框大小
cv2.rectangle(target, tl, br, (0, 0, 255), 2)
cv2.imwrite(self.out, target)
return tl[0]
@staticmethod
def image_edge_detection(img):
edges = cv2.Canny(img, 100, 200)
return edges
def discern(self):
img1 = self.clear_white(self.gap)
img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
slide = self.image_edge_detection(img1)
back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY)
back = self.image_edge_detection(back)
slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB)
back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB)
x = self.template_match(slide_pic, back_pic)
# 输出横坐标, 即 滑块在图片上的位置
return x
def get_track_simple(distance):
# 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进
# distance为传入的总距离
# 移动轨迹
track = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 计算间隔
t = 0.2
# 初速度
v = 1
while current < distance:
if current < mid:
# 加速度为2
a = 4
else:
# 加速度为-2
a = -3
v0 = v
# 当前速度
v = v0 + a * t
# 移动距离
move = v0 * t + 1 / 2 * a * t * t
# 当前位移
current += move
# 加入轨迹
track.append(round(move))
return track
def get_tracks(distance: int, level: str = "easy") -> List[int]:
if level == "easy":
return get_track_simple(distance)
else:
from . import easing
_, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo")
return tricks