From c5b64fdbf5b44b047666179f6207305639532a3f Mon Sep 17 00:00:00 2001 From: Relakkes Date: Sun, 24 Dec 2023 17:57:48 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=BE=AE=E5=8D=9A=E7=88=AC=E8=99=AB?= =?UTF-8?q?=E5=B8=96=E5=AD=90=E6=90=9C=E7=B4=A2=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 9 +- base/base_crawler.py | 2 - main.py | 9 +- media_platform/bilibili/core.py | 6 +- media_platform/bilibili/login.py | 10 +- media_platform/weibo/__init__.py | 7 ++ media_platform/weibo/client.py | 98 +++++++++++++++++ media_platform/weibo/core.py | 177 ++++++++++++++++++++++++++++++ media_platform/weibo/exception.py | 14 +++ media_platform/weibo/field.py | 19 ++++ media_platform/weibo/help.py | 25 +++++ media_platform/weibo/login.py | 106 ++++++++++++++++++ models/weibo.py | 171 +++++++++++++++++++++++++++++ tools/time_util.py | 37 ++++++- 14 files changed, 671 insertions(+), 19 deletions(-) create mode 100644 media_platform/weibo/__init__.py create mode 100644 media_platform/weibo/client.py create mode 100644 media_platform/weibo/core.py create mode 100644 media_platform/weibo/exception.py create mode 100644 media_platform/weibo/field.py create mode 100644 media_platform/weibo/help.py create mode 100644 media_platform/weibo/login.py create mode 100644 models/weibo.py diff --git a/README.md b/README.md index 817fe85..109fc08 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,16 @@ # 仓库描述 -**小红书爬虫**,**抖音爬虫**, **快手爬虫**, **B站爬虫**...。 -目前能抓取小红书、抖音、快手、B站的视频、图片、评论、点赞、转发等信息。 +**小红书爬虫**,**抖音爬虫**, **快手爬虫**, **B站爬虫**, **微博爬虫**...。 +目前能抓取小红书、抖音、快手、B站、微博的视频、图片、评论、点赞、转发等信息。 原理:利用[playwright](https://playwright.dev/)搭桥,保留登录成功后的上下文浏览器环境,通过执行JS表达式获取一些加密参数 通过使用此方式,免去了复现核心加密JS代码,逆向难度大大降低 -爬虫技术交流群:[949715256](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=NFz-oY7Pek3gpG5zbLJFHARlB8lKL94f&authKey=FlxIQK99Uu90wddNV5W%2FBga6T6lXU5BRqyTTc26f2P2ZK5OW%2BDhHp7MwviX%2BbrPa&noverify=0&group_code=949715256),同时欢迎大家贡献代码提交PR + +爬虫技术交流群:[949715256](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=NFz-oY7Pek3gpG5zbLJFHARlB8lKL94f&authKey=FlxIQK99Uu90wddNV5W%2FBga6T6lXU5BRqyTTc26f2P2ZK5OW%2BDhHp7MwviX%2BbrPa&noverify=0&group_code=949715256),同时欢迎大家贡献代码提交PR -## SPONSORED BY +## 赞助商 目前爬虫正在用的IP代理:极速HTTP代理 新用户注册认证最高送12000IP,0元试用
极速HTTP代理-官网图 diff --git a/base/base_crawler.py b/base/base_crawler.py index d4b28fe..5e70163 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -1,7 +1,5 @@ from abc import ABC, abstractmethod -from proxy.proxy_account_pool import AccountPool - class AbstractCrawler(ABC): @abstractmethod diff --git a/main.py b/main.py index d9793c6..46ac6ea 100644 --- a/main.py +++ b/main.py @@ -9,7 +9,7 @@ from media_platform.bilibili import BilibiliCrawler from media_platform.douyin import DouYinCrawler from media_platform.kuaishou import KuaishouCrawler from media_platform.xhs import XiaoHongShuCrawler -from proxy import proxy_account_pool +from media_platform.weibo import WeiboCrawler class CrawlerFactory: @@ -17,7 +17,8 @@ class CrawlerFactory: "xhs": XiaoHongShuCrawler, "dy": DouYinCrawler, "ks": KuaishouCrawler, - "bili": BilibiliCrawler + "bili": BilibiliCrawler, + "wb": WeiboCrawler } @staticmethod @@ -31,8 +32,8 @@ class CrawlerFactory: async def main(): # define command line params ... parser = argparse.ArgumentParser(description='Media crawler program.') - parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili)', - choices=["xhs", "dy", "ks", "bili"], default=config.PLATFORM) + parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)', + choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM) parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)', choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE) parser.add_argument('--type', type=str, help='crawler type (search | detail)', diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index ccdc22b..618f9e7 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -86,8 +86,8 @@ class BilibiliCrawler(AbstractCrawler): await self.get_specified_videos() else: pass - utils.logger.info("Bilibili Crawler finished ...") - pass + utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...") + async def search(self): """ @@ -220,7 +220,7 @@ class BilibiliCrawler(AbstractCrawler): async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient: """Create xhs client""" - utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create xiaohongshu API client ...") + utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ...") cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) bilibili_client_obj = BilibiliClient( proxies=httpx_proxy, diff --git a/media_platform/bilibili/login.py b/media_platform/bilibili/login.py index b58f218..4f646da 100644 --- a/media_platform/bilibili/login.py +++ b/media_platform/bilibili/login.py @@ -8,12 +8,10 @@ import functools import sys from typing import Optional -import redis from playwright.async_api import BrowserContext, Page from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, wait_fixed) -import config from base.base_crawler import AbstractLogin from tools import utils @@ -33,7 +31,7 @@ class BilibiliLogin(AbstractLogin): self.cookie_str = cookie_str async def begin(self): - """Start login xiaohongshu""" + """Start login bilibili""" utils.logger.info("[BilibiliLogin.begin] Begin login Bilibili ...") if self.login_type == "qrcode": await self.login_by_qrcode() @@ -42,7 +40,8 @@ class BilibiliLogin(AbstractLogin): elif self.login_type == "cookie": await self.login_by_cookies() else: - raise ValueError("[BilibiliLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...") + raise ValueError( + "[BilibiliLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...") @retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) async def check_login_state(self) -> bool: @@ -89,7 +88,8 @@ class BilibiliLogin(AbstractLogin): sys.exit() wait_redirect_seconds = 5 - utils.logger.info(f"[BilibiliLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...") + utils.logger.info( + f"[BilibiliLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...") await asyncio.sleep(wait_redirect_seconds) async def login_by_mobile(self): diff --git a/media_platform/weibo/__init__.py b/media_platform/weibo/__init__.py new file mode 100644 index 0000000..82383d2 --- /dev/null +++ b/media_platform/weibo/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/23 15:40 +# @Desc : +from .core import WeiboCrawler +from .login import WeiboLogin +from .client import WeiboClient \ No newline at end of file diff --git a/media_platform/weibo/client.py b/media_platform/weibo/client.py new file mode 100644 index 0000000..5507b24 --- /dev/null +++ b/media_platform/weibo/client.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/23 15:40 +# @Desc : 微博爬虫 API 请求 client + +import asyncio +import json +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from urllib.parse import urlencode + +import httpx +from playwright.async_api import BrowserContext, Page + +from tools import utils + +from .exception import DataFetchError +from .field import SearchType + + +class WeiboClient: + def __init__( + self, + timeout=10, + proxies=None, + *, + headers: Dict[str, str], + playwright_page: Page, + cookie_dict: Dict[str, str], + ): + self.proxies = proxies + self.timeout = timeout + self.headers = headers + self._host = "https://m.weibo.cn" + self.playwright_page = playwright_page + self.cookie_dict = cookie_dict + + async def request(self, method, url, **kwargs) -> Any: + async with httpx.AsyncClient(proxies=self.proxies) as client: + response = await client.request( + method, url, timeout=self.timeout, + **kwargs + ) + data: Dict = response.json() + if data.get("ok") != 1: + utils.logger.error(f"[WeiboClient.request] request {method}:{url} err, res:{data}") + raise DataFetchError(data.get("msg", "unkonw error")) + else: + return data.get("data", {}) + + async def get(self, uri: str, params=None) -> Dict: + final_uri = uri + if isinstance(params, dict): + final_uri = (f"{uri}?" + f"{urlencode(params)}") + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers) + + async def post(self, uri: str, data: dict) -> Dict: + json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) + return await self.request(method="POST", url=f"{self._host}{uri}", + data=json_str, headers=self.headers) + + async def pong(self) -> bool: + """get a note to check if login state is ok""" + utils.logger.info("[WeiboClient.pong] Begin pong weibo...") + ping_flag = False + try: + pass + except Exception as e: + utils.logger.error(f"[BilibiliClient.pong] Pong weibo failed: {e}, and try to login again...") + ping_flag = False + return ping_flag + + async def update_cookies(self, browser_context: BrowserContext): + cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) + self.headers["Cookie"] = cookie_str + self.cookie_dict = cookie_dict + + async def get_note_by_keyword( + self, + keyword: str, + page: int = 1, + search_type: SearchType = SearchType.DEFAULT + ) -> Dict: + """ + search note by keyword + :param keyword: 微博搜搜的关键词 + :param page: 分页参数 -当前页码 + :param search_type: 搜索的类型,见 weibo/filed.py 中的枚举SearchType + :return: + """ + uri = "/api/container/getIndex" + containerid = f"100103type={search_type.value}&q={keyword}" + params = { + "containerid": containerid, + "page_type": "searchall", + "page": page, + } + return await self.get(uri, params) diff --git a/media_platform/weibo/core.py b/media_platform/weibo/core.py new file mode 100644 index 0000000..cfb3cef --- /dev/null +++ b/media_platform/weibo/core.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/23 15:41 +# @Desc : 微博爬虫主流程代码 + + +import asyncio +import os +import random +import time +from asyncio import Task +from typing import Dict, List, Optional, Tuple, Union + +from playwright.async_api import (BrowserContext, BrowserType, Page, + async_playwright) + +import config +from base.base_crawler import AbstractCrawler +from models import weibo +from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool +from tools import utils +from var import comment_tasks_var, crawler_type_var + +from .client import WeiboClient +from .exception import DataFetchError +from .login import WeiboLogin +from .field import SearchType +from .help import filter_search_result_card + + +class WeiboCrawler(AbstractCrawler): + platform: str + login_type: str + crawler_type: str + context_page: Page + wb_client: WeiboClient + browser_context: BrowserContext + + def __init__(self): + self.index_url = "https://m.weibo.cn" + self.user_agent = utils.get_user_agent() + + def init_config(self, platform: str, login_type: str, crawler_type: str): + self.platform = platform + self.login_type = login_type + self.crawler_type = crawler_type + + async def start(self): + playwright_proxy_format, httpx_proxy_format = None, None + if config.ENABLE_IP_PROXY: + ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info) + + async with async_playwright() as playwright: + # Launch a browser context. + chromium = playwright.chromium + self.browser_context = await self.launch_browser( + chromium, + None, + self.user_agent, + headless=config.HEADLESS + ) + # stealth.min.js is a js script to prevent the website from detecting the crawler. + await self.browser_context.add_init_script(path="libs/stealth.min.js") + self.context_page = await self.browser_context.new_page() + await self.context_page.goto(self.index_url) + + # Create a client to interact with the xiaohongshu website. + self.wb_client = await self.create_weibo_client(httpx_proxy_format) + if not await self.wb_client.pong(): + login_obj = WeiboLogin( + login_type=self.login_type, + login_phone="", # your phone number + browser_context=self.browser_context, + context_page=self.context_page, + cookie_str=config.COOKIES + ) + await login_obj.begin() + await self.wb_client.update_cookies(browser_context=self.browser_context) + + crawler_type_var.set(self.crawler_type) + if self.crawler_type == "search": + # Search for video and retrieve their comment information. + await self.search() + elif self.crawler_type == "detail": + # Get the information and comments of the specified post + pass + else: + pass + utils.logger.info("[WeiboCrawler.start] Bilibili Crawler finished ...") + + async def search(self): + """ + search weibo note with keywords + :return: + """ + utils.logger.info("[WeiboCrawler.search] Begin search weibo keywords") + weibo_limit_count = 10 + for keyword in config.KEYWORDS.split(","): + utils.logger.info(f"[WeiboCrawler.search] Current search keyword: {keyword}") + page = 1 + while page * weibo_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: + search_res = await self.wb_client.get_note_by_keyword( + keyword=keyword, + page=page, + search_type=SearchType.DEFAULT + ) + note_id_list: List[str] = [] + note_list = filter_search_result_card(search_res.get("cards")) + for note_item in note_list: + if note_item : + mblog: Dict = note_item.get("mblog") + note_id_list.append(mblog.get("id")) + await weibo.update_weibo_note(note_item) + + page += 1 + + async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient: + """Create xhs client""" + utils.logger.info("[WeiboCrawler.create_weibo_client] Begin create weibo API client ...") + cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) + weibo_client_obj = WeiboClient( + proxies=httpx_proxy, + headers={ + "User-Agent": self.user_agent, + "Cookie": cookie_str, + "Origin": "https://m.weibo.cn", + "Referer": "https://m.weibo.cn", + "Content-Type": "application/json;charset=UTF-8" + }, + playwright_page=self.context_page, + cookie_dict=cookie_dict, + ) + return weibo_client_obj + + @staticmethod + def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: + """format proxy info for playwright and httpx""" + playwright_proxy = { + "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}", + "username": ip_proxy_info.user, + "password": ip_proxy_info.password, + } + httpx_proxy = { + f"{ip_proxy_info.protocol}{ip_proxy_info.ip}": f"{ip_proxy_info.protocol}{ip_proxy_info.user}:{ip_proxy_info.password}@{ip_proxy_info.ip}:{ip_proxy_info.port}" + } + return playwright_proxy, httpx_proxy + + async def launch_browser( + self, + chromium: BrowserType, + playwright_proxy: Optional[Dict], + user_agent: Optional[str], + headless: bool = True + ) -> BrowserContext: + """Launch browser and create browser context""" + utils.logger.info("[WeiboCrawler.launch_browser] Begin create browser context ...") + if config.SAVE_LOGIN_STATE: + user_data_dir = os.path.join(os.getcwd(), "browser_data", + config.USER_DATA_DIR % self.platform) # type: ignore + browser_context = await chromium.launch_persistent_context( + user_data_dir=user_data_dir, + accept_downloads=True, + headless=headless, + proxy=playwright_proxy, # type: ignore + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + else: + browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore + browser_context = await browser.new_context( + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context diff --git a/media_platform/weibo/exception.py b/media_platform/weibo/exception.py new file mode 100644 index 0000000..9aecdf4 --- /dev/null +++ b/media_platform/weibo/exception.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 18:44 +# @Desc : + +from httpx import RequestError + + +class DataFetchError(RequestError): + """something error when fetch""" + + +class IPBlockError(RequestError): + """fetch so fast that the server block us ip""" diff --git a/media_platform/weibo/field.py b/media_platform/weibo/field.py new file mode 100644 index 0000000..cbc9d27 --- /dev/null +++ b/media_platform/weibo/field.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/23 15:41 +# @Desc : +from enum import Enum + + +class SearchType(Enum): + # 综合 + DEFAULT = "1" + + # 实时 + REAL_TIME = "61" + + # 热门 + POPULAR = "60" + + # 视频 + VIDEO = "64" diff --git a/media_platform/weibo/help.py b/media_platform/weibo/help.py new file mode 100644 index 0000000..45d15c8 --- /dev/null +++ b/media_platform/weibo/help.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/24 17:37 +# @Desc : + +from typing import List, Dict + + +def filter_search_result_card(card_list: List[Dict]) -> List[Dict]: + """ + 过滤微博搜索的结果,只保留card_type为9类型的数据 + :param card_list: + :return: + """ + note_list: List[Dict] = [] + for card_item in card_list: + if card_item.get("card_type") == 9: + note_list.append(card_item) + if len(card_item.get("card_group", [])) > 0: + card_group = card_item.get("card_group") + for card_group_item in card_group: + if card_group_item.get("card_type") == 9: + note_list.append(card_group_item) + + return note_list diff --git a/media_platform/weibo/login.py b/media_platform/weibo/login.py new file mode 100644 index 0000000..de80b02 --- /dev/null +++ b/media_platform/weibo/login.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/23 15:42 +# @Desc : 微博登录实现 + +import asyncio +import functools +import sys +from typing import Optional + +from playwright.async_api import BrowserContext, Page +from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, + wait_fixed) + +from base.base_crawler import AbstractLogin +from tools import utils + + +class WeiboLogin(AbstractLogin): + def __init__(self, + login_type: str, + browser_context: BrowserContext, + context_page: Page, + login_phone: Optional[str] = "", + cookie_str: str = "" + ): + self.login_type = login_type + self.browser_context = browser_context + self.context_page = context_page + self.login_phone = login_phone + self.cookie_str = cookie_str + + async def begin(self): + """Start login weibo""" + utils.logger.info("[WeiboLogin.begin] Begin login Bilibili ...") + if self.login_type == "qrcode": + await self.login_by_qrcode() + elif self.login_type == "phone": + await self.login_by_mobile() + elif self.login_type == "cookie": + await self.login_by_cookies() + else: + raise ValueError( + "[WeiboLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...") + + @retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) + async def check_login_state(self) -> bool: + """ + Check if the current login status is successful and return True otherwise return False + retry decorator will retry 20 times if the return value is False, and the retry interval is 1 second + if max retry times reached, raise RetryError + """ + current_cookie = await self.browser_context.cookies() + _, cookie_dict = utils.convert_cookies(current_cookie) + if cookie_dict.get("SESSDATA", "") or cookie_dict.get("DedeUserID"): + return True + return False + + async def login_by_qrcode(self): + """login weibo website and keep webdriver login state""" + utils.logger.info("[WeiboLogin.login_by_qrcode] Begin login weibo by qrcode ...") + + # click login button + login_button_ele = self.context_page.locator( + "xpath=//div[@class='right-entry__outside go-login-btn']//div" + ) + await login_button_ele.click() + + # find login qrcode + qrcode_img_selector = "//div[@class='login-scan-box']//img" + base64_qrcode_img = await utils.find_login_qrcode( + self.context_page, + selector=qrcode_img_selector + ) + if not base64_qrcode_img: + utils.logger.info("[WeiboLogin.login_by_qrcode] login failed , have not found qrcode please check ....") + sys.exit() + + # show login qrcode + partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img) + asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode) + + utils.logger.info(f"[WeiboLogin.login_by_qrcode] Waiting for scan code login, remaining time is 20s") + try: + await self.check_login_state() + except RetryError: + utils.logger.info("[WeiboLogin.login_by_qrcode] Login weibo failed by qrcode login method ...") + sys.exit() + + wait_redirect_seconds = 5 + utils.logger.info( + f"[WeiboLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...") + await asyncio.sleep(wait_redirect_seconds) + + async def login_by_mobile(self): + pass + + async def login_by_cookies(self): + utils.logger.info("[WeiboLogin.login_by_qrcode] Begin login weibo by cookie ...") + for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): + await self.browser_context.add_cookies([{ + 'name': key, + 'value': value, + 'domain': ".weibo.cn", + 'path': "/" + }]) diff --git a/models/weibo.py b/models/weibo.py new file mode 100644 index 0000000..2f433a9 --- /dev/null +++ b/models/weibo.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/23 21:53 +# @Desc : 微博的模型类 + +import csv +import pathlib +from typing import Dict, List + +from tortoise import fields +from tortoise.contrib.pydantic import pydantic_model_creator +from tortoise.models import Model + +import config +from tools import utils +from var import crawler_type_var + + +class WeiboBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + gender = fields.CharField(null=True, max_length=12, description="用户性别") + profile_url = fields.CharField(null=True, max_length=255, description="用户主页地址") + ip_location = fields.CharField(null=True, max_length=32, default="发布微博的地理信息") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class WeiboNote(WeiboBaseModel): + note_id = fields.CharField(max_length=64, index=True, description="帖子ID") + content = fields.TextField(null=True, description="帖子正文内容") + create_time = fields.BigIntField(description="帖子发布时间戳", index=True) + create_date_time = fields.BigIntField(description="帖子发布日期时间", index=True) + liked_count = fields.CharField(null=True, max_length=16, description="帖子点赞数") + comments_count = fields.CharField(null=True, max_length=16, description="帖子评论数量") + shared_count = fields.CharField(null=True, max_length=16, description="帖子转发数量") + note_url = fields.CharField(null=True, max_length=512, description="帖子详情URL") + + class Meta: + table = "weibo_video" + table_description = "微博帖子" + + def __str__(self): + return f"{self.note_id}" + + +class WeiboComment(WeiboBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + note_id = fields.CharField(max_length=64, index=True, description="帖子ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + create_date_time = fields.BigIntField(description="评论日期时间", index=True) + comment_like_count = fields.CharField(max_length=16, description="评论点赞数量") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "weibo_note_comment" + table_description = "微博帖子评论" + + def __str__(self): + return f"{self.comment_id}" + + +async def update_weibo_note(note_item: Dict): + mblog: Dict = note_item.get("mblog") + user_info: Dict = mblog.get("user") + note_id = mblog.get("id") + local_db_item = { + # 微博信息 + "note_id": note_id, + "content": mblog.get("text"), + "create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")), + "create_date_time": utils.rfc2822_to_china_datetime(mblog.get("created_at")), + "liked_count": mblog.get("attitudes_count", 0), + "comments_count": mblog.get("comments_count", 0), + "shared_count": mblog.get("reposts_count", 0), + "last_modify_ts": utils.get_current_timestamp(), + "note_url": f"https://m.weibo.cn/detail/{note_id}", + "ip_location": mblog.get("region_name", "").replace("发布于 ", ""), + + # 用户信息 + "user_id": user_info.get("id"), + "nickname": user_info.get("screen_name", ""), + "gender": user_info.get("gender", ""), + "profile_url": user_info.get("profile_url", ""), + "avatar": user_info.get("profile_image_url", ""), + } + utils.logger.info( + f"[models.weibo.update_weibo_video] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...") + if config.IS_SAVED_DATABASED: + if not await WeiboNote.filter(note_id=note_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',)) + weibo_data = weibo_video_pydantic(**local_db_item) + weibo_video_pydantic.model_validate(weibo_data) + await WeiboNote.create(**weibo_data.model_dump()) + else: + weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate', + exclude=('id', 'add_ts')) + weibo_data = weibo_video_pydantic(**local_db_item) + weibo_video_pydantic.model_validate(weibo_data) + await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump()) + else: + # Below is a simple way to save it in CSV format. + pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/weibo/{crawler_type_var.get()}_notes_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values()) + + +async def batch_update_weibo_video_comments(video_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_weibo_video_comment(video_id, comment_item) + + +async def update_weibo_video_comment(note_id: str, comment_item: Dict): + comment_id = str(comment_item.get("id")) + content: Dict = comment_item.get("text") + user_info: Dict = comment_item.get("member") + local_db_item = { + "comment_id": comment_id, + "create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")), + "create_date_time": utils.rfc2822_to_china_datetime(comment_item.get("created_at")), + "note_id": note_id, + "content": content.get("message"), + "sub_comment_count": str(comment_item.get("total_number", 0)), + "comment_like_count": str(comment_item.get("like_count", 0)), + "last_modify_ts": utils.get_current_timestamp(), + "ip_location": comment_item.get("source", "").replace("来自", ""), + + # 用户信息 + "user_id": user_info.get("id"), + "nickname": user_info.get("screen_name", ""), + "gender": user_info.get("gender", ""), + "profile_url": user_info.get("profile_url", ""), + "avatar": user_info.get("profile_image_url", ""), + } + utils.logger.info( + f"[models.weibo.update_weibo_video_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...") + if config.IS_SAVED_DATABASED: + if not await WeiboComment.filter(comment_id=comment_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await WeiboComment.create(**comment_data.dict()) + else: + comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await WeiboComment.filter(comment_id=comment_id).update(**comment_data.dict()) + else: + pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/weibo/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values()) diff --git a/tools/time_util.py b/tools/time_util.py index c21c25f..f8e6c6b 100644 --- a/tools/time_util.py +++ b/tools/time_util.py @@ -4,6 +4,7 @@ # @Desc : 时间相关的工具函数 import time +from datetime import datetime, timezone, timedelta def get_current_timestamp() -> int: @@ -68,4 +69,38 @@ def get_unix_time_from_time_str(time_str): def get_unix_timestamp(): - return int(time.time()) \ No newline at end of file + return int(time.time()) + + +def rfc2822_to_china_datetime(rfc2822_time): + # 定义RFC 2822格式 + rfc2822_format = "%a %b %d %H:%M:%S %z %Y" + + # 将RFC 2822时间字符串转换为datetime对象 + dt_object = datetime.strptime(rfc2822_time, rfc2822_format) + + # 将datetime对象的时区转换为中国时区 + dt_object_china = dt_object.astimezone(timezone(timedelta(hours=8))) + return dt_object_china + + +def rfc2822_to_timestamp(rfc2822_time): + # 定义RFC 2822格式 + rfc2822_format = "%a %b %d %H:%M:%S %z %Y" + + # 将RFC 2822时间字符串转换为datetime对象 + dt_object = datetime.strptime(rfc2822_time, rfc2822_format) + + # 将datetime对象转换为UTC时间 + dt_utc = dt_object.replace(tzinfo=timezone.utc) + + # 计算UTC时间对应的Unix时间戳 + timestamp = int(dt_utc.timestamp()) + + return timestamp + + +if __name__ == '__main__': + # 示例用法 + _rfc2822_time = "Sat Dec 23 17:12:54 +0800 2023" + print(rfc2822_to_china_datetime(_rfc2822_time))