diff --git a/README.md b/README.md index 963d13c..933dc0e 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ | 小红书 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ | | 抖音 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | 快手 | ✅ | ✅ | ✕ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ | -| B 站 | ✅ | ✅ | ✕ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ | -| 微博 | ✅ | ✕ | ✕ | ✅ | ✕ | ✅ | ✅ | ✅ | ✕ | +| B 站 | ✅ | ✅ | ✕ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ | +| 微博 | ✅ | ✕ | ✕ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ | ## 使用方法 diff --git a/base/base_crawler.py b/base/base_crawler.py index 5e70163..92d27c7 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -1,4 +1,7 @@ from abc import ABC, abstractmethod +from typing import Dict, Optional + +from playwright.async_api import BrowserContext, BrowserType class AbstractCrawler(ABC): @@ -14,6 +17,11 @@ class AbstractCrawler(ABC): async def search(self): pass + @abstractmethod + async def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str], + headless: bool = True) -> BrowserContext: + pass + class AbstractLogin(ABC): @abstractmethod diff --git a/config/base_config.py b/config/base_config.py index b99a979..d037737 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -27,10 +27,10 @@ USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name CRAWLER_MAX_NOTES_COUNT = 20 # 并发爬虫数量控制 -MAX_CONCURRENCY_NUM = 10 +MAX_CONCURRENCY_NUM = 4 # 每个视频/帖子抓取评论最大条数 (为0则不限制) -MAX_COMMENTS_PER_POST = 10 +MAX_COMMENTS_PER_POST = 0 # 评论关键词筛选(只会留下包含关键词的评论,为空不限制) COMMENT_KEYWORDS = [ @@ -63,3 +63,9 @@ BILI_SPECIFIED_ID_LIST = [ "BV14Q4y1n7jz", # ........................ ] + +# 指定微博平台需要爬取的帖子列表 +WEIBO_SPECIFIED_ID_LIST = [ + "4982041758140155", + # ........................ +] \ No newline at end of file diff --git a/main.py b/main.py index 46ac6ea..a6b7e0e 100644 --- a/main.py +++ b/main.py @@ -8,8 +8,8 @@ from base.base_crawler import AbstractCrawler from media_platform.bilibili import BilibiliCrawler from media_platform.douyin import DouYinCrawler from media_platform.kuaishou import KuaishouCrawler -from media_platform.xhs import XiaoHongShuCrawler from media_platform.weibo import WeiboCrawler +from media_platform.xhs import XiaoHongShuCrawler class CrawlerFactory: diff --git a/media_platform/weibo/__init__.py b/media_platform/weibo/__init__.py index 82383d2..a033cbf 100644 --- a/media_platform/weibo/__init__.py +++ b/media_platform/weibo/__init__.py @@ -2,6 +2,6 @@ # @Author : relakkes@gmail.com # @Time : 2023/12/23 15:40 # @Desc : +from .client import WeiboClient from .core import WeiboCrawler from .login import WeiboLogin -from .client import WeiboClient \ No newline at end of file diff --git a/media_platform/weibo/client.py b/media_platform/weibo/client.py index 5507b24..a65f24f 100644 --- a/media_platform/weibo/client.py +++ b/media_platform/weibo/client.py @@ -4,7 +4,9 @@ # @Desc : 微博爬虫 API 请求 client import asyncio +import copy import json +import re from typing import Any, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import urlencode @@ -47,12 +49,15 @@ class WeiboClient: else: return data.get("data", {}) - async def get(self, uri: str, params=None) -> Dict: + async def get(self, uri: str, params=None, headers=None) -> Dict: final_uri = uri if isinstance(params, dict): final_uri = (f"{uri}?" f"{urlencode(params)}") - return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers) + + if headers is None: + headers = self.headers + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) async def post(self, uri: str, data: dict) -> Dict: json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) @@ -96,3 +101,78 @@ class WeiboClient: "page": page, } return await self.get(uri, params) + + async def get_note_comments(self, mid_id: str, max_id: int) -> Dict: + """get notes comments + :param mid_id: 微博ID + :param max_id: 分页参数ID + :return: + """ + uri = "/comments/hotflow" + params = { + "id": mid_id, + "mid": mid_id, + "max_id_type": 0, + } + if max_id > 0: + params.update({"max_id": max_id}) + + referer_url = f"https://m.weibo.cn/detail/{mid_id}" + headers = copy.copy(self.headers) + headers["Referer"] = referer_url + + return await self.get(uri, params, headers=headers) + + async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False, + callback: Optional[Callable] = None, ): + """ + get note all comments include sub comments + :param note_id: + :param crawl_interval: + :param is_fetch_sub_comments: + :param callback: + :return: + """ + + result = [] + is_end = False + max_id = -1 + while not is_end: + comments_res = await self.get_note_comments(note_id, max_id) + max_id: int = comments_res.get("max_id") + comment_list: List[Dict] = comments_res.get("data", []) + is_end = max_id == 0 + if callback: # 如果有回调函数,就执行回调函数 + await callback(note_id, comment_list) + await asyncio.sleep(crawl_interval) + if not is_fetch_sub_comments: + result.extend(comment_list) + continue + # todo handle get sub comments + return result + + async def get_note_info_by_id(self, note_id: str) -> Dict: + """ + 根据帖子ID获取详情 + :param note_id: + :return: + """ + url = f"{self._host}/detail/{note_id}" + async with httpx.AsyncClient(proxies=self.proxies) as client: + response = await client.request( + "GET", url, timeout=self.timeout, headers=self.headers + ) + if response.status_code != 200: + raise DataFetchError(f"get weibo detail err: {response.text}") + match = re.search(r'var \$render_data = (\[.*?\])\[0\]', response.text, re.DOTALL) + if match: + render_data_json = match.group(1) + render_data_dict = json.loads(render_data_json) + note_detail = render_data_dict[0].get("status") + note_item = { + "mblog": note_detail + } + return note_item + else: + utils.logger.info(f"[WeiboClient.get_note_info_by_id] 未找到$render_data的值") + return dict() diff --git a/media_platform/weibo/core.py b/media_platform/weibo/core.py index cfb3cef..01174f9 100644 --- a/media_platform/weibo/core.py +++ b/media_platform/weibo/core.py @@ -23,9 +23,9 @@ from var import comment_tasks_var, crawler_type_var from .client import WeiboClient from .exception import DataFetchError -from .login import WeiboLogin from .field import SearchType from .help import filter_search_result_card +from .login import WeiboLogin class WeiboCrawler(AbstractCrawler): @@ -38,7 +38,7 @@ class WeiboCrawler(AbstractCrawler): def __init__(self): self.index_url = "https://m.weibo.cn" - self.user_agent = utils.get_user_agent() + self.user_agent = utils.get_mobile_user_agent() def init_config(self, platform: str, login_type: str, crawler_type: str): self.platform = platform @@ -85,7 +85,7 @@ class WeiboCrawler(AbstractCrawler): await self.search() elif self.crawler_type == "detail": # Get the information and comments of the specified post - pass + await self.get_specified_notes() else: pass utils.logger.info("[WeiboCrawler.start] Bilibili Crawler finished ...") @@ -109,12 +109,104 @@ class WeiboCrawler(AbstractCrawler): note_id_list: List[str] = [] note_list = filter_search_result_card(search_res.get("cards")) for note_item in note_list: - if note_item : + if note_item: mblog: Dict = note_item.get("mblog") note_id_list.append(mblog.get("id")) await weibo.update_weibo_note(note_item) page += 1 + await self.batch_get_notes_comments(note_id_list) + + async def get_specified_notes(self): + """ + get specified notes info + :return: + """ + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [ + self.get_note_info_task(note_id=note_id, semaphore=semaphore) for note_id in + config.WEIBO_SPECIFIED_ID_LIST + ] + video_details = await asyncio.gather(*task_list) + for note_item in video_details: + if note_item: + await weibo.update_weibo_note(note_item) + await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST) + + async def get_note_info_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: + """ + Get note detail task + :param note_id: + :param semaphore: + :return: + """ + async with semaphore: + try: + result = await self.wb_client.get_note_info_by_id(note_id) + return result + except DataFetchError as ex: + utils.logger.error(f"[WeiboCrawler.get_note_info_task] Get note detail error: {ex}") + return None + except KeyError as ex: + utils.logger.error( + f"[WeiboCrawler.get_note_info_task] have not fund note detail note_id:{note_id}, err: {ex}") + return None + + async def batch_get_notes_comments(self, note_id_list: List[str]): + """ + batch get notes comments + :param note_id_list: + :return: + """ + utils.logger.info(f"[WeiboCrawler.batch_get_notes_comments] note ids:{note_id_list}") + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + for note_id in note_id_list: + task = asyncio.create_task(self.get_note_comments(note_id, semaphore), name=note_id) + task_list.append(task) + await asyncio.gather(*task_list) + + async def get_note_comments(self, note_id: str, semaphore: asyncio.Semaphore): + """ + get comment for note id + :param note_id: + :param semaphore: + :return: + """ + async with semaphore: + try: + utils.logger.info(f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ...") + + # Read keyword and quantity from config + keywords = config.COMMENT_KEYWORDS + max_comments = config.MAX_COMMENTS_PER_POST + + # Download comments + all_comments = await self.wb_client.get_note_all_comments( + note_id=note_id, + crawl_interval=random.random(), + ) + + # Filter comments by keyword + if keywords: + filtered_comments = [ + comment for comment in all_comments if + any(keyword in comment["content"]["message"] for keyword in keywords) + ] + else: + filtered_comments = all_comments + + # Limit the number of comments + if max_comments > 0: + filtered_comments = filtered_comments[:max_comments] + + # Update weibo note comments + await weibo.batch_update_weibo_note_comments(note_id, filtered_comments) + + except DataFetchError as ex: + utils.logger.error(f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}") + except Exception as e: + utils.logger.error(f"[WeiboCrawler.get_note_comments] may be been blocked, err:{e}") async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient: """Create xhs client""" diff --git a/media_platform/weibo/help.py b/media_platform/weibo/help.py index 45d15c8..1838555 100644 --- a/media_platform/weibo/help.py +++ b/media_platform/weibo/help.py @@ -3,7 +3,7 @@ # @Time : 2023/12/24 17:37 # @Desc : -from typing import List, Dict +from typing import Dict, List def filter_search_result_card(card_list: List[Dict]) -> List[Dict]: diff --git a/models/__init__.py b/models/__init__.py index 09a27c2..d4c05a3 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,5 +1,5 @@ from .bilibili import * from .douyin import * from .kuaishou import * +from .weibo import * from .xiaohongshu import * -from .weibo import * \ No newline at end of file diff --git a/models/weibo.py b/models/weibo.py index 3aa3f62..6b903a7 100644 --- a/models/weibo.py +++ b/models/weibo.py @@ -91,19 +91,19 @@ async def update_weibo_note(note_item: Dict): "avatar": user_info.get("profile_image_url", ""), } utils.logger.info( - f"[models.weibo.update_weibo_video] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...") + f"[models.weibo.update_weibo_note] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...") if config.IS_SAVED_DATABASED: if not await WeiboNote.filter(note_id=note_id).exists(): local_db_item["add_ts"] = utils.get_current_timestamp() - weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',)) - weibo_data = weibo_video_pydantic(**local_db_item) - weibo_video_pydantic.model_validate(weibo_data) + weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',)) + weibo_data = weibo_note_pydantic(**local_db_item) + weibo_note_pydantic.model_validate(weibo_data) await WeiboNote.create(**weibo_data.model_dump()) else: - weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate', + weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate', exclude=('id', 'add_ts')) - weibo_data = weibo_video_pydantic(**local_db_item) - weibo_video_pydantic.model_validate(weibo_data) + weibo_data = weibo_note_pydantic(**local_db_item) + weibo_note_pydantic.model_validate(weibo_data) await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump()) else: # Below is a simple way to save it in CSV format. @@ -116,23 +116,22 @@ async def update_weibo_note(note_item: Dict): writer.writerow(local_db_item.values()) -async def batch_update_weibo_video_comments(video_id: str, comments: List[Dict]): +async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]): if not comments: return for comment_item in comments: - await update_weibo_video_comment(video_id, comment_item) + await update_weibo_note_comment(note_id, comment_item) -async def update_weibo_video_comment(note_id: str, comment_item: Dict): +async def update_weibo_note_comment(note_id: str, comment_item: Dict): comment_id = str(comment_item.get("id")) - content: Dict = comment_item.get("text") - user_info: Dict = comment_item.get("member") + user_info: Dict = comment_item.get("user") local_db_item = { "comment_id": comment_id, "create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")), "create_date_time": str(utils.rfc2822_to_china_datetime(comment_item.get("created_at"))), "note_id": note_id, - "content": content.get("message"), + "content": comment_item.get("text"), "sub_comment_count": str(comment_item.get("total_number", 0)), "comment_like_count": str(comment_item.get("like_count", 0)), "last_modify_ts": utils.get_current_timestamp(), @@ -146,7 +145,7 @@ async def update_weibo_video_comment(note_id: str, comment_item: Dict): "avatar": user_info.get("profile_image_url", ""), } utils.logger.info( - f"[models.weibo.update_weibo_video_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...") + f"[models.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...") if config.IS_SAVED_DATABASED: if not await WeiboComment.filter(comment_id=comment_id).exists(): local_db_item["add_ts"] = utils.get_current_timestamp() diff --git a/tools/crawler_util.py b/tools/crawler_util.py index 562f52d..361fc75 100644 --- a/tools/crawler_util.py +++ b/tools/crawler_util.py @@ -54,6 +54,20 @@ def get_user_agent() -> str: return random.choice(ua_list) +def get_mobile_user_agent() -> str: + ua_list = [ + "Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.99 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.124 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 13; SAMSUNG SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/21.0 Chrome/110.0.5481.154 Mobile Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 OPR/99.0.0.0", + "Mozilla/5.0 (Linux; Android 10; JNY-LX1; HMSCore 6.11.0.302) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.88 HuaweiBrowser/13.0.5.303 Mobile Safari/537.36" + ] + return random.choice(ua_list) + + def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]: if not cookies: return "", {} diff --git a/tools/time_util.py b/tools/time_util.py index f8e6c6b..65a4288 100644 --- a/tools/time_util.py +++ b/tools/time_util.py @@ -4,7 +4,7 @@ # @Desc : 时间相关的工具函数 import time -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta, timezone def get_current_timestamp() -> int: