# -*- coding: utf-8 -*- from typing import List import config from base.base_crawler import AbstractStore from model.m_zhihu import ZhihuComment, ZhihuContent from store.zhihu.zhihu_store_impl import (ZhihuCsvStoreImplement, ZhihuDbStoreImplement, ZhihuJsonStoreImplement) from tools import utils from var import source_keyword_var class ZhihuStoreFactory: STORES = { "csv": ZhihuCsvStoreImplement, "db": ZhihuDbStoreImplement, "json": ZhihuJsonStoreImplement } @staticmethod def create_store() -> AbstractStore: store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION) if not store_class: raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json ...") return store_class() async def update_zhihu_content(content_item: ZhihuContent): """ 更新知乎内容 Args: content_item: Returns: """ content_item.source_keyword = source_keyword_var.get() local_db_item = content_item.model_dump() local_db_item.update({"last_modify_ts": utils.get_current_timestamp()}) utils.logger.info(f"[store.zhihu.update_zhihu_content] zhihu content: {local_db_item}") await ZhihuStoreFactory.create_store().store_content(local_db_item) async def batch_update_zhihu_note_comments(comments: List[ZhihuComment]): """ 批量更新知乎内容评论 Args: comments: Returns: """ if not comments: return for comment_item in comments: await update_zhihu_content_comment(comment_item) async def update_zhihu_content_comment(comment_item: ZhihuComment): """ 更新知乎内容评论 Args: comment_item: Returns: """ local_db_item = comment_item.model_dump() local_db_item.update({"last_modify_ts": utils.get_current_timestamp()}) utils.logger.info(f"[store.zhihu.update_zhihu_note_comment] zhihu content comment:{local_db_item}") await ZhihuStoreFactory.create_store().store_comment(local_db_item)