From 81a9946afdc80bab2be48126c162c27aead3d946 Mon Sep 17 00:00:00 2001 From: leantli Date: Thu, 11 Apr 2024 17:16:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E7=88=AC=E5=8F=96?= =?UTF-8?q?=E5=B0=8F=E7=BA=A2=E4=B9=A6=E4=BA=8C=E7=BA=A7=E8=AF=84=E8=AE=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- base/base_crawler.py | 6 ++++ config/base_config.py | 3 ++ media_platform/xhs/client.py | 35 +++++++++++++++++++++- media_platform/xhs/core.py | 56 ++++++++++++++++++++++++++++++++---- schema/tables.sql | 23 +++++++++++++++ store/xhs/__init__.py | 28 ++++++++++++++++++ store/xhs/xhs_store_impl.py | 42 +++++++++++++++++++++++++++ store/xhs/xhs_store_sql.py | 46 +++++++++++++++++++++++++++++ 8 files changed, 232 insertions(+), 7 deletions(-) diff --git a/base/base_crawler.py b/base/base_crawler.py index 2a5b69f..fffd812 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -55,6 +55,12 @@ class AbstractStore(ABC): # @abstractmethod async def store_creator(self, creator: Dict): pass + + # TODO support all platform + # only xhs is supported, so @abstractmethod is commented + # @abstractmethod + async def store_sub_comment(self, sub_comment_items: Dict): + pass class AbstractStoreImage(ABC): #TODO: support all platform diff --git a/config/base_config.py b/config/base_config.py index 26761f8..2296c54 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -39,6 +39,9 @@ ENABLE_GET_IMAGES = False # 是否开启爬评论模式, 默认不开启爬评论 ENABLE_GET_COMMENTS = False +# 是否开启爬二级评论模式, 默认不开启爬二级评论, 目前仅支持 xhs +ENABLE_GET_SUB_COMMENTS = False + # 指定小红书需要爬虫的笔记ID列表 XHS_SPECIFIED_ID_LIST = [ "6422c2750000000027000d88", diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index 6b8a393..be91f8a 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -225,7 +225,7 @@ class XiaoHongShuClient(AbstactApiClient): } return await self.get(uri, params) - async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 30, cursor: str = ""): + async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""): """ 获取指定父评论下的子评论的API Args: @@ -275,6 +275,39 @@ class XiaoHongShuClient(AbstactApiClient): await asyncio.sleep(crawl_interval) result.extend(comments) return result + + async def get_comment_all_sub_comments(self, note_id: str, root_comment_id: str, sub_comment_cursor: str, + sub_comments: List[Dict], crawl_interval: float = 1.0, + callback: Optional[Callable] = None) -> List[Dict]: + """ + 获取指定笔记下指定一级评论下的所有二级评论, 该方法会一直查找一个一级评论下的所有二级评论信息 + Args: + note_id: 笔记ID + root_comment_id: 一级评论ID + sub_comment_cursor: 二级评论的初始分页游标 + sub_comments: 爬取一级评论默认携带的二级评论列表 + crawl_interval: 爬取一次评论的延迟单位(秒) + callback: 一次评论爬取结束后 + + Returns: + + """ + result = [] + sub_comment_has_more = True + while sub_comment_has_more: + comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor) + sub_comment_has_more = comments_res.get("has_more", False) + sub_comment_cursor = comments_res.get("cursor", "") + if "comments" not in comments_res: + utils.logger.info( + f"[XiaoHongShuClient.get_comment_all_sub_comments] No 'comments' key found in response: {comments_res}") + break + comments = comments_res["comments"] + if callback: + await callback(note_id, sub_comments) + await asyncio.sleep(crawl_interval) + result.extend(comments) + return result async def get_creator_info(self, user_id: str) -> Dict: """ diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 5b67d6d..5597438 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -123,7 +123,9 @@ class XiaoHongShuCrawler(AbstractCrawler): note_id_list.append(note_detail.get("note_id")) page += 1 utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}") - await self.batch_get_note_comments(note_id_list) + note_comments = await self.batch_get_note_comments(note_id_list) + await self.batch_get_sub_comments(note_comments) + async def get_creators_and_notes(self) -> None: """Get creator's notes and retrieve their comment information.""" @@ -183,11 +185,11 @@ class XiaoHongShuCrawler(AbstractCrawler): f"[XiaoHongShuCrawler.get_note_detail] have not fund note detail note_id:{note_id}, err: {ex}") return None - async def batch_get_note_comments(self, note_list: List[str]): + async def batch_get_note_comments(self, note_list: List[str]) -> List[Dict]: """Batch get note comments""" if not config.ENABLE_GET_COMMENTS: utils.logger.info(f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled") - return + return None utils.logger.info( f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}") @@ -196,17 +198,59 @@ class XiaoHongShuCrawler(AbstractCrawler): for note_id in note_list: task = asyncio.create_task(self.get_comments(note_id, semaphore), name=note_id) task_list.append(task) - await asyncio.gather(*task_list) + note_comments = await asyncio.gather(*task_list) + # 这里是每个note_id的评论列表,可以将其合并成一个列表, 方便后续操作 + note_comments = [comment for comments in note_comments for comment in comments] + return note_comments - async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): + async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore) -> List[Dict]: """Get note comments with keyword filtering and quantity limitation""" async with semaphore: utils.logger.info(f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}") - await self.xhs_client.get_note_all_comments( + return await self.xhs_client.get_note_all_comments( note_id=note_id, crawl_interval=random.random(), callback=xhs_store.batch_update_xhs_note_comments ) + + async def batch_get_sub_comments(self, note_comments: List[Dict]): + """Batch get note sub_comments""" + if not config.ENABLE_GET_SUB_COMMENTS: + utils.logger.info(f"[XiaoHongShuCrawler.batch_get_sub_comments] Crawling sub_comment mode is not enabled") + return + + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + for note_comment in note_comments: + note_id = note_comment.get("note_id") + sub_comments = note_comment.get("sub_comments") + if sub_comments: + await xhs_store.batch_update_xhs_note_sub_comments(note_id, sub_comments) + + sub_comment_has_more = note_comment.get("sub_comment_has_more") + if not sub_comment_has_more: + continue + + root_comment_id = note_comment.get("id") + sub_comment_cursor = note_comment.get("sub_comment_cursor") + task = asyncio.create_task(self.get_sub_comments(note_id, root_comment_id, + sub_comment_cursor, sub_comments, semaphore), name=note_id) + task_list.append(task) + await asyncio.gather(*task_list) + + async def get_sub_comments(self, note_id: str, root_comment_id: str, sub_comment_cursor: str, + sub_comments: List[Dict], semaphore: asyncio.Semaphore) -> List[Dict]: + """Get note sub_comments with keyword filtering and quantity limitation""" + async with semaphore: + utils.logger.info(f"[XiaoHongShuCrawler.get_sub_comments] Begin get note id sub_comments {note_id}") + await self.xhs_client.get_comment_all_sub_comments( + note_id=note_id, + root_comment_id=root_comment_id, + sub_comment_cursor=sub_comment_cursor, + sub_comments=sub_comments, + crawl_interval=random.random(), + callback=xhs_store.batch_update_xhs_note_sub_comments + ) @staticmethod def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: diff --git a/schema/tables.sql b/schema/tables.sql index 5ba45e5..21e1024 100644 --- a/schema/tables.sql +++ b/schema/tables.sql @@ -281,4 +281,27 @@ CREATE TABLE `xhs_note_comment` ( KEY `idx_xhs_note_co_create__204f8d` (`create_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='小红书笔记评论'; +-- ---------------------------- +-- Table structure for xhs_note_sub_comment +-- ---------------------------- +DROP TABLE IF EXISTS `xhs_note_sub_comment`; +CREATE TABLE `xhs_note_sub_comment` ( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `user_id` varchar(64) NOT NULL COMMENT '用户ID', + `nickname` varchar(64) DEFAULT NULL COMMENT '用户昵称', + `avatar` varchar(255) DEFAULT NULL COMMENT '用户头像地址', + `ip_location` varchar(255) DEFAULT NULL COMMENT '评论时的IP地址', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + `comment_id` varchar(64) NOT NULL COMMENT '评论ID', + `target_comment_id` varchar(64) NOT NULL COMMENT '被评论的评论ID', + `create_time` bigint NOT NULL COMMENT '评论时间戳', + `note_id` varchar(64) NOT NULL COMMENT '笔记ID', + `content` longtext NOT NULL COMMENT '评论内容', + `pictures` varchar(512) DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `idx_xhs_note_co_comment_8e8349` (`comment_id`), + KEY `idx_xhs_note_co_create__204f8d` (`create_time`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='小红书笔记二级评论'; + SET FOREIGN_KEY_CHECKS = 1; diff --git a/store/xhs/__init__.py b/store/xhs/__init__.py index f10d0ef..2deee07 100644 --- a/store/xhs/__init__.py +++ b/store/xhs/__init__.py @@ -89,6 +89,34 @@ async def update_xhs_note_comment(note_id: str, comment_item: Dict): } utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}") await XhsStoreFactory.create_store().store_comment(local_db_item) + +async def batch_update_xhs_note_sub_comments(note_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_xhs_note_sub_comment(note_id, comment_item) + + +async def update_xhs_note_sub_comment(note_id: str, comment_item: Dict): + user_info = comment_item.get("user_info", {}) + target_comment = comment_item.get("target_comment") + comment_id = comment_item.get("id") + comment_pictures = [item.get("url_default", "") for item in comment_item.get("pictures", [])] + local_db_item = { + "comment_id": comment_id, + "target_comment_id": target_comment.get('id'), + "create_time": comment_item.get("create_time"), + "ip_location": comment_item.get("ip_location"), + "note_id": note_id, + "content": comment_item.get("content"), + "user_id": user_info.get("user_id"), + "nickname": user_info.get("nickname"), + "avatar": user_info.get("image"), + "pictures": ",".join(comment_pictures), + "last_modify_ts": utils.get_current_timestamp(), + } + utils.logger.info(f"[store.xhs.update_xhs_note_sub_comment] xhs note comment:{local_db_item}") + await XhsStoreFactory.create_store().store_sub_comment(local_db_item) async def save_creator(user_id: str, creator: Dict): diff --git a/store/xhs/xhs_store_impl.py b/store/xhs/xhs_store_impl.py index 26fc43e..1fa75b5 100644 --- a/store/xhs/xhs_store_impl.py +++ b/store/xhs/xhs_store_impl.py @@ -81,6 +81,17 @@ class XhsCsvStoreImplement(AbstractStore): """ await self.save_data_to_csv(save_item=creator, store_type="creator") + + async def store_sub_comment(self, sub_comment_items: Dict): + """ + Xiaohongshu sub_comments CSV storage implementation + Args: + sub_comment_items: sub_comments item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=sub_comment_items, store_type="sub_comments") class XhsDbStoreImplement(AbstractStore): @@ -142,6 +153,26 @@ class XhsDbStoreImplement(AbstractStore): await add_new_creator(creator) else: await update_creator_by_user_id(user_id, creator) + + async def store_sub_comment(self, sub_comment_items: Dict): + """ + Xiaohongshu content DB storage implementation + Args: + sub_comment_items: comment item dict + + Returns: + + """ + from .xhs_store_sql import (add_new_sub_comment, + query_sub_comment_by_comment_id, + update_sub_comment_by_comment_id) + comment_id = sub_comment_items.get("comment_id") + comment_detail: Dict = await query_sub_comment_by_comment_id(comment_id=comment_id) + if not comment_detail: + sub_comment_items["add_ts"] = utils.get_current_timestamp() + await add_new_sub_comment(sub_comment_items) + else: + await update_sub_comment_by_comment_id(comment_id, comment_item=sub_comment_items) class XhsJsonStoreImplement(AbstractStore): @@ -214,3 +245,14 @@ class XhsJsonStoreImplement(AbstractStore): """ await self.save_data_to_json(creator, "creator") + + async def store_sub_comment(self, sub_comment_items: Dict): + """ + sub_comment JSON storage implementatio + Args: + sub_comment_items: + + Returns: + + """ + await self.save_data_to_json(sub_comment_items, "sub_comments") diff --git a/store/xhs/xhs_store_sql.py b/store/xhs/xhs_store_sql.py index d6c4929..499f33d 100644 --- a/store/xhs/xhs_store_sql.py +++ b/store/xhs/xhs_store_sql.py @@ -145,4 +145,50 @@ async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int: """ async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() effect_row: int = await async_db_conn.update_table("xhs_creator", creator_item, "user_id", user_id) + return effect_row + + +async def query_sub_comment_by_comment_id(comment_id: str) -> Dict: + """ + 查询一条二级评论内容 + Args: + comment_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from xhs_note_sub_comment where comment_id = '{comment_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_sub_comment(comment_item: Dict) -> int: + """ + 新增一条二级评论记录 + Args: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("xhs_note_sub_comment", comment_item) + return last_row_id + + +async def update_sub_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: + """ + 更新增一条二级评论记录 + Args: + comment_id: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("xhs_note_sub_comment", comment_item, "comment_id", comment_id) return effect_row \ No newline at end of file