326 lines
10 KiB
Python
326 lines
10 KiB
Python
import asyncio
|
||
import json
|
||
import re
|
||
from typing import Callable, Dict, List, Optional
|
||
from urllib.parse import urlencode
|
||
|
||
import httpx
|
||
from playwright.async_api import BrowserContext, Page
|
||
|
||
from tools import utils
|
||
|
||
from .exception import DataFetchError, IPBlockError
|
||
from .field import SearchNoteType, SearchSortType
|
||
from .help import get_search_id, sign
|
||
|
||
|
||
class XHSClient:
|
||
def __init__(
|
||
self,
|
||
timeout=10,
|
||
proxies=None,
|
||
*,
|
||
headers: Dict[str, str],
|
||
playwright_page: Page,
|
||
cookie_dict: Dict[str, str],
|
||
):
|
||
self.proxies = proxies
|
||
self.timeout = timeout
|
||
self.headers = headers
|
||
self._host = "https://edith.xiaohongshu.com"
|
||
self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
|
||
self.IP_ERROR_CODE = 300012
|
||
self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
|
||
self.NOTE_ABNORMAL_CODE = -510001
|
||
self.playwright_page = playwright_page
|
||
self.cookie_dict = cookie_dict
|
||
|
||
async def _pre_headers(self, url: str, data=None) -> Dict:
|
||
"""
|
||
请求头参数签名
|
||
Args:
|
||
url:
|
||
data:
|
||
|
||
Returns:
|
||
|
||
"""
|
||
encrypt_params = await self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
|
||
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
|
||
signs = sign(
|
||
a1=self.cookie_dict.get("a1", ""),
|
||
b1=local_storage.get("b1", ""),
|
||
x_s=encrypt_params.get("X-s", ""),
|
||
x_t=str(encrypt_params.get("X-t", ""))
|
||
)
|
||
|
||
headers = {
|
||
"X-S": signs["x-s"],
|
||
"X-T": signs["x-t"],
|
||
"x-S-Common": signs["x-s-common"],
|
||
"X-B3-Traceid": signs["x-b3-traceid"]
|
||
}
|
||
self.headers.update(headers)
|
||
return self.headers
|
||
|
||
async def request(self, method, url, **kwargs) -> Dict:
|
||
"""
|
||
封装httpx的公共请求方法,对请求响应做一些处理
|
||
Args:
|
||
method: 请求方法
|
||
url: 请求的URL
|
||
**kwargs: 其他请求参数,例如请求头、请求体等
|
||
|
||
Returns:
|
||
|
||
"""
|
||
# return response.text
|
||
return_response = kwargs.pop('return_response', False)
|
||
|
||
async with httpx.AsyncClient(proxies=self.proxies) as client:
|
||
response = await client.request(
|
||
method, url, timeout=self.timeout,
|
||
**kwargs
|
||
)
|
||
|
||
if return_response:
|
||
return response.text
|
||
|
||
data: Dict = response.json()
|
||
if data["success"]:
|
||
return data.get("data", data.get("success", {}))
|
||
elif data["code"] == self.IP_ERROR_CODE:
|
||
raise IPBlockError(self.IP_ERROR_STR)
|
||
else:
|
||
raise DataFetchError(data.get("msg", None))
|
||
|
||
async def get(self, uri: str, params=None) -> Dict:
|
||
"""
|
||
GET请求,对请求头签名
|
||
Args:
|
||
uri: 请求路由
|
||
params: 请求参数
|
||
|
||
Returns:
|
||
|
||
"""
|
||
final_uri = uri
|
||
if isinstance(params, dict):
|
||
final_uri = (f"{uri}?"
|
||
f"{urlencode(params)}")
|
||
headers = await self._pre_headers(final_uri)
|
||
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
|
||
|
||
async def post(self, uri: str, data: dict) -> Dict:
|
||
"""
|
||
POST请求,对请求头签名
|
||
Args:
|
||
uri: 请求路由
|
||
data: 请求体参数
|
||
|
||
Returns:
|
||
|
||
"""
|
||
headers = await self._pre_headers(uri, data)
|
||
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
|
||
return await self.request(method="POST", url=f"{self._host}{uri}",
|
||
data=json_str, headers=headers)
|
||
|
||
async def pong(self) -> bool:
|
||
"""
|
||
用于检查登录态是否失效了
|
||
Returns:
|
||
|
||
"""
|
||
"""get a note to check if login state is ok"""
|
||
utils.logger.info("[XHSClient.pong] Begin to pong xhs...")
|
||
ping_flag = False
|
||
try:
|
||
note_card: Dict = await self.get_note_by_keyword(keyword="小红书")
|
||
if note_card.get("items"):
|
||
ping_flag = True
|
||
except Exception as e:
|
||
utils.logger.error(f"[XHSClient.pong] Ping xhs failed: {e}, and try to login again...")
|
||
ping_flag = False
|
||
return ping_flag
|
||
|
||
async def update_cookies(self, browser_context: BrowserContext):
|
||
"""
|
||
API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法
|
||
Args:
|
||
browser_context: 浏览器上下文对象
|
||
|
||
Returns:
|
||
|
||
"""
|
||
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
|
||
self.headers["Cookie"] = cookie_str
|
||
self.cookie_dict = cookie_dict
|
||
|
||
async def get_note_by_keyword(
|
||
self, keyword: str,
|
||
page: int = 1, page_size: int = 20,
|
||
sort: SearchSortType = SearchSortType.GENERAL,
|
||
note_type: SearchNoteType = SearchNoteType.ALL
|
||
) -> Dict:
|
||
"""
|
||
根据关键词搜索笔记
|
||
Args:
|
||
keyword: 关键词参数
|
||
page: 分页第几页
|
||
page_size: 分页数据长度
|
||
sort: 搜索结果排序指定
|
||
note_type: 搜索的笔记类型
|
||
|
||
Returns:
|
||
|
||
"""
|
||
uri = "/api/sns/web/v1/search/notes"
|
||
data = {
|
||
"keyword": keyword,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"search_id": get_search_id(),
|
||
"sort": sort.value,
|
||
"note_type": note_type.value
|
||
}
|
||
return await self.post(uri, data)
|
||
|
||
async def get_creator_info_and_notes(self, creator: str) -> Dict:
|
||
"""
|
||
获取博主的信息和第一页的笔记
|
||
Args:
|
||
creator: 博主ID
|
||
Returns:
|
||
{"creator":{}, "notes":[]}
|
||
"""
|
||
path = '/user/profile/'+creator
|
||
content = await self.request(method="GET", url=f"https://www.xiaohongshu.com{path}", return_response=True)
|
||
match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', content, re.M)
|
||
|
||
if match == None:
|
||
return {}
|
||
|
||
info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
|
||
if info == None:
|
||
return {}
|
||
|
||
return {
|
||
'creator': info.get('user').get('userPageData'),
|
||
'notes': info.get('user').get('notes')[0],
|
||
'cursor': info.get('user').get('noteQueries')[0].get('cursor'),
|
||
'has_more_notes': info.get('user').get('noteQueries')[0].get('hasMore')
|
||
}
|
||
|
||
async def get_notes_by_creator(
|
||
self, creator: str,
|
||
cursor: str,
|
||
page_size: int = 30
|
||
) -> Dict:
|
||
"""
|
||
获取博主的笔记
|
||
Args:
|
||
creator: 博主ID
|
||
cursor: 上一页最后一条笔记的ID
|
||
page_size: 分页数据长度
|
||
|
||
Returns:
|
||
|
||
"""
|
||
uri = "/api/sns/web/v1/user_posted"
|
||
data = {
|
||
"user_id": creator,
|
||
"cursor": cursor,
|
||
"num": page_size,
|
||
"image_formats": "jpg,webp,avif"
|
||
}
|
||
return await self.get(uri, data)
|
||
|
||
async def get_note_by_id(self, note_id: str) -> Dict:
|
||
"""
|
||
获取笔记详情API
|
||
Args:
|
||
note_id:笔记ID
|
||
|
||
Returns:
|
||
|
||
"""
|
||
data = {"source_note_id": note_id}
|
||
uri = "/api/sns/web/v1/feed"
|
||
res = await self.post(uri, data)
|
||
if res and res.get("items"):
|
||
res_dict: Dict = res["items"][0]["note_card"]
|
||
return res_dict
|
||
utils.logger.error(f"[XHSClient.get_note_by_id] get note empty and res:{res}")
|
||
return dict()
|
||
|
||
async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
|
||
"""
|
||
获取一级评论的API
|
||
Args:
|
||
note_id: 笔记ID
|
||
cursor: 分页游标
|
||
|
||
Returns:
|
||
|
||
"""
|
||
uri = "/api/sns/web/v2/comment/page"
|
||
params = {
|
||
"note_id": note_id,
|
||
"cursor": cursor,
|
||
"top_comment_id":"",
|
||
"image_formats": "jpg,webp,avif"
|
||
}
|
||
return await self.get(uri, params)
|
||
|
||
async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 30, cursor: str = ""):
|
||
"""
|
||
获取指定父评论下的子评论的API
|
||
Args:
|
||
note_id: 子评论的帖子ID
|
||
root_comment_id: 根评论ID
|
||
num: 分页数量
|
||
cursor: 分页游标
|
||
|
||
Returns:
|
||
|
||
"""
|
||
uri = "/api/sns/web/v2/comment/sub/page"
|
||
params = {
|
||
"note_id": note_id,
|
||
"root_comment_id": root_comment_id,
|
||
"num": num,
|
||
"cursor": cursor,
|
||
}
|
||
return await self.get(uri, params)
|
||
|
||
async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0,
|
||
callback: Optional[Callable] = None) -> List[Dict]:
|
||
"""
|
||
获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
|
||
Args:
|
||
note_id: 笔记ID
|
||
crawl_interval: 爬取一次笔记的延迟单位(秒)
|
||
callback: 一次笔记爬取结束后
|
||
|
||
Returns:
|
||
|
||
"""
|
||
result = []
|
||
comments_has_more = True
|
||
comments_cursor = ""
|
||
while comments_has_more:
|
||
comments_res = await self.get_note_comments(note_id, comments_cursor)
|
||
comments_has_more = comments_res.get("has_more", False)
|
||
comments_cursor = comments_res.get("cursor", "")
|
||
if "comments" not in comments_res:
|
||
utils.logger.info(
|
||
f"[XHSClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
|
||
break
|
||
comments = comments_res["comments"]
|
||
if callback:
|
||
await callback(note_id, comments)
|
||
await asyncio.sleep(crawl_interval)
|
||
result.extend(comments)
|
||
return result
|