fix: xhs指定笔记ID获取方式增加解析html方式,原来的由于xsec_token导致失效

This commit is contained in:
Relakkes 2024-08-11 22:37:10 +08:00
parent ec47c230a9
commit f371675d47
3 changed files with 100 additions and 24 deletions

View File

@ -55,9 +55,6 @@ ENABLE_GET_SUB_COMMENTS = False
# 指定小红书需要爬虫的笔记ID列表 # 指定小红书需要爬虫的笔记ID列表
XHS_SPECIFIED_ID_LIST = [ XHS_SPECIFIED_ID_LIST = [
"6422c2750000000027000d88", "6422c2750000000027000d88",
"64ca1b73000000000b028dd2",
"630d5b85000000001203ab41",
"668fe13000000000030241fa", # 图文混合
# ........................ # ........................
] ]

View File

@ -6,6 +6,7 @@ from urllib.parse import urlencode
import httpx import httpx
from playwright.async_api import BrowserContext, Page from playwright.async_api import BrowserContext, Page
from tenacity import retry, stop_after_attempt, wait_fixed
import config import config
from base.base_crawler import AbstractApiClient from base.base_crawler import AbstractApiClient
@ -66,6 +67,7 @@ class XiaoHongShuClient(AbstractApiClient):
self.headers.update(headers) self.headers.update(headers)
return self.headers return self.headers
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def request(self, method, url, **kwargs) -> Union[str, Any]: async def request(self, method, url, **kwargs) -> Union[str, Any]:
""" """
封装httpx的公共请求方法对请求响应做一些处理 封装httpx的公共请求方法对请求响应做一些处理
@ -88,7 +90,6 @@ class XiaoHongShuClient(AbstractApiClient):
if return_response: if return_response:
return response.text return response.text
data: Dict = response.json() data: Dict = response.json()
if data["success"]: if data["success"]:
return data.get("data", data.get("success", {})) return data.get("data", data.get("success", {}))
@ -114,7 +115,7 @@ class XiaoHongShuClient(AbstractApiClient):
headers = await self._pre_headers(final_uri) headers = await self._pre_headers(final_uri)
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
async def post(self, uri: str, data: dict) -> Dict: async def post(self, uri: str, data: dict, **kwargs) -> Dict:
""" """
POST请求对请求头签名 POST请求对请求头签名
Args: Args:
@ -127,7 +128,7 @@ class XiaoHongShuClient(AbstractApiClient):
headers = await self._pre_headers(uri, data) headers = await self._pre_headers(uri, data)
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}", return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=headers) data=json_str, headers=headers, **kwargs)
async def get_note_media(self, url: str) -> Union[bytes, None]: async def get_note_media(self, url: str) -> Union[bytes, None]:
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
@ -425,3 +426,60 @@ class XiaoHongShuClient(AbstractApiClient):
await asyncio.sleep(crawl_interval) await asyncio.sleep(crawl_interval)
result.extend(notes) result.extend(notes)
return result return result
async def get_note_short_url(self, note_id: str) -> Dict:
"""
获取笔记的短链接
Args:
note_id: 笔记ID
Returns:
"""
uri = f"/api/sns/web/short_url"
data = {
"original_url": f"{self._domain}/discovery/item/{note_id}"
}
return await self.post(uri, data=data, return_response=True)
async def get_note_by_id_from_html(self, note_id: str):
"""
通过解析网页版的笔记详情页HTML获取笔记详情
copy from https://github.com/ReaJason/xhs/blob/eb1c5a0213f6fbb592f0a2897ee552847c69ea2d/xhs/core.py#L217-L259
thanks for ReaJason
Args:
note_id:
Returns:
"""
def camel_to_underscore(key):
return re.sub(r"(?<!^)(?=[A-Z])", "_", key).lower()
def transform_json_keys(json_data):
data_dict = json.loads(json_data)
dict_new = {}
for key, value in data_dict.items():
new_key = camel_to_underscore(key)
if not value:
dict_new[new_key] = value
elif isinstance(value, dict):
dict_new[new_key] = transform_json_keys(json.dumps(value))
elif isinstance(value, list):
dict_new[new_key] = [
transform_json_keys(json.dumps(item))
if (item and isinstance(item, dict))
else item
for item in value
]
else:
dict_new[new_key] = value
return dict_new
url = "https://www.xiaohongshu.com/explore/" + note_id
html = await self.request(method="GET", url=url, return_response=True, headers=self.headers)
state = re.findall(r"window.__INITIAL_STATE__=({.*})</script>", html)[0].replace("undefined", '""')
if state != "{}":
note_dict = transform_json_keys(state)
return note_dict["note"]["note_detail_map"][note_id]["note"]
raise DataFetchError(html)

