diff --git a/README.md b/README.md index 37127c9..e89aa9e 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,6 @@ 原理:利用[playwright](https://playwright.dev/)搭桥,保留登录成功后的上下文浏览器环境,通过执行JS表达式获取一些加密参数 通过使用此方式,免去了复现核心加密JS代码,逆向难度大大降低。 -## 项目特点 -- 类型注解,代码结构清晰,方便阅读 -- 项目实现有点复杂,但是使用起来比较稳定 -- 这种利用浏览器搭桥的方式可以应用到其他平台的爬虫,只需要修改一些参数即可 - ## 已实现 @@ -25,6 +20,7 @@ - [x] 并发执行爬虫请求 - [x] 抖音登录(二维码、手机号、cookies) - [x] 抖音滑块(模拟滑动实现,准确率不太OK) +- [x] 支持登录成功后的上下文浏览器环境保留 ## 待实现 diff --git a/base/base_crawler.py b/base/base_crawler.py index 5ea12fe..08bdab8 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -14,10 +14,6 @@ class AbstractCrawler(ABC): async def search_posts(self): pass - @abstractmethod - async def get_comments(self, item_id: int): - pass - class AbstractLogin(ABC): @abstractmethod diff --git a/config/base_config.py b/config/base_config.py index c03743d..adad851 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -25,3 +25,6 @@ USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name # max page num MAX_PAGE_NUM = 20 + +# max concurrency num +MAX_CONCURRENCY_NUM = 10 diff --git a/media_platform/douyin/core.py b/media_platform/douyin/core.py index 4463ca0..8921008 100644 --- a/media_platform/douyin/core.py +++ b/media_platform/douyin/core.py @@ -1,6 +1,5 @@ import os import asyncio -import logging from asyncio import Task from argparse import Namespace from typing import Optional, List, Dict, Tuple @@ -78,7 +77,7 @@ class DouYinCrawler(AbstractCrawler): try: posts_res = await self.dy_client.search_info_by_keyword(keyword=keyword, offset=page * 10) except DataFetchError: - logging.error(f"search douyin keyword: {keyword} failed") + utils.logger.error(f"search douyin keyword: {keyword} failed") break page += 1 max_note_len -= 10 @@ -108,7 +107,7 @@ class DouYinCrawler(AbstractCrawler): ) utils.logger.info(f"aweme_id: {aweme_id} comments have all been obtained completed ...") except DataFetchError as e: - logging.error(f"aweme_id: {aweme_id} get comments failed, error: {e}") + utils.logger.error(f"aweme_id: {aweme_id} get comments failed, error: {e}") def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: """Create proxy info for playwright and httpx""" diff --git a/media_platform/douyin/login.py b/media_platform/douyin/login.py index 7ee65f8..ae26d37 100644 --- a/media_platform/douyin/login.py +++ b/media_platform/douyin/login.py @@ -1,5 +1,4 @@ import sys -import logging import asyncio import functools @@ -89,7 +88,7 @@ class DouYinLogin(AbstractLogin): # check dialog box is auto popup and wait for 10 seconds await self.context_page.wait_for_selector(dialog_selector, timeout=1000 * 10) except Exception as e: - logging.error(f"login dialog box does not pop up automatically, error: {e}") + utils.logger.error(f"login dialog box does not pop up automatically, error: {e}") utils.logger.info("login dialog box does not pop up automatically, we will manually click the login button") login_button_ele = self.context_page.locator("xpath=//p[text() = '登录']") await login_button_ele.click() @@ -163,7 +162,7 @@ class DouYinLogin(AbstractLogin): slider_verify_success = False while not slider_verify_success: if max_slider_try_times <= 0: - logging.error("slider verify failed ...") + utils.logger.error("slider verify failed ...") sys.exit() try: await self.move_slider(back_selector, gap_selector, move_step, slider_level) @@ -182,7 +181,7 @@ class DouYinLogin(AbstractLogin): utils.logger.info("slider verify success ...") slider_verify_success = True except Exception as e: - logging.error(f"slider verify failed, error: {e}") + utils.logger.error(f"slider verify failed, error: {e}") await asyncio.sleep(1) max_slider_try_times -= 1 utils.logger.info(f"remaining slider try times: {max_slider_try_times}") diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index c3f6baa..37cc04d 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -1,5 +1,4 @@ import json -import logging import asyncio from typing import Optional, Dict @@ -100,21 +99,15 @@ class XHSClient: page: int = 1, page_size: int = 20, sort: SearchSortType = SearchSortType.GENERAL, note_type: SearchNoteType = SearchNoteType.ALL - ): + ) -> Dict: """search note by keyword :param keyword: what notes you want to search - :type keyword: str :param page: page number, defaults to 1 - :type page: int, optional :param page_size: page size, defaults to 20 - :type page_size: int, optional :param sort: sort ordering, defaults to SearchSortType.GENERAL - :type sort: SearchSortType, optional :param note_type: note type, defaults to SearchNoteType.ALL - :type note_type: SearchNoteType, optional :return: {has_more: true, items: []} - :rtype: dict """ uri = "/api/sns/web/v1/search/notes" data = { @@ -127,27 +120,21 @@ class XHSClient: } return await self.post(uri, data) - async def get_note_by_id(self, note_id: str): + async def get_note_by_id(self, note_id: str) -> Dict: """ :param note_id: note_id you want to fetch - :type note_id: str :return: {"time":1679019883000,"user":{"nickname":"nickname","avatar":"avatar","user_id":"user_id"},"image_list":[{"url":"https://sns-img-qc.xhscdn.com/c8e505ca-4e5f-44be-fe1c-ca0205a38bad","trace_id":"1000g00826s57r6cfu0005ossb1e9gk8c65d0c80","file_id":"c8e505ca-4e5f-44be-fe1c-ca0205a38bad","height":1920,"width":1440}],"tag_list":[{"id":"5be78cdfdb601f000100d0bc","name":"jk","type":"topic"}],"desc":"裙裙","interact_info":{"followed":false,"liked":false,"liked_count":"1732","collected":false,"collected_count":"453","comment_count":"30","share_count":"41"},"at_user_list":[],"last_update_time":1679019884000,"note_id":"6413cf6b00000000270115b5","type":"normal","title":"title"} - :rtype: dict """ data = {"source_note_id": note_id} uri = "/api/sns/web/v1/feed" res = await self.post(uri, data) return res["items"][0]["note_card"] - async def get_note_comments(self, note_id: str, cursor: str = ""): + async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict: """get note comments - :param note_id: note id you want to fetch - :type note_id: str :param cursor: last you get cursor, defaults to "" - :type cursor: str, optional :return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930} - :rtype: dict """ uri = "/api/sns/web/v2/comment/page" params = { @@ -156,21 +143,18 @@ class XHSClient: } return await self.get(uri, params) - async def get_note_sub_comments(self, note_id: str, - root_comment_id: str, - num: int = 30, cursor: str = ""): - """get note sub comments - + async def get_note_sub_comments( + self, note_id: str, + root_comment_id: str, + num: int = 30, cursor: str = "" + ): + """ + get note sub comments :param note_id: note id you want to fetch - :type note_id: str :param root_comment_id: parent comment id - :type root_comment_id: str :param num: recommend 30, if num greater 30, it only return 30 comments - :type num: int :param cursor: last you get cursor, defaults to "" - :type cursor: str optional :return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930} - :rtype: dict """ uri = "/api/sns/web/v2/comment/sub/page" params = { diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index f354aa1..f7d5a8d 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -1,7 +1,6 @@ import os import random import asyncio -import logging from asyncio import Task from typing import Optional, List, Dict, Tuple from argparse import Namespace @@ -73,51 +72,64 @@ class XiaoHongShuCrawler(AbstractCrawler): async def search_posts(self): """Search for notes and retrieve their comment information.""" utils.logger.info("Begin search xiaohongshu keywords") + for keyword in config.KEYWORDS.split(","): utils.logger.info(f"Current keyword: {keyword}") - note_list: List[str] = [] max_note_len = config.MAX_PAGE_NUM page = 1 while max_note_len > 0: + note_id_list: List[str] = [] posts_res = await self.xhs_client.get_note_by_keyword( keyword=keyword, page=page, ) + _semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [ + self.get_note_detail(post_item.get("id"), _semaphore) + for post_item in posts_res.get("items") + ] + note_details = await asyncio.gather(*task_list) + for note_detail in note_details: + if note_detail is not None: + await xhs_model.update_xhs_note(note_detail) + note_id_list.append(note_detail.get("note_id")) page += 1 - for post_item in posts_res.get("items"): - max_note_len -= 1 - note_id = post_item.get("id") - try: - note_detail = await self.xhs_client.get_note_by_id(note_id) - except DataFetchError as ex: - utils.logger.error(f"Get note detail error: {ex}") - continue - await xhs_model.update_xhs_note(note_detail) - await asyncio.sleep(0.05) - note_list.append(note_id) - utils.logger.info(f"keyword:{keyword}, note_list:{note_list}") - await self.batch_get_note_comments(note_list) + max_note_len -= 20 + utils.logger.info(f"Note details: {note_details}") + await self.batch_get_note_comments(note_id_list) + + async def get_note_detail(self, note_id: str, semaphore: "asyncio.Semaphore") -> Optional[Dict]: + """Get note detail""" + async with semaphore: + try: + return await self.xhs_client.get_note_by_id(note_id) + except DataFetchError as ex: + utils.logger.error(f"Get note detail error: {ex}") + return None async def batch_get_note_comments(self, note_list: List[str]): """Batch get note comments""" + utils.logger.info(f"Begin batch get note comments, note list: {note_list}") + _semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list: List[Task] = [] for note_id in note_list: - task = asyncio.create_task(self.get_comments(note_id), name=note_id) + task = asyncio.create_task(self.get_comments(note_id, _semaphore), name=note_id) task_list.append(task) - await asyncio.wait(task_list) + await asyncio.gather(*task_list) - async def get_comments(self, note_id: str): + async def get_comments(self, note_id: str, semaphore: "asyncio.Semaphore"): """Get note comments""" - utils.logger.info(f"Begin get note id comments {note_id}") - all_comments = await self.xhs_client.get_note_all_comments(note_id=note_id, crawl_interval=random.random()) - for comment in all_comments: - await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment) + async with semaphore: + utils.logger.info(f"Begin get note id comments {note_id}") + all_comments = await self.xhs_client.get_note_all_comments(note_id=note_id, crawl_interval=random.random()) + for comment in all_comments: + await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment) def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: """Create proxy info for playwright and httpx""" if not config.ENABLE_IP_PROXY: return None, None, None - + utils.logger.info("Begin proxy info for playwright and httpx ...") # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 phone, ip_proxy = self.account_pool.get_account() playwright_proxy = { @@ -130,6 +142,7 @@ class XiaoHongShuCrawler(AbstractCrawler): async def create_xhs_client(self, httpx_proxy: str) -> XHSClient: """Create xhs client""" + utils.logger.info("Begin create xiaohongshu API client ...") cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) xhs_client_obj = XHSClient( proxies=httpx_proxy, @@ -152,8 +165,8 @@ class XiaoHongShuCrawler(AbstractCrawler): user_agent: Optional[str], headless: bool = True ) -> BrowserContext: - utils.logger.info("Begin create browser context ...") """Launch browser and create browser context""" + utils.logger.info("Begin create browser context ...") if config.SAVE_LOGIN_STATE: # feat issue #14 # we will save login state to avoid login every time diff --git a/media_platform/xhs/login.py b/media_platform/xhs/login.py index 404a09d..6cdd691 100644 --- a/media_platform/xhs/login.py +++ b/media_platform/xhs/login.py @@ -1,6 +1,5 @@ import sys import asyncio -import logging import functools import aioredis