fix: 小红书创作者功能数据获取优化

This commit is contained in:
Relakkes 2024-03-17 14:50:10 +08:00
parent 78a9bf9f38
commit 96309dcfee
5 changed files with 133 additions and 139 deletions

View File

@ -18,10 +18,10 @@
成为赞助者展示你的产品在这里联系作者relakkes@gmail.com
## 功能列表
| 平台 | Cookie 登录 | 二维码登录 | 手机号登录 | 关键词搜索 | 指定视频/帖子 ID 爬取 | 登录状态缓存 | 数据保存 | IP 代理池 | 滑块验证码 |
|:---:|:---------:|:-----:|:-----:|:-----:|:-------------:|:------:|:----:|:------:|:-----:|
| 平台 | Cookie 登录 | 二维码登录 | 指定创作者主页 | 关键词搜索 | 指定视频/帖子 ID 爬取 | 登录状态缓存 | 数据保存 | IP 代理池 | 滑块验证码 |
|:---:|:---------:|:-----:|:-------:|:-----:|:-------------:|:------:|:----:|:------:|:-----:|
| 小红书 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ |
| 抖音 | ✅ | ✅ | | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 抖音 | ✅ | ✅ | | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| 快手 | ✅ | ✅ | ✕ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ |
| B 站 | ✅ | ✅ | ✕ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ |
| 微博 | ✅ | ✅ | ✕ | ✅ | ✅ | ✅ | ✅ | ✅ | ✕ |
@ -94,7 +94,8 @@
PS如果打赏时请备注捐赠者如有遗漏请联系我添加有时候消息多可能会漏掉十分抱歉
| 捐赠者 | 捐赠金额 | 捐赠日期 |
|-------------|-------|------------|
|------------|-------|------------|
| *诚 | 20 元 | 2024-03-17 |
| Strem Gamer | 20 元 | 2024-03-16 |
| *鑫 | 20 元 | 2024-03-14 |
| Yuzu | 20 元 | 2024-03-07 |

View File

