diff --git a/async_db.py b/async_db.py new file mode 100644 index 0000000..409e910 --- /dev/null +++ b/async_db.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/6 14:21 +# @Desc : 异步Aiomysql的增删改查封装 +from typing import Any, Dict, List, Union + +import aiomysql + + +class AsyncMysqlDB: + def __init__(self, pool: aiomysql.Pool) -> None: + self.__pool = pool + + async def query(self, sql: str, *args: Union[str, int]) -> List[Dict[str, Any]]: + """ + 从给定的 SQL 中查询记录,返回的是一个列表 + :param sql: 查询的sql + :param args: sql中传递动态参数列表 + :return: + """ + async with self.__pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + data = await cur.fetchall() + return data or [] + + async def get_first(self, sql: str, *args: Union[str, int]) -> Union[Dict[str, Any], None]: + """ + 从给定的 SQL 中查询记录,返回的是符合条件的第一个结果 + :param sql: 查询的sql + :param args:sql中传递动态参数列表 + :return: + """ + async with self.__pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + data = await cur.fetchone() + return data + + async def item_to_table(self, table_name: str, item: Dict[str, Any]) -> int: + """ + 表中插入数据 + :param table_name: 表名 + :param item: 一条记录的字典信息 + :return: + """ + fields = list(item.keys()) + values = list(item.values()) + fields = [f'`{field}`' for field in fields] + fieldstr = ','.join(fields) + valstr = ','.join(['%s'] * len(item)) + sql = "INSERT INTO %s (%s) VALUES(%s)" % (table_name, fieldstr, valstr) + async with self.__pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, values) + lastrowid = cur.lastrowid + return lastrowid + + async def update_table(self, table_name: str, updates: Dict[str, Any], field_where: str, + value_where: Union[str, int, float]) -> int: + """ + 更新指定表的记录 + :param table_name: 表名 + :param updates: 需要更新的字段和值的 key - value 映射 + :param field_where: update 语句 where 条件中的字段名 + :param value_where: update 语句 where 条件中的字段值 + :return: + """ + upsets = [] + values = [] + for k, v in updates.items(): + s = '`%s`=%%s' % k + upsets.append(s) + values.append(v) + upsets = ','.join(upsets) + sql = 'UPDATE %s SET %s WHERE %s="%s"' % ( + table_name, + upsets, + field_where, value_where, + ) + async with self.__pool.acquire() as conn: + async with conn.cursor() as cur: + rows = await cur.execute(sql, values) + return rows + + async def execute(self, sql: str, *args: Union[str, int]) -> int: + """ + 需要更新、写入等操作的 excute 执行语句 + :param sql: + :param args: + :return: + """ + async with self.__pool.acquire() as conn: + async with conn.cursor() as cur: + rows = await cur.execute(sql, args) + return rows diff --git a/db.py b/db.py index b1bc15e..b9d8382 100644 --- a/db.py +++ b/db.py @@ -1,31 +1,78 @@ -from typing import List +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/6 14:54 +# @Desc : mediacrawler db 管理 +import asyncio +from typing import Dict +from urllib.parse import urlparse -from tortoise import Tortoise, run_async +import aiomysql -from config.db_config import * +import config +from async_db import AsyncMysqlDB from tools import utils +from var import db_conn_pool_var, media_crawler_db_var -def get_platform_models() -> List[str]: - models = ["store.xhs", "store.douyin", "store.bilibili", "store.kuaishou", "store.weibo"] - return models +def parse_mysql_url(mysql_url) -> Dict: + """ + 从配置文件中解析db链接url,给到aiomysql用,因为aiomysql不支持直接以URL的方式传递链接信息。 + Args: + mysql_url: mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler + + Returns: + + """ + parsed_url = urlparse(mysql_url) + db_params = { + 'host': parsed_url.hostname, + 'port': parsed_url.port or 3306, + 'user': parsed_url.username, + 'password': parsed_url.password, + 'db': parsed_url.path.lstrip('/') + } + return db_params -async def init_db(create_db: bool = False) -> None: - await Tortoise.init( - db_url=RELATION_DB_URL, - modules={'models': get_platform_models()}, - _create_db=create_db +async def init_mediacrawler_db(): + """ + 初始化数据库链接池对象,并将该对象塞给media_crawler_db_var上下文变量 + Returns: + + """ + db_conn_params = parse_mysql_url(config.RELATION_DB_URL) + pool = await aiomysql.create_pool( + autocommit=True, + **db_conn_params ) + async_db_obj = AsyncMysqlDB(pool) -async def close() -> None: - await Tortoise.close_connections() + # 将连接池对象和封装的CRUD sql接口对象放到上下文变量中 + db_conn_pool_var.set(pool) + media_crawler_db_var.set(async_db_obj) -async def init(): - await init_db(create_db=True) - await Tortoise.generate_schemas() - utils.logger.info("[db.init] Init DB Success!") + +async def init_db(): + """ + 初始化db连接池 + Returns: + + """ + utils.logger.info("[init_db] start init mediacrawler db connect object") + await init_mediacrawler_db() + utils.logger.info("[init_db] end init mediacrawler db connect object") + +async def close(): + """ + 关闭连接池 + Returns: + + """ + utils.logger.info("[close] close mediacrawler db pool") + db_pool: aiomysql.Pool = db_conn_pool_var.get() + if db_pool is not None: + db_pool.close() if __name__ == '__main__': - run_async(init()) + asyncio.run(init_db()) diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py index d041575..cdcf948 100644 --- a/store/bilibili/bilibili_store_impl.py +++ b/store/bilibili/bilibili_store_impl.py @@ -82,20 +82,17 @@ class BiliDbStoreImplement(AbstractStore): Returns: """ - from .bilibili_store_db_types import BilibiliVideo + + from .bilibili_store_sql import (add_new_content, + query_content_by_content_id, + update_content_by_content_id) video_id = content_item.get("video_id") - if not await BilibiliVideo.filter(video_id=video_id).exists(): + video_detail: Dict = await query_content_by_content_id(content_id=video_id) + if not video_detail: content_item["add_ts"] = utils.get_current_timestamp() - bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoCreate', exclude=('id',)) - bilibili_data = bilibili_video_pydantic(**content_item) - bilibili_video_pydantic.model_validate(bilibili_data) - await BilibiliVideo.create(**bilibili_data.model_dump()) + await add_new_content(content_item) else: - bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoUpdate', - exclude=('id', 'add_ts')) - bilibili_data = bilibili_video_pydantic(**content_item) - bilibili_video_pydantic.model_validate(bilibili_data) - await BilibiliVideo.filter(video_id=video_id).update(**bilibili_data.model_dump()) + await update_content_by_content_id(video_id, content_item=content_item) async def store_comment(self, comment_item: Dict): """ @@ -106,21 +103,17 @@ class BiliDbStoreImplement(AbstractStore): Returns: """ - from .bilibili_store_db_types import BilibiliComment + + from .bilibili_store_sql import (add_new_comment, + query_comment_by_comment_id, + update_comment_by_comment_id) comment_id = comment_item.get("comment_id") - if not await BilibiliComment.filter(comment_id=comment_id).exists(): + comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id) + if not comment_detail: comment_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await BilibiliComment.create(**comment_data.model_dump()) + await add_new_comment(comment_item) else: - comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await BilibiliComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + await update_comment_by_comment_id(comment_id, comment_item=comment_item) class BiliJsonStoreImplement(AbstractStore): diff --git a/store/bilibili/bilibili_store_sql.py b/store/bilibili/bilibili_store_sql.py new file mode 100644 index 0000000..b521f29 --- /dev/null +++ b/store/bilibili/bilibili_store_sql.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/6 15:30 +# @Desc : sql接口集合 + +from typing import Dict, List + +from db import AsyncMysqlDB +from var import media_crawler_db_var + + +async def query_content_by_content_id(content_id: str) -> Dict: + """ + 查询一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from bilibili_video where video_id = '{content_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_content(content_item: Dict) -> int: + """ + 新增一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("bilibili_video", content_item) + return last_row_id + + +async def update_content_by_content_id(content_id: str, content_item: Dict) -> int: + """ + 更新一条记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("bilibili_video", content_item, "video_id", content_id) + return effect_row + + + +async def query_comment_by_comment_id(comment_id: str) -> Dict: + """ + 查询一条评论内容 + Args: + comment_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from bilibili_video_comment where comment_id = '{comment_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_comment(comment_item: Dict) -> int: + """ + 新增一条评论记录 + Args: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("bilibili_video_comment", comment_item) + return last_row_id + + +async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: + """ + 更新增一条评论记录 + Args: + comment_id: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("bilibili_video_comment", comment_item, "comment_id", comment_id) + return effect_row diff --git a/store/douyin/douyin_store_impl.py b/store/douyin/douyin_store_impl.py index a649407..a57609b 100644 --- a/store/douyin/douyin_store_impl.py +++ b/store/douyin/douyin_store_impl.py @@ -82,20 +82,19 @@ class DouyinDbStoreImplement(AbstractStore): Returns: """ - from .douyin_store_db_types import DouyinAweme + + from .douyin_store_sql import (add_new_content, + query_content_by_content_id, + update_content_by_content_id) aweme_id = content_item.get("aweme_id") - if not await DouyinAweme.filter(aweme_id=aweme_id).exists(): + aweme_detail: Dict = await query_content_by_content_id(content_id=aweme_id) + if not aweme_detail: content_item["add_ts"] = utils.get_current_timestamp() - douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeCreate', exclude=('id',)) - douyin_data = douyin_aweme_pydantic(**content_item) - douyin_aweme_pydantic.model_validate(douyin_data) - await DouyinAweme.create(**douyin_data.dict()) + if aweme_detail.get("title"): + await add_new_content(content_item) else: - douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeUpdate', - exclude=('id', 'add_ts')) - douyin_data = douyin_aweme_pydantic(**content_item) - douyin_aweme_pydantic.model_validate(douyin_data) - await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.model_dump()) + await update_content_by_content_id(aweme_id, content_item=content_item) + async def store_comment(self, comment_item: Dict): """ @@ -106,21 +105,16 @@ class DouyinDbStoreImplement(AbstractStore): Returns: """ - from .douyin_store_db_types import DouyinAwemeComment + from .douyin_store_sql import (add_new_comment, + query_comment_by_comment_id, + update_comment_by_comment_id) comment_id = comment_item.get("comment_id") - if not await DouyinAwemeComment.filter(comment_id=comment_id).exists(): + comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id) + if not comment_detail: comment_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await DouyinAwemeComment.create(**comment_data.model_dump()) + await add_new_comment(comment_item) else: - comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await DouyinAwemeComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + await update_comment_by_comment_id(comment_id, comment_item=comment_item) class DouyinJsonStoreImplement(AbstractStore): diff --git a/store/douyin/douyin_store_sql.py b/store/douyin/douyin_store_sql.py new file mode 100644 index 0000000..2cc4cc6 --- /dev/null +++ b/store/douyin/douyin_store_sql.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/6 15:30 +# @Desc : sql接口集合 + +from typing import Dict, List + +from db import AsyncMysqlDB +from var import media_crawler_db_var + + +async def query_content_by_content_id(content_id: str) -> Dict: + """ + 查询一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from douyin_aweme where aweme_id = '{content_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_content(content_item: Dict) -> int: + """ + 新增一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("douyin_aweme", content_item) + return last_row_id + + +async def update_content_by_content_id(content_id: str, content_item: Dict) -> int: + """ + 更新一条记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("douyin_aweme", content_item, "aweme_id", content_id) + return effect_row + + + +async def query_comment_by_comment_id(comment_id: str) -> Dict: + """ + 查询一条评论内容 + Args: + comment_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from douyin_aweme_comment where comment_id = '{comment_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_comment(comment_item: Dict) -> int: + """ + 新增一条评论记录 + Args: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("douyin_aweme_comment", comment_item) + return last_row_id + + +async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: + """ + 更新增一条评论记录 + Args: + comment_id: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("douyin_aweme_comment", comment_item, "comment_id", comment_id) + return effect_row diff --git a/store/kuaishou/kuaishou_store_impl.py b/store/kuaishou/kuaishou_store_impl.py index a53dfc8..35c7f57 100644 --- a/store/kuaishou/kuaishou_store_impl.py +++ b/store/kuaishou/kuaishou_store_impl.py @@ -82,20 +82,18 @@ class KuaishouDbStoreImplement(AbstractStore): Returns: """ - from .kuaishou_store_db_types import KuaishouVideo + + + from .kuaishou_store_sql import (add_new_content, + query_content_by_content_id, + update_content_by_content_id) video_id = content_item.get("video_id") - if not await KuaishouVideo.filter(video_id=video_id).exists(): + video_detail: Dict = await query_content_by_content_id(content_id=video_id) + if not video_detail: content_item["add_ts"] = utils.get_current_timestamp() - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',)) - kuaishou_data = kuaishou_video_pydantic(**content_item) - kuaishou_video_pydantic.model_validate(kuaishou_data) - await KuaishouVideo.create(**kuaishou_data.model_dump()) + await add_new_content(content_item) else: - kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate', - exclude=('id', 'add_ts')) - kuaishou_data = kuaishou_video_pydantic(**content_item) - kuaishou_video_pydantic.model_validate(kuaishou_data) - await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump()) + await update_content_by_content_id(video_id, content_item=content_item) async def store_comment(self, comment_item: Dict): """ @@ -106,21 +104,17 @@ class KuaishouDbStoreImplement(AbstractStore): Returns: """ - from .kuaishou_store_db_types import KuaishouVideoComment + from .kuaishou_store_sql import (add_new_comment, + query_comment_by_comment_id, + update_comment_by_comment_id) comment_id = comment_item.get("comment_id") - if not await KuaishouVideoComment.filter(comment_id=comment_id).exists(): + comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id) + if not comment_detail: comment_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await KuaishouVideoComment.create(**comment_data.model_dump()) + await add_new_comment(comment_item) else: - comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + await update_comment_by_comment_id(comment_id, comment_item=comment_item) + class KuaishouJsonStoreImplement(AbstractStore): diff --git a/store/kuaishou/kuaishou_store_sql.py b/store/kuaishou/kuaishou_store_sql.py new file mode 100644 index 0000000..d50dc3f --- /dev/null +++ b/store/kuaishou/kuaishou_store_sql.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/6 15:30 +# @Desc : sql接口集合 + +from typing import Dict, List + +from db import AsyncMysqlDB +from var import media_crawler_db_var + + +async def query_content_by_content_id(content_id: str) -> Dict: + """ + 查询一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from kuaishou_video where video_id = '{content_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_content(content_item: Dict) -> int: + """ + 新增一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("kuaishou_video", content_item) + return last_row_id + + +async def update_content_by_content_id(content_id: str, content_item: Dict) -> int: + """ + 更新一条记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("kuaishou_video", content_item, "video_id", content_id) + return effect_row + + + +async def query_comment_by_comment_id(comment_id: str) -> Dict: + """ + 查询一条评论内容 + Args: + comment_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from kuaishou_video_comment where comment_id = '{comment_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_comment(comment_item: Dict) -> int: + """ + 新增一条评论记录 + Args: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("kuaishou_video_comment", comment_item) + return last_row_id + + +async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: + """ + 更新增一条评论记录 + Args: + comment_id: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("kuaishou_video_comment", comment_item, "comment_id", comment_id) + return effect_row diff --git a/store/weibo/weibo_store_impl.py b/store/weibo/weibo_store_impl.py index d915a47..05b9e52 100644 --- a/store/weibo/weibo_store_impl.py +++ b/store/weibo/weibo_store_impl.py @@ -82,20 +82,17 @@ class WeiboDbStoreImplement(AbstractStore): Returns: """ - from .weibo_store_db_types import WeiboNote + + from .weibo_store_sql import (add_new_content, + query_content_by_content_id, + update_content_by_content_id) note_id = content_item.get("note_id") - if not await WeiboNote.filter(note_id=note_id).exists(): + note_detail: Dict = await query_content_by_content_id(content_id=note_id) + if not note_detail: content_item["add_ts"] = utils.get_current_timestamp() - weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',)) - weibo_data = weibo_note_pydantic(**content_item) - weibo_note_pydantic.model_validate(weibo_data) - await WeiboNote.create(**weibo_data.model_dump()) + await add_new_content(content_item) else: - weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate', - exclude=('id', 'add_ts')) - weibo_data = weibo_note_pydantic(**content_item) - weibo_note_pydantic.model_validate(weibo_data) - await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump()) + await update_content_by_content_id(note_id, content_item=content_item) async def store_comment(self, comment_item: Dict): """ @@ -106,21 +103,16 @@ class WeiboDbStoreImplement(AbstractStore): Returns: """ - from .weibo_store_db_types import WeiboComment + from .weibo_store_sql import (add_new_comment, + query_comment_by_comment_id, + update_comment_by_comment_id) comment_id = comment_item.get("comment_id") - if not await WeiboComment.filter(comment_id=comment_id).exists(): + comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id) + if not comment_detail: comment_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate', - exclude=('id',)) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await WeiboComment.create(**comment_data.model_dump()) + await add_new_comment(comment_item) else: - comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate', - exclude=('id', 'add_ts')) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await WeiboComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + await update_comment_by_comment_id(comment_id, comment_item=comment_item) class WeiboJsonStoreImplement(AbstractStore): @@ -161,7 +153,6 @@ class WeiboJsonStoreImplement(AbstractStore): async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file: await file.write(json.dumps(save_data, ensure_ascii=False)) - async def store_content(self, content_item: Dict): """ content JSON storage implementation diff --git a/store/weibo/weibo_store_sql.py b/store/weibo/weibo_store_sql.py new file mode 100644 index 0000000..d7f73e7 --- /dev/null +++ b/store/weibo/weibo_store_sql.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/6 15:30 +# @Desc : sql接口集合 + +from typing import Dict, List + +from db import AsyncMysqlDB +from var import media_crawler_db_var + + +async def query_content_by_content_id(content_id: str) -> Dict: + """ + 查询一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from weibo_note where note_id = '{content_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_content(content_item: Dict) -> int: + """ + 新增一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("weibo_note", content_item) + return last_row_id + + +async def update_content_by_content_id(content_id: str, content_item: Dict) -> int: + """ + 更新一条记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("weibo_note", content_item, "note_id", content_id) + return effect_row + + + +async def query_comment_by_comment_id(comment_id: str) -> Dict: + """ + 查询一条评论内容 + Args: + comment_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from weibo_note_comment where comment_id = '{comment_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_comment(comment_item: Dict) -> int: + """ + 新增一条评论记录 + Args: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("weibo_note_comment", comment_item) + return last_row_id + + +async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: + """ + 更新增一条评论记录 + Args: + comment_id: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("weibo_note_comment", comment_item, "comment_id", comment_id) + return effect_row diff --git a/store/xhs/__init__.py b/store/xhs/__init__.py index 7cc7ba2..890ff9d 100644 --- a/store/xhs/__init__.py +++ b/store/xhs/__init__.py @@ -118,6 +118,7 @@ async def save_creator(user_id: str, creator: Dict): 'fans': fans, 'interaction': interaction, 'tag_list': json.dumps({tag.get('tagType'): tag.get('name') for tag in creator.get('tags')}, ensure_ascii=False), + "last_modify_ts": utils.get_current_timestamp(), } utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}") await XhsStoreFactory.create_store().store_creator(local_db_item) diff --git a/store/xhs/xhs_store_impl.py b/store/xhs/xhs_store_impl.py index e5860db..80eeede 100644 --- a/store/xhs/xhs_store_impl.py +++ b/store/xhs/xhs_store_impl.py @@ -13,8 +13,9 @@ import aiofiles from tortoise.contrib.pydantic import pydantic_model_creator from base.base_crawler import AbstractStore +from db import AsyncMysqlDB from tools import utils -from var import crawler_type_var +from var import crawler_type_var, media_crawler_db_var class XhsCsvStoreImplement(AbstractStore): @@ -94,19 +95,16 @@ class XhsDbStoreImplement(AbstractStore): Returns: """ - from .xhs_store_db_types import XHSNote + from .xhs_store_sql import (add_new_content, + query_content_by_content_id, + update_content_by_content_id) note_id = content_item.get("note_id") - if not await XHSNote.filter(note_id=note_id).first(): + note_detail: Dict = await query_content_by_content_id(content_id=note_id) + if not note_detail: content_item["add_ts"] = utils.get_current_timestamp() - note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticCreate", exclude=('id',)) - note_data = note_pydantic(**content_item) - note_pydantic.model_validate(note_data) - await XHSNote.create(**note_data.model_dump()) + await add_new_content(content_item) else: - note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticUpdate", exclude=('id', 'add_ts')) - note_data = note_pydantic(**content_item) - note_pydantic.model_validate(note_data) - await XHSNote.filter(note_id=note_id).update(**note_data.model_dump()) + await update_content_by_content_id(note_id, content_item=content_item) async def store_comment(self, comment_item: Dict): """ @@ -117,20 +115,16 @@ class XhsDbStoreImplement(AbstractStore): Returns: """ - from .xhs_store_db_types import XHSNoteComment + from .xhs_store_sql import (add_new_comment, + query_comment_by_comment_id, + update_comment_by_comment_id) comment_id = comment_item.get("comment_id") - if not await XHSNoteComment.filter(comment_id=comment_id).first(): + comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id) + if not comment_detail: comment_item["add_ts"] = utils.get_current_timestamp() - comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticCreate", exclude=('id',)) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await XHSNoteComment.create(**comment_data.model_dump()) + await add_new_comment(comment_item) else: - comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticUpdate", - exclude=('id', 'add_ts',)) - comment_data = comment_pydantic(**comment_item) - comment_pydantic.model_validate(comment_data) - await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.model_dump()) + await update_comment_by_comment_id(comment_id, comment_item=comment_item) async def store_creator(self, creator: Dict): """ @@ -141,21 +135,15 @@ class XhsDbStoreImplement(AbstractStore): Returns: """ - from .xhs_store_db_types import XhsCreator + from .xhs_store_sql import (add_new_creator, query_creator_by_user_id, + update_creator_by_user_id) user_id = creator.get("user_id") - if not await XhsCreator.filter(user_id=user_id).first(): + user_detail: Dict = await query_creator_by_user_id(user_id) + if not user_detail: creator["add_ts"] = utils.get_current_timestamp() - creator["last_modify_ts"] = creator["add_ts"] - creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticCreate", exclude=('id',)) - creator_data = creator_pydantic(**creator) - creator_pydantic.model_validate(creator_data) - await XhsCreator.create(**creator_data.model_dump()) + await add_new_creator(creator) else: - creator["last_modify_ts"] = utils.get_current_timestamp() - creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticUpdate", exclude=('id', 'add_ts',)) - creator_data = creator_pydantic(**creator) - creator_pydantic.model_validate(creator_data) - await XhsCreator.filter(user_id=user_id).update(**creator_data.model_dump()) + await update_creator_by_user_id(user_id, creator) class XhsJsonStoreImplement(AbstractStore): @@ -217,7 +205,7 @@ class XhsJsonStoreImplement(AbstractStore): """ await self.save_data_to_json(comment_item, "comments") - + async def store_creator(self, creator: Dict): """ Xiaohongshu content JSON storage implementation @@ -227,4 +215,4 @@ class XhsJsonStoreImplement(AbstractStore): Returns: """ - await self.save_data_to_json(creator, "creator") \ No newline at end of file + await self.save_data_to_json(creator, "creator") diff --git a/store/xhs/xhs_store_sql.py b/store/xhs/xhs_store_sql.py new file mode 100644 index 0000000..d6c4929 --- /dev/null +++ b/store/xhs/xhs_store_sql.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/6 15:30 +# @Desc : sql接口集合 + +from typing import Dict, List + +from db import AsyncMysqlDB +from var import media_crawler_db_var + + +async def query_content_by_content_id(content_id: str) -> Dict: + """ + 查询一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from xhs_note where note_id = '{content_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_content(content_item: Dict) -> int: + """ + 新增一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("xhs_note", content_item) + return last_row_id + + +async def update_content_by_content_id(content_id: str, content_item: Dict) -> int: + """ + 更新一条记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("xhs_note", content_item, "note_id", content_id) + return effect_row + + + +async def query_comment_by_comment_id(comment_id: str) -> Dict: + """ + 查询一条评论内容 + Args: + comment_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from xhs_note_comment where comment_id = '{comment_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_comment(comment_item: Dict) -> int: + """ + 新增一条评论记录 + Args: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("xhs_note_comment", comment_item) + return last_row_id + + +async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: + """ + 更新增一条评论记录 + Args: + comment_id: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("xhs_note_comment", comment_item, "comment_id", comment_id) + return effect_row + + +async def query_creator_by_user_id(user_id: str) -> Dict: + """ + 查询一条创作者记录 + Args: + user_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from xhs_creator where user_id = '{user_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_creator(creator_item: Dict) -> int: + """ + 新增一条创作者信息 + Args: + creator_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("xhs_creator", creator_item) + return last_row_id + + +async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int: + """ + 更新一条创作者信息 + Args: + user_id: + creator_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("xhs_creator", creator_item, "user_id", user_id) + return effect_row \ No newline at end of file diff --git a/var.py b/var.py index e929f8e..9107e52 100644 --- a/var.py +++ b/var.py @@ -2,6 +2,12 @@ from asyncio.tasks import Task from contextvars import ContextVar from typing import List +import aiomysql + +from async_db import AsyncMysqlDB + request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="") crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="") comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[]) +media_crawler_db_var: ContextVar[AsyncMysqlDB] = ContextVar("media_crawler_db_var") +db_conn_pool_var: ContextVar[aiomysql.Pool] = ContextVar("db_conn_pool_var")