MediaCrawler/media_platform/xhs/client.py

266 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
from typing import Callable, Dict, List, Optional
from urllib.parse import urlencode
import httpx
from playwright.async_api import BrowserContext, Page
from tools import utils
from .exception import DataFetchError, IPBlockError
from .field import SearchNoteType, SearchSortType
from .help import get_search_id, sign
class XHSClient:
def __init__(
self,
timeout=10,
proxies=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
):
self.proxies = proxies
self.timeout = timeout
self.headers = headers
self._host = "https://edith.xiaohongshu.com"
self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
self.IP_ERROR_CODE = 300012
self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
self.NOTE_ABNORMAL_CODE = -510001
self.playwright_page = playwright_page
self.cookie_dict = cookie_dict
async def _pre_headers(self, url: str, data=None) -> Dict:
"""
请求头参数签名
Args:
url:
data:
Returns:
"""
encrypt_params = await self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
signs = sign(
a1=self.cookie_dict.get("a1", ""),
b1=local_storage.get("b1", ""),
x_s=encrypt_params.get("X-s", ""),
x_t=str(encrypt_params.get("X-t", ""))
)
headers = {
"X-S": signs["x-s"],
"X-T": signs["x-t"],
"x-S-Common": signs["x-s-common"],
"X-B3-Traceid": signs["x-b3-traceid"]
}
self.headers.update(headers)
return self.headers
async def request(self, method, url, **kwargs) -> Dict:
"""
封装httpx的公共请求方法对请求响应做一些处理
Args:
method: 请求方法
url: 请求的URL
**kwargs: 其他请求参数,例如请求头、请求体等
Returns:
"""
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
data: Dict = response.json()
if data["success"]:
return data.get("data", data.get("success", {}))
elif data["code"] == self.IP_ERROR_CODE:
raise IPBlockError(self.IP_ERROR_STR)
else:
raise DataFetchError(data.get("msg", None))
async def get(self, uri: str, params=None) -> Dict:
"""
GET请求对请求头签名
Args:
uri: 请求路由
params: 请求参数
Returns:
"""
final_uri = uri
if isinstance(params, dict):
final_uri = (f"{uri}?"
f"{urlencode(params)}")
headers = await self._pre_headers(final_uri)
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
async def post(self, uri: str, data: dict) -> Dict:
"""
POST请求对请求头签名
Args:
uri: 请求路由
data: 请求体参数
Returns:
"""
headers = await self._pre_headers(uri, data)
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=headers)
async def pong(self) -> bool:
"""
用于检查登录态是否失效了
Returns:
"""
"""get a note to check if login state is ok"""
utils.logger.info("[XHSClient.pong] Begin to pong xhs...")
ping_flag = False
try:
note_card: Dict = await self.get_note_by_keyword(keyword="小红书")
if note_card.get("items"):
ping_flag = True
except Exception as e:
utils.logger.error(f"[XHSClient.pong] Ping xhs failed: {e}, and try to login again...")
ping_flag = False
return ping_flag
async def update_cookies(self, browser_context: BrowserContext):
"""
API客户端提供的更新cookies方法一般情况下登录成功后会调用此方法
Args:
browser_context: 浏览器上下文对象
Returns:
"""
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
self.cookie_dict = cookie_dict
async def get_note_by_keyword(
self, keyword: str,
page: int = 1, page_size: int = 20,
sort: SearchSortType = SearchSortType.GENERAL,
note_type: SearchNoteType = SearchNoteType.ALL
) -> Dict:
"""
根据关键词搜索笔记
Args:
keyword: 关键词参数
page: 分页第几页
page_size: 分页数据长度
sort: 搜索结果排序指定
note_type: 搜索的笔记类型
Returns:
"""
uri = "/api/sns/web/v1/search/notes"
data = {
"keyword": keyword,
"page": page,
"page_size": page_size,
"search_id": get_search_id(),
"sort": sort.value,
"note_type": note_type.value
}
return await self.post(uri, data)
async def get_note_by_id(self, note_id: str) -> Dict:
"""
获取笔记详情API
Args:
note_id:笔记ID
Returns:
"""
data = {"source_note_id": note_id}
uri = "/api/sns/web/v1/feed"
res = await self.post(uri, data)
if res and res.get("items"):
res_dict: Dict = res["items"][0]["note_card"]
return res_dict
utils.logger.error(f"[XHSClient.get_note_by_id] get note empty and res:{res}")
return dict()
async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
"""
获取一级评论的API
Args:
note_id: 笔记ID
cursor: 分页游标
Returns:
"""
uri = "/api/sns/web/v2/comment/page"
params = {
"note_id": note_id,
"cursor": cursor
}
return await self.get(uri, params)
async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 30, cursor: str = ""):
"""
获取指定父评论下的子评论的API
Args:
note_id: 子评论的帖子ID
root_comment_id: 根评论ID
num: 分页数量
cursor: 分页游标
Returns:
"""
uri = "/api/sns/web/v2/comment/sub/page"
params = {
"note_id": note_id,
"root_comment_id": root_comment_id,
"num": num,
"cursor": cursor,
}
return await self.get(uri, params)
async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[Dict]:
"""
获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
Args:
note_id: 笔记ID
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
result = []
comments_has_more = True
comments_cursor = ""
while comments_has_more:
comments_res = await self.get_note_comments(note_id, comments_cursor)
comments_has_more = comments_res.get("has_more", False)
comments_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res:
utils.logger.info(
f"[XHSClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
break
comments = comments_res["comments"]
if callback:
await callback(note_id, comments)
await asyncio.sleep(crawl_interval)
result.extend(comments)
return result