import asyncio import json import re from typing import Callable, Dict, List, Optional from urllib.parse import urlencode import httpx from playwright.async_api import BrowserContext, Page from tools import utils from .exception import DataFetchError, IPBlockError from .field import SearchNoteType, SearchSortType from .help import get_search_id, sign class XHSClient: def __init__( self, timeout=10, proxies=None, *, headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], ): self.proxies = proxies self.timeout = timeout self.headers = headers self._host = "https://edith.xiaohongshu.com" self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试" self.IP_ERROR_CODE = 300012 self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看" self.NOTE_ABNORMAL_CODE = -510001 self.playwright_page = playwright_page self.cookie_dict = cookie_dict async def _pre_headers(self, url: str, data=None) -> Dict: """ 请求头参数签名 Args: url: data: Returns: """ encrypt_params = await self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data]) local_storage = await self.playwright_page.evaluate("() => window.localStorage") signs = sign( a1=self.cookie_dict.get("a1", ""), b1=local_storage.get("b1", ""), x_s=encrypt_params.get("X-s", ""), x_t=str(encrypt_params.get("X-t", "")) ) headers = { "X-S": signs["x-s"], "X-T": signs["x-t"], "x-S-Common": signs["x-s-common"], "X-B3-Traceid": signs["x-b3-traceid"] } self.headers.update(headers) return self.headers async def request(self, method, url, **kwargs) -> Dict: """ 封装httpx的公共请求方法,对请求响应做一些处理 Args: method: 请求方法 url: 请求的URL **kwargs: 其他请求参数,例如请求头、请求体等 Returns: """ # return response.text return_response = kwargs.pop('return_response', False) async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( method, url, timeout=self.timeout, **kwargs ) if return_response: return response.text data: Dict = response.json() if data["success"]: return data.get("data", data.get("success", {})) elif data["code"] == self.IP_ERROR_CODE: raise IPBlockError(self.IP_ERROR_STR) else: raise DataFetchError(data.get("msg", None)) async def get(self, uri: str, params=None) -> Dict: """ GET请求,对请求头签名 Args: uri: 请求路由 params: 请求参数 Returns: """ final_uri = uri if isinstance(params, dict): final_uri = (f"{uri}?" f"{urlencode(params)}") headers = await self._pre_headers(final_uri) return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) async def post(self, uri: str, data: dict) -> Dict: """ POST请求,对请求头签名 Args: uri: 请求路由 data: 请求体参数 Returns: """ headers = await self._pre_headers(uri, data) json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, headers=headers) async def pong(self) -> bool: """ 用于检查登录态是否失效了 Returns: """ """get a note to check if login state is ok""" utils.logger.info("[XHSClient.pong] Begin to pong xhs...") ping_flag = False try: note_card: Dict = await self.get_note_by_keyword(keyword="小红书") if note_card.get("items"): ping_flag = True except Exception as e: utils.logger.error(f"[XHSClient.pong] Ping xhs failed: {e}, and try to login again...") ping_flag = False return ping_flag async def update_cookies(self, browser_context: BrowserContext): """ API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法 Args: browser_context: 浏览器上下文对象 Returns: """ cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies()) self.headers["Cookie"] = cookie_str self.cookie_dict = cookie_dict async def get_note_by_keyword( self, keyword: str, page: int = 1, page_size: int = 20, sort: SearchSortType = SearchSortType.GENERAL, note_type: SearchNoteType = SearchNoteType.ALL ) -> Dict: """ 根据关键词搜索笔记 Args: keyword: 关键词参数 page: 分页第几页 page_size: 分页数据长度 sort: 搜索结果排序指定 note_type: 搜索的笔记类型 Returns: """ uri = "/api/sns/web/v1/search/notes" data = { "keyword": keyword, "page": page, "page_size": page_size, "search_id": get_search_id(), "sort": sort.value, "note_type": note_type.value } return await self.post(uri, data) async def get_creator_info_and_notes(self, creator: str) -> Dict: """ 获取博主的信息和第一页的笔记 Args: creator: 博主ID Returns: {"creator":{}, "notes":[]} """ path = '/user/profile/'+creator content = await self.request(method="GET", url=f"https://www.xiaohongshu.com{path}", return_response=True) match = re.search(r'