MediaCrawler/media_platform/zhihu/client.py

488 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import asyncio
import json
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode
import httpx
from httpx import Response
from playwright.async_api import BrowserContext, Page
from tenacity import retry, stop_after_attempt, wait_fixed
import config
from base.base_crawler import AbstractApiClient
from constant import zhihu as zhihu_constant
from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from tools import utils
from .exception import DataFetchError, ForbiddenError
from .field import SearchSort, SearchTime, SearchType
from .help import ZhihuExtractor, sign
class ZhiHuClient(AbstractApiClient):
def __init__(
self,
timeout=10,
proxies=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
):
self.proxies = proxies
self.timeout = timeout
self.default_headers = headers
self.cookie_dict = cookie_dict
self._extractor = ZhihuExtractor()
async def _pre_headers(self, url: str) -> Dict:
"""
请求头参数签名
Args:
url: 请求的URL需要包含请求的参数
Returns:
"""
d_c0 = self.cookie_dict.get("d_c0")
if not d_c0:
raise Exception("d_c0 not found in cookies")
sign_res = sign(url, self.default_headers["cookie"])
headers = self.default_headers.copy()
headers['x-zst-81'] = sign_res["x-zst-81"]
headers['x-zse-96'] = sign_res["x-zse-96"]
return headers
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def request(self, method, url, **kwargs) -> Union[str, Any]:
"""
封装httpx的公共请求方法对请求响应做一些处理
Args:
method: 请求方法
url: 请求的URL
**kwargs: 其他请求参数,例如请求头、请求体等
Returns:
"""
# return response.text
return_response = kwargs.pop('return_response', False)
async with httpx.AsyncClient(proxies=self.proxies, ) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
if response.status_code != 200:
utils.logger.error(f"[ZhiHuClient.request] Requset Url: {url}, Request error: {response.text}")
if response.status_code == 403:
raise ForbiddenError(response.text)
elif response.status_code == 404: # 如果一个content没有评论也是404
return {}
raise DataFetchError(response.text)
if return_response:
return response.text
try:
data: Dict = response.json()
if data.get("error"):
utils.logger.error(f"[ZhiHuClient.request] Request error: {data}")
raise DataFetchError(data.get("error", {}).get("message"))
return data
except json.JSONDecodeError:
utils.logger.error(f"[ZhiHuClient.request] Request error: {response.text}")
raise DataFetchError(response.text)
async def get(self, uri: str, params=None, **kwargs) -> Union[Response, Dict, str]:
"""
GET请求对请求头签名
Args:
uri: 请求路由
params: 请求参数
Returns:
"""
final_uri = uri
if isinstance(params, dict):
final_uri += '?' + urlencode(params)
headers = await self._pre_headers(final_uri)
return await self.request(method="GET", url=zhihu_constant.ZHIHU_URL + final_uri, headers=headers, **kwargs)
async def pong(self) -> bool:
"""
用于检查登录态是否失效了
Returns:
"""
utils.logger.info("[ZhiHuClient.pong] Begin to pong zhihu...")
ping_flag = False
try:
res = await self.get_current_user_info()
if res.get("uid") and res.get("name"):
ping_flag = True
utils.logger.info("[ZhiHuClient.pong] Ping zhihu successfully")
else:
utils.logger.error(f"[ZhiHuClient.pong] Ping zhihu failed, response data: {res}")
except Exception as e:
utils.logger.error(f"[ZhiHuClient.pong] Ping zhihu failed: {e}, and try to login again...")
ping_flag = False
return ping_flag
async def update_cookies(self, browser_context: BrowserContext):
"""
API客户端提供的更新cookies方法一般情况下登录成功后会调用此方法
Args:
browser_context: 浏览器上下文对象
Returns:
"""
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.default_headers["cookie"] = cookie_str
self.cookie_dict = cookie_dict
async def get_current_user_info(self) -> Dict:
"""
获取当前登录用户信息
Returns:
"""
params = {
"include": "email,is_active,is_bind_phone"
}
return await self.get("/api/v4/me", params)
async def get_note_by_keyword(
self, keyword: str,
page: int = 1,
page_size: int = 20,
sort: SearchSort = SearchSort.DEFAULT,
note_type: SearchType = SearchType.DEFAULT,
search_time: SearchTime = SearchTime.DEFAULT
) -> List[ZhihuContent]:
"""
根据关键词搜索
Args:
keyword: 关键词
page: 第几页
page_size: 分页size
sort: 排序
note_type: 搜索结果类型
search_time: 搜索多久时间的结果
Returns:
"""
uri = "/api/v4/search_v3"
params = {
"gk_version": "gz-gaokao",
"t": "general",
"q": keyword,
"correction": 1,
"offset": (page - 1) * page_size,
"limit": page_size,
"filter_fields": "",
"lc_idx": (page - 1) * page_size,
"show_all_topics": 0,
"search_source": "Filter",
"time_interval": search_time.value,
"sort": sort.value,
"vertical": note_type.value,
}
search_res = await self.get(uri, params)
utils.logger.info(f"[ZhiHuClient.get_note_by_keyword] Search result: {search_res}")
return self._extractor.extract_contents_from_search(search_res)
async def get_root_comments(self, content_id: str, content_type: str, offset: str = "", limit: int = 10,
order_by: str = "sort") -> Dict:
"""
获取内容的一级评论
Args:
content_id: 内容ID
content_type: 内容类型(answer, article, zvideo)
offset:
limit:
order_by:
Returns:
"""
uri = f"/api/v4/{content_type}s/{content_id}/root_comments"
params = {
"order": order_by,
"offset": offset,
"limit": limit
}
return await self.get(uri, params)
async def get_child_comments(self, root_comment_id: str, offset: str = "", limit: int = 10,
order_by: str = "sort") -> Dict:
"""
获取一级评论下的子评论
Args:
root_comment_id:
offset:
limit:
order_by:
Returns:
"""
uri = f"/api/v4/comment_v5/comment/{root_comment_id}/child_comment"
params = {
"order": order_by,
"offset": offset,
"limit": limit
}
return await self.get(uri, params)
async def get_note_all_comments(self, content: ZhihuContent, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[ZhihuComment]:
"""
获取指定帖子下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
Args:
content: 内容详情对象(问题|文章|视频)
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
result: List[ZhihuComment] = []
is_end: bool = False
offset: str = ""
limit: int = 10
while not is_end:
root_comment_res = await self.get_root_comments(content.content_id, content.content_type, offset, limit)
if not root_comment_res:
break
paging_info = root_comment_res.get("paging", {})
is_end = paging_info.get("is_end")
offset = self._extractor.extract_offset(paging_info)
comments = self._extractor.extract_comments(content, root_comment_res.get("data"))
if not comments:
break
if callback:
await callback(comments)
result.extend(comments)
await self.get_comments_all_sub_comments(content, comments, crawl_interval=crawl_interval, callback=callback)
await asyncio.sleep(crawl_interval)
return result
async def get_comments_all_sub_comments(self, content: ZhihuContent, comments: List[ZhihuComment], crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[ZhihuComment]:
"""
获取指定评论下的所有子评论
Args:
content: 内容详情对象(问题|文章|视频)
comments: 评论列表
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
if not config.ENABLE_GET_SUB_COMMENTS:
return []
all_sub_comments: List[ZhihuComment] = []
for parment_comment in comments:
if parment_comment.sub_comment_count == 0:
continue
is_end: bool = False
offset: str = ""
limit: int = 10
while not is_end:
child_comment_res = await self.get_child_comments(parment_comment.comment_id, offset, limit)
if not child_comment_res:
break
paging_info = child_comment_res.get("paging", {})
is_end = paging_info.get("is_end")
offset = self._extractor.extract_offset(paging_info)
sub_comments = self._extractor.extract_comments(content, child_comment_res.get("data"))
if not sub_comments:
break
if callback:
await callback(sub_comments)
all_sub_comments.extend(sub_comments)
await asyncio.sleep(crawl_interval)
return all_sub_comments
async def get_creator_info(self, url_token: str) -> Optional[ZhihuCreator]:
"""
获取创作者信息
Args:
url_token:
Returns:
"""
uri = f"/people/{url_token}"
html_content: str = await self.get(uri, return_response=True)
return self._extractor.extract_creator(url_token, html_content)
async def get_creator_answers(self, url_token: str, offset: int = 0, limit: int = 20) -> Dict:
"""
获取创作者的回答
Args:
url_token:
offset:
limit:
Returns:
"""
uri = f"/api/v4/members/{url_token}/answers"
params = {
"include":"data[*].is_normal,admin_closed_comment,reward_info,is_collapsed,annotation_action,annotation_detail,collapse_reason,collapsed_by,suggest_edit,comment_count,can_comment,content,editable_content,attachment,voteup_count,reshipment_settings,comment_permission,created_time,updated_time,review_info,excerpt,paid_info,reaction_instruction,is_labeled,label_info,relationship.is_authorized,voting,is_author,is_thanked,is_nothelp;data[*].vessay_info;data[*].author.badge[?(type=best_answerer)].topics;data[*].author.vip_info;data[*].question.has_publishing_draft,relationship",
"offset": offset,
"limit": limit,
"order_by": "created"
}
return await self.get(uri, params)
async def get_creator_articles(self, url_token: str, offset: int = 0, limit: int = 20) -> Dict:
"""
获取创作者的文章
Args:
url_token:
offset:
limit:
Returns:
"""
uri = f"/api/v4/members/{url_token}/articles"
params = {
"include":"data[*].comment_count,suggest_edit,is_normal,thumbnail_extra_info,thumbnail,can_comment,comment_permission,admin_closed_comment,content,voteup_count,created,updated,upvoted_followees,voting,review_info,reaction_instruction,is_labeled,label_info;data[*].vessay_info;data[*].author.badge[?(type=best_answerer)].topics;data[*].author.vip_info;",
"offset": offset,
"limit": limit,
"order_by": "created"
}
return await self.get(uri, params)
async def get_creator_videos(self, url_token: str, offset: int = 0, limit: int = 20) -> Dict:
"""
获取创作者的视频
Args:
url_token:
offset:
limit:
Returns:
"""
uri = f"/api/v4/members/{url_token}/zvideos"
params = {
"include":"similar_zvideo,creation_relationship,reaction_instruction",
"offset": offset,
"limit": limit,
"similar_aggregation": "true"
}
return await self.get(uri, params)
async def get_all_anwser_by_creator(self, creator: ZhihuCreator, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[ZhihuContent]:
"""
获取创作者的所有回答
Args:
creator: 创作者信息
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
Returns:
"""
all_contents: List[ZhihuContent] = []
is_end: bool = False
offset: int = 0
limit: int = 20
while not is_end:
res = await self.get_creator_answers(creator.url_token, offset, limit)
if not res:
break
utils.logger.info(f"[ZhiHuClient.get_all_anwser_by_creator] Get creator {creator.url_token} answers: {res}")
paging_info = res.get("paging", {})
is_end = paging_info.get("is_end")
contents = self._extractor.extract_content_list_from_creator(res.get("data"))
if callback:
await callback(contents)
all_contents.extend(contents)
offset += limit
await asyncio.sleep(crawl_interval)
return all_contents
async def get_all_articles_by_creator(self, creator: ZhihuCreator, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[ZhihuContent]:
"""
获取创作者的所有文章
Args:
creator:
crawl_interval:
callback:
Returns:
"""
all_contents: List[ZhihuContent] = []
is_end: bool = False
offset: int = 0
limit: int = 20
while not is_end:
res = await self.get_creator_articles(creator.url_token, offset, limit)
if not res:
break
paging_info = res.get("paging", {})
is_end = paging_info.get("is_end")
contents = self._extractor.extract_content_list_from_creator(res.get("data"))
if callback:
await callback(contents)
all_contents.extend(contents)
offset += limit
await asyncio.sleep(crawl_interval)
return all_contents
async def get_all_videos_by_creator(self, creator: ZhihuCreator, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[ZhihuContent]:
"""
获取创作者的所有视频
Args:
creator:
crawl_interval:
callback:
Returns:
"""
all_contents: List[ZhihuContent] = []
is_end: bool = False
offset: int = 0
limit: int = 20
while not is_end:
res = await self.get_creator_videos(creator.url_token, offset, limit)
if not res:
break
paging_info = res.get("paging", {})
is_end = paging_info.get("is_end")
contents = self._extractor.extract_content_list_from_creator(res.get("data"))
if callback:
await callback(contents)
all_contents.extend(contents)
offset += limit
await asyncio.sleep(crawl_interval)
return all_contents