# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 1. 不得用于任何商业用途。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 3. 不得进行大规模爬取或对平台造成运营干扰。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 5. 不得用于任何非法或不当的用途。 # # 详细许可条款请参阅项目根目录下的LICENSE文件。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # -*- coding: utf-8 -*- # @Author : relakkes@gmail.com # @Time : 2023/12/2 12:53 # @Desc : 爬虫相关的工具函数 import base64 import json import random import re import urllib import urllib.parse from io import BytesIO from typing import Dict, List, Optional, Tuple import httpx from PIL import Image, ImageDraw from playwright.async_api import Cookie, Page from . import utils async def find_login_qrcode(page: Page, selector: str) -> str: """find login qrcode image from target selector""" try: elements = await page.wait_for_selector( selector=selector, ) login_qrcode_img = str(await elements.get_property("src")) # type: ignore if "http://" in login_qrcode_img or "https://" in login_qrcode_img: async with httpx.AsyncClient(follow_redirects=True) as client: utils.logger.info(f"[find_login_qrcode] get qrcode by url:{login_qrcode_img}") resp = await client.get(login_qrcode_img, headers={"User-Agent": get_user_agent()}) if resp.status_code == 200: image_data = resp.content base64_image = base64.b64encode(image_data).decode('utf-8') return base64_image raise Exception(f"fetch login image url failed, response message:{resp.text}") return login_qrcode_img except Exception as e: print(e) return "" async def find_qrcode_img_from_canvas(page: Page, canvas_selector: str) -> str: """ find qrcode image from canvas element Args: page: canvas_selector: Returns: """ # 等待Canvas元素加载完成 canvas = await page.wait_for_selector(canvas_selector) # 截取Canvas元素的截图 screenshot = await canvas.screenshot() # 将截图转换为base64格式 base64_image = base64.b64encode(screenshot).decode('utf-8') return base64_image def show_qrcode(qr_code) -> None: # type: ignore """parse base64 encode qrcode image and show it""" if "," in qr_code: qr_code = qr_code.split(",")[1] qr_code = base64.b64decode(qr_code) image = Image.open(BytesIO(qr_code)) # Add a square border around the QR code and display it within the border to improve scanning accuracy. width, height = image.size new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255)) new_image.paste(image, (10, 10)) draw = ImageDraw.Draw(new_image) draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1) new_image.show() def get_user_agent() -> str: ua_list = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.5112.79 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.5060.53 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.4844.84 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5112.79 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5060.53 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.4844.84 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5112.79 Safari/537.36" ] return random.choice(ua_list) def get_mobile_user_agent() -> str: ua_list = [ "Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.99 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.124 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 13; SAMSUNG SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/21.0 Chrome/110.0.5481.154 Mobile Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 OPR/99.0.0.0", "Mozilla/5.0 (Linux; Android 10; JNY-LX1; HMSCore 6.11.0.302) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.88 HuaweiBrowser/13.0.5.303 Mobile Safari/537.36" ] return random.choice(ua_list) def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]: if not cookies: return "", {} cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies]) cookie_dict = dict() for cookie in cookies: cookie_dict[cookie.get('name')] = cookie.get('value') return cookies_str, cookie_dict def convert_str_cookie_to_dict(cookie_str: str) -> Dict: cookie_dict: Dict[str, str] = dict() if not cookie_str: return cookie_dict for cookie in cookie_str.split(";"): cookie = cookie.strip() if not cookie: continue cookie_list = cookie.split("=") if len(cookie_list) != 2: continue cookie_value = cookie_list[1] if isinstance(cookie_value, list): cookie_value = "".join(cookie_value) cookie_dict[cookie_list[0]] = cookie_value return cookie_dict def match_interact_info_count(count_str: str) -> int: if not count_str: return 0 match = re.search(r'\d+', count_str) if match: number = match.group() return int(number) else: return 0 def format_proxy_info(ip_proxy_info) -> Tuple[Optional[Dict], Optional[Dict]]: """format proxy info for playwright and httpx""" playwright_proxy = { "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}", "username": ip_proxy_info.user, "password": ip_proxy_info.password, } httpx_proxy = { f"{ip_proxy_info.protocol}": f"http://{ip_proxy_info.user}:{ip_proxy_info.password}@{ip_proxy_info.ip}:{ip_proxy_info.port}" } return playwright_proxy, httpx_proxy def extract_text_from_html(html: str) -> str: """Extract text from HTML, removing all tags.""" if not html: return "" # Remove script and style elements clean_html = re.sub(r'<(script|style)[^>]*>.*?', '', html, flags=re.DOTALL) # Remove all other tags clean_text = re.sub(r'<[^>]+>', '', clean_html).strip() return clean_text def extract_url_params_to_dict(url: str) -> Dict: """Extract URL parameters to dict""" url_params_dict = dict() if not url: return url_params_dict parsed_url = urllib.parse.urlparse(url) url_params_dict = dict(urllib.parse.parse_qsl(parsed_url.query)) return url_params_dict