feat: 增加 IP 代理的最新实现

This commit is contained in:
Relakkes 2023-12-02 16:14:36 +08:00
parent a8a4d34d2a
commit 986179b9c9
16 changed files with 562 additions and 267 deletions

View File

@ -1,6 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from base.proxy_account_pool import AccountPool from proxy.proxy_account_pool import AccountPool
class AbstractCrawler(ABC): class AbstractCrawler(ABC):

View File

@ -4,10 +4,10 @@ import sys
import config import config
import db import db
from base import proxy_account_pool
from media_platform.douyin import DouYinCrawler from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler from media_platform.kuaishou import KuaishouCrawler
from media_platform.xhs import XiaoHongShuCrawler from media_platform.xhs import XiaoHongShuCrawler
from proxy import proxy_account_pool
class CrawlerFactory: class CrawlerFactory:

View File

@ -8,8 +8,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from base.proxy_account_pool import AccountPool
from models import douyin from models import douyin
from proxy.proxy_account_pool import AccountPool
from tools import utils from tools import utils
from var import crawler_type_var from var import crawler_type_var

View File

@ -10,8 +10,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from base.proxy_account_pool import AccountPool
from models import kuaishou from models import kuaishou
from proxy.proxy_account_pool import AccountPool
from tools import utils from tools import utils
from var import comment_tasks_var, crawler_type_var from var import comment_tasks_var, crawler_type_var

View File

@ -1,6 +1,6 @@
# 快手的数据传输是基于GraphQL实现的 # 快手的数据传输是基于GraphQL实现的
# 这个类负责获取一些GraphQL的schema # 这个类负责获取一些GraphQL的schema
from typing import Dict from typing import Dict
class KuaiShouGraphQL: class KuaiShouGraphQL:

View File

@ -9,8 +9,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page,
import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from base.proxy_account_pool import AccountPool
from models import xiaohongshu as xhs_model from models import xiaohongshu as xhs_model
from proxy.proxy_account_pool import AccountPool
from tools import utils from tools import utils
from var import crawler_type_var from var import crawler_type_var

View File

@ -61,6 +61,8 @@ class KuaishouVideoComment(KuaishouBaseModel):
async def update_kuaishou_video(video_item: Dict): async def update_kuaishou_video(video_item: Dict):
photo_info: Dict = video_item.get("photo", {}) photo_info: Dict = video_item.get("photo", {})
video_id = photo_info.get("id") video_id = photo_info.get("id")
if not video_id:
return
user_info = video_item.get("author", {}) user_info = video_item.get("author", {})
local_db_item = { local_db_item = {
"video_id": video_id, "video_id": video_id,

4
proxy/__init__.py Normal file
View File

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 14:37
# @Desc :

View File

@ -1,3 +1,8 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 11:18
# @Desc : IP 和 手机号 一一配对的账号代理池
from typing import List, Optional, Set, Tuple from typing import List, Optional, Set, Tuple
import config import config

89
proxy/proxy_ip_pool.py Normal file
View File

@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 13:45
# @Desc : ip代理池实现
import random
from typing import List
import httpx
from tenacity import retry, stop_after_attempt, wait_fixed
from tools import utils
from .proxy_ip_provider import IpInfoModel, IpProxy
class ProxyIpPool:
def __init__(self, ip_pool_count: int, enable_validate_ip: bool) -> None:
self.valid_ip_url = "https://httpbin.org/ip" # 验证 IP 是否有效的地址
self.ip_pool_count = ip_pool_count
self.enable_validate_ip = enable_validate_ip
self.proxy_list: List[IpInfoModel] = []
async def load_proxies(self) -> None:
"""
HTTP 代理商获取 IP 列表
:return:
"""
self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def is_valid_proxy(self, proxy: IpInfoModel) -> bool:
"""
验证代理IP是否有效
:param proxy:
:return:
"""
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} is it valid ")
try:
httpx_proxy = f"{proxy.protocol}{proxy.ip}:{proxy.port}"
proxy_auth = httpx.BasicAuth(proxy.user, proxy.password)
async with httpx.AsyncClient(proxies={proxy.protocol: httpx_proxy}, auth=proxy_auth) as client:
response = await client.get(self.valid_ip_url)
if response.status_code == 200:
return True
else:
return False
except Exception as e:
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} err: {e}")
raise e
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def get_proxy(self) -> IpInfoModel:
"""
从代理池中随机提取一个代理IP
:return:
"""
if len(self.proxy_list) == 0:
await self.reload_proxies()
proxy = random.choice(self.proxy_list)
if self.enable_validate_ip:
if not await self.is_valid_proxy(proxy):
raise Exception("[ProxyIpPool.get_proxy] current ip invalid and again get it")
self.proxy_list.remove(proxy)
return proxy
async def reload_proxies(self):
"""
# 重新加载代理池
:return:
"""
self.proxy_list = []
await self.load_proxies()
async def create_ip_pool(ip_pool_count: int, enable_validate_ip) -> ProxyIpPool:
"""
创建 IP 代理池
:param ip_pool_count:
:param enable_validate_ip:
:return:
"""
pool = ProxyIpPool(ip_pool_count, enable_validate_ip)
await pool.load_proxies()
return pool
if __name__ == '__main__':
pass

