diff --git a/.gitignore b/.gitignore index 604f7ea..2a4a8da 100644 --- a/.gitignore +++ b/.gitignore @@ -163,3 +163,5 @@ cython_debug/ *.iml .idea /temp_image/ +/xhs_user_data_dir/ +/dy_user_data_dir/ diff --git a/config/base_config.py b/config/base_config.py index 4c1c4b1..c03743d 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -1,8 +1,7 @@ +# Desc: base config PLATFORM = "xhs" KEYWORDS = "健身,旅游" LOGIN_TYPE = "qrcode" # qrcode or phone or cookies -# If it's on the Xiaohongshu platform, only the web_session cookie will be kept. -# xhs cookie format -> web_session=040069b2acxxxxxxxxxxxxxxxxxxxx; COOKIES = "" # redis config @@ -17,3 +16,12 @@ RETRY_INTERVAL = 60 * 30 # 30 minutes # playwright headless HEADLESS = True + +# save login state +SAVE_LOGIN_STATE = True + +# save user data dir +USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name + +# max page num +MAX_PAGE_NUM = 20 diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index 4f45974..b110825 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -1,13 +1,16 @@ import json +import logging import asyncio from typing import Optional, Dict import httpx from playwright.async_api import Page +from playwright.async_api import BrowserContext from .help import sign, get_search_id from .field import SearchSortType, SearchNoteType from .exception import DataFetchError, IPBlockError +from tools import utils class XHSClient: @@ -77,6 +80,21 @@ class XHSClient: return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, headers=headers) + async def ping(self) -> bool: + """get a note to check if login state is ok""" + logging.info("begin to ping xhs...") + note_id = "5e5cb38a000000000100185e" + try: + note_card: Dict = await self.get_note_by_id(note_id) + return note_card.get("note_id") == note_id + except DataFetchError: + return False + + async def update_cookies(self, browser_context: BrowserContext): + cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) + self.headers["Cookie"] = cookie_str + self.cookie_dict = cookie_dict + async def get_note_by_keyword( self, keyword: str, page: int = 1, page_size: int = 20, diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 0a46c89..20a92e3 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -6,7 +6,6 @@ from typing import Optional, List, Dict, Tuple from argparse import Namespace from playwright.async_api import Page -from playwright.async_api import Cookie from playwright.async_api import BrowserContext from playwright.async_api import async_playwright @@ -21,8 +20,8 @@ from base.proxy_account_pool import AccountPool class XiaoHongShuCrawler(AbstractCrawler): + def __init__(self): - self.cookies: Optional[List[Cookie]] = None # cookies from browser context self.browser_context: Optional[BrowserContext] = None self.context_page: Optional[Page] = None self.user_agent = utils.get_user_agent() @@ -35,87 +34,47 @@ class XiaoHongShuCrawler(AbstractCrawler): for key in kwargs.keys(): setattr(self, key, kwargs[key]) - async def update_cookies(self): - self.cookies = await self.browser_context.cookies() - - def create_proxy_info(self) -> Tuple[str, Dict, str]: - """Create proxy info for playwright and httpx""" - # phone: 13012345671 - # ip_proxy: 111.122.xx.xx1:8888 - # 手机号和IP代理都是从账号池中获取的,并且它们是固定绑定的 - phone, ip_proxy = self.account_pool.get_account() - playwright_proxy = { - "server": f"{config.IP_PROXY_PROTOCOL}{ip_proxy}", - "username": config.IP_PROXY_USER, - "password": config.IP_PROXY_PASSWORD, - } - httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}" - return phone, playwright_proxy, httpx_proxy - async def start(self): account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info() - if not config.ENABLE_IP_PROXY: - playwright_proxy, httpx_proxy = None, None - async with async_playwright() as playwright: - # launch browser and create single browser context + # Launch a browser context. chromium = playwright.chromium - browser = await chromium.launch(headless=config.HEADLESS, proxy=playwright_proxy) - self.browser_context = await browser.new_context( - viewport={"width": 1920, "height": 1080}, - user_agent=self.user_agent + self.browser_context = await self.launch_browser( + chromium, + playwright_proxy, + self.user_agent, + headless=config.HEADLESS ) - - # execute JS to bypass anti automation/crawler detection + # stealth.min.js is a js script to prevent the website from detecting the crawler. await self.browser_context.add_init_script(path="libs/stealth.min.js") self.context_page = await self.browser_context.new_page() await self.context_page.goto(self.index_url) - # begin login - login_obj = XHSLogin( - login_type=self.command_args.lt, - login_phone=account_phone, - browser_context=self.browser_context, - context_page=self.context_page, - cookie_str=config.COOKIES - ) - await login_obj.begin() - - # update cookies - await self.update_cookies() - - # init request client - cookie_str, cookie_dict = utils.convert_cookies(self.cookies) - self.xhs_client = XHSClient( - proxies=httpx_proxy, - headers={ - "User-Agent": self.user_agent, - "Cookie": cookie_str, - "Origin": "https://www.xiaohongshu.com", - "Referer": "https://www.xiaohongshu.com", - "Content-Type": "application/json;charset=UTF-8" - }, - playwright_page=self.context_page, - cookie_dict=cookie_dict, - ) + # Create a client to interact with the xiaohongshu website. + self.xhs_client = await self.create_xhs_client(httpx_proxy) + if not await self.xhs_client.ping(): + login_obj = XHSLogin( + login_type=self.command_args.lt, + login_phone=account_phone, + browser_context=self.browser_context, + context_page=self.context_page, + cookie_str=config.COOKIES + ) + await login_obj.begin() + await self.xhs_client.update_cookies(browser_context=self.browser_context) # Search for notes and retrieve their comment information. await self.search_posts() - # block main crawler coroutine - await asyncio.Event().wait() - - async def close(self): - await self.browser_context.close() - await self.browser_context.close() - logging.info("Browser context closed ...") + logging.info("Xhs Crawler finished ...") async def search_posts(self): + """Search for notes and retrieve their comment information.""" logging.info("Begin search xiaohongshu keywords") for keyword in config.KEYWORDS.split(","): logging.info(f"Current keyword: {keyword}") note_list: List[str] = [] - max_note_len = 10 + max_note_len = config.MAX_PAGE_NUM page = 1 while max_note_len > 0: posts_res = await self.xhs_client.get_note_by_keyword( @@ -129,14 +88,16 @@ class XiaoHongShuCrawler(AbstractCrawler): try: note_detail = await self.xhs_client.get_note_by_id(note_id) except DataFetchError as ex: + logging.error(f"Get note detail error: {ex}") continue await xhs_model.update_xhs_note(note_detail) await asyncio.sleep(0.05) note_list.append(note_id) logging.info(f"keyword:{keyword}, note_list:{note_list}") - await self.batch_get_note_comments(note_list) + # await self.batch_get_note_comments(note_list) async def batch_get_note_comments(self, note_list: List[str]): + """Batch get note comments""" task_list: List[Task] = [] for note_id in note_list: task = asyncio.create_task(self.get_comments(note_id), name=note_id) @@ -144,7 +105,66 @@ class XiaoHongShuCrawler(AbstractCrawler): await asyncio.wait(task_list) async def get_comments(self, note_id: str): + """Get note comments""" logging.info(f"Begin get note id comments {note_id}") all_comments = await self.xhs_client.get_note_all_comments(note_id=note_id, crawl_interval=random.random()) for comment in all_comments: await xhs_model.update_xhs_note_comment(note_id=note_id, comment_item=comment) + + def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: + """Create proxy info for playwright and httpx""" + if not config.ENABLE_IP_PROXY: + return None, None, None + + # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888 + phone, ip_proxy = self.account_pool.get_account() + playwright_proxy = { + "server": f"{config.IP_PROXY_PROTOCOL}{ip_proxy}", + "username": config.IP_PROXY_USER, + "password": config.IP_PROXY_PASSWORD, + } + httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}" + return phone, playwright_proxy, httpx_proxy + + async def create_xhs_client(self, httpx_proxy: str) -> XHSClient: + """Create xhs client""" + cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) + xhs_client_obj = XHSClient( + proxies=httpx_proxy, + headers={ + "User-Agent": self.user_agent, + "Cookie": cookie_str, + "Origin": "https://www.xiaohongshu.com", + "Referer": "https://www.xiaohongshu.com", + "Content-Type": "application/json;charset=UTF-8" + }, + playwright_page=self.context_page, + cookie_dict=cookie_dict, + ) + return xhs_client_obj + + async def launch_browser(self, chromium, playwright_proxy, user_agent, headless=True) -> BrowserContext: + """Launch browser and create browser context""" + if config.SAVE_LOGIN_STATE: + # feat issue #14 + browser_context = await chromium.launch_persistent_context( + user_data_dir=config.USER_DATA_DIR % self.command_args.platform, + accept_downloads=True, + headless=headless, + proxy=playwright_proxy, + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + else: + browser = await chromium.launch(headless=headless, proxy=playwright_proxy) + browser_context = await browser.new_context( + viewport={"width": 1920, "height": 1080}, + user_agent=user_agent + ) + return browser_context + + async def close(self): + """Close browser context""" + await self.browser_context.close() + logging.info("Browser context closed ...") diff --git a/media_platform/xhs/login.py b/media_platform/xhs/login.py index 255649c..523bd1c 100644 --- a/media_platform/xhs/login.py +++ b/media_platform/xhs/login.py @@ -8,7 +8,8 @@ from tenacity import ( retry, stop_after_attempt, wait_fixed, - retry_if_result + retry_if_result, + RetryError ) from playwright.async_api import Page from playwright.async_api import BrowserContext @@ -35,7 +36,11 @@ class XHSLogin(AbstractLogin): @retry(stop=stop_after_attempt(20), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False)) async def check_login_state(self, no_logged_in_session: str) -> bool: - """Check if the current login status is successful and return True otherwise return False""" + """ + Check if the current login status is successful and return True otherwise return False + retry decorator will retry 20 times if the return value is False, and the retry interval is 1 second + if max retry times reached, raise RetryError + """ current_cookie = await self.browser_context.cookies() _, cookie_dict = utils.convert_cookies(current_cookie) current_web_session = cookie_dict.get("web_session") @@ -44,6 +49,8 @@ class XHSLogin(AbstractLogin): return False async def begin(self): + """Start login xiaohongshu""" + logging.info("Begin login xiaohongshu ...") if self.login_type == "qrcode": await self.login_by_qrcode() elif self.login_type == "phone": @@ -54,6 +61,7 @@ class XHSLogin(AbstractLogin): raise ValueError("Invalid Login Type Currently only supported qrcode or phone or cookies ...") async def login_by_mobile(self): + """Login xiaohongshu by mobile""" logging.info("Begin login xiaohongshu by mobile ...") await asyncio.sleep(1) try: @@ -108,9 +116,10 @@ class XHSLogin(AbstractLogin): # todo ... 应该还需要检查验证码的正确性有可能输入的验证码不正确 break - login_flag: bool = await self.check_login_state(no_logged_in_session) - if not login_flag: - logging.info("login failed please confirm ...") + try: + await self.check_login_state(no_logged_in_session) + except RetryError: + logging.info("Login xiaohongshu failed by mobile login method ...") sys.exit() wait_redirect_seconds = 5 @@ -147,14 +156,17 @@ class XHSLogin(AbstractLogin): no_logged_in_session = cookie_dict.get("web_session") # show login qrcode - # utils.show_qrcode(base64_qrcode_img) + # fix issue #12 + # we need to use partial function to call show_qrcode function and run in executor + # then current asyncio event loop will not be blocked partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img) asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode) logging.info(f"waiting for scan code login, remaining time is 20s") - login_flag: bool = await self.check_login_state(no_logged_in_session) - if not login_flag: - logging.info("login failed please confirm ...") + try: + await self.check_login_state(no_logged_in_session) + except RetryError: + logging.info("Login xiaohongshu failed by qrcode login method ...") sys.exit() wait_redirect_seconds = 5 @@ -162,6 +174,7 @@ class XHSLogin(AbstractLogin): await asyncio.sleep(wait_redirect_seconds) async def login_by_cookies(self): + """login xiaohongshu website by cookies""" logging.info("Begin login xiaohongshu by cookie ...") for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): await self.browser_context.add_cookies([{ diff --git a/recv_sms_notification.py b/recv_sms_notification.py index 42e686c..cbac5a5 100644 --- a/recv_sms_notification.py +++ b/recv_sms_notification.py @@ -46,7 +46,7 @@ class RecvSmsNotificationHandler(tornado.web.RequestHandler): request_body = self.request.body.decode("utf-8") req_body_dict = json.loads(request_body) print("recv sms notification and body content: ", req_body_dict) - redis_obj = aioredis.from_url(url=config.redis_db_host, password=config.redis_db_pwd, decode_responses=True) + redis_obj = aioredis.from_url(url=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD, decode_responses=True) sms_content = req_body_dict.get("sms_content") sms_code = extract_verification_code(sms_content) if sms_code: