diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index c92cb27..c97f110 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -101,10 +101,12 @@ class BilibiliClient(AbstractApiClient): check_login_uri = "/x/web-interface/nav" response = await self.get(check_login_uri) if response.get("isLogin"): - utils.logger.info("[BilibiliClient.pong] Use cache login state get web interface successfull!") + utils.logger.info( + "[BilibiliClient.pong] Use cache login state get web interface successfull!") ping_flag = True except Exception as e: - utils.logger.error(f"[BilibiliClient.pong] Pong bilibili failed: {e}, and try to login again...") + utils.logger.error( + f"[BilibiliClient.pong] Pong bilibili failed: {e}, and try to login again...") ping_flag = False return ping_flag @@ -185,13 +187,21 @@ class BilibiliClient(AbstractApiClient): result = [] is_end = False - next_page =0 + next_page = 0 while not is_end: comments_res = await self.get_video_comments(video_id, CommentOrderType.DEFAULT, next_page) - curson_info: Dict = comments_res.get("cursor") + cursor_info: Dict = comments_res.get("cursor") comment_list: List[Dict] = comments_res.get("replies", []) - is_end = curson_info.get("is_end") - next_page = curson_info.get("next") + is_end = cursor_info.get("is_end") + next_page = cursor_info.get("next") + if is_fetch_sub_comments: + for comment in comment_list: + comment_id = comment['rpid'] + if (comment.get("rcount", 0) > 0): + { + await self.get_video_all_level_two_comments( + video_id, comment_id, CommentOrderType.DEFAULT, 10, crawl_interval, callback) + } if callback: # 如果有回调函数,就执行回调函数 await callback(video_id, comment_list) await asyncio.sleep(crawl_interval) @@ -200,3 +210,61 @@ class BilibiliClient(AbstractApiClient): continue # todo handle get sub comments return result + + async def get_video_all_level_two_comments(self, + video_id: str, + level_one_comment_id: int, + order_mode: CommentOrderType, + ps: int = 10, + crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + ) -> Dict: + """ + get video all level two comments for a level one comment + :param video_id: 视频 ID + :param level_one_comment_id: 一级评论 ID + :param order_mode: + :param ps: 一页评论数 + :param crawl_interval: + :param callback: + :return: + """ + + pn = 0 + while True: + result = await self.get_video_level_two_comments( + video_id, level_one_comment_id, 0, ps, order_mode) + comment_list: List[Dict] = result.get("replies", []) + if callback: # 如果有回调函数,就执行回调函数 + await callback(video_id, comment_list) + await asyncio.sleep(crawl_interval) + if (int(result["page"]["count"]) <= (pn+1) * ps): + break + + pn += 1 + + async def get_video_level_two_comments(self, + video_id: str, + level_one_comment_id: int, + pn: int, + ps: int, + order_mode: CommentOrderType, + ) -> Dict: + """get video level two comments + :param video_id: 视频 ID + :param level_one_comment_id: 一级评论 ID + :param order_mode: 排序方式 + + :return: + """ + uri = "/x/v2/reply/reply" + post_data = { + "oid": video_id, + "mode": order_mode.value, + "type": 1, + "ps": ps, + "pn": pn, + "root": level_one_comment_id, + } + result = await self.get(uri, post_data) + return result diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index 4a30e5f..4e038bb 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -49,7 +49,8 @@ class BilibiliCrawler(AbstractCrawler): if config.ENABLE_IP_PROXY: ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() - playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info) + playwright_proxy_format, httpx_proxy_format = self.format_proxy_info( + ip_proxy_info) async with async_playwright() as playwright: # Launch a browser context. @@ -87,27 +88,31 @@ class BilibiliCrawler(AbstractCrawler): await self.get_specified_videos() else: pass - utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...") + utils.logger.info( + "[BilibiliCrawler.start] Bilibili Crawler finished ...") async def search(self): """ search bilibili video with keywords :return: """ - utils.logger.info("[BilibiliCrawler.search] Begin search bilibli keywords") - bili_limit_count =20 # bilibili limit page fixed value + utils.logger.info( + "[BilibiliCrawler.search] Begin search bilibli keywords") + bili_limit_count = 20 # bilibili limit page fixed value if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count: config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count start_page = self.start_page # start page number for keyword in self.keyword.split(","): - utils.logger.info(f"[BilibiliCrawler.search] Current search keyword: {keyword}") + utils.logger.info( + f"[BilibiliCrawler.search] Current search keyword: {keyword}") page = 1 while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: if page < start_page: - utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") + utils.logger.info( + f"[BilibiliCrawler.search] Skip page: {page}") page += 1 continue - + video_id_list: List[str] = [] videos_res = await self.bili_client.search_video_by_keyword( keyword=keyword, @@ -119,7 +124,8 @@ class BilibiliCrawler(AbstractCrawler): semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list = [ - self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) + self.get_video_info_task(aid=video_item.get( + "aid"), bvid="", semaphore=semaphore) for video_item in video_list ] video_items = await asyncio.gather(*task_list) @@ -138,14 +144,17 @@ class BilibiliCrawler(AbstractCrawler): :return: """ if not config.ENABLE_GET_COMMENTS: - utils.logger.info(f"[BilibiliCrawler.batch_get_note_comments] Crawling comment mode is not enabled") + utils.logger.info( + f"[BilibiliCrawler.batch_get_note_comments] Crawling comment mode is not enabled") return - utils.logger.info(f"[BilibiliCrawler.batch_get_video_comments] video ids:{video_id_list}") + utils.logger.info( + f"[BilibiliCrawler.batch_get_video_comments] video ids:{video_id_list}") semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list: List[Task] = [] for video_id in video_id_list: - task = asyncio.create_task(self.get_comments(video_id, semaphore), name=video_id) + task = asyncio.create_task(self.get_comments( + video_id, semaphore), name=video_id) task_list.append(task) await asyncio.gather(*task_list) @@ -158,17 +167,21 @@ class BilibiliCrawler(AbstractCrawler): """ async with semaphore: try: - utils.logger.info(f"[BilibiliCrawler.get_comments] begin get video_id: {video_id} comments ...") + utils.logger.info( + f"[BilibiliCrawler.get_comments] begin get video_id: {video_id} comments ...") await self.bili_client.get_video_all_comments( video_id=video_id, crawl_interval=random.random(), + is_fetch_sub_comments=config.ENABLE_GET_SUB_COMMENTS, callback=bilibili_store.batch_update_bilibili_video_comments ) except DataFetchError as ex: - utils.logger.error(f"[BilibiliCrawler.get_comments] get video_id: {video_id} comment error: {ex}") + utils.logger.error( + f"[BilibiliCrawler.get_comments] get video_id: {video_id} comment error: {ex}") except Exception as e: - utils.logger.error(f"[BilibiliCrawler.get_comments] may be been blocked, err:{e}") + utils.logger.error( + f"[BilibiliCrawler.get_comments] may be been blocked, err:{e}") async def get_specified_videos(self): """ @@ -204,7 +217,8 @@ class BilibiliCrawler(AbstractCrawler): result = await self.bili_client.get_video_info(aid=aid, bvid=bvid) return result except DataFetchError as ex: - utils.logger.error(f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}") + utils.logger.error( + f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}") return None except KeyError as ex: utils.logger.error( @@ -213,7 +227,8 @@ class BilibiliCrawler(AbstractCrawler): async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient: """Create xhs client""" - utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ...") + utils.logger.info( + "[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ...") cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) bilibili_client_obj = BilibiliClient( proxies=httpx_proxy, @@ -250,7 +265,8 @@ class BilibiliCrawler(AbstractCrawler): headless: bool = True ) -> BrowserContext: """Launch browser and create browser context""" - utils.logger.info("[BilibiliCrawler.launch_browser] Begin create browser context ...") + utils.logger.info( + "[BilibiliCrawler.launch_browser] Begin create browser context ...") if config.SAVE_LOGIN_STATE: # feat issue #14 # we will save login state to avoid login every time @@ -266,7 +282,8 @@ class BilibiliCrawler(AbstractCrawler): ) return browser_context else: - browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore + # type: ignore + browser = await chromium.launch(headless=headless, proxy=playwright_proxy) browser_context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent=user_agent diff --git a/store/bilibili/__init__.py b/store/bilibili/__init__.py index ed6f5ac..a1fe0f4 100644 --- a/store/bilibili/__init__.py +++ b/store/bilibili/__init__.py @@ -62,10 +62,12 @@ async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dic async def update_bilibili_video_comment(video_id: str, comment_item: Dict): comment_id = str(comment_item.get("rpid")) + parent_comment_id = str(comment_item.get("parent", 0)) content: Dict = comment_item.get("content") user_info: Dict = comment_item.get("member") save_comment_item = { "comment_id": comment_id, + "parent_comment_id": parent_comment_id, "create_time": comment_item.get("ctime"), "video_id": str(video_id), "content": content.get("message"),