View File

@ -116,7 +116,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
break break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [
self.get_note_detail( self.get_note_detail_async_task(
note_id=post_item.get("id"), note_id=post_item.get("id"),
xsec_source=post_item.get("xsec_source"), xsec_source=post_item.get("xsec_source"),
xsec_token=post_item.get("xsec_token"), xsec_token=post_item.get("xsec_token"),
@ -163,7 +163,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [
self.get_note_detail( self.get_note_detail_async_task(
note_id=post_item.get("note_id"), note_id=post_item.get("note_id"),
xsec_source=post_item.get("xsec_source"), xsec_source=post_item.get("xsec_source"),
xsec_token=post_item.get("xsec_token"), xsec_token=post_item.get("xsec_token"),
@ -179,20 +179,41 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def get_specified_notes(self): async def get_specified_notes(self):
"""Get the information and comments of the specified post""" """Get the information and comments of the specified post"""
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
fixed_xsec_token = "ABtXiOIX98byLlu-ju5dDq3tIc6uikcJrd3t7OYyqUbE4"
task_list = [
self.get_note_detail(note_id=note_id, xsec_source="pc_search", xsec_token=fixed_xsec_token,
semaphore=semaphore) for note_id in config.XHS_SPECIFIED_ID_LIST
]
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail is not None:
await xhs_store.update_xhs_note(note_detail)
await self.get_notice_media(note_detail)
await self.batch_get_note_comments(config.XHS_SPECIFIED_ID_LIST)
async def get_note_detail(self, note_id: str, xsec_source: str, xsec_token: str, semaphore: asyncio.Semaphore) -> \ async def get_note_detail_from_html_task(note_id: str, semaphore: asyncio.Semaphore) -> Dict:
async with semaphore:
try:
_note_detail: Dict = await self.xhs_client.get_note_by_id_from_html(note_id)
print("------------------------")
print(_note_detail)
print("------------------------")
if not _note_detail:
utils.logger.error(
f"[XiaoHongShuCrawler.get_note_detail_from_html] Get note detail error, note_id: {note_id}")
return {}
return _note_detail
except DataFetchError as ex:
utils.logger.error(f"[XiaoHongShuCrawler.get_note_detail_from_html] Get note detail error: {ex}")
return {}
except KeyError as ex:
utils.logger.error(
f"[XiaoHongShuCrawler.get_note_detail_from_html] have not fund note detail note_id:{note_id}, err: {ex}")
return {}
get_note_detail_task_list = [
get_note_detail_from_html_task(note_id=note_id, semaphore=asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)) for
note_id in config.XHS_SPECIFIED_ID_LIST
]
need_get_comment_note_ids = []
note_details = await asyncio.gather(*get_note_detail_task_list)
for note_detail in note_details:
if note_detail:
need_get_comment_note_ids.append(note_detail.get("note_id"))
await xhs_store.update_xhs_note(note_detail)
await self.batch_get_note_comments(need_get_comment_note_ids)
async def get_note_detail_async_task(self, note_id: str, xsec_source: str, xsec_token: str, semaphore: asyncio.Semaphore) -> \
Optional[Dict]: Optional[Dict]:
"""Get note detail""" """Get note detail"""
async with semaphore: async with semaphore:
@ -200,16 +221,16 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_detail: Dict = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token) note_detail: Dict = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token)
if not note_detail: if not note_detail:
utils.logger.error( utils.logger.error(
f"[XiaoHongShuCrawler.get_note_detail] Get note detail error, note_id: {note_id}") f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error, note_id: {note_id}")
return None return None
note_detail.update({"xsec_token": xsec_token, "xsec_source": xsec_source}) note_detail.update({"xsec_token": xsec_token, "xsec_source": xsec_source})
return note_detail return note_detail
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error(f"[XiaoHongShuCrawler.get_note_detail] Get note detail error: {ex}") utils.logger.error(f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error: {ex}")
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(
f"[XiaoHongShuCrawler.get_note_detail] have not fund note detail note_id:{note_id}, err: {ex}") f"[XiaoHongShuCrawler.get_note_detail_async_task] have not fund note detail note_id:{note_id}, err: {ex}")
return None return None
async def batch_get_note_comments(self, note_list: List[str]): async def batch_get_note_comments(self, note_list: List[str]):