From 986179b9c9dec7be2253f0574970261cd32a51e2 Mon Sep 17 00:00:00 2001 From: Relakkes Date: Sat, 2 Dec 2023 16:14:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=20IP=20=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E7=9A=84=E6=9C=80=E6=96=B0=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- base/base_crawler.py | 2 +- main.py | 2 +- media_platform/douyin/core.py | 2 +- media_platform/kuaishou/core.py | 2 +- media_platform/kuaishou/graphql.py | 2 +- media_platform/xhs/core.py | 2 +- models/kuaishou.py | 2 + proxy/__init__.py | 4 + {base => proxy}/proxy_account_pool.py | 5 + proxy/proxy_ip_pool.py | 89 +++++++++ proxy/proxy_ip_provider.py | 111 +++++++++++ test/test_proxy_ip_pool.py | 17 ++ tools/crawler_util.py | 94 +++++++++ tools/slider_util.py | 164 ++++++++++++++++ tools/time_util.py | 67 +++++++ tools/utils.py | 264 +------------------------- 16 files changed, 562 insertions(+), 267 deletions(-) create mode 100644 proxy/__init__.py rename {base => proxy}/proxy_account_pool.py (95%) create mode 100644 proxy/proxy_ip_pool.py create mode 100644 proxy/proxy_ip_provider.py create mode 100644 test/test_proxy_ip_pool.py create mode 100644 tools/crawler_util.py create mode 100644 tools/slider_util.py create mode 100644 tools/time_util.py diff --git a/base/base_crawler.py b/base/base_crawler.py index f05ae67..b56be0a 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod -from base.proxy_account_pool import AccountPool +from proxy.proxy_account_pool import AccountPool class AbstractCrawler(ABC): diff --git a/main.py b/main.py index 81ac53a..07fb75d 100644 --- a/main.py +++ b/main.py @@ -4,10 +4,10 @@ import sys import config import db -from base import proxy_account_pool from media_platform.douyin import DouYinCrawler from media_platform.kuaishou import KuaishouCrawler from media_platform.xhs import XiaoHongShuCrawler +from proxy import proxy_account_pool class CrawlerFactory: diff --git a/media_platform/douyin/core.py b/media_platform/douyin/core.py index a4e2ac4..45478a3 100644 --- a/media_platform/douyin/core.py +++ b/media_platform/douyin/core.py @@ -8,8 +8,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler -from base.proxy_account_pool import AccountPool from models import douyin +from proxy.proxy_account_pool import AccountPool from tools import utils from var import crawler_type_var diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index f95e635..82f2b9c 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -10,8 +10,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler -from base.proxy_account_pool import AccountPool from models import kuaishou +from proxy.proxy_account_pool import AccountPool from tools import utils from var import comment_tasks_var, crawler_type_var diff --git a/media_platform/kuaishou/graphql.py b/media_platform/kuaishou/graphql.py index 1b71917..4b14baf 100644 --- a/media_platform/kuaishou/graphql.py +++ b/media_platform/kuaishou/graphql.py @@ -1,6 +1,6 @@ # 快手的数据传输是基于GraphQL实现的 # 这个类负责获取一些GraphQL的schema -from typing import Dict +from typing import Dict class KuaiShouGraphQL: diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 91611bf..378dd56 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -9,8 +9,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler -from base.proxy_account_pool import AccountPool from models import xiaohongshu as xhs_model +from proxy.proxy_account_pool import AccountPool from tools import utils from var import crawler_type_var diff --git a/models/kuaishou.py b/models/kuaishou.py index 1ab5a39..e2edefe 100644 --- a/models/kuaishou.py +++ b/models/kuaishou.py @@ -61,6 +61,8 @@ class KuaishouVideoComment(KuaishouBaseModel): async def update_kuaishou_video(video_item: Dict): photo_info: Dict = video_item.get("photo", {}) video_id = photo_info.get("id") + if not video_id: + return user_info = video_item.get("author", {}) local_db_item = { "video_id": video_id, diff --git a/proxy/__init__.py b/proxy/__init__.py new file mode 100644 index 0000000..ea02f37 --- /dev/null +++ b/proxy/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 14:37 +# @Desc : diff --git a/base/proxy_account_pool.py b/proxy/proxy_account_pool.py similarity index 95% rename from base/proxy_account_pool.py rename to proxy/proxy_account_pool.py index 1915092..e021e91 100644 --- a/base/proxy_account_pool.py +++ b/proxy/proxy_account_pool.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 11:18 +# @Desc : IP 和 手机号 一一配对的账号代理池 + from typing import List, Optional, Set, Tuple import config diff --git a/proxy/proxy_ip_pool.py b/proxy/proxy_ip_pool.py new file mode 100644 index 0000000..736aec4 --- /dev/null +++ b/proxy/proxy_ip_pool.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 13:45 +# @Desc : ip代理池实现 +import random +from typing import List + +import httpx +from tenacity import retry, stop_after_attempt, wait_fixed + +from tools import utils + +from .proxy_ip_provider import IpInfoModel, IpProxy + + +class ProxyIpPool: + def __init__(self, ip_pool_count: int, enable_validate_ip: bool) -> None: + self.valid_ip_url = "https://httpbin.org/ip" # 验证 IP 是否有效的地址 + self.ip_pool_count = ip_pool_count + self.enable_validate_ip = enable_validate_ip + self.proxy_list: List[IpInfoModel] = [] + + async def load_proxies(self) -> None: + """ + 从 HTTP 代理商获取 IP 列表 + :return: + """ + self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count) + + @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) + async def is_valid_proxy(self, proxy: IpInfoModel) -> bool: + """ + 验证代理IP是否有效 + :param proxy: + :return: + """ + utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} is it valid ") + try: + httpx_proxy = f"{proxy.protocol}{proxy.ip}:{proxy.port}" + proxy_auth = httpx.BasicAuth(proxy.user, proxy.password) + async with httpx.AsyncClient(proxies={proxy.protocol: httpx_proxy}, auth=proxy_auth) as client: + response = await client.get(self.valid_ip_url) + if response.status_code == 200: + return True + else: + return False + except Exception as e: + utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} err: {e}") + raise e + + @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) + async def get_proxy(self) -> IpInfoModel: + """ + 从代理池中随机提取一个代理IP + :return: + """ + if len(self.proxy_list) == 0: + await self.reload_proxies() + + proxy = random.choice(self.proxy_list) + if self.enable_validate_ip: + if not await self.is_valid_proxy(proxy): + raise Exception("[ProxyIpPool.get_proxy] current ip invalid and again get it") + self.proxy_list.remove(proxy) + return proxy + + async def reload_proxies(self): + """ + # 重新加载代理池 + :return: + """ + self.proxy_list = [] + await self.load_proxies() + + +async def create_ip_pool(ip_pool_count: int, enable_validate_ip) -> ProxyIpPool: + """ + 创建 IP 代理池 + :param ip_pool_count: + :param enable_validate_ip: + :return: + """ + pool = ProxyIpPool(ip_pool_count, enable_validate_ip) + await pool.load_proxies() + return pool + + +if __name__ == '__main__': + pass diff --git a/proxy/proxy_ip_provider.py b/proxy/proxy_ip_provider.py new file mode 100644 index 0000000..e494f68 --- /dev/null +++ b/proxy/proxy_ip_provider.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 11:18 +# @Desc : 爬虫 IP 获取实现 +# @Url : 现在实现了极速HTTP的接口,官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang + +import asyncio +import os +from abc import ABC, abstractmethod +from typing import Dict, List, Optional +from urllib.parse import urlencode + +import httpx +from pydantic import BaseModel, Field + +from tools import utils + + +class IpGetError(Exception): + """ ip get error""" + + +class IpInfoModel(BaseModel): + """Unified IP model""" + ip: str = Field(title="ip") + port: int = Field(title="端口") + user: str = Field(title="IP代理认证的用户名") + protocol: str = Field(default="https://", title="代理IP的协议") + password: str = Field(title="IP代理认证用户的密码") + expired_time_ts: Optional[int] = Field(title="IP 过期时间") + + +class ProxyProvider(ABC): + @abstractmethod + async def get_proxies(self, num: int) -> List[Dict]: + """ + 获取 IP 的抽象方法,不同的 HTTP 代理商需要实现该方法 + :param num: 提取的 IP 数量 + :return: + """ + pass + + +class JiSuHttpProxy(ProxyProvider): + def __init__(self, exract_type: str, key: str, crypto: str, res_type: str, protocol: int, time: int): + """ + 极速HTTP 代理IP实现 + 官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang + :param exract_type: 提取方式 + :param key: 提取key值 (到上面链接的官网去注册后获取) + :param crypto: 加密签名 (到上面链接的官网去注册后获取) + :param res_type: 返回的数据格式:TXT、JSON + :param protocol: IP协议:1:HTTP、2:HTTPS、3:SOCKS5 + :param time: IP使用时长,支持3、5、10、15、30分钟时效 + """ + self.exract_type = exract_type + self.api_path = "https://api.jisuhttp.com" + self.params = { + "key": key, + "crypto": crypto, + "type": res_type, + "port": protocol, + "time": time, + "pw": "1", # 是否使用账密验证, 1:是,0:否,否表示白名单验证;默认为0 + "se": "1", # 返回JSON格式时是否显示IP过期时间, 1:显示,0:不显示;默认为0 + } + + async def get_proxies(self, num: int) -> List[IpInfoModel]: + """ + :param num: + :return: + """ + if self.exract_type == "API": + uri = "/fetchips" + self.params.update({"num": num}) + ip_infos = [] + async with httpx.AsyncClient() as client: + url = self.api_path + uri + '?' + urlencode(self.params) + utils.logger.info(f"[JiSuHttpProxy] get ip proxy url:{url}") + response = await client.get(url, headers={"User-Agent": "MediaCrawler"}) + res_dict: Dict = response.json() + if res_dict.get("code") == 0: + data: List[Dict] = res_dict.get("data") + for ip_item in data: + ip_info_model = IpInfoModel( + ip=ip_item.get("ip"), + port=ip_item.get("port"), + user=ip_item.get("user"), + password=ip_item.get("pass"), + expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire")) + ) + ip_infos.append(ip_info_model) + else: + raise IpGetError(res_dict.get("msg", "unkown err")) + return ip_infos + else: + pass + + + +IpProxy = JiSuHttpProxy( + key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值 + crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名 + res_type="json", + protocol=2, + time=30 +) + +if __name__ == '__main__': + _ip_infos = asyncio.run(IpProxy.get_proxies(1)) + print(_ip_infos) diff --git a/test/test_proxy_ip_pool.py b/test/test_proxy_ip_pool.py new file mode 100644 index 0000000..5530cbe --- /dev/null +++ b/test/test_proxy_ip_pool.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 14:42 +# @Desc : +from unittest import IsolatedAsyncioTestCase + +from proxy.proxy_ip_pool import create_ip_pool +from proxy.proxy_ip_provider import IpInfoModel + + +class TestIpPool(IsolatedAsyncioTestCase): + async def test_ip_pool(self): + pool = await create_ip_pool(ip_pool_count=30, enable_validate_ip=False) + for i in range(30): + ip_proxy_info: IpInfoModel = await pool.get_proxy() + self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功") + print(ip_proxy_info) diff --git a/tools/crawler_util.py b/tools/crawler_util.py new file mode 100644 index 0000000..562f52d --- /dev/null +++ b/tools/crawler_util.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 12:53 +# @Desc : 爬虫相关的工具函数 + +import base64 +import random +import re +from io import BytesIO +from typing import Dict, List, Optional, Tuple + +from PIL import Image, ImageDraw +from playwright.async_api import Cookie, Page + + +async def find_login_qrcode(page: Page, selector: str) -> str: + """find login qrcode image from target selector""" + try: + elements = await page.wait_for_selector( + selector=selector, + ) + login_qrcode_img = await elements.get_property("src") # type: ignore + return str(login_qrcode_img) + + except Exception as e: + print(e) + return "" + + +def show_qrcode(qr_code) -> None: # type: ignore + """parse base64 encode qrcode image and show it""" + qr_code = qr_code.split(",")[1] + qr_code = base64.b64decode(qr_code) + image = Image.open(BytesIO(qr_code)) + + # Add a square border around the QR code and display it within the border to improve scanning accuracy. + width, height = image.size + new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255)) + new_image.paste(image, (10, 10)) + draw = ImageDraw.Draw(new_image) + draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1) + new_image.show() + + +def get_user_agent() -> str: + ua_list = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36" + ] + return random.choice(ua_list) + + +def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]: + if not cookies: + return "", {} + cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies]) + cookie_dict = dict() + for cookie in cookies: + cookie_dict[cookie.get('name')] = cookie.get('value') + return cookies_str, cookie_dict + + +def convert_str_cookie_to_dict(cookie_str: str) -> Dict: + cookie_dict: Dict[str, str] = dict() + if not cookie_str: + return cookie_dict + for cookie in cookie_str.split(";"): + cookie = cookie.strip() + if not cookie: + continue + cookie_list = cookie.split("=") + if len(cookie_list) != 2: + continue + cookie_value = cookie_list[1] + if isinstance(cookie_value, list): + cookie_value = "".join(cookie_value) + cookie_dict[cookie_list[0]] = cookie_value + return cookie_dict + + +def match_interact_info_count(count_str: str) -> int: + if not count_str: + return 0 + + match = re.search(r'\d+', count_str) + if match: + number = match.group() + return int(number) + else: + return 0 diff --git a/tools/slider_util.py b/tools/slider_util.py new file mode 100644 index 0000000..93bc9d2 --- /dev/null +++ b/tools/slider_util.py @@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 12:55 +# @Desc : 滑块相关的工具包 +import os +from typing import List +from urllib.parse import urlparse + +import cv2 +import httpx +import numpy as np + + +class Slide: + """ + copy from https://blog.csdn.net/weixin_43582101 thanks for author + update: relakkes + """ + def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None): + """ + :param gap: 缺口图片链接或者url + :param bg: 带缺口的图片链接或者url + """ + self.img_dir = os.path.join(os.getcwd(), 'temp_image') + if not os.path.exists(self.img_dir): + os.makedirs(self.img_dir) + + bg_resize = bg_size if bg_size else (340, 212) + gap_size = gap_size if gap_size else (68, 68) + self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize) + self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size) + self.out = out if out else os.path.join(self.img_dir, 'out.jpg') + + @staticmethod + def check_is_img_path(img, img_type, resize): + if img.startswith('http'): + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;" + "q=0.8,application/signed-exchange;v=b3;q=0.9", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6", + "Cache-Control": "max-age=0", + "Connection": "keep-alive", + "Host": urlparse(img).hostname, + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/91.0.4472.164 Safari/537.36", + } + img_res = httpx.get(img, headers=headers) + if img_res.status_code == 200: + img_path = f'./temp_image/{img_type}.jpg' + image = np.asarray(bytearray(img_res.content), dtype="uint8") + image = cv2.imdecode(image, cv2.IMREAD_COLOR) + if resize: + image = cv2.resize(image, dsize=resize) + cv2.imwrite(img_path, image) + return img_path + else: + raise Exception(f"保存{img_type}图片失败") + else: + return img + + @staticmethod + def clear_white(img): + """清除图片的空白区域,这里主要清除滑块的空白""" + img = cv2.imread(img) + rows, cols, channel = img.shape + min_x = 255 + min_y = 255 + max_x = 0 + max_y = 0 + for x in range(1, rows): + for y in range(1, cols): + t = set(img[x, y]) + if len(t) >= 2: + if x <= min_x: + min_x = x + elif x >= max_x: + max_x = x + + if y <= min_y: + min_y = y + elif y >= max_y: + max_y = y + img1 = img[min_x:max_x, min_y: max_y] + return img1 + + def template_match(self, tpl, target): + th, tw = tpl.shape[:2] + result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED) + # 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置 + min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) + tl = max_loc + br = (tl[0] + tw, tl[1] + th) + # 绘制矩形边框,将匹配区域标注出来 + # target:目标图像 + # tl:矩形定点 + # br:矩形的宽高 + # (0,0,255):矩形边框颜色 + # 1:矩形边框大小 + cv2.rectangle(target, tl, br, (0, 0, 255), 2) + cv2.imwrite(self.out, target) + return tl[0] + + @staticmethod + def image_edge_detection(img): + edges = cv2.Canny(img, 100, 200) + return edges + + def discern(self): + img1 = self.clear_white(self.gap) + img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) + slide = self.image_edge_detection(img1) + + back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY) + back = self.image_edge_detection(back) + + slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB) + back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB) + x = self.template_match(slide_pic, back_pic) + # 输出横坐标, 即 滑块在图片上的位置 + return x + + +def get_track_simple(distance) -> List[int]: + # 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进 + # distance为传入的总距离 + # 移动轨迹 + track: List[int] = [] + # 当前位移 + current = 0 + # 减速阈值 + mid = distance * 4 / 5 + # 计算间隔 + t = 0.2 + # 初速度 + v = 1 + + while current < distance: + if current < mid: + # 加速度为2 + a = 4 + else: + # 加速度为-2 + a = -3 + v0 = v + # 当前速度 + v = v0 + a * t # type: ignore + # 移动距离 + move = v0 * t + 1 / 2 * a * t * t + # 当前位移 + current += move # type: ignore + # 加入轨迹 + track.append(round(move)) + return track + + +def get_tracks(distance: int, level: str = "easy") -> List[int]: + if level == "easy": + return get_track_simple(distance) + else: + from . import easing + _, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo") + return tricks diff --git a/tools/time_util.py b/tools/time_util.py new file mode 100644 index 0000000..ceaf1b1 --- /dev/null +++ b/tools/time_util.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 12:52 +# @Desc : 时间相关的工具函数 + +import time + + +def get_current_timestamp() -> int: + """ + 获取当前的时间戳:1701493264496 + :return: + """ + return int(time.time() * 1000) + + +def get_current_time() -> str: + """ + 获取当前的时间:'2023-12-02 13:01:23' + :return: + """ + return time.strftime('%Y-%m-%d %X', time.localtime()) + + +def get_current_date() -> str: + """ + 获取当前的日期:'2023-12-02' + :return: + """ + return time.strftime('%Y-%m-%d', time.localtime()) + + +def get_time_str_from_unix_time(unixtime): + """ + unix 整数类型时间戳 ==> 字符串日期时间 + :param unixtime: + :return: + """ + if int(unixtime) > 1000000000000: + unixtime = int(unixtime) / 1000 + return time.strftime('%Y-%m-%d %X', time.localtime(unixtime)) + + +def get_date_str_from_unix_time(unixtime): + """ + unix 整数类型时间戳 ==> 字符串日期 + :param unixtime: + :return: + """ + if int(unixtime) > 1000000000000: + unixtime = int(unixtime) / 1000 + return time.strftime('%Y-%m-%d', time.localtime(unixtime)) + + +def get_unix_time_from_time_str(time_str): + """ + 字符串时间 ==> unix 整数类型时间戳,精确到秒 + :param time_str: + :return: + """ + try: + format_str = "%Y-%m-%d %H:%M:%S" + tm_object = time.strptime(str(time_str), format_str) + return int(time.mktime(tm_object)) + except Exception as e: + return 0 + pass diff --git a/tools/utils.py b/tools/utils.py index 250604a..248030e 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -1,103 +1,8 @@ -import base64 import logging -import os -import random -import re -import time -from io import BytesIO -from typing import Dict, List, Optional, Tuple -from urllib.parse import urlparse -import cv2 -import httpx -import numpy as np -from PIL import Image, ImageDraw -from playwright.async_api import Cookie, Page - - -async def find_login_qrcode(page: Page, selector: str) -> str: - """find login qrcode image from target selector""" - try: - elements = await page.wait_for_selector( - selector=selector, - ) - login_qrcode_img = await elements.get_property("src") # type: ignore - return str(login_qrcode_img) - - except Exception as e: - print(e) - return "" - - -def show_qrcode(qr_code) -> None: # type: ignore - """parse base64 encode qrcode image and show it""" - qr_code = qr_code.split(",")[1] - qr_code = base64.b64decode(qr_code) - image = Image.open(BytesIO(qr_code)) - - # Add a square border around the QR code and display it within the border to improve scanning accuracy. - width, height = image.size - new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255)) - new_image.paste(image, (10, 10)) - draw = ImageDraw.Draw(new_image) - draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1) - new_image.show() - - -def get_user_agent() -> str: - ua_list = [ - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36", - "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36", - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36" - ] - return random.choice(ua_list) - - -def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]: - if not cookies: - return "", {} - cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies]) - cookie_dict = dict() - for cookie in cookies: - cookie_dict[cookie.get('name')] = cookie.get('value') - return cookies_str, cookie_dict - - -def convert_str_cookie_to_dict(cookie_str: str) -> Dict: - cookie_dict: Dict[str, str]= dict() - if not cookie_str: - return cookie_dict - for cookie in cookie_str.split(";"): - cookie = cookie.strip() - if not cookie: - continue - cookie_list = cookie.split("=") - if len(cookie_list) != 2: - continue - cookie_value = cookie_list[1] - if isinstance(cookie_value, list): - cookie_value = "".join(cookie_value) - cookie_dict[cookie_list[0]] = cookie_value - return cookie_dict - - -def get_current_timestamp(): - return int(time.time() * 1000) - - -def match_interact_info_count(count_str: str) -> int: - if not count_str: - return 0 - - match = re.search(r'\d+', count_str) - if match: - number = match.group() - return int(number) - else: - return 0 +from .crawler_util import * +from .slider_util import * +from .time_util import * def init_loging_config(): @@ -113,166 +18,3 @@ def init_loging_config(): logger = init_loging_config() - - -class Slide: - """ - copy from https://blog.csdn.net/weixin_43582101 thanks for author - update: relakkes - """ - - def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None): - """ - :param gap: 缺口图片链接或者url - :param bg: 带缺口的图片链接或者url - """ - self.img_dir = os.path.join(os.getcwd(), 'temp_image') - if not os.path.exists(self.img_dir): - os.makedirs(self.img_dir) - - bg_resize = bg_size if bg_size else (340, 212) - gap_size = gap_size if gap_size else (68, 68) - self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize) - self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size) - self.out = out if out else os.path.join(self.img_dir, 'out.jpg') - - @staticmethod - def check_is_img_path(img, img_type, resize): - if img.startswith('http'): - headers = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;" - "q=0.8,application/signed-exchange;v=b3;q=0.9", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6", - "Cache-Control": "max-age=0", - "Connection": "keep-alive", - "Host": urlparse(img).hostname, - "Upgrade-Insecure-Requests": "1", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/91.0.4472.164 Safari/537.36", - } - img_res = httpx.get(img, headers=headers) - if img_res.status_code == 200: - img_path = f'./temp_image/{img_type}.jpg' - image = np.asarray(bytearray(img_res.content), dtype="uint8") - image = cv2.imdecode(image, cv2.IMREAD_COLOR) - if resize: - image = cv2.resize(image, dsize=resize) - cv2.imwrite(img_path, image) - return img_path - else: - raise Exception(f"保存{img_type}图片失败") - else: - return img - - @staticmethod - def clear_white(img): - """清除图片的空白区域,这里主要清除滑块的空白""" - img = cv2.imread(img) - rows, cols, channel = img.shape - min_x = 255 - min_y = 255 - max_x = 0 - max_y = 0 - for x in range(1, rows): - for y in range(1, cols): - t = set(img[x, y]) - if len(t) >= 2: - if x <= min_x: - min_x = x - elif x >= max_x: - max_x = x - - if y <= min_y: - min_y = y - elif y >= max_y: - max_y = y - img1 = img[min_x:max_x, min_y: max_y] - return img1 - - def template_match(self, tpl, target): - th, tw = tpl.shape[:2] - result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED) - # 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置 - min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) - tl = max_loc - br = (tl[0] + tw, tl[1] + th) - # 绘制矩形边框,将匹配区域标注出来 - # target:目标图像 - # tl:矩形定点 - # br:矩形的宽高 - # (0,0,255):矩形边框颜色 - # 1:矩形边框大小 - cv2.rectangle(target, tl, br, (0, 0, 255), 2) - cv2.imwrite(self.out, target) - return tl[0] - - @staticmethod - def image_edge_detection(img): - edges = cv2.Canny(img, 100, 200) - return edges - - def discern(self): - img1 = self.clear_white(self.gap) - img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) - slide = self.image_edge_detection(img1) - - back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY) - back = self.image_edge_detection(back) - - slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB) - back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB) - x = self.template_match(slide_pic, back_pic) - # 输出横坐标, 即 滑块在图片上的位置 - return x - - -def get_track_simple(distance) -> List[int]: - # 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进 - # distance为传入的总距离 - # 移动轨迹 - track: List[int]= [] - # 当前位移 - current = 0 - # 减速阈值 - mid = distance * 4 / 5 - # 计算间隔 - t = 0.2 - # 初速度 - v = 1 - - while current < distance: - if current < mid: - # 加速度为2 - a = 4 - else: - # 加速度为-2 - a = -3 - v0 = v - # 当前速度 - v = v0 + a * t # type: ignore - # 移动距离 - move = v0 * t + 1 / 2 * a * t * t - # 当前位移 - current += move # type: ignore - # 加入轨迹 - track.append(round(move)) - return track - - -def get_tracks(distance: int, level: str = "easy") -> List[int]: - if level == "easy": - return get_track_simple(distance) - else: - from . import easing - _, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo") - return tricks - - -def get_current_time(): - ISOTIMEFORMAT = '%Y-%m-%d %X' - return tme.strftime(ISOTIMEFORMAT, time.localtime()) - -def get_current_date(): - ISOTIMEFORMAT = '%Y-%m-%d' - return time.strftime(ISOTIMEFORMAT, time.localtime()) \ No newline at end of file