111
proxy/proxy_ip_provider.py Normal file
View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 11:18
# @Desc : 爬虫 IP 获取实现
# @Url : 现在实现了极速HTTP的接口官网地址https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang
import asyncio
import os
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from urllib.parse import urlencode
import httpx
from pydantic import BaseModel, Field
from tools import utils
class IpGetError(Exception):
""" ip get error"""
class IpInfoModel(BaseModel):
"""Unified IP model"""
ip: str = Field(title="ip")
port: int = Field(title="端口")
user: str = Field(title="IP代理认证的用户名")
protocol: str = Field(default="https://", title="代理IP的协议")
password: str = Field(title="IP代理认证用户的密码")
expired_time_ts: Optional[int] = Field(title="IP 过期时间")
class ProxyProvider(ABC):
@abstractmethod
async def get_proxies(self, num: int) -> List[Dict]:
"""
获取 IP 的抽象方法不同的 HTTP 代理商需要实现该方法
:param num: 提取的 IP 数量
:return:
"""
pass
class JiSuHttpProxy(ProxyProvider):
def __init__(self, exract_type: str, key: str, crypto: str, res_type: str, protocol: int, time: int):
"""
极速HTTP 代理IP实现
官网地址https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang
:param exract_type: 提取方式
:param key: 提取key值 (到上面链接的官网去注册后获取)
:param crypto: 加密签名 (到上面链接的官网去注册后获取)
:param res_type: 返回的数据格式TXTJSON
:param protocol: IP协议1:HTTP2:HTTPS3:SOCKS5
:param time: IP使用时长支持35101530分钟时效
"""
self.exract_type = exract_type
self.api_path = "https://api.jisuhttp.com"
self.params = {
"key": key,
"crypto": crypto,
"type": res_type,
"port": protocol,
"time": time,
"pw": "1", # 是否使用账密验证, 10否表示白名单验证默认为0
"se": "1", # 返回JSON格式时是否显示IP过期时间 1显示0不显示默认为0
}
async def get_proxies(self, num: int) -> List[IpInfoModel]:
"""
:param num:
:return:
"""
if self.exract_type == "API":
uri = "/fetchips"
self.params.update({"num": num})
ip_infos = []
async with httpx.AsyncClient() as client:
url = self.api_path + uri + '?' + urlencode(self.params)
utils.logger.info(f"[JiSuHttpProxy] get ip proxy url:{url}")
response = await client.get(url, headers={"User-Agent": "MediaCrawler"})
res_dict: Dict = response.json()
if res_dict.get("code") == 0:
data: List[Dict] = res_dict.get("data")
for ip_item in data:
ip_info_model = IpInfoModel(
ip=ip_item.get("ip"),
port=ip_item.get("port"),
user=ip_item.get("user"),
password=ip_item.get("pass"),
expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire"))
)
ip_infos.append(ip_info_model)
else:
raise IpGetError(res_dict.get("msg", "unkown err"))
return ip_infos
else:
pass
IpProxy = JiSuHttpProxy(
key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值
crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名
res_type="json",
protocol=2,
time=30
)
if __name__ == '__main__':
_ip_infos = asyncio.run(IpProxy.get_proxies(1))
print(_ip_infos)

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 14:42
# @Desc :
from unittest import IsolatedAsyncioTestCase
from proxy.proxy_ip_pool import create_ip_pool
from proxy.proxy_ip_provider import IpInfoModel
class TestIpPool(IsolatedAsyncioTestCase):
async def test_ip_pool(self):
pool = await create_ip_pool(ip_pool_count=30, enable_validate_ip=False)
for i in range(30):
ip_proxy_info: IpInfoModel = await pool.get_proxy()
self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功")
print(ip_proxy_info)

