feat: 支持数据保存到CSV中

This commit is contained in:
Relakkes 2023-08-16 19:49:41 +08:00
parent c1a3f06c7a
commit 9177c38521
10 changed files with 70 additions and 16 deletions

1
.gitignore vendored
View File

@ -164,3 +164,4 @@ cython_debug/
.idea .idea
/temp_image/ /temp_image/
/browser_data/ /browser_data/
/data/

View File

@ -21,7 +21,8 @@
- [x] 抖音登录二维码、手机号、cookies - [x] 抖音登录二维码、手机号、cookies
- [x] 抖音滑块模拟滑动实现准确率不太OK - [x] 抖音滑块模拟滑动实现准确率不太OK
- [x] 支持登录成功后的上下文浏览器环境保留 - [x] 支持登录成功后的上下文浏览器环境保留
- [x] 数据持久化到硬盘(关系型数据库) - [x] 数据保存到CSV中默认
- [x] 数据保持到数据库中(可选)
## 使用方法 ## 使用方法
@ -54,6 +55,9 @@
5. 打开对应APP扫二维码登录 5. 打开对应APP扫二维码登录
6. 等待爬虫程序执行完毕,数据会保存到 `data/xhs` 目录下
## 项目代码结构 ## 项目代码结构
``` ```
@ -61,10 +65,12 @@ MediaCrawler
├── base ├── base
│ ├── base_crawler.py # 项目的抽象类 │ ├── base_crawler.py # 项目的抽象类
│ └── proxy_account_pool.py # 账号与IP代理池 │ └── proxy_account_pool.py # 账号与IP代理池
├── browser_data # 浏览器数据目录
├── config ├── config
│ ├── account_config.py # 账号代理池配置 │ ├── account_config.py # 账号代理池配置
│ ├── base_config.py # 基础配置 │ ├── base_config.py # 基础配置
│ └── db_config.py # 数据库配置 │ └── db_config.py # 数据库配置
├── data # 数据保存目录
├── libs ├── libs
│ ├── douyin.js # 抖音Sign函数 │ ├── douyin.js # 抖音Sign函数
│ └── stealth.min.js # 去除浏览器自动化特征的JS │ └── stealth.min.js # 去除浏览器自动化特征的JS

View File

@ -1,6 +1,6 @@
# Desc: base config # Desc: base config
PLATFORM = "xhs" PLATFORM = "xhs"
KEYWORDS = "健身,旅游" KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = "" # login by cookie, if login_type is cookie, you must set this value COOKIES = "" # login by cookie, if login_type is cookie, you must set this value

View File

@ -9,4 +9,4 @@ RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db pas
RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler" RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler"
# save data to database option # save data to database option
IS_SAVED_DATABASED = True # if you want to save data to database, set True IS_SAVED_DATABASED = False # if you want to save data to database, set True

View File

@ -8,6 +8,7 @@ import httpx
from playwright.async_api import BrowserContext, Page from playwright.async_api import BrowserContext, Page
from tools import utils from tools import utils
from var import request_keyword_var
from .exception import * from .exception import *
from .field import * from .field import *
@ -142,7 +143,7 @@ class DOUYINClient:
del headers["Origin"] del headers["Origin"]
return await self.get("/aweme/v1/web/aweme/detail/", params, headers) return await self.get("/aweme/v1/web/aweme/detail/", params, headers)
async def get_aweme_comments(self, aweme_id: str, cursor: int = 0, keywords: str = ""): async def get_aweme_comments(self, aweme_id: str, cursor: int = 0):
"""get note comments """get note comments
""" """
@ -153,6 +154,7 @@ class DOUYINClient:
"count": 20, "count": 20,
"item_type": 0 "item_type": 0
} }
keywords = request_keyword_var.get()
referer_url = "https://www.douyin.com/search/" + keywords + '?aid=3a3cec5a-9e27-4040-b6aa-ef548c2c1138&publish_time=0&sort_type=0&source=search_history&type=general' referer_url = "https://www.douyin.com/search/" + keywords + '?aid=3a3cec5a-9e27-4040-b6aa-ef548c2c1138&publish_time=0&sort_type=0&source=search_history&type=general'
headers = copy.copy(self.headers) headers = copy.copy(self.headers)
headers["Referer"] = urllib.parse.quote(referer_url, safe=':/') headers["Referer"] = urllib.parse.quote(referer_url, safe=':/')
@ -164,7 +166,6 @@ class DOUYINClient:
crawl_interval: float = 1.0, crawl_interval: float = 1.0,
is_fetch_sub_comments=False, is_fetch_sub_comments=False,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
keywords: str = ""
): ):
""" """
get note all comments include sub comments get note all comments include sub comments
@ -172,14 +173,13 @@ class DOUYINClient:
:param crawl_interval: :param crawl_interval:
:param is_fetch_sub_comments: :param is_fetch_sub_comments:
:param callback: :param callback:
:param keywords:
:return: :return:
""" """
result = [] result = []
comments_has_more = 1 comments_has_more = 1
comments_cursor = 0 comments_cursor = 0
while comments_has_more: while comments_has_more:
comments_res = await self.get_aweme_comments(aweme_id, comments_cursor, keywords) comments_res = await self.get_aweme_comments(aweme_id, comments_cursor)
comments_has_more = comments_res.get("has_more", 0) comments_has_more = comments_res.get("has_more", 0)
comments_cursor = comments_res.get("cursor", comments_cursor + 20) comments_cursor = comments_res.get("cursor", comments_cursor + 20)
comments = comments_res.get("comments") comments = comments_res.get("comments")

