diff --git a/README.md b/README.md
index 817fe85..109fc08 100644
--- a/README.md
+++ b/README.md
@@ -4,15 +4,16 @@
# 仓库描述
-**小红书爬虫**,**抖音爬虫**, **快手爬虫**, **B站爬虫**...。
-目前能抓取小红书、抖音、快手、B站的视频、图片、评论、点赞、转发等信息。
+**小红书爬虫**,**抖音爬虫**, **快手爬虫**, **B站爬虫**, **微博爬虫**...。
+目前能抓取小红书、抖音、快手、B站、微博的视频、图片、评论、点赞、转发等信息。
原理:利用[playwright](https://playwright.dev/)搭桥,保留登录成功后的上下文浏览器环境,通过执行JS表达式获取一些加密参数
通过使用此方式,免去了复现核心加密JS代码,逆向难度大大降低
-爬虫技术交流群:[949715256](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=NFz-oY7Pek3gpG5zbLJFHARlB8lKL94f&authKey=FlxIQK99Uu90wddNV5W%2FBga6T6lXU5BRqyTTc26f2P2ZK5OW%2BDhHp7MwviX%2BbrPa&noverify=0&group_code=949715256),同时欢迎大家贡献代码提交PR
+
+爬虫技术交流群:[949715256](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=NFz-oY7Pek3gpG5zbLJFHARlB8lKL94f&authKey=FlxIQK99Uu90wddNV5W%2FBga6T6lXU5BRqyTTc26f2P2ZK5OW%2BDhHp7MwviX%2BbrPa&noverify=0&group_code=949715256),同时欢迎大家贡献代码提交PR
-## SPONSORED BY
+## 赞助商
目前爬虫正在用的IP代理:极速HTTP代理 新用户注册认证最高送12000IP,0元试用
diff --git a/base/base_crawler.py b/base/base_crawler.py
index d4b28fe..5e70163 100644
--- a/base/base_crawler.py
+++ b/base/base_crawler.py
@@ -1,7 +1,5 @@
from abc import ABC, abstractmethod
-from proxy.proxy_account_pool import AccountPool
-
class AbstractCrawler(ABC):
@abstractmethod
diff --git a/main.py b/main.py
index d9793c6..46ac6ea 100644
--- a/main.py
+++ b/main.py
@@ -9,7 +9,7 @@ from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.xhs import XiaoHongShuCrawler
-from proxy import proxy_account_pool
+from media_platform.weibo import WeiboCrawler
class CrawlerFactory:
@@ -17,7 +17,8 @@ class CrawlerFactory:
"xhs": XiaoHongShuCrawler,
"dy": DouYinCrawler,
"ks": KuaishouCrawler,
- "bili": BilibiliCrawler
+ "bili": BilibiliCrawler,
+ "wb": WeiboCrawler
}
@staticmethod
@@ -31,8 +32,8 @@ class CrawlerFactory:
async def main():
# define command line params ...
parser = argparse.ArgumentParser(description='Media crawler program.')
- parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili)',
- choices=["xhs", "dy", "ks", "bili"], default=config.PLATFORM)
+ parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)',
+ choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)
parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',
choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)
parser.add_argument('--type', type=str, help='crawler type (search | detail)',
diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py
index ccdc22b..618f9e7 100644
--- a/media_platform/bilibili/core.py
+++ b/media_platform/bilibili/core.py
@@ -86,8 +86,8 @@ class BilibiliCrawler(AbstractCrawler):
await self.get_specified_videos()
else:
pass
- utils.logger.info("Bilibili Crawler finished ...")
- pass
+ utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...")
+
async def search(self):
"""
@@ -220,7 +220,7 @@ class BilibiliCrawler(AbstractCrawler):
async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient:
"""Create xhs client"""
- utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create xiaohongshu API client ...")
+ utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ...")
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
bilibili_client_obj = BilibiliClient(
proxies=httpx_proxy,
diff --git a/media_platform/bilibili/login.py b/media_platform/bilibili/login.py
index b58f218..4f646da 100644
--- a/media_platform/bilibili/login.py
+++ b/media_platform/bilibili/login.py
@@ -8,12 +8,10 @@ import functools
import sys
from typing import Optional
-import redis
from playwright.async_api import BrowserContext, Page
from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt,
wait_fixed)
-import config
from base.base_crawler import AbstractLogin
from tools import utils
@@ -33,7 +31,7 @@ class BilibiliLogin(AbstractLogin):
self.cookie_str = cookie_str
async def begin(self):
- """Start login xiaohongshu"""
+ """Start login bilibili"""
utils.logger.info("[BilibiliLogin.begin] Begin login Bilibili ...")
if self.login_type == "qrcode":
await self.login_by_qrcode()
@@ -42,7 +40,8 @@ class BilibiliLogin(AbstractLogin):
elif self.login_type == "cookie":
await self.login_by_cookies()
else:
- raise ValueError("[BilibiliLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...")
+ raise ValueError(
+ "[BilibiliLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...")
@retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
async def check_login_state(self) -> bool:
@@ -89,7 +88,8 @@ class BilibiliLogin(AbstractLogin):
sys.exit()
wait_redirect_seconds = 5
- utils.logger.info(f"[BilibiliLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
+ utils.logger.info(
+ f"[BilibiliLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
await asyncio.sleep(wait_redirect_seconds)
async def login_by_mobile(self):
diff --git a/media_platform/weibo/__init__.py b/media_platform/weibo/__init__.py
new file mode 100644
index 0000000..82383d2
--- /dev/null
+++ b/media_platform/weibo/__init__.py
@@ -0,0 +1,7 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/23 15:40
+# @Desc :
+from .core import WeiboCrawler
+from .login import WeiboLogin
+from .client import WeiboClient
\ No newline at end of file
diff --git a/media_platform/weibo/client.py b/media_platform/weibo/client.py
new file mode 100644
index 0000000..5507b24
--- /dev/null
+++ b/media_platform/weibo/client.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/23 15:40
+# @Desc : 微博爬虫 API 请求 client
+
+import asyncio
+import json
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+from urllib.parse import urlencode
+
+import httpx
+from playwright.async_api import BrowserContext, Page
+
+from tools import utils
+
+from .exception import DataFetchError
+from .field import SearchType
+
+
+class WeiboClient:
+ def __init__(
+ self,
+ timeout=10,
+ proxies=None,
+ *,
+ headers: Dict[str, str],
+ playwright_page: Page,
+ cookie_dict: Dict[str, str],
+ ):
+ self.proxies = proxies
+ self.timeout = timeout
+ self.headers = headers
+ self._host = "https://m.weibo.cn"
+ self.playwright_page = playwright_page
+ self.cookie_dict = cookie_dict
+
+ async def request(self, method, url, **kwargs) -> Any:
+ async with httpx.AsyncClient(proxies=self.proxies) as client:
+ response = await client.request(
+ method, url, timeout=self.timeout,
+ **kwargs
+ )
+ data: Dict = response.json()
+ if data.get("ok") != 1:
+ utils.logger.error(f"[WeiboClient.request] request {method}:{url} err, res:{data}")
+ raise DataFetchError(data.get("msg", "unkonw error"))
+ else:
+ return data.get("data", {})
+
+ async def get(self, uri: str, params=None) -> Dict:
+ final_uri = uri
+ if isinstance(params, dict):
+ final_uri = (f"{uri}?"
+ f"{urlencode(params)}")
+ return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers)
+
+ async def post(self, uri: str, data: dict) -> Dict:
+ json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
+ return await self.request(method="POST", url=f"{self._host}{uri}",
+ data=json_str, headers=self.headers)
+
+ async def pong(self) -> bool:
+ """get a note to check if login state is ok"""
+ utils.logger.info("[WeiboClient.pong] Begin pong weibo...")
+ ping_flag = False
+ try:
+ pass
+ except Exception as e:
+ utils.logger.error(f"[BilibiliClient.pong] Pong weibo failed: {e}, and try to login again...")
+ ping_flag = False
+ return ping_flag
+
+ async def update_cookies(self, browser_context: BrowserContext):
+ cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
+ self.headers["Cookie"] = cookie_str
+ self.cookie_dict = cookie_dict
+
+ async def get_note_by_keyword(
+ self,
+ keyword: str,
+ page: int = 1,
+ search_type: SearchType = SearchType.DEFAULT
+ ) -> Dict:
+ """
+ search note by keyword
+ :param keyword: 微博搜搜的关键词
+ :param page: 分页参数 -当前页码
+ :param search_type: 搜索的类型,见 weibo/filed.py 中的枚举SearchType
+ :return:
+ """
+ uri = "/api/container/getIndex"
+ containerid = f"100103type={search_type.value}&q={keyword}"
+ params = {
+ "containerid": containerid,
+ "page_type": "searchall",
+ "page": page,
+ }
+ return await self.get(uri, params)
diff --git a/media_platform/weibo/core.py b/media_platform/weibo/core.py
new file mode 100644
index 0000000..cfb3cef
--- /dev/null
+++ b/media_platform/weibo/core.py
@@ -0,0 +1,177 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/23 15:41
+# @Desc : 微博爬虫主流程代码
+
+
+import asyncio
+import os
+import random
+import time
+from asyncio import Task
+from typing import Dict, List, Optional, Tuple, Union
+
+from playwright.async_api import (BrowserContext, BrowserType, Page,
+ async_playwright)
+
+import config
+from base.base_crawler import AbstractCrawler
+from models import weibo
+from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
+from tools import utils
+from var import comment_tasks_var, crawler_type_var
+
+from .client import WeiboClient
+from .exception import DataFetchError
+from .login import WeiboLogin
+from .field import SearchType
+from .help import filter_search_result_card
+
+
+class WeiboCrawler(AbstractCrawler):
+ platform: str
+ login_type: str
+ crawler_type: str
+ context_page: Page
+ wb_client: WeiboClient
+ browser_context: BrowserContext
+
+ def __init__(self):
+ self.index_url = "https://m.weibo.cn"
+ self.user_agent = utils.get_user_agent()
+
+ def init_config(self, platform: str, login_type: str, crawler_type: str):
+ self.platform = platform
+ self.login_type = login_type
+ self.crawler_type = crawler_type
+
+ async def start(self):
+ playwright_proxy_format, httpx_proxy_format = None, None
+ if config.ENABLE_IP_PROXY:
+ ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
+ ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
+ playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info)
+
+ async with async_playwright() as playwright:
+ # Launch a browser context.
+ chromium = playwright.chromium
+ self.browser_context = await self.launch_browser(
+ chromium,
+ None,
+ self.user_agent,
+ headless=config.HEADLESS
+ )
+ # stealth.min.js is a js script to prevent the website from detecting the crawler.
+ await self.browser_context.add_init_script(path="libs/stealth.min.js")
+ self.context_page = await self.browser_context.new_page()
+ await self.context_page.goto(self.index_url)
+
+ # Create a client to interact with the xiaohongshu website.
+ self.wb_client = await self.create_weibo_client(httpx_proxy_format)
+ if not await self.wb_client.pong():
+ login_obj = WeiboLogin(
+ login_type=self.login_type,
+ login_phone="", # your phone number
+ browser_context=self.browser_context,
+ context_page=self.context_page,
+ cookie_str=config.COOKIES
+ )
+ await login_obj.begin()
+ await self.wb_client.update_cookies(browser_context=self.browser_context)
+
+ crawler_type_var.set(self.crawler_type)
+ if self.crawler_type == "search":
+ # Search for video and retrieve their comment information.
+ await self.search()
+ elif self.crawler_type == "detail":
+ # Get the information and comments of the specified post
+ pass
+ else:
+ pass
+ utils.logger.info("[WeiboCrawler.start] Bilibili Crawler finished ...")
+
+ async def search(self):
+ """
+ search weibo note with keywords
+ :return:
+ """
+ utils.logger.info("[WeiboCrawler.search] Begin search weibo keywords")
+ weibo_limit_count = 10
+ for keyword in config.KEYWORDS.split(","):
+ utils.logger.info(f"[WeiboCrawler.search] Current search keyword: {keyword}")
+ page = 1
+ while page * weibo_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
+ search_res = await self.wb_client.get_note_by_keyword(
+ keyword=keyword,
+ page=page,
+ search_type=SearchType.DEFAULT
+ )
+ note_id_list: List[str] = []
+ note_list = filter_search_result_card(search_res.get("cards"))
+ for note_item in note_list:
+ if note_item :
+ mblog: Dict = note_item.get("mblog")
+ note_id_list.append(mblog.get("id"))
+ await weibo.update_weibo_note(note_item)
+
+ page += 1
+
+ async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient:
+ """Create xhs client"""
+ utils.logger.info("[WeiboCrawler.create_weibo_client] Begin create weibo API client ...")
+ cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
+ weibo_client_obj = WeiboClient(
+ proxies=httpx_proxy,
+ headers={
+ "User-Agent": self.user_agent,
+ "Cookie": cookie_str,
+ "Origin": "https://m.weibo.cn",
+ "Referer": "https://m.weibo.cn",
+ "Content-Type": "application/json;charset=UTF-8"
+ },
+ playwright_page=self.context_page,
+ cookie_dict=cookie_dict,
+ )
+ return weibo_client_obj
+
+ @staticmethod
+ def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]:
+ """format proxy info for playwright and httpx"""
+ playwright_proxy = {
+ "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
+ "username": ip_proxy_info.user,
+ "password": ip_proxy_info.password,
+ }
+ httpx_proxy = {
+ f"{ip_proxy_info.protocol}{ip_proxy_info.ip}": f"{ip_proxy_info.protocol}{ip_proxy_info.user}:{ip_proxy_info.password}@{ip_proxy_info.ip}:{ip_proxy_info.port}"
+ }
+ return playwright_proxy, httpx_proxy
+
+ async def launch_browser(
+ self,
+ chromium: BrowserType,
+ playwright_proxy: Optional[Dict],
+ user_agent: Optional[str],
+ headless: bool = True
+ ) -> BrowserContext:
+ """Launch browser and create browser context"""
+ utils.logger.info("[WeiboCrawler.launch_browser] Begin create browser context ...")
+ if config.SAVE_LOGIN_STATE:
+ user_data_dir = os.path.join(os.getcwd(), "browser_data",
+ config.USER_DATA_DIR % self.platform) # type: ignore
+ browser_context = await chromium.launch_persistent_context(
+ user_data_dir=user_data_dir,
+ accept_downloads=True,
+ headless=headless,
+ proxy=playwright_proxy, # type: ignore
+ viewport={"width": 1920, "height": 1080},
+ user_agent=user_agent
+ )
+ return browser_context
+ else:
+ browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
+ browser_context = await browser.new_context(
+ viewport={"width": 1920, "height": 1080},
+ user_agent=user_agent
+ )
+ return browser_context
diff --git a/media_platform/weibo/exception.py b/media_platform/weibo/exception.py
new file mode 100644
index 0000000..9aecdf4
--- /dev/null
+++ b/media_platform/weibo/exception.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/2 18:44
+# @Desc :
+
+from httpx import RequestError
+
+
+class DataFetchError(RequestError):
+ """something error when fetch"""
+
+
+class IPBlockError(RequestError):
+ """fetch so fast that the server block us ip"""
diff --git a/media_platform/weibo/field.py b/media_platform/weibo/field.py
new file mode 100644
index 0000000..cbc9d27
--- /dev/null
+++ b/media_platform/weibo/field.py
@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/23 15:41
+# @Desc :
+from enum import Enum
+
+
+class SearchType(Enum):
+ # 综合
+ DEFAULT = "1"
+
+ # 实时
+ REAL_TIME = "61"
+
+ # 热门
+ POPULAR = "60"
+
+ # 视频
+ VIDEO = "64"
diff --git a/media_platform/weibo/help.py b/media_platform/weibo/help.py
new file mode 100644
index 0000000..45d15c8
--- /dev/null
+++ b/media_platform/weibo/help.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/24 17:37
+# @Desc :
+
+from typing import List, Dict
+
+
+def filter_search_result_card(card_list: List[Dict]) -> List[Dict]:
+ """
+ 过滤微博搜索的结果,只保留card_type为9类型的数据
+ :param card_list:
+ :return:
+ """
+ note_list: List[Dict] = []
+ for card_item in card_list:
+ if card_item.get("card_type") == 9:
+ note_list.append(card_item)
+ if len(card_item.get("card_group", [])) > 0:
+ card_group = card_item.get("card_group")
+ for card_group_item in card_group:
+ if card_group_item.get("card_type") == 9:
+ note_list.append(card_group_item)
+
+ return note_list
diff --git a/media_platform/weibo/login.py b/media_platform/weibo/login.py
new file mode 100644
index 0000000..de80b02
--- /dev/null
+++ b/media_platform/weibo/login.py
@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/23 15:42
+# @Desc : 微博登录实现
+
+import asyncio
+import functools
+import sys
+from typing import Optional
+
+from playwright.async_api import BrowserContext, Page
+from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt,
+ wait_fixed)
+
+from base.base_crawler import AbstractLogin
+from tools import utils
+
+
+class WeiboLogin(AbstractLogin):
+ def __init__(self,
+ login_type: str,
+ browser_context: BrowserContext,
+ context_page: Page,
+ login_phone: Optional[str] = "",
+ cookie_str: str = ""
+ ):
+ self.login_type = login_type
+ self.browser_context = browser_context
+ self.context_page = context_page
+ self.login_phone = login_phone
+ self.cookie_str = cookie_str
+
+ async def begin(self):
+ """Start login weibo"""
+ utils.logger.info("[WeiboLogin.begin] Begin login Bilibili ...")
+ if self.login_type == "qrcode":
+ await self.login_by_qrcode()
+ elif self.login_type == "phone":
+ await self.login_by_mobile()
+ elif self.login_type == "cookie":
+ await self.login_by_cookies()
+ else:
+ raise ValueError(
+ "[WeiboLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...")
+
+ @retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
+ async def check_login_state(self) -> bool:
+ """
+ Check if the current login status is successful and return True otherwise return False
+ retry decorator will retry 20 times if the return value is False, and the retry interval is 1 second
+ if max retry times reached, raise RetryError
+ """
+ current_cookie = await self.browser_context.cookies()
+ _, cookie_dict = utils.convert_cookies(current_cookie)
+ if cookie_dict.get("SESSDATA", "") or cookie_dict.get("DedeUserID"):
+ return True
+ return False
+
+ async def login_by_qrcode(self):
+ """login weibo website and keep webdriver login state"""
+ utils.logger.info("[WeiboLogin.login_by_qrcode] Begin login weibo by qrcode ...")
+
+ # click login button
+ login_button_ele = self.context_page.locator(
+ "xpath=//div[@class='right-entry__outside go-login-btn']//div"
+ )
+ await login_button_ele.click()
+
+ # find login qrcode
+ qrcode_img_selector = "//div[@class='login-scan-box']//img"
+ base64_qrcode_img = await utils.find_login_qrcode(
+ self.context_page,
+ selector=qrcode_img_selector
+ )
+ if not base64_qrcode_img:
+ utils.logger.info("[WeiboLogin.login_by_qrcode] login failed , have not found qrcode please check ....")
+ sys.exit()
+
+ # show login qrcode
+ partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img)
+ asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode)
+
+ utils.logger.info(f"[WeiboLogin.login_by_qrcode] Waiting for scan code login, remaining time is 20s")
+ try:
+ await self.check_login_state()
+ except RetryError:
+ utils.logger.info("[WeiboLogin.login_by_qrcode] Login weibo failed by qrcode login method ...")
+ sys.exit()
+
+ wait_redirect_seconds = 5
+ utils.logger.info(
+ f"[WeiboLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
+ await asyncio.sleep(wait_redirect_seconds)
+
+ async def login_by_mobile(self):
+ pass
+
+ async def login_by_cookies(self):
+ utils.logger.info("[WeiboLogin.login_by_qrcode] Begin login weibo by cookie ...")
+ for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items():
+ await self.browser_context.add_cookies([{
+ 'name': key,
+ 'value': value,
+ 'domain': ".weibo.cn",
+ 'path': "/"
+ }])
diff --git a/models/weibo.py b/models/weibo.py
new file mode 100644
index 0000000..2f433a9
--- /dev/null
+++ b/models/weibo.py
@@ -0,0 +1,171 @@
+# -*- coding: utf-8 -*-
+# @Author : relakkes@gmail.com
+# @Time : 2023/12/23 21:53
+# @Desc : 微博的模型类
+
+import csv
+import pathlib
+from typing import Dict, List
+
+from tortoise import fields
+from tortoise.contrib.pydantic import pydantic_model_creator
+from tortoise.models import Model
+
+import config
+from tools import utils
+from var import crawler_type_var
+
+
+class WeiboBaseModel(Model):
+ id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
+ user_id = fields.CharField(null=True, max_length=64, description="用户ID")
+ nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
+ avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
+ gender = fields.CharField(null=True, max_length=12, description="用户性别")
+ profile_url = fields.CharField(null=True, max_length=255, description="用户主页地址")
+ ip_location = fields.CharField(null=True, max_length=32, default="发布微博的地理信息")
+ add_ts = fields.BigIntField(description="记录添加时间戳")
+ last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
+
+ class Meta:
+ abstract = True
+
+
+class WeiboNote(WeiboBaseModel):
+ note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
+ content = fields.TextField(null=True, description="帖子正文内容")
+ create_time = fields.BigIntField(description="帖子发布时间戳", index=True)
+ create_date_time = fields.BigIntField(description="帖子发布日期时间", index=True)
+ liked_count = fields.CharField(null=True, max_length=16, description="帖子点赞数")
+ comments_count = fields.CharField(null=True, max_length=16, description="帖子评论数量")
+ shared_count = fields.CharField(null=True, max_length=16, description="帖子转发数量")
+ note_url = fields.CharField(null=True, max_length=512, description="帖子详情URL")
+
+ class Meta:
+ table = "weibo_video"
+ table_description = "微博帖子"
+
+ def __str__(self):
+ return f"{self.note_id}"
+
+
+class WeiboComment(WeiboBaseModel):
+ comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
+ note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
+ content = fields.TextField(null=True, description="评论内容")
+ create_time = fields.BigIntField(description="评论时间戳")
+ create_date_time = fields.BigIntField(description="评论日期时间", index=True)
+ comment_like_count = fields.CharField(max_length=16, description="评论点赞数量")
+ sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
+
+ class Meta:
+ table = "weibo_note_comment"
+ table_description = "微博帖子评论"
+
+ def __str__(self):
+ return f"{self.comment_id}"
+
+
+async def update_weibo_note(note_item: Dict):
+ mblog: Dict = note_item.get("mblog")
+ user_info: Dict = mblog.get("user")
+ note_id = mblog.get("id")
+ local_db_item = {
+ # 微博信息
+ "note_id": note_id,
+ "content": mblog.get("text"),
+ "create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")),
+ "create_date_time": utils.rfc2822_to_china_datetime(mblog.get("created_at")),
+ "liked_count": mblog.get("attitudes_count", 0),
+ "comments_count": mblog.get("comments_count", 0),
+ "shared_count": mblog.get("reposts_count", 0),
+ "last_modify_ts": utils.get_current_timestamp(),
+ "note_url": f"https://m.weibo.cn/detail/{note_id}",
+ "ip_location": mblog.get("region_name", "").replace("发布于 ", ""),
+
+ # 用户信息
+ "user_id": user_info.get("id"),
+ "nickname": user_info.get("screen_name", ""),
+ "gender": user_info.get("gender", ""),
+ "profile_url": user_info.get("profile_url", ""),
+ "avatar": user_info.get("profile_image_url", ""),
+ }
+ utils.logger.info(
+ f"[models.weibo.update_weibo_video] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...")
+ if config.IS_SAVED_DATABASED:
+ if not await WeiboNote.filter(note_id=note_id).exists():
+ local_db_item["add_ts"] = utils.get_current_timestamp()
+ weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',))
+ weibo_data = weibo_video_pydantic(**local_db_item)
+ weibo_video_pydantic.model_validate(weibo_data)
+ await WeiboNote.create(**weibo_data.model_dump())
+ else:
+ weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate',
+ exclude=('id', 'add_ts'))
+ weibo_data = weibo_video_pydantic(**local_db_item)
+ weibo_video_pydantic.model_validate(weibo_data)
+ await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump())
+ else:
+ # Below is a simple way to save it in CSV format.
+ pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True)
+ save_file_name = f"data/weibo/{crawler_type_var.get()}_notes_{utils.get_current_date()}.csv"
+ with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
+ writer = csv.writer(f)
+ if f.tell() == 0:
+ writer.writerow(local_db_item.keys())
+ writer.writerow(local_db_item.values())
+
+
+async def batch_update_weibo_video_comments(video_id: str, comments: List[Dict]):
+ if not comments:
+ return
+ for comment_item in comments:
+ await update_weibo_video_comment(video_id, comment_item)
+
+
+async def update_weibo_video_comment(note_id: str, comment_item: Dict):
+ comment_id = str(comment_item.get("id"))
+ content: Dict = comment_item.get("text")
+ user_info: Dict = comment_item.get("member")
+ local_db_item = {
+ "comment_id": comment_id,
+ "create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")),
+ "create_date_time": utils.rfc2822_to_china_datetime(comment_item.get("created_at")),
+ "note_id": note_id,
+ "content": content.get("message"),
+ "sub_comment_count": str(comment_item.get("total_number", 0)),
+ "comment_like_count": str(comment_item.get("like_count", 0)),
+ "last_modify_ts": utils.get_current_timestamp(),
+ "ip_location": comment_item.get("source", "").replace("来自", ""),
+
+ # 用户信息
+ "user_id": user_info.get("id"),
+ "nickname": user_info.get("screen_name", ""),
+ "gender": user_info.get("gender", ""),
+ "profile_url": user_info.get("profile_url", ""),
+ "avatar": user_info.get("profile_image_url", ""),
+ }
+ utils.logger.info(
+ f"[models.weibo.update_weibo_video_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...")
+ if config.IS_SAVED_DATABASED:
+ if not await WeiboComment.filter(comment_id=comment_id).exists():
+ local_db_item["add_ts"] = utils.get_current_timestamp()
+ comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate',
+ exclude=('id',))
+ comment_data = comment_pydantic(**local_db_item)
+ comment_pydantic.validate(comment_data)
+ await WeiboComment.create(**comment_data.dict())
+ else:
+ comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate',
+ exclude=('id', 'add_ts'))
+ comment_data = comment_pydantic(**local_db_item)
+ comment_pydantic.validate(comment_data)
+ await WeiboComment.filter(comment_id=comment_id).update(**comment_data.dict())
+ else:
+ pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True)
+ save_file_name = f"data/weibo/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv"
+ with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
+ writer = csv.writer(f)
+ if f.tell() == 0:
+ writer.writerow(local_db_item.keys())
+ writer.writerow(local_db_item.values())
diff --git a/tools/time_util.py b/tools/time_util.py
index c21c25f..f8e6c6b 100644
--- a/tools/time_util.py
+++ b/tools/time_util.py
@@ -4,6 +4,7 @@
# @Desc : 时间相关的工具函数
import time
+from datetime import datetime, timezone, timedelta
def get_current_timestamp() -> int:
@@ -68,4 +69,38 @@ def get_unix_time_from_time_str(time_str):
def get_unix_timestamp():
- return int(time.time())
\ No newline at end of file
+ return int(time.time())
+
+
+def rfc2822_to_china_datetime(rfc2822_time):
+ # 定义RFC 2822格式
+ rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
+
+ # 将RFC 2822时间字符串转换为datetime对象
+ dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
+
+ # 将datetime对象的时区转换为中国时区
+ dt_object_china = dt_object.astimezone(timezone(timedelta(hours=8)))
+ return dt_object_china
+
+
+def rfc2822_to_timestamp(rfc2822_time):
+ # 定义RFC 2822格式
+ rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
+
+ # 将RFC 2822时间字符串转换为datetime对象
+ dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
+
+ # 将datetime对象转换为UTC时间
+ dt_utc = dt_object.replace(tzinfo=timezone.utc)
+
+ # 计算UTC时间对应的Unix时间戳
+ timestamp = int(dt_utc.timestamp())
+
+ return timestamp
+
+
+if __name__ == '__main__':
+ # 示例用法
+ _rfc2822_time = "Sat Dec 23 17:12:54 +0800 2023"
+ print(rfc2822_to_china_datetime(_rfc2822_time))