diff --git a/cmd_arg/arg.py b/cmd_arg/arg.py index 27854f7..65819a1 100644 --- a/cmd_arg/arg.py +++ b/cmd_arg/arg.py @@ -7,8 +7,8 @@ from tools.utils import str2bool async def parse_cmd(): # 读取command arg parser = argparse.ArgumentParser(description='Media crawler program.') - parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)', - choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM) + parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb | tieba)', + choices=["xhs", "dy", "ks", "bili", "wb", "tieba"], default=config.PLATFORM) parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)', choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE) parser.add_argument('--type', type=str, help='crawler type (search | detail | creator)', diff --git a/config/base_config.py b/config/base_config.py index 076003a..08dd421 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -88,6 +88,12 @@ WEIBO_SPECIFIED_ID_LIST = [ # ........................ ] +# 指定贴吧需要爬取的帖子列表 +TIEBA_SPECIFIED_ID_LIST = [ + +] + + # 指定小红书创作者ID列表 XHS_CREATOR_ID_LIST = [ "63e36c9a000000002703502b", diff --git a/main.py b/main.py index 27d84ad..e051b5e 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ from base.base_crawler import AbstractCrawler from media_platform.bilibili import BilibiliCrawler from media_platform.douyin import DouYinCrawler from media_platform.kuaishou import KuaishouCrawler +from media_platform.tieba import TieBaCrawler from media_platform.weibo import WeiboCrawler from media_platform.xhs import XiaoHongShuCrawler @@ -18,7 +19,8 @@ class CrawlerFactory: "dy": DouYinCrawler, "ks": KuaishouCrawler, "bili": BilibiliCrawler, - "wb": WeiboCrawler + "wb": WeiboCrawler, + "tieba": TieBaCrawler } @staticmethod @@ -28,6 +30,7 @@ class CrawlerFactory: raise ValueError("Invalid Media Platform Currently only supported xhs or dy or ks or bili ...") return crawler_class() + async def main(): # parse cmd await cmd_arg.parse_cmd() @@ -38,7 +41,7 @@ async def main(): crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM) await crawler.start() - + if config.SAVE_DATA_OPTION == "db": await db.close() diff --git a/media_platform/tieba/__init__.py b/media_platform/tieba/__init__.py new file mode 100644 index 0000000..e7e2a44 --- /dev/null +++ b/media_platform/tieba/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +from .core import TieBaCrawler \ No newline at end of file diff --git a/media_platform/tieba/client.py b/media_platform/tieba/client.py new file mode 100644 index 0000000..a7ebaa1 --- /dev/null +++ b/media_platform/tieba/client.py @@ -0,0 +1,169 @@ +import asyncio +import json +import re +from typing import Any, Callable, Dict, List, Optional, Union +from urllib.parse import urlencode + +import httpx +from playwright.async_api import BrowserContext, Page + +import config +from base.base_crawler import AbstractApiClient +from tools import utils + +from .field import SearchNoteType, SearchSortType + + +class BaiduTieBaClient(AbstractApiClient): + def __init__( + self, + timeout=10, + proxies=None, + *, + headers: Dict[str, str], + playwright_page: Page, + cookie_dict: Dict[str, str], + ): + self.proxies = proxies + self.timeout = timeout + self.headers = headers + self.playwright_page = playwright_page + self.cookie_dict = cookie_dict + self._host = "https://tieba.baidu.com" + + async def request(self, method, url, **kwargs) -> Union[str, Any]: + """ + 封装httpx的公共请求方法,对请求响应做一些处理 + Args: + method: 请求方法 + url: 请求的URL + **kwargs: 其他请求参数,例如请求头、请求体等 + + Returns: + + """ + # return response.text + return_response = kwargs.pop('return_response', False) + + async with httpx.AsyncClient(proxies=self.proxies) as client: + response = await client.request( + method, url, timeout=self.timeout, + **kwargs + ) + + if return_response: + return response.text + + return response.json() + + async def get(self, uri: str, params=None) -> Dict: + """ + GET请求,对请求头签名 + Args: + uri: 请求路由 + params: 请求参数 + + Returns: + + """ + final_uri = uri + if isinstance(params, dict): + final_uri = (f"{uri}?" + f"{urlencode(params)}") + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers) + + async def post(self, uri: str, data: dict) -> Dict: + """ + POST请求,对请求头签名 + Args: + uri: 请求路由 + data: 请求体参数 + + Returns: + + """ + json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) + return await self.request(method="POST", url=f"{self._host}{uri}", + data=json_str, headers=self.headers) + + async def pong(self) -> bool: + """ + 用于检查登录态是否失效了 + Returns: + + """ + utils.logger.info("[BaiduTieBaClient.pong] Begin to pong tieba...") + try: + uri = "/mo/q/sync" + res: Dict = await self.get(uri) + if res and res.get("no") == 0: + ping_flag = True + else: + utils.logger.info(f"[BaiduTieBaClient.pong] user not login, will try to login again...") + ping_flag = False + except Exception as e: + utils.logger.error(f"[BaiduTieBaClient.pong] Ping tieba failed: {e}, and try to login again...") + ping_flag = False + return ping_flag + + async def update_cookies(self, browser_context: BrowserContext): + """ + API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法 + Args: + browser_context: 浏览器上下文对象 + + Returns: + + """ + cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) + self.headers["Cookie"] = cookie_str + self.cookie_dict = cookie_dict + + async def get_note_by_keyword( + self, keyword: str, + page: int = 1, + page_size: int = 10, + sort: SearchSortType = SearchSortType.TIME_DESC, + note_type: SearchNoteType = SearchNoteType.FIXED_THREAD + ) -> Dict: + """ + 根据关键词搜索贴吧帖子 + Args: + keyword: 关键词 + page: 分页第几页 + page_size: 每页肠病毒 + sort: 结果排序方式 + note_type: 帖子类型(主题贴|主题+回复混合模式) + + Returns: + + """ + # todo impl it + return {} + + async def get_note_by_id(self, note_id: str) -> Dict: + """ + 根据帖子ID获取帖子详情 + Args: + note_id: + + Returns: + + """ + # todo impl it + return {} + + async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0, + callback: Optional[Callable] = None) -> List[Dict]: + """ + 获取指定帖子下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息 + Args: + note_id: 帖子ID + crawl_interval: 爬取一次笔记的延迟单位(秒) + callback: 一次笔记爬取结束后 + + Returns: + + """ + # todo impl it + return [] \ No newline at end of file diff --git a/media_platform/tieba/core.py b/media_platform/tieba/core.py new file mode 100644 index 0000000..c7a99d5 --- /dev/null +++ b/media_platform/tieba/core.py @@ -0,0 +1,265 @@ +import asyncio +import os +import random +from asyncio import Task +from typing import Dict, List, Optional, Tuple + +from playwright.async_api import (BrowserContext, BrowserType, Page, + async_playwright) + +import config +from base.base_crawler import AbstractCrawler +from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool +from store import tieba as tieba_store +from tools import utils +from var import crawler_type_var + +from .client import BaiduTieBaClient +from .field import SearchNoteType, SearchSortType +from .login import BaiduTieBaLogin + + +class TieBaCrawler(AbstractCrawler): + context_page: Page + tieba_client: BaiduTieBaClient + browser_context: BrowserContext + + def __init__(self) -> None: + self.index_url = "https://tieba.baidu.com" + self.user_agent = utils.get_user_agent() + + async def start(self) -> None: + playwright_proxy_format, httpx_proxy_format = None, None + if config.ENABLE_IP_PROXY: + ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info) + + async with async_playwright() as playwright: + # Launch a browser context. + chromium = playwright.chromium + self.browser_context = await self.launch_browser( + chromium, + None, + self.user_agent, + headless=config.HEADLESS + ) + # stealth.min.js is a js script to prevent the website from detecting the crawler. + await self.browser_context.add_init_script(path="libs/stealth.min.js") + self.context_page = await self.browser_context.new_page() + await self.context_page.goto(self.index_url) + + # Create a client to interact with the baidutieba website. + self.tieba_client = await self.create_tieba_client(httpx_proxy_format) + if not await self.tieba_client.pong(): + login_obj = BaiduTieBaLogin( + login_type=config.LOGIN_TYPE, + login_phone="", # input your phone number + browser_context=self.browser_context, + context_page=self.context_page, + cookie_str=config.COOKIES + ) + await login_obj.begin() + await self.tieba_client.update_cookies(browser_context=self.browser_context) + + crawler_type_var.set(config.CRAWLER_TYPE) + if config.CRAWLER_TYPE == "search": + # Search for notes and retrieve their comment information. + await self.search() + elif config.CRAWLER_TYPE == "detail": + # Get the information and comments of the specified post + await self.get_specified_notes() + else: + pass + + utils.logger.info("[BaiduTieBaCrawler.start] Xhs Crawler finished ...") + + async def search(self) -> None: + """Search for notes and retrieve their comment information.""" + utils.logger.info("[BaiduTieBaCrawler.search] Begin search baidutieba keywords") + tieba_limit_count = 10 # tieba limit page fixed value + if config.CRAWLER_MAX_NOTES_COUNT < tieba_limit_count: + config.CRAWLER_MAX_NOTES_COUNT = tieba_limit_count + start_page = config.START_PAGE + for keyword in config.KEYWORDS.split(","): + utils.logger.info(f"[BaiduTieBaCrawler.search] Current search keyword: {keyword}") + page = 1 + while (page - start_page + 1) * tieba_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: + if page < start_page: + utils.logger.info(f"[BaiduTieBaCrawler.search] Skip page {page}") + page += 1 + continue + try: + utils.logger.info(f"[BaiduTieBaCrawler.search] search tieba keyword: {keyword}, page: {page}") + note_id_list: List[str] = [] + notes_res = await self.tieba_client.get_note_by_keyword( + keyword=keyword, + page=page, + page_size=tieba_limit_count, + sort=SearchSortType.TIME_DESC, + note_type=SearchNoteType.FIXED_THREAD + ) + utils.logger.info(f"[BaiduTieBaCrawler.search] Search notes res:{notes_res}") + if not notes_res or not notes_res.get('has_more', False): + utils.logger.info("No more content!") + break + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [ + self.get_note_detail( + note_id=post_item.get("id"), + semaphore=semaphore + ) + for post_item in notes_res.get("items", {}) + if post_item.get('model_type') not in ('rec_query', 'hot_query') + ] + note_details = await asyncio.gather(*task_list) + for note_detail in note_details: + if note_detail: + await tieba_store.update_tieba_note(note_detail) + note_id_list.append(note_detail.get("note_id")) + page += 1 + utils.logger.info(f"[BaiduTieBaCrawler.search] Note details: {note_details}") + await self.batch_get_note_comments(note_id_list) + except Exception as ex: + utils.logger.error(f"[BaiduTieBaCrawler.search] Get note detail error, err: {ex}") + break + + async def fetch_creator_notes_detail(self, note_list: List[Dict]): + """ + Concurrently obtain the specified post list and save the data + """ + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [ + self.get_note_detail( + note_id=post_item.get("note_id"), + semaphore=semaphore + ) + for post_item in note_list + ] + + note_details = await asyncio.gather(*task_list) + for note_detail in note_details: + if note_detail: + await tieba_store.update_tieba_note(note_detail) + + async def get_specified_notes(self): + """Get the information and comments of the specified post""" + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [ + self.get_note_detail(note_id=note_id, semaphore=semaphore) for note_id in config.TIEBA_SPECIFIED_ID_LIST + ] + note_details = await asyncio.gather(*task_list) + for note_detail in note_details: + if note_detail is not None: + await tieba_store.update_tieba_note(note_detail) + await self.batch_get_note_comments(config.TIEBA_SPECIFIED_ID_LIST) + + async def get_note_detail(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: + """Get note detail""" + async with semaphore: + try: + note_detail: Dict = await self.tieba_client.get_note_by_id(note_id) + if not note_detail: + utils.logger.error( + f"[BaiduTieBaCrawler.get_note_detail] Get note detail error, note_id: {note_id}") + return None + return note_detail + except Exception as ex: + utils.logger.error(f"[BaiduTieBaCrawler.get_note_detail] Get note detail error: {ex}") + return None + except KeyError as ex: + utils.logger.error( + f"[BaiduTieBaCrawler.get_note_detail] have not fund note detail note_id:{note_id}, err: {ex}") + return None + + async def batch_get_note_comments(self, note_list: List[str]): + """Batch get note comments""" + if not config.ENABLE_GET_COMMENTS: + utils.logger.info(f"[BaiduTieBaCrawler.batch_get_note_comments] Crawling comment mode is not enabled") + return + + utils.logger.info( + f"[BaiduTieBaCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}") + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + for note_id in note_list: + task = asyncio.create_task(self.get_comments(note_id, semaphore), name=note_id) + task_list.append(task) + await asyncio.gather(*task_list) + + async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): + """Get note comments with keyword filtering and quantity limitation""" + async with semaphore: + utils.logger.info(f"[BaiduTieBaCrawler.get_comments] Begin get note id comments {note_id}") + await self.tieba_client.get_note_all_comments( + note_id=note_id, + crawl_interval=random.random(), + callback=tieba_store.batch_update_tieba_note_comments + ) + + @staticmethod + def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: + """format proxy info for playwright and httpx""" + playwright_proxy = { + "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}", + "username": ip_proxy_info.user, + "password": ip_proxy_info.password, + } + httpx_proxy = { + f"{ip_proxy_info.protocol}": f"http://{ip_proxy_info.user}:{ip_proxy_info.password}@{ip_proxy_info.ip}:{ip_proxy_info.port}" + } + return playwright_proxy, httpx_proxy + + async def create_tieba_client(self, httpx_proxy: Optional[str]) -> BaiduTieBaClient: + """Create tieba client""" + utils.logger.info("[BaiduTieBaCrawler.create_tieba_client] Begin create baidutieba API client ...") + cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) + tieba_client_obj = BaiduTieBaClient( + proxies=httpx_proxy, + headers={ + "User-Agent": self.user_agent, + "Cookie": cookie_str, + "Origin": "https://www.baidutieba.com", + "Referer": "https://www.baidutieba.com", + "Content-Type": "application/json;charset=UTF-8" + }, + playwright_page=self.context_page, + cookie_dict=cookie_dict, + ) + return tieba_client_obj + + async def launch_browser( + self, + chromium: BrowserType, + playwright_proxy: Optional[Dict], + user_agent: Optional[str], + headless: bool = True + ) -> BrowserContext: + """Launch browser and create browser context""" + utils.logger.info("[BaiduTieBaCrawler.launch_browser] Begin create browser context ...") + if config.SAVE_LOGIN_STATE: + # feat issue #14 + # we will save login state to avoid login every time + user_data_dir = os.path.join(os.getcwd(), "browser_data", + config.USER_DATA_DIR % config.PLATFORM) # type: ignore + browser_context = await chromium.launch_persistent_context( + user_data_dir=user_data_dir, + accept_downloads=True, + headless=headless, + proxy=playwright_proxy, # type: ignore + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + else: + browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore + browser_context = await browser.new_context( + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + + async def close(self): + """Close browser context""" + await self.browser_context.close() + utils.logger.info("[BaiduTieBaCrawler.close] Browser context closed ...") diff --git a/media_platform/tieba/field.py b/media_platform/tieba/field.py new file mode 100644 index 0000000..824fe88 --- /dev/null +++ b/media_platform/tieba/field.py @@ -0,0 +1,18 @@ +from enum import Enum + + +class SearchSortType(Enum): + """search sort type""" + # 按时间倒序 + TIME_DESC = "1" + # 按时间顺序 + TIME_ASC = "0" + # 按相关性顺序 + RELEVANCE_ORDER = "2" + + +class SearchNoteType(Enum): + # 只看主题贴 + MAIN_THREAD = "1" + # 混合模式(帖子+回复) + FIXED_THREAD = "0" diff --git a/media_platform/tieba/login.py b/media_platform/tieba/login.py new file mode 100644 index 0000000..8c1eb15 --- /dev/null +++ b/media_platform/tieba/login.py @@ -0,0 +1,112 @@ +import asyncio +import functools +import sys +from typing import Optional + +from playwright.async_api import BrowserContext, Page +from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, + wait_fixed) + +import config +from base.base_crawler import AbstractLogin +from tools import utils + + +class BaiduTieBaLogin(AbstractLogin): + + def __init__(self, + login_type: str, + browser_context: BrowserContext, + context_page: Page, + login_phone: Optional[str] = "", + cookie_str: str = "" + ): + config.LOGIN_TYPE = login_type + self.browser_context = browser_context + self.context_page = context_page + self.login_phone = login_phone + self.cookie_str = cookie_str + + @retry(stop=stop_after_attempt(600), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) + async def check_login_state(self) -> bool: + """ + 轮训检查登录状态是否成功,成功返回True否则返回False + + Returns: + + """ + current_cookie = await self.browser_context.cookies() + _, cookie_dict = utils.convert_cookies(current_cookie) + stoken = cookie_dict.get("STOKEN") + ptoken = cookie_dict.get("PTOKEN") + if stoken or ptoken: + return True + return False + + async def begin(self): + """Start login baidutieba""" + utils.logger.info("[BaiduTieBaLogin.begin] Begin login baidutieba ...") + if config.LOGIN_TYPE == "qrcode": + await self.login_by_qrcode() + elif config.LOGIN_TYPE == "phone": + await self.login_by_mobile() + elif config.LOGIN_TYPE == "cookie": + await self.login_by_cookies() + else: + raise ValueError("[BaiduTieBaLogin.begin]Invalid Login Type Currently only supported qrcode or phone or cookies ...") + + async def login_by_mobile(self): + """Login baidutieba by mobile""" + pass + + async def login_by_qrcode(self): + """login baidutieba website and keep webdriver login state""" + utils.logger.info("[BaiduTieBaLogin.login_by_qrcode] Begin login baidutieba by qrcode ...") + qrcode_img_selector = "xpath=//img[@class='tang-pass-qrcode-img']" + # find login qrcode + base64_qrcode_img = await utils.find_login_qrcode( + self.context_page, + selector=qrcode_img_selector + ) + if not base64_qrcode_img: + utils.logger.info("[BaiduTieBaLogin.login_by_qrcode] login failed , have not found qrcode please check ....") + # if this website does not automatically popup login dialog box, we will manual click login button + await asyncio.sleep(0.5) + login_button_ele = self.context_page.locator("xpath=//li[@class='u_login']") + await login_button_ele.click() + base64_qrcode_img = await utils.find_login_qrcode( + self.context_page, + selector=qrcode_img_selector + ) + if not base64_qrcode_img: + utils.logger.info("[BaiduTieBaLogin.login_by_qrcode] login failed , have not found qrcode please check ....") + sys.exit() + + # show login qrcode + # fix issue #12 + # we need to use partial function to call show_qrcode function and run in executor + # then current asyncio event loop will not be blocked + partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img) + asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode) + + utils.logger.info(f"[BaiduTieBaLogin.login_by_qrcode] waiting for scan code login, remaining time is 120s") + try: + await self.check_login_state() + except RetryError: + utils.logger.info("[BaiduTieBaLogin.login_by_qrcode] Login baidutieba failed by qrcode login method ...") + sys.exit() + + wait_redirect_seconds = 5 + utils.logger.info(f"[BaiduTieBaLogin.login_by_qrcode] Login successful then wait for {wait_redirect_seconds} seconds redirect ...") + await asyncio.sleep(wait_redirect_seconds) + + async def login_by_cookies(self): + """login baidutieba website by cookies""" + utils.logger.info("[BaiduTieBaLogin.login_by_cookies] Begin login baidutieba by cookie ...") + for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): + await self.browser_context.add_cookies([{ + 'name': key, + 'value': value, + 'domain': ".baidu.com", + 'path': "/" + }]) diff --git a/store/tieba/__init__.py b/store/tieba/__init__.py new file mode 100644 index 0000000..9605d58 --- /dev/null +++ b/store/tieba/__init__.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +from typing import List + +from . import tieba_store_impl +from .tieba_store_impl import * + + +class TieBaStoreFactory: + STORES = { + "csv": TieBaCsvStoreImplement, + "db": TieBaDbStoreImplement, + "json": TieBaJsonStoreImplement + } + + @staticmethod + def create_store() -> AbstractStore: + store_class = TieBaStoreFactory.STORES.get(config.SAVE_DATA_OPTION) + if not store_class: + raise ValueError( + "[TieBaStoreFactory.create_store] Invalid save option only supported csv or db or json ...") + return store_class() + + +async def update_tieba_note(note_item: Dict): + note_id = note_item.get("note_id") + user_info = note_item.get("user", {}) + interact_info = note_item.get("interact_info", {}) + tag_list: List[Dict] = note_item.get("tag_list", []) + + local_db_item = { + "note_id": note_id, + "type": note_item.get("type"), + "title": note_item.get("title") or note_item.get("desc", "")[:255], + "desc": note_item.get("desc", ""), + "time": note_item.get("time"), + "last_update_time": note_item.get("last_update_time", 0), + "user_id": user_info.get("user_id"), + "nickname": user_info.get("nickname"), + "avatar": user_info.get("avatar"), + "liked_count": interact_info.get("liked_count"), + "collected_count": interact_info.get("collected_count"), + "comment_count": interact_info.get("comment_count"), + "share_count": interact_info.get("share_count"), + "ip_location": note_item.get("ip_location", ""), + + "tag_list": ','.join([tag.get('name', '') for tag in tag_list if tag.get('type') == 'topic']), + "last_modify_ts": utils.get_current_timestamp(), + # todo: add note_url + "note_url": "" + } + utils.logger.info(f"[store.tieba.update_tieba_note] tieba note: {local_db_item}") + await TieBaStoreFactory.create_store().store_content(local_db_item) + + +async def batch_update_tieba_note_comments(note_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_tieba_note_comment(note_id, comment_item) + + +async def update_tieba_note_comment(note_id: str, comment_item: Dict): + """ + Update tieba note comment + Args: + note_id: + comment_item: + + Returns: + + """ + user_info = comment_item.get("user_info", {}) + comment_id = comment_item.get("id") + comment_pictures = [item.get("url_default", "") for item in comment_item.get("pictures", [])] + target_comment = comment_item.get("target_comment", {}) + local_db_item = { + "comment_id": comment_id, + "create_time": comment_item.get("create_time"), + "ip_location": comment_item.get("ip_location"), + "note_id": note_id, + "content": comment_item.get("content"), + "user_id": user_info.get("user_id"), + "nickname": user_info.get("nickname"), + "avatar": user_info.get("image"), + "sub_comment_count": comment_item.get("sub_comment_count", 0), + "pictures": ",".join(comment_pictures), + "parent_comment_id": target_comment.get("id", 0), + "last_modify_ts": utils.get_current_timestamp(), + } + utils.logger.info(f"[store.tieba.update_tieba_note_comment] tieba note comment:{local_db_item}") + await TieBaStoreFactory.create_store().store_comment(local_db_item) diff --git a/store/tieba/tieba_store_impl.py b/store/tieba/tieba_store_impl.py new file mode 100644 index 0000000..fe0ccbc --- /dev/null +++ b/store/tieba/tieba_store_impl.py @@ -0,0 +1,244 @@ +# -*- coding: utf-8 -*- +import asyncio +import csv +import json +import os +import pathlib +from typing import Dict + +import aiofiles + +import config +from base.base_crawler import AbstractStore +from tools import utils, words +from var import crawler_type_var + + +def calculate_number_of_files(file_store_path: str) -> int: + """计算数据保存文件的前部分排序数字,支持每次运行代码不写到同一个文件中 + Args: + file_store_path; + Returns: + file nums + """ + if not os.path.exists(file_store_path): + return 1 + try: + return max([int(file_name.split("_")[0])for file_name in os.listdir(file_store_path)])+1 + except ValueError: + return 1 + + +class TieBaCsvStoreImplement(AbstractStore): + csv_store_path: str = "data/tieba" + file_count:int=calculate_number_of_files(csv_store_path) + + def make_save_file_name(self, store_type: str) -> str: + """ + make save file name by store type + Args: + store_type: contents or comments + + Returns: eg: data/tieba/search_comments_20240114.csv ... + + """ + return f"{self.csv_store_path}/{self.file_count}_{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.csv" + + async def save_data_to_csv(self, save_item: Dict, store_type: str): + """ + Below is a simple way to save it in CSV format. + Args: + save_item: save content dict info + store_type: Save type contains content and comments(contents | comments) + + Returns: no returns + + """ + pathlib.Path(self.csv_store_path).mkdir(parents=True, exist_ok=True) + save_file_name = self.make_save_file_name(store_type=store_type) + async with aiofiles.open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + f.fileno() + writer = csv.writer(f) + if await f.tell() == 0: + await writer.writerow(save_item.keys()) + await writer.writerow(save_item.values()) + + async def store_content(self, content_item: Dict): + """ + Xiaohongshu content CSV storage implementation + Args: + content_item: note item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=content_item, store_type="contents") + + async def store_comment(self, comment_item: Dict): + """ + Xiaohongshu comment CSV storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + await self.save_data_to_csv(save_item=comment_item, store_type="comments") + + async def store_creator(self, creator: Dict): + """ + Xiaohongshu content CSV storage implementation + Args: + creator: creator dict + + Returns: + + """ + await self.save_data_to_csv(save_item=creator, store_type="creator") + + +class TieBaDbStoreImplement(AbstractStore): + async def store_content(self, content_item: Dict): + """ + Xiaohongshu content DB storage implementation + Args: + content_item: content item dict + + Returns: + + """ + from .tieba_store_sql import (add_new_content, + query_content_by_content_id, + update_content_by_content_id) + note_id = content_item.get("note_id") + note_detail: Dict = await query_content_by_content_id(content_id=note_id) + if not note_detail: + content_item["add_ts"] = utils.get_current_timestamp() + await add_new_content(content_item) + else: + await update_content_by_content_id(note_id, content_item=content_item) + + async def store_comment(self, comment_item: Dict): + """ + Xiaohongshu content DB storage implementation + Args: + comment_item: comment item dict + + Returns: + + """ + from .tieba_store_sql import (add_new_comment, + query_comment_by_comment_id, + update_comment_by_comment_id) + comment_id = comment_item.get("comment_id") + comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id) + if not comment_detail: + comment_item["add_ts"] = utils.get_current_timestamp() + await add_new_comment(comment_item) + else: + await update_comment_by_comment_id(comment_id, comment_item=comment_item) + + async def store_creator(self, creator: Dict): + """ + Xiaohongshu content DB storage implementation + Args: + creator: creator dict + + Returns: + + """ + from .tieba_store_sql import (add_new_creator, + query_creator_by_user_id, + update_creator_by_user_id) + user_id = creator.get("user_id") + user_detail: Dict = await query_creator_by_user_id(user_id) + if not user_detail: + creator["add_ts"] = utils.get_current_timestamp() + await add_new_creator(creator) + else: + await update_creator_by_user_id(user_id, creator) + + +class TieBaJsonStoreImplement(AbstractStore): + json_store_path: str = "data/tieba/json" + words_store_path: str = "data/tieba/words" + lock = asyncio.Lock() + file_count:int=calculate_number_of_files(json_store_path) + WordCloud = words.AsyncWordCloudGenerator() + + def make_save_file_name(self, store_type: str) -> (str,str): + """ + make save file name by store type + Args: + store_type: Save type contains content and comments(contents | comments) + + Returns: + + """ + + return ( + f"{self.json_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}.json", + f"{self.words_store_path}/{crawler_type_var.get()}_{store_type}_{utils.get_current_date()}" + ) + + async def save_data_to_json(self, save_item: Dict, store_type: str): + """ + Below is a simple way to save it in json format. + Args: + save_item: save content dict info + store_type: Save type contains content and comments(contents | comments) + + Returns: + + """ + pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True) + pathlib.Path(self.words_store_path).mkdir(parents=True, exist_ok=True) + save_file_name,words_file_name_prefix = self.make_save_file_name(store_type=store_type) + save_data = [] + + async with self.lock: + if os.path.exists(save_file_name): + async with aiofiles.open(save_file_name, 'r', encoding='utf-8') as file: + save_data = json.loads(await file.read()) + + save_data.append(save_item) + async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file: + await file.write(json.dumps(save_data, ensure_ascii=False)) + + if config.ENABLE_GET_COMMENTS and config.ENABLE_GET_WORDCLOUD: + try: + await self.WordCloud.generate_word_frequency_and_cloud(save_data, words_file_name_prefix) + except: + pass + async def store_content(self, content_item: Dict): + """ + content JSON storage implementation + Args: + content_item: + + Returns: + + """ + await self.save_data_to_json(content_item, "contents") + + async def store_comment(self, comment_item: Dict): + """ + comment JSON storage implementatio + Args: + comment_item: + + Returns: + + """ + await self.save_data_to_json(comment_item, "comments") + + async def store_creator(self, creator: Dict): + """ + Xiaohongshu content JSON storage implementation + Args: + creator: creator dict + + Returns: + + """ + await self.save_data_to_json(creator, "creator") diff --git a/store/tieba/tieba_store_sql.py b/store/tieba/tieba_store_sql.py new file mode 100644 index 0000000..9ec03a4 --- /dev/null +++ b/store/tieba/tieba_store_sql.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +from typing import Dict, List + +from db import AsyncMysqlDB +from var import media_crawler_db_var + + +async def query_content_by_content_id(content_id: str) -> Dict: + """ + 查询一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from baidu_tieba where note_id = '{content_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_content(content_item: Dict) -> int: + """ + 新增一条内容记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("baidu_tieba", content_item) + return last_row_id + + +async def update_content_by_content_id(content_id: str, content_item: Dict) -> int: + """ + 更新一条记录(xhs的帖子 | 抖音的视频 | 微博 | 快手视频 ...) + Args: + content_id: + content_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("baidu_tieba", content_item, "note_id", content_id) + return effect_row + + + +async def query_comment_by_comment_id(comment_id: str) -> Dict: + """ + 查询一条评论内容 + Args: + comment_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from baidu_tieba_comment where comment_id = '{comment_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_comment(comment_item: Dict) -> int: + """ + 新增一条评论记录 + Args: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("baidu_tieba_comment", comment_item) + return last_row_id + + +async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int: + """ + 更新增一条评论记录 + Args: + comment_id: + comment_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("baidu_tieba_comment", comment_item, "comment_id", comment_id) + return effect_row + + +async def query_creator_by_user_id(user_id: str) -> Dict: + """ + 查询一条创作者记录 + Args: + user_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from baidu_tieba_creator where user_id = '{user_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_creator(creator_item: Dict) -> int: + """ + 新增一条创作者信息 + Args: + creator_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("baidu_tieba_creator", creator_item) + return last_row_id + + +async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int: + """ + 更新一条创作者信息 + Args: + user_id: + creator_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("baidu_tieba_creator", creator_item, "user_id", user_id) + return effect_row \ No newline at end of file