diff --git a/base/base_crawler.py b/base/base_crawler.py index fffd812..656b924 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -55,12 +55,7 @@ class AbstractStore(ABC): # @abstractmethod async def store_creator(self, creator: Dict): pass - - # TODO support all platform - # only xhs is supported, so @abstractmethod is commented - # @abstractmethod - async def store_sub_comment(self, sub_comment_items: Dict): - pass + class AbstractStoreImage(ABC): #TODO: support all platform diff --git a/config/base_config.py b/config/base_config.py index 2296c54..7edee27 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -40,6 +40,7 @@ ENABLE_GET_IMAGES = False ENABLE_GET_COMMENTS = False # 是否开启爬二级评论模式, 默认不开启爬二级评论, 目前仅支持 xhs +# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 ENABLE_GET_SUB_COMMENTS = False # 指定小红书需要爬虫的笔记ID列表 diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index be91f8a..b682b11 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -7,6 +7,7 @@ from urllib.parse import urlencode import httpx from playwright.async_api import BrowserContext, Page +import config from base.base_crawler import AbstactApiClient from tools import utils @@ -274,39 +275,53 @@ class XiaoHongShuClient(AbstactApiClient): await callback(note_id, comments) await asyncio.sleep(crawl_interval) result.extend(comments) + sub_comments = await self.get_comments_all_sub_comments(comments, crawl_interval, callback) + result.extend(sub_comments) return result - async def get_comment_all_sub_comments(self, note_id: str, root_comment_id: str, sub_comment_cursor: str, - sub_comments: List[Dict], crawl_interval: float = 1.0, + async def get_comments_all_sub_comments(self, comments: List[Dict], crawl_interval: float = 1.0, callback: Optional[Callable] = None) -> List[Dict]: """ - 获取指定笔记下指定一级评论下的所有二级评论, 该方法会一直查找一个一级评论下的所有二级评论信息 + 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息 Args: - note_id: 笔记ID - root_comment_id: 一级评论ID - sub_comment_cursor: 二级评论的初始分页游标 - sub_comments: 爬取一级评论默认携带的二级评论列表 + comments: 评论列表 crawl_interval: 爬取一次评论的延迟单位(秒) callback: 一次评论爬取结束后 Returns: """ + if not config.ENABLE_GET_SUB_COMMENTS: + utils.logger.info(f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled") + return [] + result = [] - sub_comment_has_more = True - while sub_comment_has_more: - comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor) - sub_comment_has_more = comments_res.get("has_more", False) - sub_comment_cursor = comments_res.get("cursor", "") - if "comments" not in comments_res: - utils.logger.info( - f"[XiaoHongShuClient.get_comment_all_sub_comments] No 'comments' key found in response: {comments_res}") - break - comments = comments_res["comments"] - if callback: + for comment in comments: + note_id = comment.get("note_id") + sub_comments = comment.get("sub_comments") + if sub_comments and callback: await callback(note_id, sub_comments) - await asyncio.sleep(crawl_interval) - result.extend(comments) + + sub_comment_has_more = comment.get("sub_comment_has_more") + if not sub_comment_has_more: + continue + + root_comment_id = comment.get("id") + sub_comment_cursor = comment.get("sub_comment_cursor") + + while sub_comment_has_more: + comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor) + sub_comment_has_more = comments_res.get("has_more", False) + sub_comment_cursor = comments_res.get("cursor", "") + if "comments" not in comments_res: + utils.logger.info( + f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}") + break + comments = comments_res["comments"] + if callback: + await callback(note_id, comments) + await asyncio.sleep(crawl_interval) + result.extend(comments) return result async def get_creator_info(self, user_id: str) -> Dict: diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 5597438..6fc2660 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -123,9 +123,7 @@ class XiaoHongShuCrawler(AbstractCrawler): note_id_list.append(note_detail.get("note_id")) page += 1 utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}") - note_comments = await self.batch_get_note_comments(note_id_list) - await self.batch_get_sub_comments(note_comments) - + await self.batch_get_note_comments(note_id_list) async def get_creators_and_notes(self) -> None: """Get creator's notes and retrieve their comment information.""" @@ -185,11 +183,11 @@ class XiaoHongShuCrawler(AbstractCrawler): f"[XiaoHongShuCrawler.get_note_detail] have not fund note detail note_id:{note_id}, err: {ex}") return None - async def batch_get_note_comments(self, note_list: List[str]) -> List[Dict]: + async def batch_get_note_comments(self, note_list: List[str]): """Batch get note comments""" if not config.ENABLE_GET_COMMENTS: utils.logger.info(f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled") - return None + return utils.logger.info( f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}") @@ -198,59 +196,17 @@ class XiaoHongShuCrawler(AbstractCrawler): for note_id in note_list: task = asyncio.create_task(self.get_comments(note_id, semaphore), name=note_id) task_list.append(task) - note_comments = await asyncio.gather(*task_list) - # 这里是每个note_id的评论列表,可以将其合并成一个列表, 方便后续操作 - note_comments = [comment for comments in note_comments for comment in comments] - return note_comments + await asyncio.gather(*task_list) - async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore) -> List[Dict]: + async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): """Get note comments with keyword filtering and quantity limitation""" async with semaphore: utils.logger.info(f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}") - return await self.xhs_client.get_note_all_comments( + await self.xhs_client.get_note_all_comments( note_id=note_id, crawl_interval=random.random(), callback=xhs_store.batch_update_xhs_note_comments ) - - async def batch_get_sub_comments(self, note_comments: List[Dict]): - """Batch get note sub_comments""" - if not config.ENABLE_GET_SUB_COMMENTS: - utils.logger.info(f"[XiaoHongShuCrawler.batch_get_sub_comments] Crawling sub_comment mode is not enabled") - return - - semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) - task_list: List[Task] = [] - for note_comment in note_comments: - note_id = note_comment.get("note_id") - sub_comments = note_comment.get("sub_comments") - if sub_comments: - await xhs_store.batch_update_xhs_note_sub_comments(note_id, sub_comments) - - sub_comment_has_more = note_comment.get("sub_comment_has_more") - if not sub_comment_has_more: - continue - - root_comment_id = note_comment.get("id") - sub_comment_cursor = note_comment.get("sub_comment_cursor") - task = asyncio.create_task(self.get_sub_comments(note_id, root_comment_id, - sub_comment_cursor, sub_comments, semaphore), name=note_id) - task_list.append(task) - await asyncio.gather(*task_list) - - async def get_sub_comments(self, note_id: str, root_comment_id: str, sub_comment_cursor: str, - sub_comments: List[Dict], semaphore: asyncio.Semaphore) -> List[Dict]: - """Get note sub_comments with keyword filtering and quantity limitation""" - async with semaphore: - utils.logger.info(f"[XiaoHongShuCrawler.get_sub_comments] Begin get note id sub_comments {note_id}") - await self.xhs_client.get_comment_all_sub_comments( - note_id=note_id, - root_comment_id=root_comment_id, - sub_comment_cursor=sub_comment_cursor, - sub_comments=sub_comments, - crawl_interval=random.random(), - callback=xhs_store.batch_update_xhs_note_sub_comments - ) @staticmethod def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: @@ -317,4 +273,4 @@ class XiaoHongShuCrawler(AbstractCrawler): async def close(self): """Close browser context""" await self.browser_context.close() - utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...") + utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...") \ No newline at end of file diff --git a/schema/tables.sql b/schema/tables.sql index 21e1024..007a55e 100644 --- a/schema/tables.sql +++ b/schema/tables.sql @@ -282,26 +282,10 @@ CREATE TABLE `xhs_note_comment` ( ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='小红书笔记评论'; -- ---------------------------- --- Table structure for xhs_note_sub_comment +-- alter table xhs_note_comment to support parent_comment_id -- ---------------------------- -DROP TABLE IF EXISTS `xhs_note_sub_comment`; -CREATE TABLE `xhs_note_sub_comment` ( - `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', - `user_id` varchar(64) NOT NULL COMMENT '用户ID', - `nickname` varchar(64) DEFAULT NULL COMMENT '用户昵称', - `avatar` varchar(255) DEFAULT NULL COMMENT '用户头像地址', - `ip_location` varchar(255) DEFAULT NULL COMMENT '评论时的IP地址', - `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', - `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', - `comment_id` varchar(64) NOT NULL COMMENT '评论ID', - `target_comment_id` varchar(64) NOT NULL COMMENT '被评论的评论ID', - `create_time` bigint NOT NULL COMMENT '评论时间戳', - `note_id` varchar(64) NOT NULL COMMENT '笔记ID', - `content` longtext NOT NULL COMMENT '评论内容', - `pictures` varchar(512) DEFAULT NULL, - PRIMARY KEY (`id`), - KEY `idx_xhs_note_co_comment_8e8349` (`comment_id`), - KEY `idx_xhs_note_co_create__204f8d` (`create_time`) -) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='小红书笔记二级评论'; +ALTER TABLE `xhs_note_comment` +ADD COLUMN `parent_comment_id` VARCHAR(64) DEFAULT NULL COMMENT '父评论ID'; + SET FOREIGN_KEY_CHECKS = 1; diff --git a/store/xhs/__init__.py b/store/xhs/__init__.py index 2deee07..ab13482 100644 --- a/store/xhs/__init__.py +++ b/store/xhs/__init__.py @@ -74,6 +74,7 @@ async def update_xhs_note_comment(note_id: str, comment_item: Dict): user_info = comment_item.get("user_info", {}) comment_id = comment_item.get("id") comment_pictures = [item.get("url_default", "") for item in comment_item.get("pictures", [])] + target_comment = comment_item.get("target_comment", {}) local_db_item = { "comment_id": comment_id, "create_time": comment_item.get("create_time"), @@ -83,40 +84,13 @@ async def update_xhs_note_comment(note_id: str, comment_item: Dict): "user_id": user_info.get("user_id"), "nickname": user_info.get("nickname"), "avatar": user_info.get("image"), - "sub_comment_count": comment_item.get("sub_comment_count"), + "sub_comment_count": comment_item.get("sub_comment_count", 0), "pictures": ",".join(comment_pictures), + "parent_comment_id": target_comment.get("id", 0), "last_modify_ts": utils.get_current_timestamp(), } utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}") await XhsStoreFactory.create_store().store_comment(local_db_item) - -async def batch_update_xhs_note_sub_comments(note_id: str, comments: List[Dict]): - if not comments: - return - for comment_item in comments: - await update_xhs_note_sub_comment(note_id, comment_item) - - -async def update_xhs_note_sub_comment(note_id: str, comment_item: Dict): - user_info = comment_item.get("user_info", {}) - target_comment = comment_item.get("target_comment") - comment_id = comment_item.get("id") - comment_pictures = [item.get("url_default", "") for item in comment_item.get("pictures", [])] - local_db_item = { - "comment_id": comment_id, - "target_comment_id": target_comment.get('id'), - "create_time": comment_item.get("create_time"), - "ip_location": comment_item.get("ip_location"), - "note_id": note_id, - "content": comment_item.get("content"), - "user_id": user_info.get("user_id"), - "nickname": user_info.get("nickname"), - "avatar": user_info.get("image"), - "pictures": ",".join(comment_pictures), - "last_modify_ts": utils.get_current_timestamp(), - } - utils.logger.info(f"[store.xhs.update_xhs_note_sub_comment] xhs note comment:{local_db_item}") - await XhsStoreFactory.create_store().store_sub_comment(local_db_item) async def save_creator(user_id: str, creator: Dict): diff --git a/store/xhs/xhs_store_impl.py b/store/xhs/xhs_store_impl.py index 1fa75b5..026d527 100644 --- a/store/xhs/xhs_store_impl.py +++ b/store/xhs/xhs_store_impl.py @@ -81,17 +81,6 @@ class XhsCsvStoreImplement(AbstractStore): """ await self.save_data_to_csv(save_item=creator, store_type="creator") - - async def store_sub_comment(self, sub_comment_items: Dict): - """ - Xiaohongshu sub_comments CSV storage implementation - Args: - sub_comment_items: sub_comments item dict - - Returns: - - """ - await self.save_data_to_csv(save_item=sub_comment_items, store_type="sub_comments") class XhsDbStoreImplement(AbstractStore): @@ -154,26 +143,6 @@ class XhsDbStoreImplement(AbstractStore): else: await update_creator_by_user_id(user_id, creator) - async def store_sub_comment(self, sub_comment_items: Dict): - """ - Xiaohongshu content DB storage implementation - Args: - sub_comment_items: comment item dict - - Returns: - - """ - from .xhs_store_sql import (add_new_sub_comment, - query_sub_comment_by_comment_id, - update_sub_comment_by_comment_id) - comment_id = sub_comment_items.get("comment_id") - comment_detail: Dict = await query_sub_comment_by_comment_id(comment_id=comment_id) - if not comment_detail: - sub_comment_items["add_ts"] = utils.get_current_timestamp() - await add_new_sub_comment(sub_comment_items) - else: - await update_sub_comment_by_comment_id(comment_id, comment_item=sub_comment_items) - class XhsJsonStoreImplement(AbstractStore): json_store_path: str = "data/xhs" @@ -245,14 +214,4 @@ class XhsJsonStoreImplement(AbstractStore): """ await self.save_data_to_json(creator, "creator") - - async def store_sub_comment(self, sub_comment_items: Dict): - """ - sub_comment JSON storage implementatio - Args: - sub_comment_items: - - Returns: - - """ - await self.save_data_to_json(sub_comment_items, "sub_comments") + \ No newline at end of file diff --git a/store/xhs/xhs_store_sql.py b/store/xhs/xhs_store_sql.py index 499f33d..28c0a2c 100644 --- a/store/xhs/xhs_store_sql.py +++ b/store/xhs/xhs_store_sql.py @@ -146,49 +146,3 @@ async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int: async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() effect_row: int = await async_db_conn.update_table("xhs_creator", creator_item, "user_id", user_id) return effect_row - - -async def query_sub_comment_by_comment_id(comment_id: str) -> Dict: - """ - 查询一条二级评论内容 - Args: - comment_id: - - Returns: - - """ - async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() - sql: str = f"select * from xhs_note_sub_comment where comment_id = '{comment_id}'" - rows: List[Dict] = await async_db_conn.query(sql) - if len(rows) > 0: - return rows[0] - return dict() - - -async def add_new_sub_comment(comment_item: Dict) -> int: - """ - 新增一条二级评论记录 - Args: - comment_item: - - Returns: - - """ - async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() - last_row_id: int = await async_db_conn.item_to_table("xhs_note_sub_comment", comment_item) - return last_row_id - - -async def update_sub_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: - """ - 更新增一条二级评论记录 - Args: - comment_id: - comment_item: - - Returns: - - """ - async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() - effect_row: int = await async_db_conn.update_table("xhs_note_sub_comment", comment_item, "comment_id", comment_id) - return effect_row \ No newline at end of file