From ab7d8142af0bc1668f85cc8f4db12707df8d3613 Mon Sep 17 00:00:00 2001 From: Relakkes Date: Sat, 24 Aug 2024 05:52:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20weibo=E6=94=AF=E6=8C=81=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E5=88=9B=E4=BD=9C=E8=80=85=E4=B8=BB=E9=A1=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- config/base_config.py | 6 ++ media_platform/weibo/client.py | 139 ++++++++++++++++++++++++++++++-- media_platform/weibo/core.py | 38 +++++++++ media_platform/xhs/core.py | 3 - schema/tables.sql | 19 +++++ store/weibo/__init__.py | 87 +++++++++++++++++++- store/weibo/weibo_store_impl.py | 44 ++++++++-- store/weibo/weibo_store_sql.py | 46 +++++++++++ 9 files changed, 368 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 20c93b1..bde5082 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ | 抖音 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | 快手 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | B 站 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -| 微博 | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | +| 微博 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | 贴吧 | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | diff --git a/config/base_config.py b/config/base_config.py index 690dbab..6456727 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -85,6 +85,12 @@ WEIBO_SPECIFIED_ID_LIST = [ # ........................ ] +# 指定weibo创作者ID列表 +WEIBO_CREATOR_ID_LIST = [ + "5533390220", + # ........................ +] + # 指定贴吧需要爬取的帖子列表 TIEBA_SPECIFIED_ID_LIST = [ diff --git a/media_platform/weibo/client.py b/media_platform/weibo/client.py index 5e77394..ec0e049 100644 --- a/media_platform/weibo/client.py +++ b/media_platform/weibo/client.py @@ -7,10 +7,11 @@ import asyncio import copy import json import re -from typing import Any, Callable, Dict, List, Optional -from urllib.parse import urlencode +from typing import Callable, Dict, List, Optional, Union +from urllib.parse import parse_qs, unquote, urlencode import httpx +from httpx import Response from playwright.async_api import BrowserContext, Page import config @@ -38,20 +39,26 @@ class WeiboClient: self.cookie_dict = cookie_dict self._image_agent_host = "https://i1.wp.com/" - async def request(self, method, url, **kwargs) -> Any: + async def request(self, method, url, **kwargs) -> Union[Response, Dict]: + enable_return_response = kwargs.pop("return_response", False) async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( method, url, timeout=self.timeout, **kwargs ) + + if enable_return_response: + return response + data: Dict = response.json() - if data.get("ok") != 1: + ok_code = data.get("ok") + if ok_code not in [0, 1]: utils.logger.error(f"[WeiboClient.request] request {method}:{url} err, res:{data}") raise DataFetchError(data.get("msg", "unkonw error")) else: return data.get("data", {}) - async def get(self, uri: str, params=None, headers=None) -> Dict: + async def get(self, uri: str, params=None, headers=None, **kwargs) -> Union[Response, Dict]: final_uri = uri if isinstance(params, dict): final_uri = (f"{uri}?" @@ -59,7 +66,7 @@ class WeiboClient: if headers is None: headers = self.headers - return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers, **kwargs) async def post(self, uri: str, data: dict) -> Dict: json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) @@ -229,3 +236,123 @@ class WeiboClient: return None else: return response.content + + + + async def get_creator_container_info(self, creator_id: str) -> Dict: + """ + 获取用户的容器ID, 容器信息代表着真实请求的API路径 + fid_container_id:用户的微博详情API的容器ID + lfid_container_id:用户的微博列表API的容器ID + Args: + creator_id: + + Returns: { + + """ + response = await self.get(f"/u/{creator_id}", return_response=True) + m_weibocn_params = response.cookies.get("M_WEIBOCN_PARAMS") + if not m_weibocn_params: + raise DataFetchError("get containerid failed") + m_weibocn_params_dict = parse_qs(unquote(m_weibocn_params)) + return { + "fid_container_id": m_weibocn_params_dict.get("fid", [""])[0], + "lfid_container_id": m_weibocn_params_dict.get("lfid", [""])[0] + } + + async def get_creator_info_by_id(self, creator_id: str) -> Dict: + """ + 根据用户ID获取用户详情 + Args: + creator_id: + + Returns: + + """ + uri = "/api/container/getIndex" + container_info = await self.get_creator_container_info(creator_id) + if container_info.get("fid_container_id") == "" or container_info.get("lfid_container_id") == "": + utils.logger.error(f"[WeiboClient.get_creator_info_by_id] get containerid failed") + raise DataFetchError("get containerid failed") + params = { + "jumpfrom": "weibocom", + "type": "uid", + "value": creator_id, + "containerid": container_info["fid_container_id"], + } + + user_res = await self.get(uri, params) + + if user_res.get("tabsInfo"): + tabs: List[Dict] = user_res.get("tabsInfo", {}).get("tabs", []) + for tab in tabs: + if tab.get("tabKey") == "weibo": + container_info["lfid_container_id"] = tab.get("containerid") + break + + user_res.update(container_info) + return user_res + + async def get_notes_by_creator(self, creator: str, container_id: str, since_id: str = "0", ) -> Dict: + """ + 获取博主的笔记 + Args: + creator: 博主ID + container_id: 容器ID + since_id: 上一页最后一条笔记的ID + Returns: + + """ + + uri = "/api/container/getIndex" + params = { + "jumpfrom": "weibocom", + "type": "uid", + "value": creator, + "containerid": container_id, + "since_id": since_id, + } + return await self.get(uri, params) + + async def get_all_notes_by_creator_id(self, creator_id: str, container_id: str, crawl_interval: float = 1.0, + callback: Optional[Callable] = None) -> List[Dict]: + """ + 获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息 + Args: + creator_id: + container_id: + crawl_interval: + callback: + + Returns: + + """ + result = [] + notes_has_more = True + since_id = "" + crawler_total_count = 0 + while notes_has_more: + notes_res = await self.get_notes_by_creator(creator_id, container_id, since_id) + if not notes_res: + utils.logger.error( + f"[WeiboClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.") + break + + notes_has_more = notes_res.get("cardlistInfo", {}).get("total", 0) > crawler_total_count + since_id = notes_res.get("cardlistInfo", {}).get("since_id", "0") + notes_has_more += 10 + if "cards" not in notes_res: + utils.logger.info( + f"[WeiboClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}") + break + + notes = notes_res["cards"] + utils.logger.info( + f"[WeiboClient.get_all_notes_by_creator] got user_id:{creator_id} notes len : {len(notes)}") + notes = [note for note in notes if note.get("card_type") == 9] + if callback: + await callback(notes) + await asyncio.sleep(crawl_interval) + result.extend(notes) + return result + diff --git a/media_platform/weibo/core.py b/media_platform/weibo/core.py index 47c4fff..996f030 100644 --- a/media_platform/weibo/core.py +++ b/media_platform/weibo/core.py @@ -84,6 +84,9 @@ class WeiboCrawler(AbstractCrawler): elif config.CRAWLER_TYPE == "detail": # Get the information and comments of the specified post await self.get_specified_notes() + elif config.CRAWLER_TYPE == "creator": + # Get creator's information and their notes and comments + await self.get_creators_and_notes() else: pass utils.logger.info("[WeiboCrawler.start] Weibo Crawler finished ...") @@ -221,6 +224,41 @@ class WeiboCrawler(AbstractCrawler): extension_file_name = url.split(".")[-1] await weibo_store.update_weibo_note_image(pic["pid"], content, extension_file_name) + + async def get_creators_and_notes(self) -> None: + """ + Get creator's information and their notes and comments + Returns: + + """ + utils.logger.info("[WeiboCrawler.get_creators_and_notes] Begin get weibo creators") + for user_id in config.WEIBO_CREATOR_ID_LIST: + createor_info_res: Dict = await self.wb_client.get_creator_info_by_id(creator_id=user_id) + if createor_info_res: + createor_info: Dict = createor_info_res.get("userInfo", {}) + utils.logger.info(f"[WeiboCrawler.get_creators_and_notes] creator info: {createor_info}") + if not createor_info: + raise DataFetchError("Get creator info error") + await weibo_store.save_creator(user_id, user_info=createor_info) + + # Get all note information of the creator + all_notes_list = await self.wb_client.get_all_notes_by_creator_id( + creator_id=user_id, + container_id=createor_info_res.get("lfid_container_id"), + crawl_interval=0, + callback=weibo_store.batch_update_weibo_notes + ) + + note_ids = [note_item.get("mlog", {}).get("id") for note_item in all_notes_list if + note_item.get("mlog", {}).get("id")] + await self.batch_get_notes_comments(note_ids) + + else: + utils.logger.error( + f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_id:{user_id}") + + + async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient: """Create xhs client""" utils.logger.info("[WeiboCrawler.create_weibo_client] Begin create weibo API client ...") diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index f176240..18952f2 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -185,9 +185,6 @@ class XiaoHongShuCrawler(AbstractCrawler): async with semaphore: try: _note_detail: Dict = await self.xhs_client.get_note_by_id_from_html(note_id) - print("------------------------") - print(_note_detail) - print("------------------------") if not _note_detail: utils.logger.error( f"[XiaoHongShuCrawler.get_note_detail_from_html] Get note detail error, note_id: {note_id}") diff --git a/schema/tables.sql b/schema/tables.sql index da9e855..0dec6dd 100644 --- a/schema/tables.sql +++ b/schema/tables.sql @@ -406,3 +406,22 @@ alter table kuaishou_video add column `source_keyword` varchar(255) default '' c alter table weibo_note add column `source_keyword` varchar(255) default '' comment '搜索来源关键字'; alter table xhs_note add column `source_keyword` varchar(255) default '' comment '搜索来源关键字'; alter table tieba_note add column `source_keyword` varchar(255) default '' comment '搜索来源关键字'; + + +DROP TABLE IF EXISTS `weibo_creator`; +CREATE TABLE `weibo_creator` +( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `user_id` varchar(64) NOT NULL COMMENT '用户ID', + `nickname` varchar(64) DEFAULT NULL COMMENT '用户昵称', + `avatar` varchar(255) DEFAULT NULL COMMENT '用户头像地址', + `ip_location` varchar(255) DEFAULT NULL COMMENT '评论时的IP地址', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + `desc` longtext COMMENT '用户描述', + `gender` varchar(1) DEFAULT NULL COMMENT '性别', + `follows` varchar(16) DEFAULT NULL COMMENT '关注数', + `fans` varchar(16) DEFAULT NULL COMMENT '粉丝数', + `tag_list` longtext COMMENT '标签列表', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='微博博主'; diff --git a/store/weibo/__init__.py b/store/weibo/__init__.py index 2e86a5d..65e75dd 100644 --- a/store/weibo/__init__.py +++ b/store/weibo/__init__.py @@ -7,6 +7,7 @@ import re from typing import List from var import source_keyword_var + from .weibo_store_image import * from .weibo_store_impl import * @@ -27,7 +28,33 @@ class WeibostoreFactory: return store_class() +async def batch_update_weibo_notes(note_list: List[Dict]): + """ + Batch update weibo notes + Args: + note_list: + + Returns: + + """ + if not note_list: + return + for note_item in note_list: + await update_weibo_note(note_item) + + async def update_weibo_note(note_item: Dict): + """ + Update weibo note + Args: + note_item: + + Returns: + + """ + if not note_item: + return + mblog: Dict = note_item.get("mblog") user_info: Dict = mblog.get("user") note_id = mblog.get("id") @@ -61,6 +88,15 @@ async def update_weibo_note(note_item: Dict): async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]): + """ + Batch update weibo note comments + Args: + note_id: + comments: + + Returns: + + """ if not comments: return for comment_item in comments: @@ -68,6 +104,17 @@ async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]): async def update_weibo_note_comment(note_id: str, comment_item: Dict): + """ + Update weibo note comment + Args: + note_id: weibo note id + comment_item: weibo comment item + + Returns: + + """ + if not comment_item or not note_id: + return comment_id = str(comment_item.get("id")) user_info: Dict = comment_item.get("user") content_text = comment_item.get("text") @@ -95,5 +142,43 @@ async def update_weibo_note_comment(note_id: str, comment_item: Dict): f"[store.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {save_comment_item.get('content', '')[:24]} ...") await WeibostoreFactory.create_store().store_comment(comment_item=save_comment_item) + async def update_weibo_note_image(picid: str, pic_content, extension_file_name): - await WeiboStoreImage().store_image({"pic_id": picid, "pic_content": pic_content, "extension_file_name": extension_file_name}) \ No newline at end of file + """ + Save weibo note image to local + Args: + picid: + pic_content: + extension_file_name: + + Returns: + + """ + await WeiboStoreImage().store_image( + {"pic_id": picid, "pic_content": pic_content, "extension_file_name": extension_file_name}) + + +async def save_creator(user_id: str, user_info: Dict): + """ + Save creator information to local + Args: + user_id: + user_info: + + Returns: + + """ + local_db_item = { + 'user_id': user_id, + 'nickname': user_info.get('screen_name'), + 'gender': '女' if user_info.get('gender') == "f" else '男', + 'avatar': user_info.get('avatar_hd'), + 'desc': user_info.get('description'), + 'ip_location': user_info.get("source", "").replace("来自", ""), + 'follows': user_info.get('follow_count', ''), + 'fans': user_info.get('followers_count', ''), + 'tag_list': '', + "last_modify_ts": utils.get_current_timestamp(), + } + utils.logger.info(f"[store.weibo.save_creator] creator:{local_db_item}") + await WeibostoreFactory.create_store().store_creator(local_db_item) diff --git a/store/weibo/weibo_store_impl.py b/store/weibo/weibo_store_impl.py index 188c016..08cc24e 100644 --- a/store/weibo/weibo_store_impl.py +++ b/store/weibo/weibo_store_impl.py @@ -33,9 +33,6 @@ def calculate_number_of_files(file_store_path: str) -> int: class WeiboCsvStoreImplement(AbstractStore): - async def store_creator(self, creator: Dict): - pass - csv_store_path: str = "data/weibo" file_count: int = calculate_number_of_files(csv_store_path) @@ -91,6 +88,17 @@ class WeiboCsvStoreImplement(AbstractStore): """ await self.save_data_to_csv(save_item=comment_item, store_type="comments") + async def store_creator(self, creator: Dict): + """ + Weibo creator CSV storage implementation + Args: + creator: + + Returns: + + """ + await self.save_data_to_csv(save_item=creator, store_type="creators") + class WeiboDbStoreImplement(AbstractStore): @@ -136,7 +144,25 @@ class WeiboDbStoreImplement(AbstractStore): await update_comment_by_comment_id(comment_id, comment_item=comment_item) async def store_creator(self, creator: Dict): - pass + """ + Weibo creator DB storage implementation + Args: + creator: + + Returns: + + """ + + from .weibo_store_sql import (add_new_creator, + query_creator_by_user_id, + update_creator_by_user_id) + user_id = creator.get("user_id") + user_detail: Dict = await query_creator_by_user_id(user_id) + if not user_detail: + creator["add_ts"] = utils.get_current_timestamp() + await add_new_creator(creator) + else: + await update_creator_by_user_id(user_id, creator) class WeiboJsonStoreImplement(AbstractStore): @@ -214,4 +240,12 @@ class WeiboJsonStoreImplement(AbstractStore): await self.save_data_to_json(comment_item, "comments") async def store_creator(self, creator: Dict): - pass + """ + creator JSON storage implementation + Args: + creator: + + Returns: + + """ + await self.save_data_to_json(creator, "creators") diff --git a/store/weibo/weibo_store_sql.py b/store/weibo/weibo_store_sql.py index d7f73e7..ad83f6a 100644 --- a/store/weibo/weibo_store_sql.py +++ b/store/weibo/weibo_store_sql.py @@ -100,3 +100,49 @@ async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> i async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() effect_row: int = await async_db_conn.update_table("weibo_note_comment", comment_item, "comment_id", comment_id) return effect_row + + +async def query_creator_by_user_id(user_id: str) -> Dict: + """ + 查询一条创作者记录 + Args: + user_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from weibo_creator where user_id = '{user_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_creator(creator_item: Dict) -> int: + """ + 新增一条创作者信息 + Args: + creator_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("weibo_creator", creator_item) + return last_row_id + + +async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int: + """ + 更新一条创作者信息 + Args: + user_id: + creator_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("weibo_creator", creator_item, "user_id", user_id) + return effect_row \ No newline at end of file