From 894dabcf63565dd52e78330bd74f923cd7a8b577 Mon Sep 17 00:00:00 2001 From: Relakkes Date: Sun, 14 Jan 2024 22:06:31 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=95=B0=E6=8D=AE=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E9=87=8D=E6=9E=84=EF=BC=8C=E5=88=86=E7=A6=BB=E4=B8=8D?= =?UTF-8?q?=E5=90=8C=E7=B1=BB=E5=9E=8B=E7=9A=84=E5=AD=98=E5=82=A8=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- base/base_crawler.py | 10 + config/base_config.py | 8 +- config/db_config.py | 3 - db.py | 10 +- main.py | 2 +- media_platform/bilibili/core.py | 17 +- media_platform/bilibili/login.py | 2 +- media_platform/douyin/core.py | 8 +- media_platform/kuaishou/client.py | 16 +- media_platform/kuaishou/core.py | 8 +- media_platform/kuaishou/graphql.py | 2 +- .../kuaishou/graphql/vision_profile.graphql | 16 ++ media_platform/weibo/core.py | 32 +--- media_platform/xhs/core.py | 8 +- models/__init__.py | 5 - models/bilibili.py | 156 ---------------- models/douyin.py | 171 ------------------ models/kuaishou.py | 151 ---------------- models/weibo.py | 170 ----------------- models/xiaohongshu.py | 150 --------------- requirements.txt | 1 + store/__init__.py | 4 + store/bilibili/__init__.py | 81 +++++++++ store/bilibili/bilibili_store_db_types.py | 55 ++++++ store/bilibili/bilibili_store_impl.py | 129 +++++++++++++ store/douyin/__init__.py | 95 ++++++++++ store/douyin/douyin_store_db_types.py | 60 ++++++ store/douyin/douyin_store_impl.py | 130 +++++++++++++ store/kuaishou/__init__.py | 77 ++++++++ store/kuaishou/kuaishou_store_db_types.py | 55 ++++++ store/kuaishou/kuaishou_store_impl.py | 128 +++++++++++++ store/weibo/__init__.py | 88 +++++++++ store/weibo/weibo_store_db_types.py | 57 ++++++ store/weibo/weibo_store_impl.py | 128 +++++++++++++ store/xhs/__init__.py | 74 ++++++++ store/xhs/xhs_store_db_types.py | 57 ++++++ store/xhs/xhs_store_impl.py | 127 +++++++++++++ 37 files changed, 1427 insertions(+), 864 deletions(-) create mode 100644 media_platform/kuaishou/graphql/vision_profile.graphql delete mode 100644 models/__init__.py delete mode 100644 models/bilibili.py delete mode 100644 models/douyin.py delete mode 100644 models/kuaishou.py delete mode 100644 models/weibo.py delete mode 100644 models/xiaohongshu.py create mode 100644 store/__init__.py create mode 100644 store/bilibili/__init__.py create mode 100644 store/bilibili/bilibili_store_db_types.py create mode 100644 store/bilibili/bilibili_store_impl.py create mode 100644 store/douyin/__init__.py create mode 100644 store/douyin/douyin_store_db_types.py create mode 100644 store/douyin/douyin_store_impl.py create mode 100644 store/kuaishou/__init__.py create mode 100644 store/kuaishou/kuaishou_store_db_types.py create mode 100644 store/kuaishou/kuaishou_store_impl.py create mode 100644 store/weibo/__init__.py create mode 100644 store/weibo/weibo_store_db_types.py create mode 100644 store/weibo/weibo_store_impl.py create mode 100644 store/xhs/__init__.py create mode 100644 store/xhs/xhs_store_db_types.py create mode 100644 store/xhs/xhs_store_impl.py diff --git a/base/base_crawler.py b/base/base_crawler.py index 92d27c7..a7ab64b 100644 --- a/base/base_crawler.py +++ b/base/base_crawler.py @@ -39,3 +39,13 @@ class AbstractLogin(ABC): @abstractmethod async def login_by_cookies(self): pass + + +class AbstractStore(ABC): + @abstractmethod + async def store_content(self, content_item: Dict): + pass + + @abstractmethod + async def store_comment(self, comment_item: Dict): + pass diff --git a/config/base_config.py b/config/base_config.py index c72471f..ab5ec4e 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -20,6 +20,9 @@ HEADLESS = True # 是否保存登录状态 SAVE_LOGIN_STATE = True +# 数据保存类型选项配置,支持三种类型:csv、db、json +SAVE_DATA_OPTION = "csv" # csv or db or json + # 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name @@ -54,7 +57,10 @@ DY_SPECIFIED_ID_LIST = [ ] # 指定快手平台需要爬取的ID列表 -KS_SPECIFIED_ID_LIST = [] +KS_SPECIFIED_ID_LIST = [ + "3xf8enb8dbj6uig", + "3x6zz972bchmvqe" +] # 指定B站平台需要爬取的视频bvid列表 BILI_SPECIFIED_ID_LIST = [ diff --git a/config/db_config.py b/config/db_config.py index 270edd3..d3f1d14 100644 --- a/config/db_config.py +++ b/config/db_config.py @@ -7,6 +7,3 @@ REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456") # your redis password # mysql config RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db password RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler" - -# save data to database option -IS_SAVED_DATABASED = False # if you want to save data to database, set True diff --git a/db.py b/db.py index d25e436..8ec4d5d 100644 --- a/db.py +++ b/db.py @@ -1,14 +1,20 @@ +from typing import List + from tortoise import Tortoise, run_async from config.db_config import * from tools import utils +def get_platform_models() -> List[str]: + models = ["store.xhs", "store.douyin", "store.bilibili", "store.kuaishou", "store.weibo"] + return models + + async def init_db(create_db: bool = False) -> None: await Tortoise.init( db_url=RELATION_DB_URL, - modules={'models': ['models']}, - # modules={'models': ['models.kuaishou']}, # generate special table + modules={'models': get_platform_models()}, _create_db=create_db ) diff --git a/main.py b/main.py index a6b7e0e..7c6a82b 100644 --- a/main.py +++ b/main.py @@ -40,7 +40,7 @@ async def main(): choices=["search", "detail"], default=config.CRAWLER_TYPE) # init db - if config.IS_SAVED_DATABASED: + if config.SAVE_DATA_OPTION == "db": await db.init_db() args = parser.parse_args() diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index 730b464..0a7a0ab 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -6,19 +6,18 @@ import asyncio import os import random -import time from asyncio import Task -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple from playwright.async_api import (BrowserContext, BrowserType, Page, async_playwright) import config from base.base_crawler import AbstractCrawler -from models import bilibili from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool +from store import bilibili as bilibili_store from tools import utils -from var import comment_tasks_var, crawler_type_var +from var import crawler_type_var from .client import BilibiliClient from .exception import DataFetchError @@ -88,7 +87,6 @@ class BilibiliCrawler(AbstractCrawler): pass utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...") - async def search(self): """ search bilibili video with keywords @@ -118,7 +116,7 @@ class BilibiliCrawler(AbstractCrawler): for video_item in video_items: if video_item: video_id_list.append(video_item.get("View").get("aid")) - await bilibili.update_bilibili_video(video_item) + await bilibili_store.update_bilibili_video(video_item) page += 1 await self.batch_get_video_comments(video_id_list) @@ -150,7 +148,7 @@ class BilibiliCrawler(AbstractCrawler): await self.bili_client.get_video_all_comments( video_id=video_id, crawl_interval=random.random(), - callback=bilibili.batch_update_bilibili_video_comments + callback=bilibili_store.batch_update_bilibili_video_comments ) except DataFetchError as ex: @@ -176,7 +174,7 @@ class BilibiliCrawler(AbstractCrawler): video_aid: str = video_item_view.get("aid") if video_aid: video_aids_list.append(video_aid) - await bilibili.update_bilibili_video(video_detail) + await bilibili_store.update_bilibili_video(video_detail) await self.batch_get_video_comments(video_aids_list) async def get_video_info_task(self, aid: int, bvid: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: @@ -195,7 +193,8 @@ class BilibiliCrawler(AbstractCrawler): utils.logger.error(f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}") return None except KeyError as ex: - utils.logger.error(f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}") + utils.logger.error( + f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}") return None async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient: diff --git a/media_platform/bilibili/login.py b/media_platform/bilibili/login.py index 4f646da..ddf11bf 100644 --- a/media_platform/bilibili/login.py +++ b/media_platform/bilibili/login.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # @Author : relakkes@gmail.com # @Time : 2023/12/2 18:44 -# @Desc : bilibli登录类实现 +# @Desc : bilibli登录实现类 import asyncio import functools diff --git a/media_platform/douyin/core.py b/media_platform/douyin/core.py index 4e1ee56..5aa1c9f 100644 --- a/media_platform/douyin/core.py +++ b/media_platform/douyin/core.py @@ -8,8 +8,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler -from models import douyin from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool +from store import douyin as douyin_store from tools import utils from var import crawler_type_var @@ -99,7 +99,7 @@ class DouYinCrawler(AbstractCrawler): except TypeError: continue aweme_list.append(aweme_info.get("aweme_id", "")) - await douyin.update_douyin_aweme(aweme_item=aweme_info) + await douyin_store.update_douyin_aweme(aweme_item=aweme_info) utils.logger.info(f"[DouYinCrawler.search] keyword:{keyword}, aweme_list:{aweme_list}") await self.batch_get_note_comments(aweme_list) @@ -112,7 +112,7 @@ class DouYinCrawler(AbstractCrawler): aweme_details = await asyncio.gather(*task_list) for aweme_detail in aweme_details: if aweme_detail is not None: - await douyin.update_douyin_aweme(aweme_detail) + await douyin_store.update_douyin_aweme(aweme_detail) await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST) async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any: @@ -146,7 +146,7 @@ class DouYinCrawler(AbstractCrawler): keywords=config.COMMENT_KEYWORDS # 关键词列表 ) # 现在返回的 comments 已经是经过关键词筛选的 - await douyin.batch_update_dy_aweme_comments(aweme_id, comments) + await douyin_store.batch_update_dy_aweme_comments(aweme_id, comments) utils.logger.info(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} comments have all been obtained and filtered ...") except DataFetchError as e: utils.logger.error(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} get comments failed, error: {e}") diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 20b30fb..9693c70 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -10,7 +10,7 @@ from playwright.async_api import BrowserContext, Page import config from tools import utils -from .exception import DataFetchError, IPBlockError +from .exception import DataFetchError from .graphql import KuaiShouGraphQL @@ -56,13 +56,21 @@ class KuaiShouClient: return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, headers=self.headers) - @staticmethod - async def pong() -> bool: + async def pong(self) -> bool: """get a note to check if login state is ok""" utils.logger.info("[KuaiShouClient.pong] Begin pong kuaishou...") ping_flag = False try: - pass + post_data = { + "operationName": "visionProfileUserList", + "variables": { + "ftype": 1, + }, + "query": self.graphql.get("vision_profile") + } + res = await self.post("", post_data) + if res.get("visionProfileUserList", {}).get("result") == 1: + ping_flag = True except Exception as e: utils.logger.error(f"[KuaiShouClient.pong] Pong kuaishou failed: {e}, and try to login again...") ping_flag = False diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index 28585c3..7a007c2 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -10,8 +10,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler -from models import kuaishou from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool +from store import kuaishou as kuaishou_store from tools import utils from var import comment_tasks_var, crawler_type_var @@ -106,7 +106,7 @@ class KuaishouCrawler(AbstractCrawler): for video_detail in vision_search_photo.get("feeds"): video_id_list.append(video_detail.get("photo", {}).get("id")) - await kuaishou.update_kuaishou_video(video_item=video_detail) + await kuaishou_store.update_kuaishou_video(video_item=video_detail) # batch fetch video comments page += 1 @@ -121,7 +121,7 @@ class KuaishouCrawler(AbstractCrawler): video_details = await asyncio.gather(*task_list) for video_detail in video_details: if video_detail is not None: - await kuaishou.update_kuaishou_video(video_detail) + await kuaishou_store.update_kuaishou_video(video_detail) await self.batch_get_video_comments(config.KS_SPECIFIED_ID_LIST) async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: @@ -167,7 +167,7 @@ class KuaishouCrawler(AbstractCrawler): await self.ks_client.get_video_all_comments( photo_id=video_id, crawl_interval=random.random(), - callback=kuaishou.batch_update_ks_video_comments + callback=kuaishou_store.batch_update_ks_video_comments ) except DataFetchError as ex: utils.logger.error(f"[KuaishouCrawler.get_comments] get video_id: {video_id} comment error: {ex}") diff --git a/media_platform/kuaishou/graphql.py b/media_platform/kuaishou/graphql.py index 4b14baf..215b57f 100644 --- a/media_platform/kuaishou/graphql.py +++ b/media_platform/kuaishou/graphql.py @@ -11,7 +11,7 @@ class KuaiShouGraphQL: self.load_graphql_queries() def load_graphql_queries(self): - graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql"] + graphql_files = ["search_query.graphql", "video_detail.graphql", "comment_list.graphql", "vision_profile.graphql"] for file in graphql_files: with open(self.graphql_dir + file, mode="r") as f: diff --git a/media_platform/kuaishou/graphql/vision_profile.graphql b/media_platform/kuaishou/graphql/vision_profile.graphql new file mode 100644 index 0000000..148165a --- /dev/null +++ b/media_platform/kuaishou/graphql/vision_profile.graphql @@ -0,0 +1,16 @@ +query visionProfileUserList($pcursor: String, $ftype: Int) { + visionProfileUserList(pcursor: $pcursor, ftype: $ftype) { + result + fols { + user_name + headurl + user_text + isFollowing + user_id + __typename + } + hostName + pcursor + __typename + } +} diff --git a/media_platform/weibo/core.py b/media_platform/weibo/core.py index 44d630f..318ff68 100644 --- a/media_platform/weibo/core.py +++ b/media_platform/weibo/core.py @@ -15,8 +15,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler -from models import weibo from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool +from store import weibo as weibo_store from tools import utils from var import crawler_type_var @@ -120,7 +120,7 @@ class WeiboCrawler(AbstractCrawler): if note_item: mblog: Dict = note_item.get("mblog") note_id_list.append(mblog.get("id")) - await weibo.update_weibo_note(note_item) + await weibo_store.update_weibo_note(note_item) page += 1 await self.batch_get_notes_comments(note_id_list) @@ -138,7 +138,7 @@ class WeiboCrawler(AbstractCrawler): video_details = await asyncio.gather(*task_list) for note_item in video_details: if note_item: - await weibo.update_weibo_note(note_item) + await weibo_store.update_weibo_note(note_item) await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST) async def get_note_info_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: @@ -184,33 +184,11 @@ class WeiboCrawler(AbstractCrawler): async with semaphore: try: utils.logger.info(f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ...") - - # Read keyword and quantity from config - keywords = config.COMMENT_KEYWORDS - max_comments = config.MAX_COMMENTS_PER_POST - - # Download comments - all_comments = await self.wb_client.get_note_all_comments( + await self.wb_client.get_note_all_comments( note_id=note_id, crawl_interval=random.randint(1,10), # 微博对API的限流比较严重,所以延时提高一些 + callback=weibo_store.batch_update_weibo_note_comments ) - - # Filter comments by keyword - if keywords: - filtered_comments = [ - comment for comment in all_comments if - any(keyword in comment["content"]["message"] for keyword in keywords) - ] - else: - filtered_comments = all_comments - - # Limit the number of comments - if max_comments > 0: - filtered_comments = filtered_comments[:max_comments] - - # Update weibo note comments - await weibo.batch_update_weibo_note_comments(note_id, filtered_comments) - except DataFetchError as ex: utils.logger.error(f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}") except Exception as e: diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 56dd010..83366e8 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -9,8 +9,8 @@ from playwright.async_api import (BrowserContext, BrowserType, Page, import config from base.base_crawler import AbstractCrawler -from models import xiaohongshu as xhs_model from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool +from store import xhs as xhs_store from tools import utils from var import crawler_type_var @@ -112,7 +112,7 @@ class XiaoHongShuCrawler(AbstractCrawler): note_details = await asyncio.gather(*task_list) for note_detail in note_details: if note_detail is not None: - await xhs_model.update_xhs_note(note_detail) + await xhs_store.update_xhs_note(note_detail) note_id_list.append(note_detail.get("note_id")) page += 1 utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}") @@ -127,7 +127,7 @@ class XiaoHongShuCrawler(AbstractCrawler): note_details = await asyncio.gather(*task_list) for note_detail in note_details: if note_detail is not None: - await xhs_model.update_xhs_note(note_detail) + await xhs_store.update_xhs_note(note_detail) await self.batch_get_note_comments(config.XHS_SPECIFIED_ID_LIST) async def get_note_detail(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: @@ -174,7 +174,7 @@ class XiaoHongShuCrawler(AbstractCrawler): # 更新或保存过滤后的评论 for comment in filtered_comments: - await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment) + await xhs_store.update_xhs_note_comment(note_id=note_id, comment_item=comment) @staticmethod def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: diff --git a/models/__init__.py b/models/__init__.py deleted file mode 100644 index d4c05a3..0000000 --- a/models/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .bilibili import * -from .douyin import * -from .kuaishou import * -from .weibo import * -from .xiaohongshu import * diff --git a/models/bilibili.py b/models/bilibili.py deleted file mode 100644 index 63aff57..0000000 --- a/models/bilibili.py +++ /dev/null @@ -1,156 +0,0 @@ -# -*- coding: utf-8 -*- -# @Author : relakkes@gmail.com -# @Time : 2023/12/3 16:16 -# @Desc : B 站的模型类 -import csv -import pathlib -from typing import Dict, List - -from tortoise import fields -from tortoise.contrib.pydantic import pydantic_model_creator -from tortoise.models import Model - -import config -from tools import utils -from var import crawler_type_var - - -class BilibiliBaseModel(Model): - id = fields.IntField(pk=True, autoincrement=True, description="自增ID") - user_id = fields.CharField(null=True, max_length=64, description="用户ID") - nickname = fields.CharField(null=True, max_length=64, description="用户昵称") - avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") - add_ts = fields.BigIntField(description="记录添加时间戳") - last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") - - class Meta: - abstract = True - - -class BilibiliVideo(BilibiliBaseModel): - video_id = fields.CharField(max_length=64, index=True, description="视频ID") - video_type = fields.CharField(max_length=16, description="视频类型") - title = fields.CharField(null=True, max_length=500, description="视频标题") - desc = fields.TextField(null=True, description="视频描述") - create_time = fields.BigIntField(description="视频发布时间戳", index=True) - liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") - video_play_count = fields.CharField(null=True, max_length=16, description="视频播放数量") - video_danmaku = fields.CharField(null=True, max_length=16, description="视频弹幕数量") - video_comment = fields.CharField(null=True, max_length=16, description="视频评论数量") - video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") - video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") - - class Meta: - table = "bilibili_video" - table_description = "B站视频" - - def __str__(self): - return f"{self.video_id} - {self.title}" - - -class BilibiliComment(BilibiliBaseModel): - comment_id = fields.CharField(max_length=64, index=True, description="评论ID") - video_id = fields.CharField(max_length=64, index=True, description="视频ID") - content = fields.TextField(null=True, description="评论内容") - create_time = fields.BigIntField(description="评论时间戳") - sub_comment_count = fields.CharField(max_length=16, description="评论回复数") - - class Meta: - table = "bilibili_video_comment" - table_description = "B 站视频评论" - - def __str__(self): - return f"{self.comment_id} - {self.content}" - - -async def update_bilibili_video(video_item: Dict): - video_item_view: Dict = video_item.get("View") - video_user_info: Dict = video_item_view.get("owner") - video_item_stat: Dict = video_item_view.get("stat") - video_id = str(video_item_view.get("aid")) - local_db_item = { - "video_id": video_id, - "video_type": "video", - "title": video_item_view.get("title", "")[:500], - "desc": video_item_view.get("desc", "")[:500], - "create_time": video_item_view.get("pubdate"), - "user_id": str(video_user_info.get("mid")), - "nickname": video_user_info.get("name"), - "avatar": video_user_info.get("face", ""), - "liked_count": str(video_item_stat.get("like", "")), - "video_play_count": str(video_item_stat.get("view", "")), - "video_danmaku": str(video_item_stat.get("danmaku", "")), - "video_comment": str(video_item_stat.get("reply", "")), - "last_modify_ts": utils.get_current_timestamp(), - "video_url": f"https://www.bilibili.com/video/av{video_id}", - "video_cover_url": video_item_view.get("pic", ""), - } - utils.logger.info(f"[models.bilibili.update_bilibili_video] bilibili video id:{video_id}, title:{local_db_item.get('title')}") - if config.IS_SAVED_DATABASED: - if not await BilibiliVideo.filter(video_id=video_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoCreate', exclude=('id',)) - bilibili_data = bilibili_video_pydantic(**local_db_item) - bilibili_video_pydantic.model_validate(bilibili_data) - await BilibiliVideo.create(**bilibili_data.model_dump()) - else: - bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoUpdate', - exclude=('id', 'add_ts')) - bilibili_data = bilibili_video_pydantic(**local_db_item) - bilibili_video_pydantic.model_validate(bilibili_data) - await BilibiliVideo.filter(video_id=video_id).update(**bilibili_data.model_dump()) - else: - # Below is a simple way to save it in CSV format. - pathlib.Path(f"data/bilibili").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/bilibili/{crawler_type_var.get()}_videos_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) - - -async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dict]): - if not comments: - return - for comment_item in comments: - await update_bilibili_video_comment(video_id, comment_item) - -async def update_bilibili_video_comment(video_id: str, comment_item: Dict): - comment_id = str(comment_item.get("rpid")) - content: Dict = comment_item.get("content") - user_info: Dict = comment_item.get("member") - local_db_item = { - "comment_id": comment_id, - "create_time": comment_item.get("ctime"), - "video_id": str(video_id), - "content": content.get("message"), - "user_id": user_info.get("mid"), - "nickname": user_info.get("uname"), - "avatar": user_info.get("avatar"), - "sub_comment_count": str(comment_item.get("rcount", 0)), - "last_modify_ts": utils.get_current_timestamp(), - } - utils.logger.info(f"[models.bilibili.update_bilibili_video_comment] Bilibili video comment: {comment_id}, content: {local_db_item.get('content')}") - if config.IS_SAVED_DATABASED: - if not await BilibiliComment.filter(comment_id=comment_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await BilibiliComment.create(**comment_data.dict()) - else: - comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await BilibiliComment.filter(comment_id=comment_id).update(**comment_data.dict()) - else: - pathlib.Path(f"data/bilibili").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/bilibili/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) diff --git a/models/douyin.py b/models/douyin.py deleted file mode 100644 index a65d7a1..0000000 --- a/models/douyin.py +++ /dev/null @@ -1,171 +0,0 @@ -import csv -import pathlib -from typing import Dict, List - -from tortoise import fields -from tortoise.contrib.pydantic import pydantic_model_creator -from tortoise.models import Model - -import config -from tools import utils -from var import crawler_type_var - - -class DouyinBaseModel(Model): - id = fields.IntField(pk=True, autoincrement=True, description="自增ID") - user_id = fields.CharField(null=True, max_length=64, description="用户ID") - sec_uid = fields.CharField(null=True, max_length=128, description="用户sec_uid") - short_user_id = fields.CharField(null=True, max_length=64, description="用户短ID") - user_unique_id = fields.CharField(null=True, max_length=64, description="用户唯一ID") - nickname = fields.CharField(null=True, max_length=64, description="用户昵称") - avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") - user_signature = fields.CharField(null=True, max_length=500, description="用户签名") - ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址") - add_ts = fields.BigIntField(description="记录添加时间戳") - last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") - - class Meta: - abstract = True - - -class DouyinAweme(DouyinBaseModel): - aweme_id = fields.CharField(max_length=64, index=True, description="视频ID") - aweme_type = fields.CharField(max_length=16, description="视频类型") - title = fields.CharField(null=True, max_length=500, description="视频标题") - desc = fields.TextField(null=True, description="视频描述") - create_time = fields.BigIntField(description="视频发布时间戳", index=True) - liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") - comment_count = fields.CharField(null=True, max_length=16, description="视频评论数") - share_count = fields.CharField(null=True, max_length=16, description="视频分享数") - collected_count = fields.CharField(null=True, max_length=16, description="视频收藏数") - aweme_url = fields.CharField(null=True, max_length=255, description="视频详情页URL") - - class Meta: - table = "douyin_aweme" - table_description = "抖音视频" - - def __str__(self): - return f"{self.aweme_id} - {self.title}" - - -class DouyinAwemeComment(DouyinBaseModel): - comment_id = fields.CharField(max_length=64, index=True, description="评论ID") - aweme_id = fields.CharField(max_length=64, index=True, description="视频ID") - content = fields.TextField(null=True, description="评论内容") - create_time = fields.BigIntField(description="评论时间戳") - sub_comment_count = fields.CharField(max_length=16, description="评论回复数") - - class Meta: - table = "douyin_aweme_comment" - table_description = "抖音视频评论" - - def __str__(self): - return f"{self.comment_id} - {self.content}" - - -async def update_douyin_aweme(aweme_item: Dict): - aweme_id = aweme_item.get("aweme_id") - user_info = aweme_item.get("author", {}) - interact_info = aweme_item.get("statistics", {}) - local_db_item = { - "aweme_id": aweme_id, - "aweme_type": str(aweme_item.get("aweme_type")), - "title": aweme_item.get("desc", ""), - "desc": aweme_item.get("desc", ""), - "create_time": aweme_item.get("create_time"), - "user_id": user_info.get("uid"), - "sec_uid": user_info.get("sec_uid"), - "short_user_id": user_info.get("short_id"), - "user_unique_id": user_info.get("unique_id"), - "user_signature": user_info.get("signature"), - "nickname": user_info.get("nickname"), - "avatar": user_info.get("avatar_thumb", {}).get("url_list", [""])[0], - "liked_count": str(interact_info.get("digg_count")), - "collected_count": str(interact_info.get("collect_count")), - "comment_count": str(interact_info.get("comment_count")), - "share_count": str(interact_info.get("share_count")), - "ip_location": aweme_item.get("ip_label", ""), - "last_modify_ts": utils.get_current_timestamp(), - "aweme_url": f"https://www.douyin.com/video/{aweme_id}" - } - utils.logger.info(f"[models.douyin.update_douyin_aweme] douyin aweme id:{aweme_id}, title:{local_db_item.get('title')}") - if config.IS_SAVED_DATABASED: - if not await DouyinAweme.filter(aweme_id=aweme_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeCreate', exclude=('id',)) - douyin_data = douyin_aweme_pydantic(**local_db_item) - douyin_aweme_pydantic.validate(douyin_data) - await DouyinAweme.create(**douyin_data.dict()) - else: - douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeUpdate', - exclude=('id', 'add_ts')) - douyin_data = douyin_aweme_pydantic(**local_db_item) - douyin_aweme_pydantic.validate(douyin_data) - await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.dict()) - else: - # Below is a simple way to save it in CSV format. - pathlib.Path(f"data/dy").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/dy/{crawler_type_var.get()}_awemes_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) - - -async def batch_update_dy_aweme_comments(aweme_id: str, comments: List[Dict]): - if not comments: - return - for comment_item in comments: - await update_dy_aweme_comment(aweme_id, comment_item) - - -async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict): - comment_aweme_id = comment_item.get("aweme_id") - if aweme_id != comment_aweme_id: - utils.logger.error(f"[models.douyin.update_dy_aweme_comment] comment_aweme_id: {comment_aweme_id} != aweme_id: {aweme_id}") - return - user_info = comment_item.get("user", {}) - comment_id = comment_item.get("cid") - avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get( - "avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {} - local_db_item = { - "comment_id": comment_id, - "create_time": comment_item.get("create_time"), - "ip_location": comment_item.get("ip_label", ""), - "aweme_id": aweme_id, - "content": comment_item.get("text"), - "user_id": user_info.get("uid"), - "sec_uid": user_info.get("sec_uid"), - "short_user_id": user_info.get("short_id"), - "user_unique_id": user_info.get("unique_id"), - "user_signature": user_info.get("signature"), - "nickname": user_info.get("nickname"), - "avatar": avatar_info.get("url_list", [""])[0], - "sub_comment_count": str(comment_item.get("reply_comment_total", 0)), - "last_modify_ts": utils.get_current_timestamp(), - } - utils.logger.info(f"[models.douyin.update_dy_aweme_comment] douyin aweme comment: {comment_id}, content: {local_db_item.get('content')}") - if config.IS_SAVED_DATABASED: - if not await DouyinAwemeComment.filter(comment_id=comment_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await DouyinAwemeComment.create(**comment_data.dict()) - else: - comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await DouyinAwemeComment.filter(comment_id=comment_id).update(**comment_data.dict()) - else: - - pathlib.Path(f"data/dy").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/dy/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) diff --git a/models/kuaishou.py b/models/kuaishou.py deleted file mode 100644 index 823d670..0000000 --- a/models/kuaishou.py +++ /dev/null @@ -1,151 +0,0 @@ -import csv -import pathlib -from typing import Dict, List - -from tortoise import fields -from tortoise.contrib.pydantic import pydantic_model_creator -from tortoise.models import Model - -import config -from tools import utils -from var import crawler_type_var - - -class KuaishouBaseModel(Model): - id = fields.IntField(pk=True, autoincrement=True, description="自增ID") - user_id = fields.CharField(null=True, max_length=64, description="用户ID") - nickname = fields.CharField(null=True, max_length=64, description="用户昵称") - avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") - add_ts = fields.BigIntField(description="记录添加时间戳") - last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") - - class Meta: - abstract = True - - -class KuaishouVideo(KuaishouBaseModel): - video_id = fields.CharField(max_length=64, index=True, description="视频ID") - video_type = fields.CharField(max_length=16, description="视频类型") - title = fields.CharField(null=True, max_length=500, description="视频标题") - desc = fields.TextField(null=True, description="视频描述") - create_time = fields.BigIntField(description="视频发布时间戳", index=True) - liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") - viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") - video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") - video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") - video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") - - class Meta: - table = "kuaishou_video" - table_description = "快手视频" - - def __str__(self): - return f"{self.video_id} - {self.title}" - - -class KuaishouVideoComment(KuaishouBaseModel): - comment_id = fields.CharField(max_length=64, index=True, description="评论ID") - video_id = fields.CharField(max_length=64, index=True, description="视频ID") - content = fields.TextField(null=True, description="评论内容") - create_time = fields.BigIntField(description="评论时间戳") - sub_comment_count = fields.CharField(max_length=16, description="评论回复数") - - class Meta: - table = "kuaishou_video_comment" - table_description = "快手视频评论" - - def __str__(self): - return f"{self.comment_id} - {self.content}" - - -async def update_kuaishou_video(video_item: Dict): - photo_info: Dict = video_item.get("photo", {}) - video_id = photo_info.get("id") - if not video_id: - return - user_info = video_item.get("author", {}) - local_db_item = { - "video_id": video_id, - "video_type": str(video_item.get("type")), - "title": photo_info.get("caption", "")[:500], - "desc": photo_info.get("caption", "")[:500], - "create_time": photo_info.get("timestamp"), - "user_id": user_info.get("id"), - "nickname": user_info.get("name"), - "avatar": user_info.get("headerUrl", ""), - "liked_count": str(photo_info.get("realLikeCount")), - "viewd_count": str(photo_info.get("viewCount")), - "last_modify_ts": utils.get_current_timestamp(), - "video_url": f"https://www.kuaishou.com/short-video/{video_id}", - "video_cover_url": photo_info.get("coverUrl", ""), - "video_play_url": photo_info.get("photoUrl", ""), - } - utils.logger.info(f"[models.kuaishou.update_kuaishou_video] Kuaishou video id:{video_id}, title:{local_db_item.get('title')}") - if config.IS_SAVED_DATABASED: - if not await KuaishouVideo.filter(video_id=video_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',)) - kuaishou_data = kuaishou_video_pydantic(**local_db_item) - kuaishou_video_pydantic.model_validate(kuaishou_data) - await KuaishouVideo.create(**kuaishou_data.model_dump()) - else: - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate', - exclude=('id', 'add_ts')) - kuaishou_data = kuaishou_video_pydantic(**local_db_item) - kuaishou_video_pydantic.model_validate(kuaishou_data) - await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump()) - else: - # Below is a simple way to save it in CSV format. - pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/kuaishou/{crawler_type_var.get()}_videos_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) - - -async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]): - utils.logger.info(f"[KuaishouVideoComment.batch_update_ks_video_comments] video_id:{video_id}, comments:{comments}") - if not comments: - return - for comment_item in comments: - await update_ks_video_comment(video_id, comment_item) - - -async def update_ks_video_comment(video_id: str, comment_item: Dict): - comment_id = comment_item.get("commentId") - local_db_item = { - "comment_id": comment_id, - "create_time": comment_item.get("timestamp"), - "video_id": video_id, - "content": comment_item.get("content"), - "user_id": comment_item.get("authorId"), - "nickname": comment_item.get("authorName"), - "avatar": comment_item.get("headurl"), - "sub_comment_count": str(comment_item.get("subCommentCount", 0)), - "last_modify_ts": utils.get_current_timestamp(), - } - utils.logger.info(f"[models.kuaishou.update_ks_video_comment] Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}") - if config.IS_SAVED_DATABASED: - if not await KuaishouVideoComment.filter(comment_id=comment_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await KuaishouVideoComment.create(**comment_data.dict()) - else: - comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.dict()) - else: - pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/kuaishou/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) diff --git a/models/weibo.py b/models/weibo.py deleted file mode 100644 index 6b903a7..0000000 --- a/models/weibo.py +++ /dev/null @@ -1,170 +0,0 @@ -# -*- coding: utf-8 -*- -# @Author : relakkes@gmail.com -# @Time : 2023/12/23 21:53 -# @Desc : 微博的模型类 - -import csv -import pathlib -from typing import Dict, List - -from tortoise import fields -from tortoise.contrib.pydantic import pydantic_model_creator -from tortoise.models import Model - -import config -from tools import utils -from var import crawler_type_var - - -class WeiboBaseModel(Model): - id = fields.IntField(pk=True, autoincrement=True, description="自增ID") - user_id = fields.CharField(null=True, max_length=64, description="用户ID") - nickname = fields.CharField(null=True, max_length=64, description="用户昵称") - avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") - gender = fields.CharField(null=True, max_length=12, description="用户性别") - profile_url = fields.CharField(null=True, max_length=255, description="用户主页地址") - ip_location = fields.CharField(null=True, max_length=32, default="发布微博的地理信息") - add_ts = fields.BigIntField(description="记录添加时间戳") - last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") - - class Meta: - abstract = True - - -class WeiboNote(WeiboBaseModel): - note_id = fields.CharField(max_length=64, index=True, description="帖子ID") - content = fields.TextField(null=True, description="帖子正文内容") - create_time = fields.BigIntField(description="帖子发布时间戳", index=True) - create_date_time = fields.CharField(description="帖子发布日期时间",max_length=32, index=True) - liked_count = fields.CharField(null=True, max_length=16, description="帖子点赞数") - comments_count = fields.CharField(null=True, max_length=16, description="帖子评论数量") - shared_count = fields.CharField(null=True, max_length=16, description="帖子转发数量") - note_url = fields.CharField(null=True, max_length=512, description="帖子详情URL") - - class Meta: - table = "weibo_note" - table_description = "微博帖子" - - def __str__(self): - return f"{self.note_id}" - - -class WeiboComment(WeiboBaseModel): - comment_id = fields.CharField(max_length=64, index=True, description="评论ID") - note_id = fields.CharField(max_length=64, index=True, description="帖子ID") - content = fields.TextField(null=True, description="评论内容") - create_time = fields.BigIntField(description="评论时间戳") - create_date_time = fields.CharField(description="评论日期时间", max_length=32, index=True) - comment_like_count = fields.CharField(max_length=16, description="评论点赞数量") - sub_comment_count = fields.CharField(max_length=16, description="评论回复数") - - class Meta: - table = "weibo_note_comment" - table_description = "微博帖子评论" - - def __str__(self): - return f"{self.comment_id}" - - -async def update_weibo_note(note_item: Dict): - mblog: Dict = note_item.get("mblog") - user_info: Dict = mblog.get("user") - note_id = mblog.get("id") - local_db_item = { - # 微博信息 - "note_id": note_id, - "content": mblog.get("text"), - "create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")), - "create_date_time": str(utils.rfc2822_to_china_datetime(mblog.get("created_at"))), - "liked_count": str(mblog.get("attitudes_count", 0)), - "comments_count": str(mblog.get("comments_count", 0)), - "shared_count": str(mblog.get("reposts_count", 0)), - "last_modify_ts": utils.get_current_timestamp(), - "note_url": f"https://m.weibo.cn/detail/{note_id}", - "ip_location": mblog.get("region_name", "").replace("发布于 ", ""), - - # 用户信息 - "user_id": str(user_info.get("id")), - "nickname": user_info.get("screen_name", ""), - "gender": user_info.get("gender", ""), - "profile_url": user_info.get("profile_url", ""), - "avatar": user_info.get("profile_image_url", ""), - } - utils.logger.info( - f"[models.weibo.update_weibo_note] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...") - if config.IS_SAVED_DATABASED: - if not await WeiboNote.filter(note_id=note_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',)) - weibo_data = weibo_note_pydantic(**local_db_item) - weibo_note_pydantic.model_validate(weibo_data) - await WeiboNote.create(**weibo_data.model_dump()) - else: - weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate', - exclude=('id', 'add_ts')) - weibo_data = weibo_note_pydantic(**local_db_item) - weibo_note_pydantic.model_validate(weibo_data) - await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump()) - else: - # Below is a simple way to save it in CSV format. - pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/weibo/{crawler_type_var.get()}_notes_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) - - -async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]): - if not comments: - return - for comment_item in comments: - await update_weibo_note_comment(note_id, comment_item) - - -async def update_weibo_note_comment(note_id: str, comment_item: Dict): - comment_id = str(comment_item.get("id")) - user_info: Dict = comment_item.get("user") - local_db_item = { - "comment_id": comment_id, - "create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")), - "create_date_time": str(utils.rfc2822_to_china_datetime(comment_item.get("created_at"))), - "note_id": note_id, - "content": comment_item.get("text"), - "sub_comment_count": str(comment_item.get("total_number", 0)), - "comment_like_count": str(comment_item.get("like_count", 0)), - "last_modify_ts": utils.get_current_timestamp(), - "ip_location": comment_item.get("source", "").replace("来自", ""), - - # 用户信息 - "user_id": str(user_info.get("id")), - "nickname": user_info.get("screen_name", ""), - "gender": user_info.get("gender", ""), - "profile_url": user_info.get("profile_url", ""), - "avatar": user_info.get("profile_image_url", ""), - } - utils.logger.info( - f"[models.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...") - if config.IS_SAVED_DATABASED: - if not await WeiboComment.filter(comment_id=comment_id).exists(): - local_db_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await WeiboComment.create(**comment_data.dict()) - else: - comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await WeiboComment.filter(comment_id=comment_id).update(**comment_data.dict()) - else: - pathlib.Path(f"data/weibo").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/weibo/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) diff --git a/models/xiaohongshu.py b/models/xiaohongshu.py deleted file mode 100644 index 4098cad..0000000 --- a/models/xiaohongshu.py +++ /dev/null @@ -1,150 +0,0 @@ -import csv -import pathlib -from typing import Dict, List - -from tortoise import fields -from tortoise.contrib.pydantic import pydantic_model_creator -from tortoise.models import Model - -import config -from tools import utils -from var import crawler_type_var - - -class XhsBaseModel(Model): - id = fields.IntField(pk=True, autoincrement=True, description="自增ID") - user_id = fields.CharField(max_length=64, description="用户ID") - nickname = fields.CharField(null=True, max_length=64, description="用户昵称") - avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") - ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址") - add_ts = fields.BigIntField(description="记录添加时间戳") - last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") - - class Meta: - abstract = True - - -class XHSNote(XhsBaseModel): - note_id = fields.CharField(max_length=64, index=True, description="笔记ID") - type = fields.CharField(null=True, max_length=16, description="笔记类型(normal | video)") - title = fields.CharField(null=True, max_length=255, description="笔记标题") - desc = fields.TextField(null=True, description="笔记描述") - time = fields.BigIntField(description="笔记发布时间戳", index=True) - last_update_time = fields.BigIntField(description="笔记最后更新时间戳") - liked_count = fields.CharField(null=True, max_length=16, description="笔记点赞数") - collected_count = fields.CharField(null=True, max_length=16, description="笔记收藏数") - comment_count = fields.CharField(null=True, max_length=16, description="笔记评论数") - share_count = fields.CharField(null=True, max_length=16, description="笔记分享数") - image_list = fields.TextField(null=True, description="笔记封面图片列表") - note_url = fields.CharField(null=True, max_length=255, description="笔记详情页的URL") - - class Meta: - table = "xhs_note" - table_description = "小红书笔记" - - def __str__(self): - return f"{self.note_id} - {self.title}" - - -class XHSNoteComment(XhsBaseModel): - comment_id = fields.CharField(max_length=64, index=True, description="评论ID") - create_time = fields.BigIntField(index=True, description="评论时间戳") - note_id = fields.CharField(max_length=64, description="笔记ID") - content = fields.TextField(description="评论内容") - sub_comment_count = fields.IntField(description="子评论数量") - - class Meta: - table = "xhs_note_comment" - table_description = "小红书笔记评论" - - def __str__(self): - return f"{self.comment_id} - {self.content}" - - -async def update_xhs_note(note_item: Dict): - note_id = note_item.get("note_id") - user_info = note_item.get("user", {}) - interact_info = note_item.get("interact_info", {}) - image_list: List[Dict] = note_item.get("image_list", []) - - local_db_item = { - "note_id": note_item.get("note_id"), - "type": note_item.get("type"), - "title": note_item.get("title") or note_item.get("desc", "")[:255], - "desc": note_item.get("desc", ""), - "time": note_item.get("time"), - "last_update_time": note_item.get("last_update_time", 0), - "user_id": user_info.get("user_id"), - "nickname": user_info.get("nickname"), - "avatar": user_info.get("avatar"), - "liked_count": interact_info.get("liked_count"), - "collected_count": interact_info.get("collected_count"), - "comment_count": interact_info.get("comment_count"), - "share_count": interact_info.get("share_count"), - "ip_location": note_item.get("ip_location", ""), - "image_list": ','.join([img.get('url', '') for img in image_list]), - "last_modify_ts": utils.get_current_timestamp(), - "note_url": f"https://www.xiaohongshu.com/explore/{note_id}" - } - utils.logger.info(f"[models.xiaohongshu.update_xhs_note] xhs note: {local_db_item}") - if config.IS_SAVED_DATABASED: - if not await XHSNote.filter(note_id=note_id).first(): - local_db_item["add_ts"] = utils.get_current_timestamp() - note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticCreate", exclude=('id',)) - note_data = note_pydantic(**local_db_item) - note_pydantic.validate(note_data) - await XHSNote.create(**note_data.dict()) - else: - note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticUpdate", exclude=('id', 'add_ts')) - note_data = note_pydantic(**local_db_item) - note_pydantic.validate(note_data) - await XHSNote.filter(note_id=note_id).update(**note_data.dict()) - else: - # Below is a simple way to save it in CSV format. - pathlib.Path(f"data/xhs").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/xhs/{crawler_type_var.get()}_notes_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) - - -async def update_xhs_note_comment(note_id: str, comment_item: Dict): - user_info = comment_item.get("user_info", {}) - comment_id = comment_item.get("id") - local_db_item = { - "comment_id": comment_id, - "create_time": comment_item.get("create_time"), - "ip_location": comment_item.get("ip_location"), - "note_id": note_id, - "content": comment_item.get("content"), - "user_id": user_info.get("user_id"), - "nickname": user_info.get("nickname"), - "avatar": user_info.get("image"), - "sub_comment_count": comment_item.get("sub_comment_count"), - "last_modify_ts": utils.get_current_timestamp(), - } - utils.logger.info(f"[models.xiaohongshu.update_xhs_note_comment] xhs note comment:{local_db_item}") - if config.IS_SAVED_DATABASED: - if not await XHSNoteComment.filter(comment_id=comment_id).first(): - local_db_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticCreate", exclude=('id',)) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await XHSNoteComment.create(**comment_data.dict()) - else: - comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticUpdate", - exclude=('id', 'add_ts',)) - comment_data = comment_pydantic(**local_db_item) - comment_pydantic.validate(comment_data) - await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.dict()) - else: - # Below is a simple way to save it in CSV format. - pathlib.Path(f"data/xhs").mkdir(parents=True, exist_ok=True) - save_file_name = f"data/xhs/{crawler_type_var.get()}_comment_{utils.get_current_date()}.csv" - with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: - writer = csv.writer(f) - if f.tell() == 0: - writer.writerow(local_db_item.keys()) - writer.writerow(local_db_item.values()) diff --git a/requirements.txt b/requirements.txt index 46712e0..f53f350 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ aerich==0.7.2 numpy~=1.24.4 redis~=4.6.0 pydantic==2.5.2 +aiofiles~=23.2.1 \ No newline at end of file diff --git a/store/__init__.py b/store/__init__.py new file mode 100644 index 0000000..2375b3d --- /dev/null +++ b/store/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 17:29 +# @Desc : diff --git a/store/bilibili/__init__.py b/store/bilibili/__init__.py new file mode 100644 index 0000000..a600ecb --- /dev/null +++ b/store/bilibili/__init__.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 19:34 +# @Desc : + +from typing import List + +import config + +from .bilibili_store_db_types import * +from .bilibili_store_impl import * + + +class BiliStoreFactory: + STORES = { + "csv": BiliCsvStoreImplement, + "db": BiliDbStoreImplement, + "json": BiliJsonStoreImplement + } + + @staticmethod + def create_store() -> AbstractStore: + store_class = BiliStoreFactory.STORES.get(config.SAVE_DATA_OPTION) + if not store_class: + raise ValueError( + "[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json ...") + return store_class() + + +async def update_bilibili_video(video_item: Dict): + video_item_view: Dict = video_item.get("View") + video_user_info: Dict = video_item_view.get("owner") + video_item_stat: Dict = video_item_view.get("stat") + video_id = str(video_item_view.get("aid")) + save_content_item = { + "video_id": video_id, + "video_type": "video", + "title": video_item_view.get("title", "")[:500], + "desc": video_item_view.get("desc", "")[:500], + "create_time": video_item_view.get("pubdate"), + "user_id": str(video_user_info.get("mid")), + "nickname": video_user_info.get("name"), + "avatar": video_user_info.get("face", ""), + "liked_count": str(video_item_stat.get("like", "")), + "video_play_count": str(video_item_stat.get("view", "")), + "video_danmaku": str(video_item_stat.get("danmaku", "")), + "video_comment": str(video_item_stat.get("reply", "")), + "last_modify_ts": utils.get_current_timestamp(), + "video_url": f"https://www.bilibili.com/video/av{video_id}", + "video_cover_url": video_item_view.get("pic", ""), + } + utils.logger.info( + f"[store.bilibili.update_bilibili_video] bilibili video id:{video_id}, title:{save_content_item.get('title')}") + await BiliStoreFactory.create_store().store_content(content_item=save_content_item) + + +async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_bilibili_video_comment(video_id, comment_item) + + +async def update_bilibili_video_comment(video_id: str, comment_item: Dict): + comment_id = str(comment_item.get("rpid")) + content: Dict = comment_item.get("content") + user_info: Dict = comment_item.get("member") + save_comment_item = { + "comment_id": comment_id, + "create_time": comment_item.get("ctime"), + "video_id": str(video_id), + "content": content.get("message"), + "user_id": user_info.get("mid"), + "nickname": user_info.get("uname"), + "avatar": user_info.get("avatar"), + "sub_comment_count": str(comment_item.get("rcount", 0)), + "last_modify_ts": utils.get_current_timestamp(), + } + utils.logger.info( + f"[store.bilibili.update_bilibili_video_comment] Bilibili video comment: {comment_id}, content: {save_comment_item.get('content')}") + await BiliStoreFactory.create_store().store_comment(comment_item=save_comment_item) diff --git a/store/bilibili/bilibili_store_db_types.py b/store/bilibili/bilibili_store_db_types.py new file mode 100644 index 0000000..f3bc117 --- /dev/null +++ b/store/bilibili/bilibili_store_db_types.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 19:34 +# @Desc : B站存储到DB的模型类集合 + +from tortoise import fields +from tortoise.models import Model + + +class BilibiliBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class BilibiliVideo(BilibiliBaseModel): + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + video_type = fields.CharField(max_length=16, description="视频类型") + title = fields.CharField(null=True, max_length=500, description="视频标题") + desc = fields.TextField(null=True, description="视频描述") + create_time = fields.BigIntField(description="视频发布时间戳", index=True) + liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") + video_play_count = fields.CharField(null=True, max_length=16, description="视频播放数量") + video_danmaku = fields.CharField(null=True, max_length=16, description="视频弹幕数量") + video_comment = fields.CharField(null=True, max_length=16, description="视频评论数量") + video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") + video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") + + class Meta: + table = "bilibili_video" + table_description = "B站视频" + + def __str__(self): + return f"{self.video_id} - {self.title}" + + +class BilibiliComment(BilibiliBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "bilibili_video_comment" + table_description = "B 站视频评论" + + def __str__(self): + return f"{self.comment_id} - {self.content}" diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py new file mode 100644 index 0000000..1006bd5 --- /dev/null +++ b/store/bilibili/bilibili_store_impl.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 19:34 +# @Desc : B站存储实现类 + +import csv +import pathlib +from typing import Dict + +import aiofiles +from tortoise.contrib.pydantic import pydantic_model_creator + +from base.base_crawler import AbstractStore +from tools import utils +from var import crawler_type_var + + +class BiliCsvStoreImplement(AbstractStore): + csv_store_path: str = "data/bilibili" + + def make_save_file_name(self, store_type: str) -> str: + """ + make save file name by store type + Args: + store_type: contents or comments + + Returns: eg: data/bilibili/search_comments_20240114.csv ... + + """ + return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv" + + async def save_data_to_csv(self, save_item: Dict, store_type: str): + """ + Below is a simple way to save it in CSV format. + Args: + save_item: save content dict info + store_type: Save type contains content and comments(contents | comments) + + Returns: no returns + + """ + pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True) + save_file_name = self.make_save_file_name(store_type=store_type) + async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if await f.tell() == 0: + await writer.writerow(save_item.keys()) + await writer.writerow(save_item.values()) + + async def store_content(self, content_item: Dict): + """ + Bilibili content CSV storage implementation + Args: + content_item: note item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=content_item, store_type="contents") + + async def store_comment(self, comment_item: Dict): + """ + Bilibili comment CSV storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=comment_item, store_type="comments") + + +class BiliDbStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + """ + Bilibili content DB storage implementation + Args: + content_item: content item dict + + Returns: + + """ + from .bilibili_store_db_types import BilibiliVideo + video_id = content_item.get("video_id") + if not await BilibiliVideo.filter(video_id=video_id).exists(): + content_item["add_ts"] = utils.get_current_timestamp() + bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoCreate', exclude=('id',)) + bilibili_data = bilibili_video_pydantic(**content_item) + bilibili_video_pydantic.model_validate(bilibili_data) + await BilibiliVideo.create(**bilibili_data.model_dump()) + else: + bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoUpdate', + exclude=('id', 'add_ts')) + bilibili_data = bilibili_video_pydantic(**content_item) + bilibili_video_pydantic.model_validate(bilibili_data) + await BilibiliVideo.filter(video_id=video_id).update(**bilibili_data.model_dump()) + + async def store_comment(self, comment_item: Dict): + """ + Bilibili content DB storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + from .bilibili_store_db_types import BilibiliComment + comment_id = comment_item.get("comment_id") + if not await BilibiliComment.filter(comment_id=comment_id).exists(): + comment_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await BilibiliComment.create(**comment_data.model_dump()) + else: + comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await BilibiliComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + + +class BiliJsonStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + pass + + async def store_comment(self, comment_item: Dict): + pass diff --git a/store/douyin/__init__.py b/store/douyin/__init__.py new file mode 100644 index 0000000..db3a147 --- /dev/null +++ b/store/douyin/__init__.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 18:46 +# @Desc : +from typing import List + +import config + +from .douyin_store_db_types import * +from .douyin_store_impl import * + + +class DouyinStoreFactory: + STORES = { + "csv": DouyinCsvStoreImplement, + "db": DouyinDbStoreImplement, + "json": DouyinJsonStoreImplement + } + + @staticmethod + def create_store() -> AbstractStore: + store_class = DouyinStoreFactory.STORES.get(config.SAVE_DATA_OPTION) + if not store_class: + raise ValueError( + "[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json ...") + return store_class() + + +async def update_douyin_aweme(aweme_item: Dict): + aweme_id = aweme_item.get("aweme_id") + user_info = aweme_item.get("author", {}) + interact_info = aweme_item.get("statistics", {}) + save_content_item = { + "aweme_id": aweme_id, + "aweme_type": str(aweme_item.get("aweme_type")), + "title": aweme_item.get("desc", ""), + "desc": aweme_item.get("desc", ""), + "create_time": aweme_item.get("create_time"), + "user_id": user_info.get("uid"), + "sec_uid": user_info.get("sec_uid"), + "short_user_id": user_info.get("short_id"), + "user_unique_id": user_info.get("unique_id"), + "user_signature": user_info.get("signature"), + "nickname": user_info.get("nickname"), + "avatar": user_info.get("avatar_thumb", {}).get("url_list", [""])[0], + "liked_count": str(interact_info.get("digg_count")), + "collected_count": str(interact_info.get("collect_count")), + "comment_count": str(interact_info.get("comment_count")), + "share_count": str(interact_info.get("share_count")), + "ip_location": aweme_item.get("ip_label", ""), + "last_modify_ts": utils.get_current_timestamp(), + "aweme_url": f"https://www.douyin.com/video/{aweme_id}" + } + utils.logger.info( + f"[store.douyin.update_douyin_aweme] douyin aweme id:{aweme_id}, title:{save_content_item.get('title')}") + await DouyinStoreFactory.create_store().store_content(content_item=save_content_item) + + +async def batch_update_dy_aweme_comments(aweme_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_dy_aweme_comment(aweme_id, comment_item) + + +async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict): + comment_aweme_id = comment_item.get("aweme_id") + if aweme_id != comment_aweme_id: + utils.logger.error( + f"[store.douyin.update_dy_aweme_comment] comment_aweme_id: {comment_aweme_id} != aweme_id: {aweme_id}") + return + user_info = comment_item.get("user", {}) + comment_id = comment_item.get("cid") + avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get( + "avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {} + save_comment_item = { + "comment_id": comment_id, + "create_time": comment_item.get("create_time"), + "ip_location": comment_item.get("ip_label", ""), + "aweme_id": aweme_id, + "content": comment_item.get("text"), + "user_id": user_info.get("uid"), + "sec_uid": user_info.get("sec_uid"), + "short_user_id": user_info.get("short_id"), + "user_unique_id": user_info.get("unique_id"), + "user_signature": user_info.get("signature"), + "nickname": user_info.get("nickname"), + "avatar": avatar_info.get("url_list", [""])[0], + "sub_comment_count": str(comment_item.get("reply_comment_total", 0)), + "last_modify_ts": utils.get_current_timestamp(), + } + utils.logger.info( + f"[store.douyin.update_dy_aweme_comment] douyin aweme comment: {comment_id}, content: {save_comment_item.get('content')}") + + await DouyinStoreFactory.create_store().store_comment(comment_item=save_comment_item) diff --git a/store/douyin/douyin_store_db_types.py b/store/douyin/douyin_store_db_types.py new file mode 100644 index 0000000..a30fbee --- /dev/null +++ b/store/douyin/douyin_store_db_types.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 18:50 +# @Desc : 抖音存储到DB的模型类集合 + + +from tortoise import fields +from tortoise.models import Model + + +class DouyinBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + sec_uid = fields.CharField(null=True, max_length=128, description="用户sec_uid") + short_user_id = fields.CharField(null=True, max_length=64, description="用户短ID") + user_unique_id = fields.CharField(null=True, max_length=64, description="用户唯一ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + user_signature = fields.CharField(null=True, max_length=500, description="用户签名") + ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class DouyinAweme(DouyinBaseModel): + aweme_id = fields.CharField(max_length=64, index=True, description="视频ID") + aweme_type = fields.CharField(max_length=16, description="视频类型") + title = fields.CharField(null=True, max_length=500, description="视频标题") + desc = fields.TextField(null=True, description="视频描述") + create_time = fields.BigIntField(description="视频发布时间戳", index=True) + liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") + comment_count = fields.CharField(null=True, max_length=16, description="视频评论数") + share_count = fields.CharField(null=True, max_length=16, description="视频分享数") + collected_count = fields.CharField(null=True, max_length=16, description="视频收藏数") + aweme_url = fields.CharField(null=True, max_length=255, description="视频详情页URL") + + class Meta: + table = "douyin_aweme" + table_description = "抖音视频" + + def __str__(self): + return f"{self.aweme_id} - {self.title}" + + +class DouyinAwemeComment(DouyinBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + aweme_id = fields.CharField(max_length=64, index=True, description="视频ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "douyin_aweme_comment" + table_description = "抖音视频评论" + + def __str__(self): + return f"{self.comment_id} - {self.content}" diff --git a/store/douyin/douyin_store_impl.py b/store/douyin/douyin_store_impl.py new file mode 100644 index 0000000..023897d --- /dev/null +++ b/store/douyin/douyin_store_impl.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 18:46 +# @Desc : 抖音存储实现类 + +import csv +import pathlib +from typing import Dict + +import aiofiles +from tortoise.contrib.pydantic import pydantic_model_creator + +from base.base_crawler import AbstractStore +from tools import utils +from var import crawler_type_var + + +class DouyinCsvStoreImplement(AbstractStore): + csv_store_path: str = "data/douyin" + + def make_save_file_name(self, store_type: str) -> str: + """ + make save file name by store type + Args: + store_type: contents or comments + + Returns: eg: data/douyin/search_comments_20240114.csv ... + + """ + return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv" + + async def save_data_to_csv(self, save_item: Dict, store_type: str): + """ + Below is a simple way to save it in CSV format. + Args: + save_item: save content dict info + store_type: Save type contains content and comments(contents | comments) + + Returns: no returns + + """ + pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True) + save_file_name = self.make_save_file_name(store_type=store_type) + async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if await f.tell() == 0: + await writer.writerow(save_item.keys()) + await writer.writerow(save_item.values()) + + async def store_content(self, content_item: Dict): + """ + Xiaohongshu content CSV storage implementation + Args: + content_item: note item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=content_item, store_type="contents") + + async def store_comment(self, comment_item: Dict): + """ + Xiaohongshu comment CSV storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=comment_item, store_type="comments") + + +class DouyinDbStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + """ + Douyin content DB storage implementation + Args: + content_item: content item dict + + Returns: + + """ + from .douyin_store_db_types import DouyinAweme + aweme_id = content_item.get("aweme_id") + if not await DouyinAweme.filter(aweme_id=aweme_id).exists(): + content_item["add_ts"] = utils.get_current_timestamp() + douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeCreate', exclude=('id',)) + douyin_data = douyin_aweme_pydantic(**content_item) + douyin_aweme_pydantic.model_validate(douyin_data) + await DouyinAweme.create(**douyin_data.dict()) + else: + douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeUpdate', + exclude=('id', 'add_ts')) + douyin_data = douyin_aweme_pydantic(**content_item) + douyin_aweme_pydantic.model_validate(douyin_data) + await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.model_dump()) + + async def store_comment(self, comment_item: Dict): + """ + Douyin content DB storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + from .douyin_store_db_types import DouyinAwemeComment + comment_id = comment_item.get("comment_id") + if not await DouyinAwemeComment.filter(comment_id=comment_id).exists(): + comment_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await DouyinAwemeComment.create(**comment_data.model_dump()) + else: + comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await DouyinAwemeComment.filter(comment_id=comment_item).update(**comment_data.model_dump()) + + +class DouyinJsonStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + pass + + async def store_comment(self, comment_item: Dict): + pass + diff --git a/store/kuaishou/__init__.py b/store/kuaishou/__init__.py new file mode 100644 index 0000000..5fac31f --- /dev/null +++ b/store/kuaishou/__init__.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 20:03 +# @Desc : +from typing import List + +import config + +from .kuaishou_store_db_types import * +from .kuaishou_store_impl import * + + +class KuaishouStoreFactory: + STORES = { + "csv": KuaishouCsvStoreImplement, + "db": KuaishouDbStoreImplement, + "json": KuaishouJsonStoreImplement + } + + @staticmethod + def create_store() -> AbstractStore: + store_class = KuaishouStoreFactory.STORES.get(config.SAVE_DATA_OPTION) + if not store_class: + raise ValueError( + "[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json ...") + return store_class() + + +async def update_kuaishou_video(video_item: Dict): + photo_info: Dict = video_item.get("photo", {}) + video_id = photo_info.get("id") + if not video_id: + return + user_info = video_item.get("author", {}) + save_content_item = { + "video_id": video_id, + "video_type": str(video_item.get("type")), + "title": photo_info.get("caption", "")[:500], + "desc": photo_info.get("caption", "")[:500], + "create_time": photo_info.get("timestamp"), + "user_id": user_info.get("id"), + "nickname": user_info.get("name"), + "avatar": user_info.get("headerUrl", ""), + "liked_count": str(photo_info.get("realLikeCount")), + "viewd_count": str(photo_info.get("viewCount")), + "last_modify_ts": utils.get_current_timestamp(), + "video_url": f"https://www.kuaishou.com/short-video/{video_id}", + "video_cover_url": photo_info.get("coverUrl", ""), + "video_play_url": photo_info.get("photoUrl", ""), + } + utils.logger.info(f"[store.kuaishou.update_kuaishou_video] Kuaishou video id:{video_id}, title:{save_content_item.get('title')}") + await KuaishouStoreFactory.create_store().store_content(content_item=save_content_item) + + +async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]): + utils.logger.info(f"[store.kuaishou.batch_update_ks_video_comments] video_id:{video_id}, comments:{comments}") + if not comments: + return + for comment_item in comments: + await update_ks_video_comment(video_id, comment_item) + + +async def update_ks_video_comment(video_id: str, comment_item: Dict): + comment_id = comment_item.get("commentId") + save_comment_item = { + "comment_id": comment_id, + "create_time": comment_item.get("timestamp"), + "video_id": video_id, + "content": comment_item.get("content"), + "user_id": comment_item.get("authorId"), + "nickname": comment_item.get("authorName"), + "avatar": comment_item.get("headurl"), + "sub_comment_count": str(comment_item.get("subCommentCount", 0)), + "last_modify_ts": utils.get_current_timestamp(), + } + utils.logger.info(f"[store.kuaishou.update_ks_video_comment] Kuaishou video comment: {comment_id}, content: {save_comment_item.get('content')}") + await KuaishouStoreFactory.create_store().store_comment(comment_item=save_comment_item) \ No newline at end of file diff --git a/store/kuaishou/kuaishou_store_db_types.py b/store/kuaishou/kuaishou_store_db_types.py new file mode 100644 index 0000000..256695e --- /dev/null +++ b/store/kuaishou/kuaishou_store_db_types.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 20:03 +# @Desc : 快手存储到DB的模型类集合 + + +from tortoise import fields +from tortoise.models import Model + + +class KuaishouBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class KuaishouVideo(KuaishouBaseModel): + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + video_type = fields.CharField(max_length=16, description="视频类型") + title = fields.CharField(null=True, max_length=500, description="视频标题") + desc = fields.TextField(null=True, description="视频描述") + create_time = fields.BigIntField(description="视频发布时间戳", index=True) + liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") + viewd_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") + video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") + video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") + video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") + + class Meta: + table = "kuaishou_video" + table_description = "快手视频" + + def __str__(self): + return f"{self.video_id} - {self.title}" + + +class KuaishouVideoComment(KuaishouBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "kuaishou_video_comment" + table_description = "快手视频评论" + + def __str__(self): + return f"{self.comment_id} - {self.content}" diff --git a/store/kuaishou/kuaishou_store_impl.py b/store/kuaishou/kuaishou_store_impl.py new file mode 100644 index 0000000..cf29433 --- /dev/null +++ b/store/kuaishou/kuaishou_store_impl.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 20:03 +# @Desc : 快手存储实现类 +import csv +import pathlib +from typing import Dict + +import aiofiles +from tortoise.contrib.pydantic import pydantic_model_creator + +from base.base_crawler import AbstractStore +from tools import utils +from var import crawler_type_var + + +class KuaishouCsvStoreImplement(AbstractStore): + csv_store_path: str = "data/kuaishou" + + def make_save_file_name(self, store_type: str) -> str: + """ + make save file name by store type + Args: + store_type: contents or comments + + Returns: eg: data/kuaishou/search_comments_20240114.csv ... + + """ + return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv" + + async def save_data_to_csv(self, save_item: Dict, store_type: str): + """ + Below is a simple way to save it in CSV format. + Args: + save_item: save content dict info + store_type: Save type contains content and comments(contents | comments) + + Returns: no returns + + """ + pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True) + save_file_name = self.make_save_file_name(store_type=store_type) + async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if await f.tell() == 0: + await writer.writerow(save_item.keys()) + await writer.writerow(save_item.values()) + + async def store_content(self, content_item: Dict): + """ + Kuaishou content CSV storage implementation + Args: + content_item: note item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=content_item, store_type="contents") + + async def store_comment(self, comment_item: Dict): + """ + Kuaishou comment CSV storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=comment_item, store_type="comments") + + +class KuaishouDbStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + """ + Kuaishou content DB storage implementation + Args: + content_item: content item dict + + Returns: + + """ + from .kuaishou_store_db_types import KuaishouVideo + video_id = content_item.get("video_id") + if not await KuaishouVideo.filter(video_id=video_id).exists(): + content_item["add_ts"] = utils.get_current_timestamp() + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',)) + kuaishou_data = kuaishou_video_pydantic(**content_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.create(**kuaishou_data.model_dump()) + else: + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate', + exclude=('id', 'add_ts')) + kuaishou_data = kuaishou_video_pydantic(**content_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump()) + + async def store_comment(self, comment_item: Dict): + """ + Kuaishou content DB storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + from .kuaishou_store_db_types import KuaishouVideoComment + comment_id = comment_item.get("comment_id") + if not await KuaishouVideoComment.filter(comment_id=comment_id).exists(): + comment_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await KuaishouVideoComment.create(**comment_data.model_dump()) + else: + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + + +class KuaishouJsonStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + pass + + async def store_comment(self, comment_item: Dict): + pass diff --git a/store/weibo/__init__.py b/store/weibo/__init__.py new file mode 100644 index 0000000..0e1ca56 --- /dev/null +++ b/store/weibo/__init__.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 21:34 +# @Desc : + +from typing import List + +import config + +from .weibo_store_db_types import * +from .weibo_store_impl import * + + +class WeibostoreFactory: + STORES = { + "csv": WeiboCsvStoreImplement, + "db": WeiboDbStoreImplement, + "json": BiliJsonStoreImplement + } + + @staticmethod + def create_store() -> AbstractStore: + store_class = WeibostoreFactory.STORES.get(config.SAVE_DATA_OPTION) + if not store_class: + raise ValueError( + "[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json ...") + return store_class() + +async def update_weibo_note(note_item: Dict): + mblog: Dict = note_item.get("mblog") + user_info: Dict = mblog.get("user") + note_id = mblog.get("id") + save_content_item = { + # 微博信息 + "note_id": note_id, + "content": mblog.get("text"), + "create_time": utils.rfc2822_to_timestamp(mblog.get("created_at")), + "create_date_time": str(utils.rfc2822_to_china_datetime(mblog.get("created_at"))), + "liked_count": str(mblog.get("attitudes_count", 0)), + "comments_count": str(mblog.get("comments_count", 0)), + "shared_count": str(mblog.get("reposts_count", 0)), + "last_modify_ts": utils.get_current_timestamp(), + "note_url": f"https://m.weibo.cn/detail/{note_id}", + "ip_location": mblog.get("region_name", "").replace("发布于 ", ""), + + # 用户信息 + "user_id": str(user_info.get("id")), + "nickname": user_info.get("screen_name", ""), + "gender": user_info.get("gender", ""), + "profile_url": user_info.get("profile_url", ""), + "avatar": user_info.get("profile_image_url", ""), + } + utils.logger.info( + f"[store.weibo.update_weibo_note] weibo note id:{note_id}, title:{save_content_item.get('content')[:24]} ...") + await WeibostoreFactory.create_store().store_content(content_item=save_content_item) + + +async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_weibo_note_comment(note_id, comment_item) + + +async def update_weibo_note_comment(note_id: str, comment_item: Dict): + comment_id = str(comment_item.get("id")) + user_info: Dict = comment_item.get("user") + save_comment_item = { + "comment_id": comment_id, + "create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")), + "create_date_time": str(utils.rfc2822_to_china_datetime(comment_item.get("created_at"))), + "note_id": note_id, + "content": comment_item.get("text"), + "sub_comment_count": str(comment_item.get("total_number", 0)), + "comment_like_count": str(comment_item.get("like_count", 0)), + "last_modify_ts": utils.get_current_timestamp(), + "ip_location": comment_item.get("source", "").replace("来自", ""), + + # 用户信息 + "user_id": str(user_info.get("id")), + "nickname": user_info.get("screen_name", ""), + "gender": user_info.get("gender", ""), + "profile_url": user_info.get("profile_url", ""), + "avatar": user_info.get("profile_image_url", ""), + } + utils.logger.info( + f"[store.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {save_comment_item.get('content','')[:24]} ...") + await WeibostoreFactory.create_store().store_comment(comment_item=save_comment_item) diff --git a/store/weibo/weibo_store_db_types.py b/store/weibo/weibo_store_db_types.py new file mode 100644 index 0000000..da672b5 --- /dev/null +++ b/store/weibo/weibo_store_db_types.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 21:35 +# @Desc : 微博存储到DB的模型类集合 + +from tortoise import fields +from tortoise.models import Model + + +class WeiboBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + gender = fields.CharField(null=True, max_length=12, description="用户性别") + profile_url = fields.CharField(null=True, max_length=255, description="用户主页地址") + ip_location = fields.CharField(null=True, max_length=32, default="发布微博的地理信息") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class WeiboNote(WeiboBaseModel): + note_id = fields.CharField(max_length=64, index=True, description="帖子ID") + content = fields.TextField(null=True, description="帖子正文内容") + create_time = fields.BigIntField(description="帖子发布时间戳", index=True) + create_date_time = fields.CharField(description="帖子发布日期时间", max_length=32, index=True) + liked_count = fields.CharField(null=True, max_length=16, description="帖子点赞数") + comments_count = fields.CharField(null=True, max_length=16, description="帖子评论数量") + shared_count = fields.CharField(null=True, max_length=16, description="帖子转发数量") + note_url = fields.CharField(null=True, max_length=512, description="帖子详情URL") + + class Meta: + table = "weibo_note" + table_description = "微博帖子" + + def __str__(self): + return f"{self.note_id}" + + +class WeiboComment(WeiboBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + note_id = fields.CharField(max_length=64, index=True, description="帖子ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + create_date_time = fields.CharField(description="评论日期时间", max_length=32, index=True) + comment_like_count = fields.CharField(max_length=16, description="评论点赞数量") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "weibo_note_comment" + table_description = "微博帖子评论" + + def __str__(self): + return f"{self.comment_id}" diff --git a/store/weibo/weibo_store_impl.py b/store/weibo/weibo_store_impl.py new file mode 100644 index 0000000..65dabf0 --- /dev/null +++ b/store/weibo/weibo_store_impl.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 21:35 +# @Desc : 微博存储实现类 +import csv +import pathlib +from typing import Dict + +import aiofiles +from tortoise.contrib.pydantic import pydantic_model_creator + +from base.base_crawler import AbstractStore +from tools import utils +from var import crawler_type_var + + +class WeiboCsvStoreImplement(AbstractStore): + csv_store_path: str = "data/weibo" + + def make_save_file_name(self, store_type: str) -> str: + """ + make save file name by store type + Args: + store_type: contents or comments + + Returns: eg: data/bilibili/search_comments_20240114.csv ... + + """ + return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv" + + async def save_data_to_csv(self, save_item: Dict, store_type: str): + """ + Below is a simple way to save it in CSV format. + Args: + save_item: save content dict info + store_type: Save type contains content and comments(contents | comments) + + Returns: no returns + + """ + pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True) + save_file_name = self.make_save_file_name(store_type=store_type) + async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if await f.tell() == 0: + await writer.writerow(save_item.keys()) + await writer.writerow(save_item.values()) + + async def store_content(self, content_item: Dict): + """ + Weibo content CSV storage implementation + Args: + content_item: note item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=content_item, store_type="contents") + + async def store_comment(self, comment_item: Dict): + """ + Weibo comment CSV storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=comment_item, store_type="comments") + + +class WeiboDbStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + """ + Weibo content DB storage implementation + Args: + content_item: content item dict + + Returns: + + """ + from .weibo_store_db_types import WeiboNote + note_id = content_item.get("note_id") + if not await WeiboNote.filter(note_id=note_id).exists(): + content_item["add_ts"] = utils.get_current_timestamp() + weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',)) + weibo_data = weibo_note_pydantic(**content_item) + weibo_note_pydantic.model_validate(weibo_data) + await WeiboNote.create(**weibo_data.model_dump()) + else: + weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate', + exclude=('id', 'add_ts')) + weibo_data = weibo_note_pydantic(**content_item) + weibo_note_pydantic.model_validate(weibo_data) + await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump()) + + async def store_comment(self, comment_item: Dict): + """ + Weibo content DB storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + from .weibo_store_db_types import WeiboComment + comment_id = comment_item.get("comment_id") + if not await WeiboComment.filter(comment_id=comment_id).exists(): + comment_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await WeiboComment.create(**comment_data.model_dump()) + else: + comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await WeiboComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + + +class BiliJsonStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + pass + + async def store_comment(self, comment_item: Dict): + pass diff --git a/store/xhs/__init__.py b/store/xhs/__init__.py new file mode 100644 index 0000000..8f4542b --- /dev/null +++ b/store/xhs/__init__.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 17:34 +# @Desc : +from typing import List + +import config + +from . import xhs_store_impl +from .xhs_store_db_types import * +from .xhs_store_impl import * + + +class XhsStoreFactory: + STORES = { + "csv": XhsCsvStoreImplement, + "db": XhsDbStoreImplement, + "json": XhsJsonStoreImplement + } + + @staticmethod + def create_store() -> AbstractStore: + store_class = XhsStoreFactory.STORES.get(config.SAVE_DATA_OPTION) + if not store_class: + raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json ...") + return store_class() + + +async def update_xhs_note(note_item: Dict): + note_id = note_item.get("note_id") + user_info = note_item.get("user", {}) + interact_info = note_item.get("interact_info", {}) + image_list: List[Dict] = note_item.get("image_list", []) + + local_db_item = { + "note_id": note_item.get("note_id"), + "type": note_item.get("type"), + "title": note_item.get("title") or note_item.get("desc", "")[:255], + "desc": note_item.get("desc", ""), + "time": note_item.get("time"), + "last_update_time": note_item.get("last_update_time", 0), + "user_id": user_info.get("user_id"), + "nickname": user_info.get("nickname"), + "avatar": user_info.get("avatar"), + "liked_count": interact_info.get("liked_count"), + "collected_count": interact_info.get("collected_count"), + "comment_count": interact_info.get("comment_count"), + "share_count": interact_info.get("share_count"), + "ip_location": note_item.get("ip_location", ""), + "image_list": ','.join([img.get('url', '') for img in image_list]), + "last_modify_ts": utils.get_current_timestamp(), + "note_url": f"https://www.xiaohongshu.com/explore/{note_id}" + } + utils.logger.info(f"[store.xhs.update_xhs_note] xhs note: {local_db_item}") + await XhsStoreFactory.create_store().store_content(local_db_item) + + +async def update_xhs_note_comment(note_id: str, comment_item: Dict): + user_info = comment_item.get("user_info", {}) + comment_id = comment_item.get("id") + local_db_item = { + "comment_id": comment_id, + "create_time": comment_item.get("create_time"), + "ip_location": comment_item.get("ip_location"), + "note_id": note_id, + "content": comment_item.get("content"), + "user_id": user_info.get("user_id"), + "nickname": user_info.get("nickname"), + "avatar": user_info.get("image"), + "sub_comment_count": comment_item.get("sub_comment_count"), + "last_modify_ts": utils.get_current_timestamp(), + } + utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}") + await XhsStoreFactory.create_store().store_comment(local_db_item) diff --git a/store/xhs/xhs_store_db_types.py b/store/xhs/xhs_store_db_types.py new file mode 100644 index 0000000..5377bef --- /dev/null +++ b/store/xhs/xhs_store_db_types.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 17:31 +# @Desc : 小红书存储到DB的模型类集合 + +from tortoise import fields +from tortoise.models import Model + + +class XhsBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + ip_location = fields.CharField(null=True, max_length=255, description="评论时的IP地址") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class XHSNote(XhsBaseModel): + note_id = fields.CharField(max_length=64, index=True, description="笔记ID") + type = fields.CharField(null=True, max_length=16, description="笔记类型(normal | video)") + title = fields.CharField(null=True, max_length=255, description="笔记标题") + desc = fields.TextField(null=True, description="笔记描述") + time = fields.BigIntField(description="笔记发布时间戳", index=True) + last_update_time = fields.BigIntField(description="笔记最后更新时间戳") + liked_count = fields.CharField(null=True, max_length=16, description="笔记点赞数") + collected_count = fields.CharField(null=True, max_length=16, description="笔记收藏数") + comment_count = fields.CharField(null=True, max_length=16, description="笔记评论数") + share_count = fields.CharField(null=True, max_length=16, description="笔记分享数") + image_list = fields.TextField(null=True, description="笔记封面图片列表") + note_url = fields.CharField(null=True, max_length=255, description="笔记详情页的URL") + + class Meta: + table = "xhs_note" + table_description = "小红书笔记" + + def __str__(self): + return f"{self.note_id} - {self.title}" + + +class XHSNoteComment(XhsBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + create_time = fields.BigIntField(index=True, description="评论时间戳") + note_id = fields.CharField(max_length=64, description="笔记ID") + content = fields.TextField(description="评论内容") + sub_comment_count = fields.IntField(description="子评论数量") + + class Meta: + table = "xhs_note_comment" + table_description = "小红书笔记评论" + + def __str__(self): + return f"{self.comment_id} - {self.content}" diff --git a/store/xhs/xhs_store_impl.py b/store/xhs/xhs_store_impl.py new file mode 100644 index 0000000..82fd97c --- /dev/null +++ b/store/xhs/xhs_store_impl.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/1/14 16:58 +# @Desc : 小红书存储实现类 +import csv +import pathlib +from typing import Dict + +import aiofiles +from tortoise.contrib.pydantic import pydantic_model_creator + +from base.base_crawler import AbstractStore +from tools import utils +from var import crawler_type_var + + +class XhsCsvStoreImplement(AbstractStore): + csv_store_path: str = "data/xhs" + + def make_save_file_name(self, store_type: str) -> str: + """ + make save file name by store type + Args: + store_type: contents or comments + + Returns: eg: data/xhs/search_comments_20240114.csv ... + + """ + return f"{self.csv_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv" + + async def save_data_to_csv(self, save_item: Dict, store_type: str): + """ + Below is a simple way to save it in CSV format. + Args: + save_item: save content dict info + store_type: Save type contains content and comments(contents | comments) + + Returns: no returns + + """ + pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True) + save_file_name = self.make_save_file_name(store_type=store_type) + async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + f.fileno() + writer = csv.writer(f) + if await f.tell() == 0: + await writer.writerow(save_item.keys()) + await writer.writerow(save_item.values()) + + async def store_content(self, content_item: Dict): + """ + Xiaohongshu content CSV storage implementation + Args: + content_item: note item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=content_item, store_type="contents") + + async def store_comment(self, comment_item: Dict): + """ + Xiaohongshu comment CSV storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=comment_item, store_type="comments") + + +class XhsDbStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + """ + Xiaohongshu content DB storage implementation + Args: + content_item: content item dict + + Returns: + + """ + from .xhs_store_db_types import XHSNote + note_id = content_item.get("note_id") + if not await XHSNote.filter(note_id=note_id).first(): + content_item["add_ts"] = utils.get_current_timestamp() + note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticCreate", exclude=('id',)) + note_data = note_pydantic(**content_item) + note_pydantic.model_validate(note_data) + await XHSNote.create(**note_data.model_dump()) + else: + note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticUpdate", exclude=('id', 'add_ts')) + note_data = note_pydantic(**content_item) + note_pydantic.model_validate(note_data) + await XHSNote.filter(note_id=note_id).update(**note_data.model_dump()) + + async def store_comment(self, comment_item: Dict): + """ + Xiaohongshu content DB storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + from .xhs_store_db_types import XHSNoteComment + comment_id = comment_item.get("id") + if not await XHSNoteComment.filter(comment_id=comment_id).first(): + comment_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticCreate", exclude=('id',)) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await XHSNoteComment.create(**comment_data.model_dump()) + else: + comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticUpdate", + exclude=('id', 'add_ts',)) + comment_data = comment_pydantic(**comment_item) + comment_pydantic.model_validate(comment_data) + await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + + +class XhsJsonStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + pass + + async def store_comment(self, comment_item: Dict): + pass