diff --git a/base/base_crawler.py b/base/base_crawler.py index e307769..52ce177 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -1,9 +1,11 @@ from abc import ABC, abstractmethod +from base.proxy_account_pool import AccountPool + class AbstractCrawler(ABC): @abstractmethod - def init_config(self, **kwargs): + def init_config(self, platform: str, login_type: str, account_pool: AccountPool): pass @abstractmethod @@ -11,7 +13,7 @@ class AbstractCrawler(ABC): pass @abstractmethod - async def search_posts(self): + async def search(self): pass diff --git a/base/proxy_account_pool.py b/base/proxy_account_pool.py index e14a26b..1915092 100644 --- a/base/proxy_account_pool.py +++ b/base/proxy_account_pool.py @@ -1,4 +1,4 @@ -from typing import Tuple, Optional, List, Set +from typing import List, Optional, Set, Tuple import config diff --git a/config/__init__.py b/config/__init__.py index 4721e8f..b3e5dbf 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,3 +1,3 @@ -from .base_config import * from .account_config import * +from .base_config import * from .db_config import * diff --git a/config/base_config.py b/config/base_config.py index 541c898..5d98096 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -19,8 +19,8 @@ SAVE_LOGIN_STATE = True # save user data dir USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name -# max page num -MAX_PAGE_NUM = 20 +# crawler max notes count +CRAWLER_MAX_NOTES_COUNT = 20 # max concurrency num MAX_CONCURRENCY_NUM = 10 diff --git a/db.py b/db.py index 27caf4a..f83205c 100644 --- a/db.py +++ b/db.py @@ -1,8 +1,6 @@ -from tortoise import Tortoise -from tortoise import run_async +from tortoise import Tortoise, run_async from config.db_config import * - from tools import utils diff --git a/main.py b/main.py index d00cb5a..46c94cf 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,9 @@ -import sys -import asyncio import argparse +import asyncio +import sys -import db import config +import db from base import proxy_account_pool from media_platform.douyin import DouYinCrawler from media_platform.xhs import XiaoHongShuCrawler @@ -17,14 +17,16 @@ class CrawlerFactory: elif platform == "dy": return DouYinCrawler() else: - raise ValueError("Invalid Media Platform Currently only supported xhs or douyin ...") + raise ValueError("Invalid Media Platform Currently only supported xhs or dy ...") async def main(): # define command line params ... parser = argparse.ArgumentParser(description='Media crawler program.') - parser.add_argument('--platform', type=str, help='Media platform select (xhs|dy)...', default=config.PLATFORM) - parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)', default=config.LOGIN_TYPE) + parser.add_argument('--platform', type=str, help='Media platform select (xhs|dy)', choices=["xhs", "dy"], + default=config.PLATFORM) + parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)', + choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE) # init account pool account_pool = proxy_account_pool.create_account_pool() @@ -34,9 +36,10 @@ async def main(): await db.init_db() args = parser.parse_args() - crawler = CrawlerFactory().create_crawler(platform=args.platform) + crawler = CrawlerFactory.create_crawler(platform=args.platform) crawler.init_config( - command_args=args, + platform=args.platform, + login_type=args.lt, account_pool=account_pool ) await crawler.start() @@ -44,6 +47,7 @@ async def main(): if __name__ == '__main__': try: - asyncio.run(main()) + # asyncio.run(main()) + asyncio.get_event_loop().run_until_complete(main()) except KeyboardInterrupt: sys.exit() diff --git a/media_platform/douyin/client.py b/media_platform/douyin/client.py index 58d4888..ca56dc5 100644 --- a/media_platform/douyin/client.py +++ b/media_platform/douyin/client.py @@ -1,17 +1,17 @@ -import copy import asyncio -from typing import Optional, Dict, Callable - -import httpx -import execjs +import copy import urllib.parse -from playwright.async_api import Page -from playwright.async_api import BrowserContext +from typing import Callable, Dict, Optional + +import execjs +import httpx +from playwright.async_api import BrowserContext, Page -from .field import * -from .exception import * from tools import utils +from .exception import * +from .field import * + class DOUYINClient: def __init__( diff --git a/media_platform/douyin/core.py b/media_platform/douyin/core.py index ba19abe..cba4aa5 100644 --- a/media_platform/douyin/core.py +++ b/media_platform/douyin/core.py @@ -1,38 +1,38 @@ -import os import asyncio +import os from asyncio import Task -from argparse import Namespace -from typing import Optional, List, Dict, Tuple +from typing import Dict, List, Optional, Tuple -from playwright.async_api import async_playwright -from playwright.async_api import BrowserType -from playwright.async_api import BrowserContext -from playwright.async_api import Page +from playwright.async_api import (BrowserContext, BrowserType, Page, + async_playwright) import config -from tools import utils -from .client import DOUYINClient -from .exception import DataFetchError -from .login import DouYinLogin from base.base_crawler import AbstractCrawler from base.proxy_account_pool import AccountPool from models import douyin +from tools import utils + +from .client import DOUYINClient +from .exception import DataFetchError +from .login import DouYinLogin class DouYinCrawler(AbstractCrawler): + platform: str + login_type: str + context_page: Page dy_client: DOUYINClient + account_pool: AccountPool + browser_context: BrowserContext def __init__(self) -> None: - self.browser_context: Optional[BrowserContext] = None # type: ignore - self.context_page: Optional[Page] = None # type: ignore self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36" # fixed self.index_url = "https://www.douyin.com" - self.command_args: Optional[Namespace] = None # type: ignore - self.account_pool: Optional[AccountPool] = None # type: ignore - def init_config(self, **kwargs): - for key, value in kwargs.items(): - setattr(self, key, value) + def init_config(self, platform: str, login_type: str, account_pool: AccountPool) -> None: + self.platform = platform + self.login_type = login_type + self.account_pool = account_pool async def start(self) -> None: account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info() @@ -53,7 +53,7 @@ class DouYinCrawler(AbstractCrawler): self.dy_client = await self.create_douyin_client(httpx_proxy) if not await self.dy_client.ping(browser_context=self.browser_context): login_obj = DouYinLogin( - login_type=self.command_args.lt, # type: ignore + login_type=self.login_type, login_phone=account_phone, browser_context=self.browser_context, context_page=self.context_page, @@ -63,25 +63,25 @@ class DouYinCrawler(AbstractCrawler): await self.dy_client.update_cookies(browser_context=self.browser_context) # search_posts - await self.search_posts() + await self.search() utils.logger.info("Douyin Crawler finished ...") - async def search_posts(self) -> None: + async def search(self) -> None: utils.logger.info("Begin search douyin keywords") for keyword in config.KEYWORDS.split(","): utils.logger.info(f"Current keyword: {keyword}") aweme_list: List[str] = [] - max_note_len = config.MAX_PAGE_NUM + dy_limite_count = 10 # douyin fixed limit page 10 page = 0 - while max_note_len > 0: + while (page + 1) * dy_limite_count <= config.CRAWLER_MAX_NOTES_COUNT: try: - posts_res = await self.dy_client.search_info_by_keyword(keyword=keyword, offset=page * 10) + posts_res = await self.dy_client.search_info_by_keyword(keyword=keyword, + offset=page * dy_limite_count) except DataFetchError: utils.logger.error(f"search douyin keyword: {keyword} failed") break page += 1 - max_note_len -= 10 for post_item in posts_res.get("data"): try: aweme_info: Dict = post_item.get("aweme_info") or \ @@ -93,15 +93,15 @@ class DouYinCrawler(AbstractCrawler): utils.logger.info(f"keyword:{keyword}, aweme_list:{aweme_list}") await self.batch_get_note_comments(aweme_list) - async def batch_get_note_comments(self, aweme_list: List[str]): + async def batch_get_note_comments(self, aweme_list: List[str]) -> None: task_list: List[Task] = [] - _semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) for aweme_id in aweme_list: - task = asyncio.create_task(self.get_comments(aweme_id, _semaphore), name=aweme_id) + task = asyncio.create_task(self.get_comments(aweme_id, semaphore), name=aweme_id) task_list.append(task) await asyncio.wait(task_list) - async def get_comments(self, aweme_id: str, semaphore: "asyncio.Semaphore"): + async def get_comments(self, aweme_id: str, semaphore: asyncio.Semaphore) -> None: async with semaphore: try: await self.dy_client.get_aweme_all_comments( @@ -155,7 +155,7 @@ class DouYinCrawler(AbstractCrawler): """Launch browser and create browser context""" if config.SAVE_LOGIN_STATE: user_data_dir = os.path.join(os.getcwd(), "browser_data", - config.USER_DATA_DIR % self.command_args.platform) # type: ignore + config.USER_DATA_DIR % self.platform) # type: ignore browser_context = await chromium.launch_persistent_context( user_data_dir=user_data_dir, accept_downloads=True, @@ -173,7 +173,7 @@ class DouYinCrawler(AbstractCrawler): ) return browser_context - async def close(self): + async def close(self) -> None: """Close browser context""" await self.browser_context.close() utils.logger.info("Browser context closed ...") diff --git a/media_platform/douyin/login.py b/media_platform/douyin/login.py index a16999b..7a2148a 100644 --- a/media_platform/douyin/login.py +++ b/media_platform/douyin/login.py @@ -1,22 +1,17 @@ -import sys import asyncio import functools +import sys from typing import Optional import aioredis -from tenacity import ( - retry, - stop_after_attempt, - wait_fixed, - retry_if_result, - RetryError -) -from playwright.async_api import Page, TimeoutError as PlaywrightTimeoutError -from playwright.async_api import BrowserContext +from playwright.async_api import BrowserContext, Page +from playwright.async_api import TimeoutError as PlaywrightTimeoutError +from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, + wait_fixed) import config -from tools import utils from base.base_crawler import AbstractLogin +from tools import utils class DouYinLogin(AbstractLogin): diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index 67fd113..f00904f 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -1,16 +1,16 @@ -import json import asyncio -from typing import Optional, Dict +import json +from typing import Dict, Optional import httpx -from playwright.async_api import Page -from playwright.async_api import BrowserContext +from playwright.async_api import BrowserContext, Page -from .help import sign, get_search_id -from .field import SearchSortType, SearchNoteType -from .exception import DataFetchError, IPBlockError from tools import utils +from .exception import DataFetchError, IPBlockError +from .field import SearchNoteType, SearchSortType +from .help import get_search_id, sign + class XHSClient: def __init__( diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index eb4ab1e..1c1d084 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -1,41 +1,41 @@ +import asyncio import os import random -import asyncio from asyncio import Task -from typing import Optional, List, Dict, Tuple -from argparse import Namespace +from typing import Dict, List, Optional, Tuple -from playwright.async_api import Page -from playwright.async_api import BrowserContext -from playwright.async_api import async_playwright -from playwright.async_api import BrowserType +from playwright.async_api import (BrowserContext, BrowserType, Page, + async_playwright) import config -from tools import utils -from .exception import * -from .login import XHSLogin -from .client import XHSClient -from models import xiaohongshu as xhs_model from base.base_crawler import AbstractCrawler from base.proxy_account_pool import AccountPool +from models import xiaohongshu as xhs_model +from tools import utils + +from .client import XHSClient +from .exception import DataFetchError +from .login import XHSLogin class XiaoHongShuCrawler(AbstractCrawler): + platform: str + login_type: str context_page: Page - browser_context: BrowserContext xhs_client: XHSClient account_pool: AccountPool + browser_context: BrowserContext - def __init__(self): + def __init__(self) -> None: self.index_url = "https://www.xiaohongshu.com" - self.command_args: Optional[Namespace] = None # type: ignore self.user_agent = utils.get_user_agent() - def init_config(self, **kwargs): - for key, value in kwargs.items(): - setattr(self, key, value) + def init_config(self, platform: str, login_type: str, account_pool: AccountPool) -> None: + self.platform = platform + self.login_type = login_type + self.account_pool = account_pool - async def start(self): + async def start(self) -> None: account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info() async with async_playwright() as playwright: # Launch a browser context. @@ -62,7 +62,7 @@ class XiaoHongShuCrawler(AbstractCrawler): self.xhs_client = await self.create_xhs_client(httpx_proxy) if not await self.xhs_client.ping(): login_obj = XHSLogin( - login_type=self.command_args.lt, + login_type=self.login_type, login_phone=account_phone, browser_context=self.browser_context, context_page=self.context_page, @@ -72,28 +72,27 @@ class XiaoHongShuCrawler(AbstractCrawler): await self.xhs_client.update_cookies(browser_context=self.browser_context) # Search for notes and retrieve their comment information. - await self.search_posts() + await self.search() utils.logger.info("Xhs Crawler finished ...") - async def search_posts(self) -> None: + async def search(self) -> None: """Search for notes and retrieve their comment information.""" utils.logger.info("Begin search xiaohongshu keywords") - + xhs_limit_count = 20 # xhs limit page fixed value for keyword in config.KEYWORDS.split(","): utils.logger.info(f"Current keyword: {keyword}") - max_note_len = config.MAX_PAGE_NUM page = 1 - while max_note_len > 0: + while page * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: note_id_list: List[str] = [] - posts_res = await self.xhs_client.get_note_by_keyword( + notes_res = await self.xhs_client.get_note_by_keyword( keyword=keyword, page=page, ) - _semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list = [ - self.get_note_detail(post_item.get("id"), _semaphore) - for post_item in posts_res.get("items", {}) + self.get_note_detail(post_item.get("id"), semaphore) + for post_item in notes_res.get("items", {}) ] note_details = await asyncio.gather(*task_list) for note_detail in note_details: @@ -101,11 +100,10 @@ class XiaoHongShuCrawler(AbstractCrawler): await xhs_model.update_xhs_note(note_detail) note_id_list.append(note_detail.get("note_id")) page += 1 - max_note_len -= 20 utils.logger.info(f"Note details: {note_details}") await self.batch_get_note_comments(note_id_list) - async def get_note_detail(self, note_id: str, semaphore: "asyncio.Semaphore") -> Optional[Dict]: + async def get_note_detail(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: """Get note detail""" async with semaphore: try: @@ -117,14 +115,14 @@ class XiaoHongShuCrawler(AbstractCrawler): async def batch_get_note_comments(self, note_list: List[str]): """Batch get note comments""" utils.logger.info(f"Begin batch get note comments, note list: {note_list}") - _semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list: List[Task] = [] for note_id in note_list: - task = asyncio.create_task(self.get_comments(note_id, _semaphore), name=note_id) + task = asyncio.create_task(self.get_comments(note_id, semaphore), name=note_id) task_list.append(task) await asyncio.gather(*task_list) - async def get_comments(self, note_id: str, semaphore: "asyncio.Semaphore"): + async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): """Get note comments""" async with semaphore: utils.logger.info(f"Begin get note id comments {note_id}") @@ -147,7 +145,7 @@ class XiaoHongShuCrawler(AbstractCrawler): httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}" return phone, playwright_proxy, httpx_proxy - async def create_xhs_client(self, httpx_proxy: str) -> XHSClient: + async def create_xhs_client(self, httpx_proxy: Optional[str]) -> XHSClient: """Create xhs client""" utils.logger.info("Begin create xiaohongshu API client ...") cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) @@ -177,18 +175,19 @@ class XiaoHongShuCrawler(AbstractCrawler): if config.SAVE_LOGIN_STATE: # feat issue #14 # we will save login state to avoid login every time - user_data_dir = os.path.join(os.getcwd(), "browser_data", config.USER_DATA_DIR % self.command_args.platform) # type: ignore + user_data_dir = os.path.join(os.getcwd(), "browser_data", + config.USER_DATA_DIR % self.platform) # type: ignore browser_context = await chromium.launch_persistent_context( user_data_dir=user_data_dir, accept_downloads=True, headless=headless, - proxy=playwright_proxy, # type: ignore + proxy=playwright_proxy, # type: ignore viewport={"width": 1920, "height": 1080}, user_agent=user_agent ) return browser_context else: - browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore + browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser_context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent=user_agent diff --git a/media_platform/xhs/login.py b/media_platform/xhs/login.py index 1673eb9..f410ac2 100644 --- a/media_platform/xhs/login.py +++ b/media_platform/xhs/login.py @@ -1,21 +1,16 @@ -import sys import asyncio import functools +import sys +from typing import Optional import aioredis -from tenacity import ( - retry, - stop_after_attempt, - wait_fixed, - retry_if_result, - RetryError -) -from playwright.async_api import Page -from playwright.async_api import BrowserContext +from playwright.async_api import BrowserContext, Page +from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, + wait_fixed) import config -from tools import utils from base.base_crawler import AbstractLogin +from tools import utils class XHSLogin(AbstractLogin): @@ -24,7 +19,7 @@ class XHSLogin(AbstractLogin): login_type: str, browser_context: BrowserContext, context_page: Page, - login_phone: str = "", + login_phone: Optional[str] = "", cookie_str: str = "" ): self.login_type = login_type diff --git a/models/douyin.py b/models/douyin.py index bd24b57..9d797cc 100644 --- a/models/douyin.py +++ b/models/douyin.py @@ -1,8 +1,8 @@ import json from typing import Dict, List -from tortoise.models import Model from tortoise import fields +from tortoise.models import Model import config from tools import utils diff --git a/models/xiaohongshu.py b/models/xiaohongshu.py index c558084..40c750b 100644 --- a/models/xiaohongshu.py +++ b/models/xiaohongshu.py @@ -1,7 +1,7 @@ -from typing import List, Dict +from typing import Dict, List -from tortoise.models import Model from tortoise import fields +from tortoise.models import Model import config from tools import utils @@ -65,7 +65,7 @@ async def update_xhs_note(note_item: Dict): local_db_item = { "note_id": note_item.get("note_id"), "type": note_item.get("type"), - "title": note_item.get("title"), + "title": note_item.get("title") or note_item.get("desc", ""), "desc": note_item.get("desc", ""), "time": note_item.get("time"), "last_update_time": note_item.get("last_update_time", 0), diff --git a/recv_sms_notification.py b/recv_sms_notification.py index cb32c36..9718df0 100644 --- a/recv_sms_notification.py +++ b/recv_sms_notification.py @@ -1,7 +1,7 @@ # Start an HTTP server to receive SMS forwarding notifications and store them in Redis. -import re -import json import asyncio +import json +import re from typing import List import aioredis diff --git a/tools/easing.py b/tools/easing.py index 52fc6d6..f6f810a 100644 --- a/tools/easing.py +++ b/tools/easing.py @@ -2,10 +2,11 @@ # -*- coding: utf-8 -*- # copy from https://github.com/aneasystone/selenium-test/blob/master/12-slider-captcha.py # thanks to aneasystone for his great work -import numpy as np import math from typing import List, Tuple +import numpy as np + # https://github.com/gdsmith/jquery.easing/blob/master/jquery.easing.js def ease_in_quad(x): diff --git a/tools/utils.py b/tools/utils.py index 88ab7f1..7bddfc4 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -1,19 +1,18 @@ -import re -import os -import time -import random import base64 import logging +import os +import random +import re +import time from io import BytesIO +from typing import Dict, List, Optional, Tuple from urllib.parse import urlparse -from typing import Optional, Dict, List, Tuple import cv2 import httpx import numpy as np from PIL import Image, ImageDraw -from playwright.async_api import Cookie -from playwright.async_api import Page +from playwright.async_api import Cookie, Page async def find_login_qrcode(page: Page, selector: str) -> str: