diff --git a/db.py b/db.py index f83205c..2d0863d 100644 --- a/db.py +++ b/db.py @@ -8,6 +8,7 @@ async def init_db(create_db: bool = False) -> None: await Tortoise.init( db_url=RELATION_DB_URL, modules={'models': ['models']}, + # modules={'models': ['models.kuaishou']}, # generate special table _create_db=create_db ) diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 9cfab0a..2c9f3c3 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -31,13 +31,6 @@ class KuaiShouClient: self.cookie_dict = cookie_dict self.graphql = KuaiShouGraphQL() - async def _pre_headers(self, url: str, data=None): - self.headers = { - "cookie":"kpf=PC_WEB; clientid=3; did=web_6e79b79fdeac627f5cf52f08cab4e6bd; kpn=KUAISHOU_VISION", - "content-type":"application/json" - } - return self.headers - async def request(self, method, url, **kwargs) -> Dict: async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( @@ -55,8 +48,7 @@ class KuaiShouClient: if isinstance(params, dict): final_uri = (f"{uri}?" f"{urlencode(params)}") - headers = await self._pre_headers(final_uri) - return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) + return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers) async def post(self, uri: str, data: dict) -> Dict: json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) @@ -81,12 +73,12 @@ class KuaiShouClient: async def search_info_by_keyword(self, keyword: str, pcursor: str): """ - KuaiShou Web Search API + KuaiShou web search api :param keyword: search keyword :param pcursor: limite page curson :return: """ - params = { + post_data = { "operationName": "visionSearchPhoto", "variables": { "keyword": keyword, @@ -95,8 +87,21 @@ class KuaiShouClient: }, "query": self.graphql.get("search_query") } - return await self.post("", params) + return await self.post("", post_data) - async def get_video_info(self, video_id: str) -> Dict: - pass \ No newline at end of file + async def get_video_info(self, photo_id: str) -> Dict: + """ + Kuaishou web video detail api + :param photo_id: + :return: + """ + post_data = { + "operationName": "visionVideoDetail", + "variables": { + "photoId": photo_id, + "page": "search" + }, + "query": self.graphql.get("video_detail") + } + return await self.post("", post_data) diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index b7afbbc..a2efd0f 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -10,6 +10,7 @@ from base.base_crawler import AbstractCrawler from base.proxy_account_pool import AccountPool from tools import utils from var import crawler_type_var +from models import kuaishou from .client import KuaiShouClient from .login import KuaishouLogin @@ -91,42 +92,38 @@ class KuaishouCrawler(AbstractCrawler): if not videos_res: utils.logger.error(f"search info by keyword:{keyword} not found data") continue - vision_search_photo = videos_res.get("visionSearchPhoto") - utils.logger.info(f"videos_res:{videos_res}") - semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) - task_list = [ - self.get_video_detail(feed_item.get("photo", {}).get("id"), semaphore) - for feed_item in vision_search_photo.get("feeds", {}) - ] - video_details = await asyncio.gather(*task_list) - for video_detail in video_details: - if video_detail is not None: - video_id_list.append(video_detail.get("id")) - page += 1 - utils.logger.info(f"Video details: {video_details}") - await self.batch_get_note_comments(video_id_list) - await asyncio.Event().wait() + vision_search_photo: Dict = videos_res.get("visionSearchPhoto") + if vision_search_photo.get("result") != 1: + utils.logger.error(f"search info by keyword:{keyword} not found data ") + continue + + for video_detail in vision_search_photo.get("feeds"): + video_id_list.append(video_detail.get("photo", {}).get("id")) + await kuaishou.update_kuaishou_video(video_item=video_detail) + + # batch fetch video comments + page += 1 + await self.batch_get_video_comments(video_id_list) async def get_specified_notes(self): pass - async def batch_get_note_comments(self, video_id_list: List[str]): - pass + async def batch_get_video_comments(self, video_id_list: List[str]): + utils.logger.info(f"[batch_get_video_comments] video ids:{video_id_list}") - async def get_video_detail(self, ): - pass - - async def get_video_detail(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: - """Get note detail""" + async def get_video_info_task(self, video_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: + """Get video detail task""" async with semaphore: try: - return await self.ks_client.get_video_info(video_id) + result = await self.ks_client.get_video_info(video_id) + utils.logger.info(f"Get video_id:{video_id} info result: {result} ...") + return result except DataFetchError as ex: - utils.logger.error(f"Get note detail error: {ex}") + utils.logger.error(f"Get video detail error: {ex}") return None except KeyError as ex: - utils.logger.error(f"have not fund note detail note_id:{note_id}, err: {ex}") + utils.logger.error(f"have not fund note detail video_id:{video_id}, err: {ex}") return None def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]: diff --git a/media_platform/kuaishou/graphql/video_detail.graphql b/media_platform/kuaishou/graphql/video_detail.graphql index e69de29..ffb5309 100644 --- a/media_platform/kuaishou/graphql/video_detail.graphql +++ b/media_platform/kuaishou/graphql/video_detail.graphql @@ -0,0 +1,80 @@ +query visionVideoDetail($photoId: String, $type: String, $page: String, $webPageArea: String) { + visionVideoDetail(photoId: $photoId, type: $type, page: $page, webPageArea: $webPageArea) { + status + type + author { + id + name + following + headerUrl + __typename + } + photo { + id + duration + caption + likeCount + realLikeCount + coverUrl + photoUrl + liked + timestamp + expTag + llsid + viewCount + videoRatio + stereoType + musicBlocked + manifest { + mediaType + businessType + version + adaptationSet { + id + duration + representation { + id + defaultSelect + backupUrl + codecs + url + height + width + avgBitrate + maxBitrate + m3u8Slice + qualityType + qualityLabel + frameRate + featureP2sp + hidden + disableAdaptive + __typename + } + __typename + } + __typename + } + manifestH265 + photoH265Url + coronaCropManifest + coronaCropManifestH265 + croppedPhotoH265Url + croppedPhotoUrl + videoResource + __typename + } + tags { + type + name + __typename + } + commentLimit { + canAddComment + __typename + } + llsid + danmakuSwitch + __typename + } +} diff --git a/media_platform/kuaishou/login.py b/media_platform/kuaishou/login.py index 3552d80..178225b 100644 --- a/media_platform/kuaishou/login.py +++ b/media_platform/kuaishou/login.py @@ -28,7 +28,16 @@ class KuaishouLogin(AbstractLogin): self.cookie_str = cookie_str async def begin(self): - pass + """Start login xiaohongshu""" + utils.logger.info("Begin login kuaishou ...") + if self.login_type == "qrcode": + await self.login_by_qrcode() + elif self.login_type == "phone": + await self.login_by_mobile() + elif self.login_type == "cookie": + await self.login_by_cookies() + else: + raise ValueError("Invalid Login Type Currently only supported qrcode or phone or cookie ...") async def login_by_qrcode(self): pass @@ -37,4 +46,11 @@ class KuaishouLogin(AbstractLogin): pass async def login_by_cookies(self): - pass + utils.logger.info("Begin login kuaishou by cookie ...") + for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items(): + await self.browser_context.add_cookies([{ + 'name': key, + 'value': value, + 'domain': ".douyin.com", + 'path': "/" + }]) diff --git a/models/kuaishou.py b/models/kuaishou.py new file mode 100644 index 0000000..00b4c97 --- /dev/null +++ b/models/kuaishou.py @@ -0,0 +1,160 @@ +import csv +import pathlib +from typing import Dict, List + +from tortoise import fields +from tortoise.contrib.pydantic import pydantic_model_creator +from tortoise.models import Model + +import config +from tools import utils +from var import crawler_type_var + + +class KuaishouBaseModel(Model): + id = fields.IntField(pk=True, autoincrement=True, description="自增ID") + user_id = fields.CharField(null=True, max_length=64, description="用户ID") + nickname = fields.CharField(null=True, max_length=64, description="用户昵称") + avatar = fields.CharField(null=True, max_length=255, description="用户头像地址") + add_ts = fields.BigIntField(description="记录添加时间戳") + last_modify_ts = fields.BigIntField(description="记录最后修改时间戳") + + class Meta: + abstract = True + + +class KuaishouVideo(KuaishouBaseModel): + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + video_type = fields.CharField(max_length=16, description="视频类型") + title = fields.CharField(null=True, max_length=500, description="视频标题") + desc = fields.TextField(null=True, description="视频描述") + create_time = fields.BigIntField(description="视频发布时间戳", index=True) + liked_count = fields.CharField(null=True, max_length=16, description="视频点赞数") + video_count = fields.CharField(null=True, max_length=16, description="视频浏览数量") + video_url = fields.CharField(null=True, max_length=512, description="视频详情URL") + video_cover_url = fields.CharField(null=True, max_length=512, description="视频封面图 URL") + video_play_url = fields.CharField(null=True, max_length=512, description="视频播放 URL") + + class Meta: + table = "kuaishou_video" + table_description = "快手视频" + + def __str__(self): + return f"{self.video_id} - {self.title}" + + +class KuaishouVideoComment(KuaishouBaseModel): + comment_id = fields.CharField(max_length=64, index=True, description="评论ID") + video_id = fields.CharField(max_length=64, index=True, description="视频ID") + content = fields.TextField(null=True, description="评论内容") + create_time = fields.BigIntField(description="评论时间戳") + sub_comment_count = fields.CharField(max_length=16, description="评论回复数") + + class Meta: + table = "kuaishou_video_comment" + table_description = "快手视频评论" + + def __str__(self): + return f"{self.comment_id} - {self.content}" + + +async def update_kuaishou_video(video_item: Dict): + photo_info: Dict = video_item.get("photo", {}) + video_id = photo_info.get("id") + user_info = video_item.get("author", {}) + local_db_item = { + "video_id": video_id, + "video_type": video_item.get("type"), + "title": photo_info.get("caption", ""), + "desc": photo_info.get("caption", ""), + "create_time": photo_info.get("timestamp"), + "user_id": user_info.get("id"), + "nickname": user_info.get("name"), + "avatar": user_info.get("headerUrl", ""), + "liked_count": photo_info.get("realLikeCount"), + "viewd_count": photo_info.get("viewCount"), + "last_modify_ts": utils.get_current_timestamp(), + "video_url": f"https://www.kuaishou.com/short-video/{video_id}", + "video_cover_url": photo_info.get("coverUrl", ""), + "video_play_url": photo_info.get("photoUrl", ""), + } + print(f"Kuaishou video id:{video_id}, title:{local_db_item.get('title')}") + if config.IS_SAVED_DATABASED: + if not await KuaishouVideo.filter(video_id=video_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoCreate', exclude=('id',)) + kuaishou_data = kuaishou_video_pydantic(**local_db_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.create(**kuaishou_data.model_dump()) + else: + kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='KuaishouVideoUpdate', + exclude=('id', 'add_ts')) + kuaishou_data = kuaishou_video_pydantic(**local_db_item) + kuaishou_video_pydantic.model_validate(kuaishou_data) + await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump()) + else: + # Below is a simple way to save it in CSV format. + pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/kuaishou/{crawler_type_var.get()}_videos_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values()) + + +async def batch_update_ks_video_comments(video_id: str, comments: List[Dict]): + if not comments: + return + for comment_item in comments: + await update_ks_video_comment(video_id, comment_item) + + +async def update_ks_video_comment(video_id: str, comment_item: Dict): + comment_video_id = comment_item.get("video_id") + if video_id != comment_video_id: + print(f"comment_video_id: {comment_video_id} != video_id: {video_id}") + return + user_info = comment_item.get("user", {}) + comment_id = comment_item.get("cid") + avatar_info = user_info.get("avatar_medium", {}) or user_info.get("avatar_300x300", {}) or user_info.get( + "avatar_168x168", {}) or user_info.get("avatar_thumb", {}) or {} + local_db_item = { + "comment_id": comment_id, + "create_time": comment_item.get("create_time"), + "ip_location": comment_item.get("ip_label", ""), + "video_id": video_id, + "content": comment_item.get("text"), + "user_id": user_info.get("uid"), + "sec_uid": user_info.get("sec_uid"), + "short_user_id": user_info.get("short_id"), + "user_unique_id": user_info.get("unique_id"), + "user_signature": user_info.get("signature"), + "nickname": user_info.get("nickname"), + "avatar": avatar_info.get("url_list", [""])[0], + "sub_comment_count": comment_item.get("reply_comment_total", 0), + "last_modify_ts": utils.get_current_timestamp(), + } + print(f"Kuaishou video comment: {comment_id}, content: {local_db_item.get('content')}") + if config.IS_SAVED_DATABASED: + if not await KuaishouVideoComment.filter(comment_id=comment_id).exists(): + local_db_item["add_ts"] = utils.get_current_timestamp() + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate', + exclude=('id',)) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await KuaishouVideoComment.create(**comment_data.dict()) + else: + comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate', + exclude=('id', 'add_ts')) + comment_data = comment_pydantic(**local_db_item) + comment_pydantic.validate(comment_data) + await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.dict()) + else: + pathlib.Path(f"data/kuaishou").mkdir(parents=True, exist_ok=True) + save_file_name = f"data/kuaishou/{crawler_type_var.get()}_comments_{utils.get_current_date()}.csv" + with open(save_file_name, mode='a+', encoding="utf-8-sig", newline="") as f: + writer = csv.writer(f) + if f.tell() == 0: + writer.writerow(local_db_item.keys()) + writer.writerow(local_db_item.values())