feat: 微博爬虫帖子搜索完成

This commit is contained in:
Relakkes 2023-12-24 17:57:48 +08:00
parent 9785abba5d
commit c5b64fdbf5
14 changed files with 671 additions and 19 deletions

View File

@ -4,15 +4,16 @@
# 仓库描述
**小红书爬虫****抖音爬虫** **快手爬虫** **B站爬虫**...。
目前能抓取小红书、抖音、快手、B站的视频、图片、评论、点赞、转发等信息。
**小红书爬虫****抖音爬虫** **快手爬虫** **B站爬虫** **微博爬虫**...。
目前能抓取小红书、抖音、快手、B站、微博的视频、图片、评论、点赞、转发等信息。
原理:利用[playwright](https://playwright.dev/)搭桥保留登录成功后的上下文浏览器环境通过执行JS表达式获取一些加密参数
通过使用此方式免去了复现核心加密JS代码逆向难度大大降低
爬虫技术交流群:[949715256](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=NFz-oY7Pek3gpG5zbLJFHARlB8lKL94f&authKey=FlxIQK99Uu90wddNV5W%2FBga6T6lXU5BRqyTTc26f2P2ZK5OW%2BDhHp7MwviX%2BbrPa&noverify=0&group_code=949715256),同时欢迎大家贡献代码提交PR
爬虫技术交流群:[949715256](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=NFz-oY7Pek3gpG5zbLJFHARlB8lKL94f&authKey=FlxIQK99Uu90wddNV5W%2FBga6T6lXU5BRqyTTc26f2P2ZK5OW%2BDhHp7MwviX%2BbrPa&noverify=0&group_code=949715256)同时欢迎大家贡献代码提交PR
## SPONSORED BY
## 赞助商
目前爬虫正在用的IP代理<a href="https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang">极速HTTP代理</a> 新用户注册认证最高送12000IP0元试用<br>
<a href="https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang" target="_blank"><img src="https://s2.loli.net/2023/11/30/RapQtL8A2w6TGfj.png" alt="极速HTTP代理-官网图"></a>

View File

@ -1,7 +1,5 @@
from abc import ABC, abstractmethod
from proxy.proxy_account_pool import AccountPool
class AbstractCrawler(ABC):
@abstractmethod

View File

@ -9,7 +9,7 @@ from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.xhs import XiaoHongShuCrawler
from proxy import proxy_account_pool
from media_platform.weibo import WeiboCrawler
class CrawlerFactory:
@ -17,7 +17,8 @@ class CrawlerFactory:
"xhs": XiaoHongShuCrawler,
"dy": DouYinCrawler,
"ks": KuaishouCrawler,
"bili": BilibiliCrawler
"bili": BilibiliCrawler,
"wb": WeiboCrawler
}
@staticmethod
@ -31,8 +32,8 @@ class CrawlerFactory:
async def main():
# define command line params ...
parser = argparse.ArgumentParser(description='Media crawler program.')
parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili)',
choices=["xhs", "dy", "ks", "bili"], default=config.PLATFORM)
parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)',
choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)
parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',
choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)
parser.add_argument('--type', type=str, help='crawler type (search | detail)',

View File

@ -86,8 +86,8 @@ class BilibiliCrawler(AbstractCrawler):
await self.get_specified_videos()
else:
pass
utils.logger.info("Bilibili Crawler finished ...")
pass
utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...")
async def search(self):
"""
@ -220,7 +220,7 @@ class BilibiliCrawler(AbstractCrawler):
async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient:
"""Create xhs client"""
utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create xiaohongshu API client ...")
utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ...")
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
bilibili_client_obj = BilibiliClient(
proxies=httpx_proxy,

View File

@ -8,12 +8,10 @@ import functools
import sys
from typing import Optional
import redis
from playwright.async_api import BrowserContext, Page
from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt,
wait_fixed)
import config
from base.base_crawler import AbstractLogin
from tools import utils
@ -33,7 +31,7 @@ class BilibiliLogin(AbstractLogin):
self.cookie_str = cookie_str
async def begin(self):
"""Start login xiaohongshu"""
"""Start login bilibili"""
utils.logger.info("[BilibiliLogin.begin] Begin login Bilibili ...")
if self.login_type == "qrcode":
await self.login_by_qrcode()
@ -42,7 +40,8 @@ class BilibiliLogin(AbstractLogin):
elif self.login_type == "cookie":
await self.login_by_cookies()
else:
raise ValueError("[BilibiliLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...")
raise ValueError(
"[BilibiliLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...")
@retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
async def check_login_state(self) -> bool:
@ -89,7 +88,8 @@ class BilibiliLogin(AbstractLogin):
sys.exit()
wait_redirect_seconds = 5
utils.logger.info(f"[BilibiliLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
utils.logger.info(
f"[BilibiliLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
await asyncio.sleep(wait_redirect_seconds)
async def login_by_mobile(self):

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/23 15:40
# @Desc :
from .core import WeiboCrawler
from .login import WeiboLogin
from .client import WeiboClient

View File

@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/23 15:40
# @Desc : 微博爬虫 API 请求 client
import asyncio
import json
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urlencode
import httpx
from playwright.async_api import BrowserContext, Page
from tools import utils
from .exception import DataFetchError
from .field import SearchType
class WeiboClient:
def __init__(
self,
timeout=10,
proxies=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
):
self.proxies = proxies
self.timeout = timeout
self.headers = headers
self._host = "https://m.weibo.cn"
self.playwright_page = playwright_page
self.cookie_dict = cookie_dict
async def request(self, method, url, **kwargs) -> Any:
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
data: Dict = response.json()
if data.get("ok") != 1:
utils.logger.error(f"[WeiboClient.request] request {method}:{url} err, res:{data}")
raise DataFetchError(data.get("msg", "unkonw error"))
else:
return data.get("data", {})
async def get(self, uri: str, params=None) -> Dict:
final_uri = uri
if isinstance(params, dict):
final_uri = (f"{uri}?"
f"{urlencode(params)}")
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers)
async def post(self, uri: str, data: dict) -> Dict:
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=self.headers)
async def pong(self) -> bool:
"""get a note to check if login state is ok"""
utils.logger.info("[WeiboClient.pong] Begin pong weibo...")
ping_flag = False
try:
pass
except Exception as e:
utils.logger.error(f"[BilibiliClient.pong] Pong weibo failed: {e}, and try to login again...")
ping_flag = False
return ping_flag
async def update_cookies(self, browser_context: BrowserContext):
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
self.cookie_dict = cookie_dict
async def get_note_by_keyword(
self,
keyword: str,
page: int = 1,
search_type: SearchType = SearchType.DEFAULT
) -> Dict:
"""
search note by keyword
:param keyword: 微博搜搜的关键词
:param page: 分页参数 -当前页码
:param search_type: 搜索的类型 weibo/filed.py 中的枚举SearchType
:return:
"""
uri = "/api/container/getIndex"
containerid = f"100103type={search_type.value}&q={keyword}"
params = {
"containerid": containerid,
"page_type": "searchall",
"page": page,
}
return await self.get(uri, params)

View File

@ -0,0 +1,177 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/23 15:41
# @Desc : 微博爬虫主流程代码
import asyncio
import os
import random
import time
from asyncio import Task
from typing import Dict, List, Optional, Tuple, Union
from playwright.async_api import (BrowserContext, BrowserType, Page,
async_playwright)
import config
from base.base_crawler import AbstractCrawler
from models import weibo
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from tools import utils
from var import comment_tasks_var, crawler_type_var
from .client import WeiboClient
from .exception import DataFetchError
from .login import WeiboLogin
from .field import SearchType
from .help import filter_search_result_card
class WeiboCrawler(AbstractCrawler):
platform: str
login_type: str
crawler_type: str
context_page: Page
wb_client: WeiboClient
browser_context: BrowserContext
def __init__(self):
self.index_url = "https://m.weibo.cn"
self.user_agent = utils.get_user_agent()
def init_config(self, platform: str, login_type: str, crawler_type: str):
self.platform = platform
self.login_type = login_type
self.crawler_type = crawler_type
async def start(self):
playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info)
async with async_playwright() as playwright:
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(
chromium,
None,
self.user_agent,
headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url)
# Create a client to interact with the xiaohongshu website.
self.wb_client = await self.create_weibo_client(httpx_proxy_format)
if not await self.wb_client.pong():
login_obj = WeiboLogin(
login_type=self.login_type,
login_phone="", # your phone number
browser_context=self.browser_context,
context_page=self.context_page,
cookie_str=config.COOKIES
)
await login_obj.begin()
await self.wb_client.update_cookies(browser_context=self.browser_context)
crawler_type_var.set(self.crawler_type)
if self.crawler_type == "search":
# Search for video and retrieve their comment information.
await self.search()
elif self.crawler_type == "detail":
# Get the information and comments of the specified post
pass
else:
pass
utils.logger.info("[WeiboCrawler.start] Bilibili Crawler finished ...")
async def search(self):
"""
search weibo note with keywords
:return:
"""
utils.logger.info("[WeiboCrawler.search] Begin search weibo keywords")
weibo_limit_count = 10
for keyword in config.KEYWORDS.split(","):
utils.logger.info(f"[WeiboCrawler.search] Current search keyword: {keyword}")
page = 1
while page * weibo_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
search_res = await self.wb_client.get_note_by_keyword(
keyword=keyword,
page=page,
search_type=SearchType.DEFAULT
)
note_id_list: List[str] = []
note_list = filter_search_result_card(search_res.get("cards"))
for note_item in note_list:
if note_item :
mblog: Dict = note_item.get("mblog")
note_id_list.append(mblog.get("id"))
await weibo.update_weibo_note(note_item)
page += 1
async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient:
"""Create xhs client"""
utils.logger.info("[WeiboCrawler.create_weibo_client] Begin create weibo API client ...")
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
weibo_client_obj = WeiboClient(
proxies=httpx_proxy,
headers={
"User-Agent": self.user_agent,
"Cookie": cookie_str,
"Origin": "https://m.weibo.cn",
"Referer": "https://m.weibo.cn",
"Content-Type": "application/json;charset=UTF-8"
},
playwright_page=self.context_page,
cookie_dict=cookie_dict,
)
return weibo_client_obj
@staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]:
"""format proxy info for playwright and httpx"""
playwright_proxy = {
"server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
"username": ip_proxy_info.user,
"password": ip_proxy_info.password,
}
httpx_proxy = {
f"{ip_proxy_info.protocol}{ip_proxy_info.ip}": f"{ip_proxy_info.protocol}{ip_proxy_info.user}:{ip_proxy_info.password}@{ip_proxy_info.ip}:{ip_proxy_info.port}"
}
return playwright_proxy, httpx_proxy
async def launch_browser(
self,
chromium: BrowserType,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True
) -> BrowserContext:
"""Launch browser and create browser context"""
utils.logger.info("[WeiboCrawler.launch_browser] Begin create browser context ...")
if config.SAVE_LOGIN_STATE:
user_data_dir = os.path.join(os.getcwd(), "browser_data",
config.USER_DATA_DIR % self.platform) # type: ignore
browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir,
accept_downloads=True,
headless=headless,
proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
)
return browser_context

View File

@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 18:44
# @Desc :
from httpx import RequestError
class DataFetchError(RequestError):
"""something error when fetch"""
class IPBlockError(RequestError):
"""fetch so fast that the server block us ip"""

View File

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/23 15:41
# @Desc :
from enum import Enum
class SearchType(Enum):
# 综合
DEFAULT = "1"
# 实时
REAL_TIME = "61"
# 热门
POPULAR = "60"
# 视频
VIDEO = "64"

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/24 17:37
# @Desc :
from typing import List, Dict
def filter_search_result_card(card_list: List[Dict]) -> List[Dict]:
"""
过滤微博搜索的结果只保留card_type为9类型的数据
:param card_list:
:return:
"""
note_list: List[Dict] = []
for card_item in card_list:
if card_item.get("card_type") == 9:
note_list.append(card_item)
if len(card_item.get("card_group", [])) > 0:
card_group = card_item.get("card_group")
for card_group_item in card_group:
if card_group_item.get("card_type") == 9:
note_list.append(card_group_item)
return note_list

View File

@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/23 15:42
# @Desc : 微博登录实现
import asyncio
import functools
import sys
from typing import Optional
from playwright.async_api import BrowserContext, Page
from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt,
wait_fixed)
from base.base_crawler import AbstractLogin
from tools import utils
class WeiboLogin(AbstractLogin):
def __init__(self,
login_type: str,
browser_context: BrowserContext,
context_page: Page,
login_phone: Optional[str] = "",
cookie_str: str = ""
):
self.login_type = login_type
self.browser_context = browser_context
self.context_page = context_page
self.login_phone = login_phone
self.cookie_str = cookie_str
async def begin(self):
"""Start login weibo"""
utils.logger.info("[WeiboLogin.begin] Begin login Bilibili ...")
if self.login_type == "qrcode":
await self.login_by_qrcode()
elif self.login_type == "phone":
await self.login_by_mobile()
elif self.login_type == "cookie":
await self.login_by_cookies()
else:
raise ValueError(
"[WeiboLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...")
@retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
async def check_login_state(self) -> bool:
"""
Check if the current login status is successful and return True otherwise return False
retry decorator will retry 20 times if the return value is False, and the retry interval is 1 second
if max retry times reached, raise RetryError
"""
current_cookie = await self.browser_context.cookies()
_, cookie_dict = utils.convert_cookies(current_cookie)
if cookie_dict.get("SESSDATA", "") or cookie_dict.get("DedeUserID"):
return True
return False
async def login_by_qrcode(self):
"""login weibo website and keep webdriver login state"""
utils.logger.info("[WeiboLogin.login_by_qrcode] Begin login weibo by qrcode ...")
# click login button
login_button_ele = self.context_page.locator(
"xpath=//div[@class='right-entry__outside go-login-btn']//div"
)
await login_button_ele.click()
# find login qrcode
qrcode_img_selector = "//div[@class='login-scan-box']//img"
base64_qrcode_img = await utils.find_login_qrcode(
self.context_page,
selector=qrcode_img_selector
)
if not base64_qrcode_img:
utils.logger.info("[WeiboLogin.login_by_qrcode] login failed , have not found qrcode please check ....")
sys.exit()
# show login qrcode
partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img)
asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode)
utils.logger.info(f"[WeiboLogin.login_by_qrcode] Waiting for scan code login, remaining time is 20s")
try:
await self.check_login_state()
except RetryError:
utils.logger.info("[WeiboLogin.login_by_qrcode] Login weibo failed by qrcode login method ...")
sys.exit()
wait_redirect_seconds = 5
utils.logger.info(
f"[WeiboLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
await asyncio.sleep(wait_redirect_seconds)
async def login_by_mobile(self):
pass
async def login_by_cookies(self):
utils.logger.info("[WeiboLogin.login_by_qrcode] Begin login weibo by cookie ...")
for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items():
await self.browser_context.add_cookies([{
'name': key,
'value': value,
'domain': ".weibo.cn",
'path': "/"
}])

171
models/weibo.py Normal file
View File

@ -0,0 +1,171 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/23 21:53
# @Desc : 微博的模型类
import csv
import pathlib
from typing import Dict, List
from tortoise import fields
from tortoise.contrib.pydantic import pydantic_model_creator
from tortoise.models import Model
import config
from tools import utils
from var import crawler_type_var
class WeiboBaseModel(Model):
id = fields.IntField(pk=True, autoincrement=True, description="自增ID")
user_id = fields.CharField(null=True, max_length=64, description="用户ID")
nickname = fields.CharField(null=True, max_length=64, description="用户昵称")
avatar = fields.CharField(null=True, max_length=255, description="用户头像地址")
gender = fields.CharField(null=True, max_length=12, description="用户性别")
profile_url = fields.CharField(null=True, max_length=255, description="用户主页地址")
ip_location = fields.CharField(null=True, max_length=32, default="发布微博的地理信息")
add_ts = fields.BigIntField(description="记录添加时间戳")
last_modify_ts = fields.BigIntField(description="记录最后修改时间戳")
class Meta:
abstract = True
class WeiboNote(WeiboBaseModel):
note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
content = fields.TextField(null=True, description="帖子正文内容")
create_time = fields.BigIntField(description="帖子发布时间戳", index=True)
create_date_time = fields.BigIntField(description="帖子发布日期时间", index=True)
liked_count = fields.CharField(null=True, max_length=16, description="帖子点赞数")
comments_count = fields.CharField(null=True, max_length=16, description="帖子评论数量")
shared_count = fields.CharField(null=True, max_length=16, description="帖子转发数量")
note_url = fields.CharField(null=True, max_length=512, description="帖子详情URL")
class Meta:
table = "weibo_video"
table_description = "微博帖子"
def __str__(self):
return f"{self.note_id}"
class WeiboComment(WeiboBaseModel):
comment_id = fields.CharField(max_length=64, index=True, description="评论ID")
note_id = fields.CharField(max_length=64, index=True, description="帖子ID")
content = fields.TextField(null=True, description="评论内容")
create_time = fields.BigIntField(description="评论时间戳")
create_date_time = fields.BigIntField(description="评论日期时间", index=True)
comment_like_count = fields.CharField(max_length=16, description="评论点赞数量")
sub_comment_count = fields.CharField(max_length=16, description="评论回复数")
class Meta:
table = "weibo_note_comment"
table_description = "微博帖子评论"
def __str__(self):
return f"{self.comment_id}"
async def update_weibo_note(note_item: Dict):
mblog: Dict = note_item.get("mblog")
user_info: Dict = mblog.get("user")
note_id = mblog.get("id")
local_db_item = {
# 微博信息
"note_id": note_id,
"content": mblog.get("text"),
"create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")),
"create_date_time": utils.rfc2822_to_china_datetime(mblog.get("created_at")),
"liked_count": mblog.get("attitudes_count", 0),
"comments_count": mblog.get("comments_count", 0),
"shared_count": mblog.get("reposts_count", 0),
"last_modify_ts": utils.get_current_timestamp(),
"note_url": f"https://m.weibo.cn/detail/{note_id}",
"ip_location": mblog.get("region_name", "").replace("发布于 ", ""),
# 用户信息
"user_id": user_info.get("id"),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[models.weibo.update_weibo_video] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...")
if config.IS_SAVED_DATABASED:
if not await WeiboNote.filter(note_id=note_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',))
weibo_data = weibo_video_pydantic(**local_db_item)
weibo_video_pydantic.model_validate(weibo_data)
await WeiboNote.create(**weibo_data.model_dump())
else:
weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate',
exclude=('id', 'add_ts'))
weibo_data = weibo_video_pydantic(**local_db_item)
weibo_video_pydantic.model_validate(weibo_data)
await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump())
else:
# Below is a simple way to save it in CSV format.
pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/weibo/{crawler_type_var.get()}_notes_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def batch_update_weibo_video_comments(video_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_weibo_video_comment(video_id, comment_item)
async def update_weibo_video_comment(note_id: str, comment_item: Dict):
comment_id = str(comment_item.get("id"))
content: Dict = comment_item.get("text")
user_info: Dict = comment_item.get("member")
local_db_item = {
"comment_id": comment_id,
"create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")),
"create_date_time": utils.rfc2822_to_china_datetime(comment_item.get("created_at")),
"note_id": note_id,
"content": content.get("message"),
"sub_comment_count": str(comment_item.get("total_number", 0)),
"comment_like_count": str(comment_item.get("like_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
"ip_location": comment_item.get("source", "").replace("来自", ""),
# 用户信息
"user_id": user_info.get("id"),
"nickname": user_info.get("screen_name", ""),
"gender": user_info.get("gender", ""),
"profile_url": user_info.get("profile_url", ""),
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[models.weibo.update_weibo_video_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...")
if config.IS_SAVED_DATABASED:
if not await WeiboComment.filter(comment_id=comment_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await WeiboComment.create(**comment_data.dict())
else:
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data)
await WeiboComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True)
save_file_name = f"data/weibo/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv"
with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

View File

@ -4,6 +4,7 @@
# @Desc : 时间相关的工具函数
import time
from datetime import datetime, timezone, timedelta
def get_current_timestamp() -> int:
@ -68,4 +69,38 @@ def get_unix_time_from_time_str(time_str):
def get_unix_timestamp():
return int(time.time())
return int(time.time())
def rfc2822_to_china_datetime(rfc2822_time):
# 定义RFC 2822格式
rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
# 将RFC 2822时间字符串转换为datetime对象
dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
# 将datetime对象的时区转换为中国时区
dt_object_china = dt_object.astimezone(timezone(timedelta(hours=8)))
return dt_object_china
def rfc2822_to_timestamp(rfc2822_time):
# 定义RFC 2822格式
rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
# 将RFC 2822时间字符串转换为datetime对象
dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
# 将datetime对象转换为UTC时间
dt_utc = dt_object.replace(tzinfo=timezone.utc)
# 计算UTC时间对应的Unix时间戳
timestamp = int(dt_utc.timestamp())
return timestamp
if __name__ == '__main__':
# 示例用法
_rfc2822_time = "Sat Dec 23 17:12:54 +0800 2023"
print(rfc2822_to_china_datetime(_rfc2822_time))