From 57437719bff502ab2becaa96371f842898d0c863 Mon Sep 17 00:00:00 2001 From: Relakkes Date: Thu, 29 Jun 2023 16:22:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8A=96=E9=9F=B3=E4=B8=89=E7=A7=8D?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E7=99=BB=E5=BD=95=E5=AE=9E=E7=8E=B0=20&=20?= =?UTF-8?q?=E6=8A=96=E9=9F=B3=E6=BB=91=E5=9D=97=E6=A8=A1=E6=8B=9F=E6=BB=91?= =?UTF-8?q?=E5=8A=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- README.md | 50 +++--- media_platform/douyin/core.py | 20 ++- media_platform/douyin/login.py | 268 ++++++++++++++++++++++++++------- requirements.txt | 3 +- tools/easing.py | 68 +++++++++ tools/utils.py | 161 ++++++++++++++++++++ 7 files changed, 488 insertions(+), 85 deletions(-) create mode 100644 tools/easing.py diff --git a/.gitignore b/.gitignore index c9a8617..604f7ea 100644 --- a/.gitignore +++ b/.gitignore @@ -161,4 +161,5 @@ cython_debug/ *.xml *.iml -.idea \ No newline at end of file +.idea +/temp_image/ diff --git a/README.md b/README.md index 5038599..e0df70f 100644 --- a/README.md +++ b/README.md @@ -23,10 +23,11 @@ - [x] 抖音Sign请求签名 - [x] 代理池实现(手机号+IP) - [x] 并发执行爬虫请求 +- [x] 抖音登录(二维码、手机号、cookies) +- [x] 抖音滑块(模拟滑动实现,准确率不太OK) ## 待实现 -- [ ] 抖音登录(滑块) - [ ] 数据持久化到硬盘 ## 使用方法 @@ -44,40 +45,40 @@ ``` MediaCrawler ├── base -│ ├── base_crawler.py # 项目的抽象类 -│ └── proxy_account_pool.py # 账号与IP代理池 +│ ├── base_crawler.py # 项目的抽象类 +│ └── proxy_account_pool.py # 账号与IP代理池 ├── config -│ ├── account_config.py # 基础配置 -│ └── base_config.py # 账号池配置 +│ ├── account_config.py # 基础配置 +│ └── base_config.py # 账号池配置 ├── images │ ├── douyin.gif │ └── xiaohongshu.git ├── libs -│ ├── douyin.js # 抖音Sign函数 -│ └── stealth.min.js # 去除浏览器自动化特征的JS +│ ├── douyin.js # 抖音Sign函数 +│ └── stealth.min.js # 去除浏览器自动化特征的JS ├── media_platform -│ ├── douyin # 抖音crawler实现 -│ │ ├── client.py # 抖音API httpx 请求封装 -│ │ ├── core.py # 抖音crawler的核心实现 -│ │ ├── exception.py # 抖音crawler的异常处理 -│ │ ├── field.py # 抖音crawler的字段定义 -│ │ └── login.py # 抖音crawler的登录实现 -│ └── xiaohongshu # 小红书crawler实现 -│ ├── client.py # 小红书API httpx 请求封装 -│ ├── core.py # 小红书crawler的核心实现 -│ ├── exception.py # 小红书crawler的异常处理 -│ ├── field.py # 小红书crawler的字段定义 -│ ├── help.py # 小红书crawler的辅助函数 -│ └── login.py # 小红书crawler的登录实现 +│ ├── douyin # 抖音crawler实现 +│ │ ├── client.py # httpx 请求封装 +│ │ ├── core.py # 核心实现 +│ │ ├── exception.py # 异常处理 +│ │ ├── field.py # 字段定义 +│ │ └── login.py # 登录实现 +│ └── xiaohongshu # 小红书crawler实现 +│ ├── client.py # API httpx 请求封装 +│ ├── core.py # 核心实现 +│ ├── exception.py # 异常处理 +│ ├── field.py # 字段定义 +│ ├── help.py # 辅助函数 +│ └── login.py # 登录实现 ├── modles │ ├── douyin │ │ └── m_douyin.py │ └── xhs │ └── m_xhs.py ├── tools -│ └── utils.py # 工具函数 -├── main.py # 程序入口 -└── recv_sms_notification.py # 短信转发器的HTTP SERVER接口 +│ └── utils.py # 工具函数 +├── main.py # 程序入口 +└── recv_sms_notification.py # 短信转发器的HTTP SERVER接口 ``` ## 小红书运行截图 @@ -116,8 +117,7 @@ MediaCrawler 备注: - 小红书这边一个手机号一天只能发10条短信(悠着点),目前在发验证码时还未触发滑块验证,估计多了之后也会有~ -- -短信转发软件会不会监控自己手机上其他短信内容?(理论上应该不会,因为[短信转发仓库](https://github.com/pppscn/SmsForwarder) +- 短信转发软件会不会监控自己手机上其他短信内容?(理论上应该不会,因为[短信转发仓库](https://github.com/pppscn/SmsForwarder) star还是蛮多的) diff --git a/media_platform/douyin/core.py b/media_platform/douyin/core.py index 126331b..520628e 100644 --- a/media_platform/douyin/core.py +++ b/media_platform/douyin/core.py @@ -49,13 +49,25 @@ class DouYinCrawler(AbstractCrawler): return phone, playwright_proxy, httpx_proxy async def start(self): - account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info() + # phone: 1340xxxx, ip_proxy: 47.xxx.xxx.xxx:8888 + account_phone, ip_proxy = self.account_pool.get_account() + + # 抖音平台如果开启代理登录的话,会被风控,所以这里不开启代理 + playwright_proxy = None + # playwright_proxy = { + # "server": f"{config.ip_proxy_protocol}{ip_proxy}", + # "username": config.ip_proxy_user, + # "password": config.ip_proxy_password, + # } + + httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}" if not config.ENABLE_IP_PROXY: - playwright_proxy, httpx_proxy = None, None + playwright_proxy = None + httpx_proxy = None async with async_playwright() as playwright: chromium = playwright.chromium - browser = await chromium.launch(headless=True, proxy=playwright_proxy) + browser = await chromium.launch(headless=config.HEADLESS, proxy=playwright_proxy) self.browser_context = await browser.new_context( viewport={"width": 1800, "height": 900}, user_agent=self.user_agent, @@ -74,7 +86,7 @@ class DouYinCrawler(AbstractCrawler): context_page=self.context_page, cookie_str=config.COOKIES ) - # await login_obj.begin() + await login_obj.begin() # update cookies await self.update_cookies() diff --git a/media_platform/douyin/login.py b/media_platform/douyin/login.py index ac9f83e..9c9ab10 100644 --- a/media_platform/douyin/login.py +++ b/media_platform/douyin/login.py @@ -1,16 +1,23 @@ import sys +import logging import asyncio -from playwright.async_api import Page +import aioredis +from tenacity import ( + retry, + stop_after_attempt, + wait_fixed, + retry_if_result +) +from playwright.async_api import Page, TimeoutError as PlaywrightTimeoutError from playwright.async_api import BrowserContext -from tools import utils +import config +from tools import utils, easing from base.base_crawler import AbstractLogin class DouYinLogin(AbstractLogin): - async def login_by_cookies(self): - pass def __init__(self, login_type: str, @@ -23,59 +30,19 @@ class DouYinLogin(AbstractLogin): self.browser_context = browser_context self.context_page = context_page self.login_phone = login_phone - self.cookie_str = cookie_str self.scan_qrcode_time = 60 - - async def check_login_state(self): - """Check if the current login status is successful and return True otherwise return False""" - current_cookie = await self.browser_context.cookies() - _, cookie_dict = utils.convert_cookies(current_cookie) - if cookie_dict.get("LOGIN_STATUS") == "1": - return True - return False - - async def login_by_qrcode(self): - """login douyin website and keep webdriver login state""" - print("Begin login douyin ...") - # find login qrcode - base64_qrcode_img = await utils.find_login_qrcode( - self.context_page, - selector="xpath=//article[@class='web-login']//img" - ) - if not base64_qrcode_img: - if await self.check_login_state(): - return - # todo ...if this website does not automatically popup login dialog box, we will manual click login button - print("login failed , have not found qrcode please check ....") - sys.exit() - - # show login qrcode - utils.show_qrcode(base64_qrcode_img) - - while self.scan_qrcode_time > 0: - await asyncio.sleep(1) - self.scan_qrcode_time -= 1 - print(f"waiting for scan code login, remaining time is {self.scan_qrcode_time} seconds") - # get login state from browser - if await self.check_login_state(): - # If the QR code login is successful, you need to wait for a moment. - # Because there will be a second redirection after successful login - # executing JS during this period may be performed in a Page that has already been destroyed. - wait_for_seconds = 5 - print(f"Login successful then wait for {wait_for_seconds} seconds redirect ...") - while wait_for_seconds > 0: - await asyncio.sleep(1) - print(f"remaining wait {wait_for_seconds} seconds ...") - wait_for_seconds -= 1 - break - else: - sys.exit() - - async def login_by_mobile(self): - # todo implement login by mobile - pass + self.cookie_str = cookie_str async def begin(self): + """ + Start login douyin website + 滑块中间页面的验证准确率不太OK... 如果没有特俗要求,建议不开抖音登录,或者使用cookies登录 + """ + + # popup login dialog + await self.popup_login_dialog() + + # select login type if self.login_type == "qrcode": await self.login_by_qrcode() elif self.login_type == "phone": @@ -84,3 +51,196 @@ class DouYinLogin(AbstractLogin): await self.login_by_cookies() else: raise ValueError("Invalid Login Type Currently only supported qrcode or phone ...") + + # 如果页面重定向到滑动验证码页面,需要再次滑动滑块 + await asyncio.sleep(3) + current_page_title = await self.context_page.title() + if "验证码中间页" in current_page_title: + await self.check_page_display_slider(move_step=3, slider_level="hard") + + # check login state + logging.info(f"login finished then check login state ...") + login_flag: bool = await self.check_login_state() + if not login_flag: + logging.info("login failed please confirm ...") + sys.exit() + + # wait for redirect + wait_redirect_seconds = 5 + logging.info(f"Login successful then wait for {wait_redirect_seconds} seconds redirect ...") + await asyncio.sleep(wait_redirect_seconds) + + @retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) + async def check_login_state(self): + """Check if the current login status is successful and return True otherwise return False""" + current_cookie = await self.browser_context.cookies() + _, cookie_dict = utils.convert_cookies(current_cookie) + if cookie_dict.get("LOGIN_STATUS") == "1": + return True + return False + + async def popup_login_dialog(self): + """If the login dialog box does not pop up automatically, we will manually click the login button""" + dialog_selector = "xpath=//div[@id='login-pannel']" + try: + # check dialog box is auto popup and wait for 10 seconds + await self.context_page.wait_for_selector(dialog_selector, timeout=1000 * 10) + except Exception as e: + logging.error(f"login dialog box does not pop up automatically, error: {e}") + logging.info("login dialog box does not pop up automatically, we will manually click the login button") + login_button_ele = self.context_page.locator("xpath=//p[text() = '登录']") + await login_button_ele.click() + await asyncio.sleep(0.5) + + async def login_by_qrcode(self): + logging.info("Begin login douyin by qrcode...") + qrcode_img_selector = "xpath=//article[@class='web-login']//img" + base64_qrcode_img = await utils.find_login_qrcode( + self.context_page, + selector=qrcode_img_selector + ) + if not base64_qrcode_img: + logging.info("login qrcode not found please confirm ...") + sys.exit() + + # show login qrcode + utils.show_qrcode(base64_qrcode_img) + await asyncio.sleep(2) + + async def login_by_mobile(self): + logging.info("Begin login douyin by mobile ...") + mobile_tap_ele = self.context_page.locator("xpath=//li[text() = '验证码登录']") + await mobile_tap_ele.click() + await self.context_page.wait_for_selector("xpath=//article[@class='web-login-mobile-code']") + mobile_input_ele = self.context_page.locator("xpath=//input[@placeholder='手机号']") + await mobile_input_ele.fill(self.login_phone) + await asyncio.sleep(0.5) + send_sms_code_btn = self.context_page.locator("xpath=//span[text() = '获取验证码']") + await send_sms_code_btn.click() + + # 检查是否有滑动验证码 + await self.check_page_display_slider(move_step=10, slider_level="easy") + + redis_obj = aioredis.from_url(url=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD, decode_responses=True) + max_get_sms_code_time = 60 * 2 # 最长获取验证码的时间为2分钟 + while max_get_sms_code_time > 0: + logging.info(f"get douyin sms code from redis remaining time {max_get_sms_code_time}s ...") + await asyncio.sleep(1) + sms_code_key = f"dy_{self.login_phone}" + sms_code_value = await redis_obj.get(sms_code_key) + if not sms_code_value: + max_get_sms_code_time -= 1 + continue + + sms_code_input_ele = self.context_page.locator("xpath=//input[@placeholder='请输入验证码']") + await sms_code_input_ele.fill(value=sms_code_value) + await asyncio.sleep(0.5) + submit_btn_ele = self.context_page.locator("xpath=//button[@class='web-login-button']") + await submit_btn_ele.click() # 点击登录 + # todo ... 应该还需要检查验证码的正确性有可能输入的验证码不正确 + break + + async def check_page_display_slider(self, move_step: int = 10, slider_level: str = "easy"): + """ + 检查页面是否出现滑动验证码 + :return: + """ + # 等待滑动验证码的出现 + back_selector = "#captcha-verify-image" + try: + await self.context_page.wait_for_selector(selector=back_selector, state="visible", timeout=30 * 1000) + except PlaywrightTimeoutError: # 没有滑动验证码,直接返回 + return + + gap_selector = 'xpath=//*[@id="captcha_container"]/div/div[2]/img[2]' + max_slider_try_times = 20 + slider_verify_success = False + while not slider_verify_success: + if max_slider_try_times <= 0: + logging.error("slider verify failed ...") + sys.exit() + try: + await self.move_slider(back_selector, gap_selector, move_step, slider_level) + await asyncio.sleep(1) + + # 如果滑块滑动慢了,或者验证失败了,会提示操作过慢,这里点一下刷新按钮 + page_content = await self.context_page.content() + if "操作过慢" in page_content or "提示重新操作" in page_content: + logging.info("slider verify failed, retry ...") + await self.context_page.click(selector="//a[contains(@class, 'secsdk_captcha_refresh')]") + continue + + # 滑动成功后,等待滑块消失 + await self.context_page.wait_for_selector(selector=back_selector, state="hidden", timeout=1000) + # 如果滑块消失了,说明验证成功了,跳出循环,如果没有消失,说明验证失败了,上面这一行代码会抛出异常被捕获后继续循环滑动验证码 + logging.info("slider verify success ...") + slider_verify_success = True + except Exception as e: + logging.error(f"slider verify failed, error: {e}") + await asyncio.sleep(1) + max_slider_try_times -= 1 + logging.info(f"remaining slider try times: {max_slider_try_times}") + continue + + async def move_slider(self, back_selector: str, gap_selector: str, move_step: int = 10, slider_level="easy"): + """ + Move the slider to the right to complete the verification + :param back_selector: 滑动验证码背景图片的选择器 + :param gap_selector: 滑动验证码的滑块选择器 + :param move_step: 是控制单次移动速度的比例是1/10 默认是1 相当于 传入的这个距离不管多远0.1秒钟移动完 越大越慢 + :param slider_level: 滑块难度 easy hard,分别对应手机验证码的滑块和验证码中间的滑块 + :return: + """ + + # get slider background image + slider_back_elements = await self.context_page.wait_for_selector( + selector=back_selector, + timeout=1000 * 10, # wait 10 seconds + ) + slide_back = str(await slider_back_elements.get_property("src")) + + # get slider gap image + gap_elements = await self.context_page.wait_for_selector( + selector=gap_selector, + timeout=1000 * 10, # wait 10 seconds + ) + gap_src = str(await gap_elements.get_property("src")) + + # 识别滑块位置 + slide_app = utils.Slide(gap=gap_src, bg=slide_back) + distance = slide_app.discern() + + # 获取移动轨迹 + tracks = utils.get_tracks(distance, slider_level) + new_1 = tracks[-1] - (sum(tracks) - distance) + tracks.pop() + tracks.append(new_1) + + # 根据轨迹拖拽滑块到指定位置 + element = await self.context_page.query_selector(gap_selector) + bounding_box = await element.bounding_box() + + await self.context_page.mouse.move(bounding_box["x"] + bounding_box["width"] / 2, + bounding_box["y"] + bounding_box["height"] / 2) + # 这里获取到x坐标中心点位置 + x = bounding_box["x"] + bounding_box["width"] / 2 + # 模拟滑动操作 + await element.hover() + await self.context_page.mouse.down() + + for track in tracks: + # 循环鼠标按照轨迹移动 + # steps 是控制单次移动速度的比例是1/10 默认是1 相当于 传入的这个距离不管多远0.1秒钟移动完 越大越慢 + await self.context_page.mouse.move(x + track, 0, steps=move_step) + x += track + await self.context_page.mouse.up() + + async def login_by_cookies(self): + logging.info("Begin login douyin by cookie ...") + for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): + await self.browser_context.add_cookies([{ + 'name': key, + 'value': value, + 'domain': ".douyin.com", + 'path': "/" + }]) diff --git a/requirements.txt b/requirements.txt index 422f037..25b142b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ playwright==1.33.0 aioredis==2.0.1 tenacity==8.2.2 tornado==6.3.2 -PyExecJS==1.5.1 \ No newline at end of file +PyExecJS==1.5.1 +opencv-python==4.7.0.72 diff --git a/tools/easing.py b/tools/easing.py new file mode 100644 index 0000000..fa3a017 --- /dev/null +++ b/tools/easing.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# copy from https://github.com/aneasystone/selenium-test/blob/master/12-slider-captcha.py +# thanks to aneasystone for his great work +import numpy as np +import math + + +# https://github.com/gdsmith/jquery.easing/blob/master/jquery.easing.js +def ease_in_quad(x): + return x * x + + +def ease_out_quad(x): + return 1 - (1 - x) * (1 - x) + + +def ease_out_quart(x): + return 1 - pow(1 - x, 4) + + +def ease_out_expo(x): + if x == 1: + return 1 + else: + return 1 - pow(2, -10 * x) + + +def ease_out_bounce(x): + n1 = 7.5625 + d1 = 2.75 + if x < 1 / d1: + return n1 * x * x + elif x < 2 / d1: + x -= 1.5 / d1 + return n1 * x * x + 0.75 + elif x < 2.5 / d1: + x -= 2.25 / d1 + return n1 * x * x + 0.9375 + else: + x -= 2.625 / d1 + return n1 * x * x + 0.984375 + + +def ease_out_elastic(x): + if x == 0: + return 0 + elif x == 1: + return 1 + else: + c4 = (2 * math.pi) / 3 + return pow(2, -10 * x) * math.sin((x * 10 - 0.75) * c4) + 1 + + +def get_tracks(distance, seconds, ease_func): + tracks = [0] + offsets = [0] + for t in np.arange(0.0, seconds, 0.1): + ease = globals()[ease_func] + offset = round(ease(t / seconds) * distance) + tracks.append(offset - offsets[-1]) + offsets.append(offset) + return offsets, tracks + + +if __name__ == '__main__': + o, tl = get_tracks(129, 3, "ease_out_expo") + print(tl) diff --git a/tools/utils.py b/tools/utils.py index 650edd7..4f709f8 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -1,11 +1,16 @@ import re +import os import time import random import base64 import logging from io import BytesIO +from urllib.parse import urlparse from typing import Optional, Dict, List, Tuple +import cv2 +import httpx +import numpy as np from PIL import Image, ImageDraw from playwright.async_api import Cookie from playwright.async_api import Page @@ -71,6 +76,8 @@ def convert_str_cookie_to_dict(cookie_str: str) -> Dict: if not cookie: continue cookie = cookie.split("=") + if len(cookie) != 2: + continue cookie_value = cookie[1] if isinstance(cookie_value, list): cookie_value = "".join(cookie_value) @@ -102,3 +109,157 @@ def init_loging_config(): datefmt='%Y-%m-%d %H:%M:%S' ) logging.Logger("Media Crawler") + + +class Slide: + """ + copy from https://blog.csdn.net/weixin_43582101 thanks for author + update: relakkes + """ + + def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None): + """ + :param gap: 缺口图片链接或者url + :param bg: 带缺口的图片链接或者url + """ + self.img_dir = os.path.join(os.getcwd(), 'temp_image') + if not os.path.exists(self.img_dir): + os.makedirs(self.img_dir) + + bg_resize = bg_size if bg_size else (340, 212) + gap_size = gap_size if gap_size else (68, 68) + self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize) + self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size) + self.out = out if out else os.path.join(self.img_dir, 'out.jpg') + + @staticmethod + def check_is_img_path(img, img_type, resize): + if img.startswith('http'): + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;" + "q=0.8,application/signed-exchange;v=b3;q=0.9", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6", + "Cache-Control": "max-age=0", + "Connection": "keep-alive", + "Host": urlparse(img).hostname, + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/91.0.4472.164 Safari/537.36", + } + img_res = httpx.get(img, headers=headers) + if img_res.status_code == 200: + img_path = f'./temp_image/{img_type}.jpg' + image = np.asarray(bytearray(img_res.content), dtype="uint8") + image = cv2.imdecode(image, cv2.IMREAD_COLOR) + if resize: + image = cv2.resize(image, dsize=resize) + cv2.imwrite(img_path, image) + return img_path + else: + raise Exception(f"保存{img_type}图片失败") + else: + return img + + @staticmethod + def clear_white(img): + """清除图片的空白区域,这里主要清除滑块的空白""" + img = cv2.imread(img) + rows, cols, channel = img.shape + min_x = 255 + min_y = 255 + max_x = 0 + max_y = 0 + for x in range(1, rows): + for y in range(1, cols): + t = set(img[x, y]) + if len(t) >= 2: + if x <= min_x: + min_x = x + elif x >= max_x: + max_x = x + + if y <= min_y: + min_y = y + elif y >= max_y: + max_y = y + img1 = img[min_x:max_x, min_y: max_y] + return img1 + + def template_match(self, tpl, target): + th, tw = tpl.shape[:2] + result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED) + # 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置 + min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) + tl = max_loc + br = (tl[0] + tw, tl[1] + th) + # 绘制矩形边框,将匹配区域标注出来 + # target:目标图像 + # tl:矩形定点 + # br:矩形的宽高 + # (0,0,255):矩形边框颜色 + # 1:矩形边框大小 + cv2.rectangle(target, tl, br, (0, 0, 255), 2) + cv2.imwrite(self.out, target) + return tl[0] + + @staticmethod + def image_edge_detection(img): + edges = cv2.Canny(img, 100, 200) + return edges + + def discern(self): + img1 = self.clear_white(self.gap) + img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) + slide = self.image_edge_detection(img1) + + back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY) + back = self.image_edge_detection(back) + + slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB) + back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB) + x = self.template_match(slide_pic, back_pic) + # 输出横坐标, 即 滑块在图片上的位置 + return x + + +def get_track_simple(distance): + # 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进 + # distance为传入的总距离 + # 移动轨迹 + track = [] + # 当前位移 + current = 0 + # 减速阈值 + mid = distance * 4 / 5 + # 计算间隔 + t = 0.2 + # 初速度 + v = 1 + + while current < distance: + if current < mid: + # 加速度为2 + a = 4 + else: + # 加速度为-2 + a = -3 + v0 = v + # 当前速度 + v = v0 + a * t + # 移动距离 + move = v0 * t + 1 / 2 * a * t * t + # 当前位移 + current += move + # 加入轨迹 + track.append(round(move)) + return track + + +def get_tracks(distance: int, level: str = "easy") -> List[int]: + if level == "easy": + return get_track_simple(distance) + else: + from . import easing + _, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo") + return tricks