@ -4,7 +4,7 @@ KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = ""
SORT_TYPE = "popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值展示只支持小红书
CRAWLER_TYPE = "search"
CRAWLER_TYPE = "search" # 爬取类型search(关键词搜索) | detail(帖子相亲)| creator(创作者主页数据)
# 是否开启 IP 代理
ENABLE_IP_PROXY = False
@ -70,8 +70,6 @@ WEIBO_SPECIFIED_ID_LIST = [
# 指定小红书创作者ID列表
XHS_CREATOR_ID_LIST = [
"59d8cb33de5fb4696bf17217",
"61b87386000000001000b18b",
"5e8558100000000001005bc5",
"63e36c9a000000002703502b",
# ........................
]

View File

@ -1,7 +1,7 @@
import asyncio
import json
import re
from typing import Callable, Dict, List, Optional
from typing import Callable, Dict, List, Optional, Union, Any
from urllib.parse import urlencode
import httpx
@ -28,6 +28,7 @@ class XHSClient:
self.timeout = timeout
self.headers = headers
self._host = "https://edith.xiaohongshu.com"
self._domain = "https://www.xiaohongshu.com"
self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
self.IP_ERROR_CODE = 300012
self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
@ -63,7 +64,7 @@ class XHSClient:
self.headers.update(headers)
return self.headers
async def request(self, method, url, **kwargs) -> Dict:
async def request(self, method, url, **kwargs) -> Union[str, Any]:
"""
封装httpx的公共请求方法对请求响应做一些处理
Args:
@ -186,56 +187,6 @@ class XHSClient:
}
return await self.post(uri, data)
async def get_creator_info_and_notes(self, creator: str) -> Dict:
"""
获取博主的信息和第一页的笔记
Args:
creator: 博主ID
Returns:
{"creator":{}, "notes":[]}
"""
path = '/user/profile/'+creator
content = await self.request(method="GET", url=f"https://www.xiaohongshu.com{path}", return_response=True)
match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', content, re.M)
if match == None:
return {}
info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
if info == None:
return {}
return {
'creator': info.get('user').get('userPageData'),
'notes': info.get('user').get('notes')[0],
'cursor': info.get('user').get('noteQueries')[0].get('cursor'),
'has_more_notes': info.get('user').get('noteQueries')[0].get('hasMore')
}
async def get_notes_by_creator(
self, creator: str,
cursor: str,
page_size: int = 30
) -> Dict:
"""
获取博主的笔记
Args:
creator: 博主ID
cursor: 上一页最后一条笔记的ID
page_size: 分页数据长度
Returns:
"""
uri = "/api/sns/web/v1/user_posted"
data = {
"user_id": creator,
"cursor": cursor,
"num": page_size,
"image_formats": "jpg,webp,avif"
}
return await self.get(uri, data)
async def get_note_by_id(self, note_id: str) -> Dict:
"""
获取笔记详情API
@ -268,7 +219,7 @@ class XHSClient:
params = {
"note_id": note_id,
"cursor": cursor,
"top_comment_id":"",
"top_comment_id": "",
"image_formats": "jpg,webp,avif"
}
return await self.get(uri, params)
@ -323,3 +274,76 @@ class XHSClient:
await asyncio.sleep(crawl_interval)
result.extend(comments)
return result
async def get_creator_info(self, user_id: str) -> Dict:
"""
通过解析网页版的用户主页HTML获取用户个人简要信息
PC端用户主页的网页存在window.__INITIAL_STATE__这个变量上的解析它即可
eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
"""
uri = f"/user/profile/{user_id}"
html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers)
match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', html_content, re.M)
if match is None:
return {}
info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
if info is None:
return {}
return info.get('user').get('userPageData')
async def get_notes_by_creator(
self, creator: str,
cursor: str,
page_size: int = 30
) -> Dict:
"""
获取博主的笔记
Args:
creator: 博主ID
cursor: 上一页最后一条笔记的ID
page_size: 分页数据长度
Returns:
"""
uri = "/api/sns/web/v1/user_posted"
data = {
"user_id": creator,
"cursor": cursor,
"num": page_size,
"image_formats": "jpg,webp,avif"
}
return await self.get(uri, data)
async def get_all_notes_by_creator(self, user_id: str, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[Dict]:
"""
获取指定用户下的所有发过的帖子该方法会一直查找一个用户下的所有帖子信息
Args:
user_id: 用户ID
crawl_interval: 爬取一次的延迟单位
callback: 一次分页爬取结束后的更新回调函数
Returns:
"""
result = []
notes_has_more = True
notes_cursor = ""
while notes_has_more:
notes_res = await self.get_notes_by_creator(user_id, notes_cursor)
notes_has_more = notes_res.get("has_more", False)
notes_cursor = notes_res.get("cursor", "")
if "notes" not in notes_res:
utils.logger.info(f"[XHSClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
break
notes = notes_res["notes"]
utils.logger.info(f"[XHSClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}")
if callback:
await callback(notes)
await asyncio.sleep(crawl_interval)
result.extend(notes)
return result

View File

@ -126,65 +126,35 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def get_creators_and_notes(self) -> None:
"""Get creator's notes and retrieve their comment information."""
utils.logger.info("[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators")
xhs_limit_count = 30
for creator in config.XHS_CREATOR_ID_LIST:
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] Current creator: {creator}")
page = 0
cursor = ''
has_more_notes = False
while page * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
note_id_list: List[str] = []
for user_id in config.XHS_CREATOR_ID_LIST:
# get creator detail info from web html content
createor_info: Dict = await self.xhs_client.get_creator_info(user_id=user_id)
if createor_info:
await xhs_store.save_creator(user_id, creator=createor_info)
if page == 0:
# get creator info and notes
creator_and_notes_info = await self.xhs_client.get_creator_info_and_notes(creator)
# Get all note information of the creator
all_notes_list = await self.xhs_client.get_all_notes_by_creator(
user_id=user_id,
crawl_interval=random.random(),
callback=self.fetch_creator_notes_detail
)
if creator_and_notes_info == None or not creator_and_notes_info:
utils.logger.error(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator notes error")
continue
notes_res = creator_and_notes_info.get('notes')
# utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator and notes:{notes_res}")
cursor = creator_and_notes_info.get('cursor')
has_more_notes = creator_and_notes_info.get('has_more_notes')
# save creator info
await xhs_store.save_creator(creator, creator_and_notes_info.get('creator'))
utils.logger.info(
f"[XiaoHongShuCrawler.get_creators_and_notes] save creator info:{creator_and_notes_info.get('creator')}")
else:
# get notes
notes = await self.xhs_client.get_notes_by_creator(creator, cursor)
# utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] get notes res:{notes_res}")
if notes == None or not notes:
utils.logger.error(f"[XiaoHongShuCrawler.get_creators_and_notes] get creator's notes error")
continue
cursor = notes.get('cursor')
has_more_notes = notes.get('has_more_notes')
notes_res = notes.get('notes')
utils.logger.info(
f"[XiaoHongShuCrawler.get_creators_and_notes] get creator's notes res:{notes_res}")
note_ids = [note_item.get("note_id") for note_item in all_notes_list]
await self.batch_get_note_comments(note_ids)
async def fetch_creator_notes_detail(self, note_list: List[Dict]):
"""
Concurrently obtain the specified post list and save the data
"""
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [
self.get_note_detail(post_item.get('id'), semaphore)
for post_item in notes_res
self.get_note_detail(post_item.get("note_id"), semaphore) for post_item in note_list
]
note_details = await asyncio.gather(*task_list)
for note_detail in note_details:
if note_detail is not None:
await xhs_store.update_xhs_note(note_detail)
note_id_list.append(note_detail.get('note_id'))
page += 1
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] Note details: {note_details}")
await self.batch_get_note_comments(note_id_list)
if not has_more_notes:
break
async def get_specified_notes(self):
"""Get the information and comments of the specified post"""

View File

@ -26,6 +26,7 @@ class XhsStoreFactory:
return store_class()
async def update_xhs_note(note_item: Dict):
note_id = note_item.get("note_id")
user_info = note_item.get("user", {})
@ -116,7 +117,7 @@ async def save_creator(user_id: str, creator: Dict):
'follows': follows,
'fans': fans,
'interaction': interaction,
'tag_list': json.dumps({tag.get('tagType'): tag.get('name') for tag in creator.get('tags')}),
'tag_list': json.dumps({tag.get('tagType'): tag.get('name') for tag in creator.get('tags')}, ensure_ascii=False),
}
utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}")
await XhsStoreFactory.create_store().store_creator(local_db_item)