94
tools/crawler_util.py Normal file
View File

@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 12:53
# @Desc : 爬虫相关的工具函数
import base64
import random
import re
from io import BytesIO
from typing import Dict, List, Optional, Tuple
from PIL import Image, ImageDraw
from playwright.async_api import Cookie, Page
async def find_login_qrcode(page: Page, selector: str) -> str:
"""find login qrcode image from target selector"""
try:
elements = await page.wait_for_selector(
selector=selector,
)
login_qrcode_img = await elements.get_property("src") # type: ignore
return str(login_qrcode_img)
except Exception as e:
print(e)
return ""
def show_qrcode(qr_code) -> None: # type: ignore
"""parse base64 encode qrcode image and show it"""
qr_code = qr_code.split(",")[1]
qr_code = base64.b64decode(qr_code)
image = Image.open(BytesIO(qr_code))
# Add a square border around the QR code and display it within the border to improve scanning accuracy.
width, height = image.size
new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255))
new_image.paste(image, (10, 10))
draw = ImageDraw.Draw(new_image)
draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1)
new_image.show()
def get_user_agent() -> str:
ua_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
]
return random.choice(ua_list)
def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]:
if not cookies:
return "", {}
cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies])
cookie_dict = dict()
for cookie in cookies:
cookie_dict[cookie.get('name')] = cookie.get('value')
return cookies_str, cookie_dict
def convert_str_cookie_to_dict(cookie_str: str) -> Dict:
cookie_dict: Dict[str, str] = dict()
if not cookie_str:
return cookie_dict
for cookie in cookie_str.split(";"):
cookie = cookie.strip()
if not cookie:
continue
cookie_list = cookie.split("=")
if len(cookie_list) != 2:
continue
cookie_value = cookie_list[1]
if isinstance(cookie_value, list):
cookie_value = "".join(cookie_value)
cookie_dict[cookie_list[0]] = cookie_value
return cookie_dict
def match_interact_info_count(count_str: str) -> int:
if not count_str:
return 0
match = re.search(r'\d+', count_str)
if match:
number = match.group()
return int(number)
else:
return 0

164
tools/slider_util.py Normal file
View File

@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 12:55
# @Desc : 滑块相关的工具包
import os
from typing import List
from urllib.parse import urlparse
import cv2
import httpx
import numpy as np
class Slide:
"""
copy from https://blog.csdn.net/weixin_43582101 thanks for author
update: relakkes
"""
def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None):
"""
:param gap: 缺口图片链接或者url
:param bg: 带缺口的图片链接或者url
"""
self.img_dir = os.path.join(os.getcwd(), 'temp_image')
if not os.path.exists(self.img_dir):
os.makedirs(self.img_dir)
bg_resize = bg_size if bg_size else (340, 212)
gap_size = gap_size if gap_size else (68, 68)
self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize)
self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size)
self.out = out if out else os.path.join(self.img_dir, 'out.jpg')
@staticmethod
def check_is_img_path(img, img_type, resize):
if img.startswith('http'):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": urlparse(img).hostname,
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.164 Safari/537.36",
}
img_res = httpx.get(img, headers=headers)
if img_res.status_code == 200:
img_path = f'./temp_image/{img_type}.jpg'
image = np.asarray(bytearray(img_res.content), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
if resize:
image = cv2.resize(image, dsize=resize)
cv2.imwrite(img_path, image)
return img_path
else:
raise Exception(f"保存{img_type}图片失败")
else:
return img
@staticmethod
def clear_white(img):
"""清除图片的空白区域,这里主要清除滑块的空白"""
img = cv2.imread(img)
rows, cols, channel = img.shape
min_x = 255
min_y = 255
max_x = 0
max_y = 0
for x in range(1, rows):
for y in range(1, cols):
t = set(img[x, y])
if len(t) >= 2:
if x <= min_x:
min_x = x
elif x >= max_x:
max_x = x
if y <= min_y:
min_y = y
elif y >= max_y:
max_y = y
img1 = img[min_x:max_x, min_y: max_y]
return img1
def template_match(self, tpl, target):
th, tw = tpl.shape[:2]
result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED)
# 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
tl = max_loc
br = (tl[0] + tw, tl[1] + th)
# 绘制矩形边框,将匹配区域标注出来
# target目标图像
# tl矩形定点
# br矩形的宽高
# (0,0,255):矩形边框颜色
# 1矩形边框大小
cv2.rectangle(target, tl, br, (0, 0, 255), 2)
cv2.imwrite(self.out, target)
return tl[0]
@staticmethod
def image_edge_detection(img):
edges = cv2.Canny(img, 100, 200)
return edges
def discern(self):
img1 = self.clear_white(self.gap)
img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
slide = self.image_edge_detection(img1)
back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY)
back = self.image_edge_detection(back)
slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB)
back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB)
x = self.template_match(slide_pic, back_pic)
# 输出横坐标, 即 滑块在图片上的位置
return x
def get_track_simple(distance) -> List[int]:
# 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进
# distance为传入的总距离
# 移动轨迹
track: List[int] = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 计算间隔
t = 0.2
# 初速度
v = 1
while current < distance:
if current < mid:
# 加速度为2
a = 4
else:
# 加速度为-2
a = -3
v0 = v
# 当前速度
v = v0 + a * t # type: ignore
# 移动距离
move = v0 * t + 1 / 2 * a * t * t
# 当前位移
current += move # type: ignore
# 加入轨迹
track.append(round(move))
return track
def get_tracks(distance: int, level: str = "easy") -> List[int]:
if level == "easy":
return get_track_simple(distance)
else:
from . import easing
_, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo")
return tricks