View File

@ -11,6 +11,7 @@ from base.base_crawler import AbstractCrawler
from base.proxy_account_pool import AccountPool from base.proxy_account_pool import AccountPool
from models import douyin from models import douyin
from tools import utils from tools import utils
from var import request_keyword_var
from .client import DOUYINClient from .client import DOUYINClient
from .exception import DataFetchError from .exception import DataFetchError
@ -70,14 +71,15 @@ class DouYinCrawler(AbstractCrawler):
async def search(self) -> None: async def search(self) -> None:
utils.logger.info("Begin search douyin keywords") utils.logger.info("Begin search douyin keywords")
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
request_keyword_var.set(keyword)
utils.logger.info(f"Current keyword: {keyword}") utils.logger.info(f"Current keyword: {keyword}")
aweme_list: List[str] = [] aweme_list: List[str] = []
dy_limite_count = 10 # douyin fixed limit page 10 dy_limit_count = 10 # douyin fixed limit page 10
page = 0 page = 0
while (page + 1) * dy_limite_count <= config.CRAWLER_MAX_NOTES_COUNT: while (page + 1) * dy_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
try: try:
posts_res = await self.dy_client.search_info_by_keyword(keyword=keyword, posts_res = await self.dy_client.search_info_by_keyword(keyword=keyword,
offset=page * dy_limite_count) offset=page * dy_limit_count)
except DataFetchError: except DataFetchError:
utils.logger.error(f"search douyin keyword: {keyword} failed") utils.logger.error(f"search douyin keyword: {keyword} failed")
break break
@ -91,23 +93,22 @@ class DouYinCrawler(AbstractCrawler):
aweme_list.append(aweme_info.get("aweme_id", "")) aweme_list.append(aweme_info.get("aweme_id", ""))
await douyin.update_douyin_aweme(aweme_item=aweme_info) await douyin.update_douyin_aweme(aweme_item=aweme_info)
utils.logger.info(f"keyword:{keyword}, aweme_list:{aweme_list}") utils.logger.info(f"keyword:{keyword}, aweme_list:{aweme_list}")
await self.batch_get_note_comments(aweme_list, keyword) await self.batch_get_note_comments(aweme_list)
async def batch_get_note_comments(self, aweme_list: List[str], keywords: str) -> None: async def batch_get_note_comments(self, aweme_list: List[str]) -> None:
task_list: List[Task] = [] task_list: List[Task] = []
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
for aweme_id in aweme_list: for aweme_id in aweme_list:
task = asyncio.create_task(self.get_comments(aweme_id, semaphore, keywords), name=aweme_id) task = asyncio.create_task(self.get_comments(aweme_id, semaphore), name=aweme_id)
task_list.append(task) task_list.append(task)
await asyncio.wait(task_list) await asyncio.wait(task_list)
async def get_comments(self, aweme_id: str, semaphore: asyncio.Semaphore, keywords: str) -> None: async def get_comments(self, aweme_id: str, semaphore: asyncio.Semaphore) -> None:
async with semaphore: async with semaphore:
try: try:
await self.dy_client.get_aweme_all_comments( await self.dy_client.get_aweme_all_comments(
aweme_id=aweme_id, aweme_id=aweme_id,
callback=douyin.batch_update_dy_aweme_comments, callback=douyin.batch_update_dy_aweme_comments,
keywords=keywords
) )
utils.logger.info(f"aweme_id: {aweme_id} comments have all been obtained completed ...") utils.logger.info(f"aweme_id: {aweme_id} comments have all been obtained completed ...")
except DataFetchError as e: except DataFetchError as e:

View File