67
tools/time_util.py Normal file
View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 12:52
# @Desc : 时间相关的工具函数
import time
def get_current_timestamp() -> int:
"""
获取当前的时间戳1701493264496
:return:
"""
return int(time.time() * 1000)
def get_current_time() -> str:
"""
获取当前的时间'2023-12-02 13:01:23'
:return:
"""
return time.strftime('%Y-%m-%d %X', time.localtime())
def get_current_date() -> str:
"""
获取当前的日期'2023-12-02'
:return:
"""
return time.strftime('%Y-%m-%d', time.localtime())
def get_time_str_from_unix_time(unixtime):
"""
unix 整数类型时间戳 ==> 字符串日期时间
:param unixtime:
:return:
"""
if int(unixtime) > 1000000000000:
unixtime = int(unixtime) / 1000
return time.strftime('%Y-%m-%d %X', time.localtime(unixtime))
def get_date_str_from_unix_time(unixtime):
"""
unix 整数类型时间戳 ==> 字符串日期
:param unixtime:
:return:
"""
if int(unixtime) > 1000000000000:
unixtime = int(unixtime) / 1000
return time.strftime('%Y-%m-%d', time.localtime(unixtime))
def get_unix_time_from_time_str(time_str):
"""
字符串时间 ==> unix 整数类型时间戳精确到秒
:param time_str:
:return:
"""
try:
format_str = "%Y-%m-%d %H:%M:%S"
tm_object = time.strptime(str(time_str), format_str)
return int(time.mktime(tm_object))
except Exception as e:
return 0
pass

View File