@ -12,6 +12,7 @@ from base.base_crawler import AbstractCrawler
from base.proxy_account_pool import AccountPool from base.proxy_account_pool import AccountPool
from models import xiaohongshu as xhs_model from models import xiaohongshu as xhs_model
from tools import utils from tools import utils
from var import request_keyword_var
from .client import XHSClient from .client import XHSClient
from .exception import DataFetchError from .exception import DataFetchError
@ -81,6 +82,8 @@ class XiaoHongShuCrawler(AbstractCrawler):
utils.logger.info("Begin search xiaohongshu keywords") utils.logger.info("Begin search xiaohongshu keywords")
xhs_limit_count = 20 # xhs limit page fixed value xhs_limit_count = 20 # xhs limit page fixed value
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
# set keyword to context var
request_keyword_var.set(keyword)
utils.logger.info(f"Current search keyword: {keyword}") utils.logger.info(f"Current search keyword: {keyword}")
page = 1 page = 1
while page * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: while page * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:

View File

@ -1,4 +1,5 @@
import json import csv
import pathlib
from typing import Dict, List from typing import Dict, List
from tortoise import fields from tortoise import fields
@ -7,6 +8,7 @@ from tortoise.contrib.pydantic import pydantic_model_creator
import config import config
from tools import utils from tools import utils
from var import request_keyword_var
class DouyinBaseModel(Model): class DouyinBaseModel(Model):
@ -98,6 +100,15 @@ async def update_douyin_aweme(aweme_item: Dict):
douyin_data = douyin_aweme_pydantic(**local_db_item) douyin_data = douyin_aweme_pydantic(**local_db_item)
douyin_aweme_pydantic.validate(douyin_data) douyin_aweme_pydantic.validate(douyin_data)
await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.dict()) await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.dict())
else:
# Below is a simple way to save it in CSV format.
source_keywords = request_keyword_var.get()
pathlib.Path(f"data/dy").mkdir(parents=True, exist_ok=True)
with open(f"data/dy/aweme_{source_keywords}.csv", mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def batch_update_dy_aweme_comments(aweme_id: str, comments: List[Dict]): async def batch_update_dy_aweme_comments(aweme_id: str, comments: List[Dict]):
@ -147,3 +158,11 @@ async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict):
comment_data = comment_pydantic(**local_db_item) comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data) comment_pydantic.validate(comment_data)
await DouyinAwemeComment.filter(comment_id=comment_id).update(**comment_data.dict()) await DouyinAwemeComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
source_keywords = request_keyword_var.get()
pathlib.Path(f"data/dy").mkdir(parents=True, exist_ok=True)
with open(f"data/dy/comment_{source_keywords}.csv", mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

View File

@ -1,3 +1,5 @@
import csv
import pathlib
from typing import Dict, List from typing import Dict, List
from tortoise import fields from tortoise import fields
@ -6,6 +8,7 @@ from tortoise.models import Model
import config import config
from tools import utils from tools import utils
from var import request_keyword_var
class XhsBaseModel(Model): class XhsBaseModel(Model):
@ -94,6 +97,15 @@ async def update_xhs_note(note_item: Dict):
note_data = note_pydantic(**local_db_item) note_data = note_pydantic(**local_db_item)
note_pydantic.validate(note_data) note_pydantic.validate(note_data)
await XHSNote.filter(note_id=note_id).update(**note_data.dict()) await XHSNote.filter(note_id=note_id).update(**note_data.dict())
else:
# Below is a simple way to save it in CSV format.
source_keywords = request_keyword_var.get()
pathlib.Path(f"data/xhs").mkdir(parents=True, exist_ok=True)
with open(f"data/xhs/notes_{source_keywords}.csv", mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())
async def update_xhs_note_comment(note_id: str, comment_item: Dict): async def update_xhs_note_comment(note_id: str, comment_item: Dict):
@ -125,3 +137,12 @@ async def update_xhs_note_comment(note_id: str, comment_item: Dict):
comment_data = comment_pydantic(**local_db_item) comment_data = comment_pydantic(**local_db_item)
comment_pydantic.validate(comment_data) comment_pydantic.validate(comment_data)
await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.dict()) await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.dict())
else:
# Below is a simple way to save it in CSV format.
source_keywords = request_keyword_var.get()
pathlib.Path(f"data/xhs").mkdir(parents=True, exist_ok=True)
with open(f"data/xhs/comment_{source_keywords}.csv", mode='a+', encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
if f.tell() == 0:
writer.writerow(local_db_item.keys())
writer.writerow(local_db_item.values())

3
var.py Normal file
View File

@ -0,0 +1,3 @@
from contextvars import ContextVar
request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="")