@ -1,103 +1,8 @@
import base64
import logging import logging
import os
import random
import re
import time
from io import BytesIO
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse
import cv2 from .crawler_util import *
import httpx from .slider_util import *
import numpy as np from .time_util import *
from PIL import Image, ImageDraw
from playwright.async_api import Cookie, Page
async def find_login_qrcode(page: Page, selector: str) -> str:
"""find login qrcode image from target selector"""
try:
elements = await page.wait_for_selector(
selector=selector,
)
login_qrcode_img = await elements.get_property("src") # type: ignore
return str(login_qrcode_img)
except Exception as e:
print(e)
return ""
def show_qrcode(qr_code) -> None: # type: ignore
"""parse base64 encode qrcode image and show it"""
qr_code = qr_code.split(",")[1]
qr_code = base64.b64decode(qr_code)
image = Image.open(BytesIO(qr_code))
# Add a square border around the QR code and display it within the border to improve scanning accuracy.
width, height = image.size
new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255))
new_image.paste(image, (10, 10))
draw = ImageDraw.Draw(new_image)
draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1)
new_image.show()
def get_user_agent() -> str:
ua_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
]
return random.choice(ua_list)
def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]:
if not cookies:
return "", {}
cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies])
cookie_dict = dict()
for cookie in cookies:
cookie_dict[cookie.get('name')] = cookie.get('value')
return cookies_str, cookie_dict
def convert_str_cookie_to_dict(cookie_str: str) -> Dict:
cookie_dict: Dict[str, str]= dict()
if not cookie_str:
return cookie_dict
for cookie in cookie_str.split(";"):
cookie = cookie.strip()
if not cookie:
continue
cookie_list = cookie.split("=")
if len(cookie_list) != 2:
continue
cookie_value = cookie_list[1]
if isinstance(cookie_value, list):
cookie_value = "".join(cookie_value)
cookie_dict[cookie_list[0]] = cookie_value
return cookie_dict
def get_current_timestamp():
return int(time.time() * 1000)
def match_interact_info_count(count_str: str) -> int:
if not count_str:
return 0
match = re.search(r'\d+', count_str)
if match:
number = match.group()
return int(number)
else:
return 0
def init_loging_config(): def init_loging_config():
@ -113,166 +18,3 @@ def init_loging_config():
logger = init_loging_config() logger = init_loging_config()
class Slide:
"""
copy from https://blog.csdn.net/weixin_43582101 thanks for author
update: relakkes
"""
def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None):
"""
:param gap: 缺口图片链接或者url
:param bg: 带缺口的图片链接或者url
"""
self.img_dir = os.path.join(os.getcwd(), 'temp_image')
if not os.path.exists(self.img_dir):
os.makedirs(self.img_dir)
bg_resize = bg_size if bg_size else (340, 212)
gap_size = gap_size if gap_size else (68, 68)
self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize)
self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size)
self.out = out if out else os.path.join(self.img_dir, 'out.jpg')
@staticmethod
def check_is_img_path(img, img_type, resize):
if img.startswith('http'):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Host": urlparse(img).hostname,
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.164 Safari/537.36",
}
img_res = httpx.get(img, headers=headers)
if img_res.status_code == 200:
img_path = f'./temp_image/{img_type}.jpg'
image = np.asarray(bytearray(img_res.content), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
if resize:
image = cv2.resize(image, dsize=resize)
cv2.imwrite(img_path, image)
return img_path
else:
raise Exception(f"保存{img_type}图片失败")
else:
return img
@staticmethod
def clear_white(img):
"""清除图片的空白区域,这里主要清除滑块的空白"""
img = cv2.imread(img)
rows, cols, channel = img.shape
min_x = 255
min_y = 255
max_x = 0
max_y = 0
for x in range(1, rows):
for y in range(1, cols):
t = set(img[x, y])
if len(t) >= 2:
if x <= min_x:
min_x = x
elif x >= max_x:
max_x = x
if y <= min_y:
min_y = y
elif y >= max_y:
max_y = y
img1 = img[min_x:max_x, min_y: max_y]
return img1
def template_match(self, tpl, target):
th, tw = tpl.shape[:2]
result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED)
# 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
tl = max_loc
br = (tl[0] + tw, tl[1] + th)
# 绘制矩形边框,将匹配区域标注出来
# target目标图像
# tl矩形定点
# br矩形的宽高
# (0,0,255):矩形边框颜色
# 1矩形边框大小
cv2.rectangle(target, tl, br, (0, 0, 255), 2)
cv2.imwrite(self.out, target)
return tl[0]
@staticmethod
def image_edge_detection(img):
edges = cv2.Canny(img, 100, 200)
return edges
def discern(self):
img1 = self.clear_white(self.gap)
img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
slide = self.image_edge_detection(img1)
back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY)
back = self.image_edge_detection(back)
slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB)
back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB)
x = self.template_match(slide_pic, back_pic)
# 输出横坐标, 即 滑块在图片上的位置
return x
def get_track_simple(distance) -> List[int]:
# 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进
# distance为传入的总距离
# 移动轨迹
track: List[int]= []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 计算间隔
t = 0.2
# 初速度
v = 1
while current < distance:
if current < mid:
# 加速度为2
a = 4
else:
# 加速度为-2
a = -3
v0 = v
# 当前速度
v = v0 + a * t # type: ignore
# 移动距离
move = v0 * t + 1 / 2 * a * t * t
# 当前位移
current += move # type: ignore
# 加入轨迹
track.append(round(move))
return track
def get_tracks(distance: int, level: str = "easy") -> List[int]:
if level == "easy":
return get_track_simple(distance)
else:
from . import easing
_, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo")
return tricks
def get_current_time():
ISOTIMEFORMAT = '%Y-%m-%d %X'
return tme.strftime(ISOTIMEFORMAT, time.localtime())
def get_current_date():
ISOTIMEFORMAT = '%Y-%m-%d'
return time.strftime(ISOTIMEFORMAT, time.localtime())