317 lines
11 KiB
Python
317 lines
11 KiB
Python
"""
|
||
抓取PT数据
|
||
1. 创建数据库,sqlite
|
||
2. 创建表
|
||
3. 解析网站表格
|
||
4. 按照规则进行去重
|
||
5. 数据入库
|
||
|
||
数据如何展示呢??
|
||
"""
|
||
import logging
|
||
import random
|
||
import sys
|
||
import time
|
||
|
||
from sqlalchemy import func
|
||
|
||
from qnloft_db import db_config as config
|
||
import requests
|
||
import toml
|
||
from bs4 import BeautifulSoup
|
||
from loguru import logger
|
||
from lxml import html as lhtml
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
from qnloft_db.sqlite_db_main import SqliteDbMain
|
||
from qnloft_db_model.PtWebsiteData import PtWebsiteData
|
||
from dateutil import parser
|
||
|
||
|
||
def extract_id(url, field):
|
||
parsed_url = urlparse(url)
|
||
query_params = parse_qs(parsed_url.query)
|
||
return query_params.get(field, [None])[0]
|
||
|
||
|
||
def contains_alpha_or_chinese(input_str):
|
||
s = input_str.strip()
|
||
# 判断是否包含字母
|
||
has_alpha = any(char.isalpha() for char in s)
|
||
# 判断是否包含汉字
|
||
has_chinese = any('\u4e00' <= char <= '\u9fff' for char in s)
|
||
# 返回结果
|
||
return s if has_alpha or has_chinese else None
|
||
|
||
|
||
def check_seed_status(status):
|
||
s = ["%", "Free", "free"]
|
||
return status if any(keyword in status for keyword in s) else None
|
||
|
||
|
||
class PtGetData:
|
||
|
||
def __init__(self):
|
||
logger.add("../log/PtGetData_{time:YYYY-MM-DD}.log", rotation="1 day", level="INFO")
|
||
logger.add(sys.stderr, level="INFO")
|
||
self.torrents_uri = "/torrents.php"
|
||
self.headers = {
|
||
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
||
'accept-language': 'zh,zh-CN;q=0.9',
|
||
'cache-control': 'max-age=0',
|
||
'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
|
||
'sec-ch-ua-mobile': '?0',
|
||
'sec-ch-ua-platform': '"macOS"',
|
||
'sec-fetch-dest': 'document',
|
||
'sec-fetch-mode': 'navigate',
|
||
'sec-fetch-site': 'same-origin',
|
||
'sec-fetch-user': '?1',
|
||
'upgrade-insecure-requests': '1',
|
||
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
|
||
}
|
||
self.db_main = SqliteDbMain(config.pt_website_db)
|
||
self.if_pass = False
|
||
|
||
def get_data(self, section_name, section_data):
|
||
res_txt = f"开始对 [{section_name}] 进行操作...,抓取数据:"
|
||
logger.info(res_txt)
|
||
url, cookie = section_data.get('url'), section_data.get('cookie')
|
||
if cookie is not None and len(cookie.strip()) > 0:
|
||
self.headers["cookie"] = cookie
|
||
if len(section_data.get("torrents")) > 1:
|
||
self.torrents_uri = section_data.get("torrents")
|
||
html = self.get_website_html(uri=self.torrents_uri + "?sort=0&type=desc", section_name=section_name,
|
||
section_data=section_data)
|
||
if len(html) == 0:
|
||
return
|
||
try:
|
||
# 取数据库中查询一下,是否存在source_name=section_name的数据,如果存在,则不是初始化
|
||
count = self.db_main.pandas_query_by_condition(
|
||
model=func.count(PtWebsiteData.id),
|
||
query_condition=PtWebsiteData.source_name == section_name,
|
||
)
|
||
# 如果不存在,则是初始化数据
|
||
res = int(count['count_1'].iloc[0])
|
||
if res == 0:
|
||
self.if_pass = True
|
||
doc_html = lhtml.fromstring(html)
|
||
# 解析网页内容
|
||
self.get_common_analysis(section_name, doc_html)
|
||
# 获取分页
|
||
pages = self.get_common_total_page(doc_html)
|
||
for i in range(0, pages):
|
||
sleep_time = random.uniform(1, 3)
|
||
logger.info(
|
||
f"总共 【{pages}】 页,开始抓取第 【{i}】 页数据,还剩 【{pages - i}】 页,不过要休眠 {sleep_time} 秒")
|
||
time.sleep(sleep_time)
|
||
# 对页面数据进行解析和存储
|
||
self.get_data_by_page(section_name, section_data, i)
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"页面无法解析,请知晓!!!{e}")
|
||
return
|
||
|
||
def get_data_by_page(self, section_name, section_data, page_num=0):
|
||
if page_num >= 1:
|
||
html = self.get_website_html(uri=f"{self.torrents_uri}&page={page_num}",
|
||
section_name=section_name, section_data=section_data)
|
||
if len(html) == 0:
|
||
return
|
||
doc_html = lhtml.fromstring(html)
|
||
self.get_common_analysis(section_name, doc_html)
|
||
|
||
def get_common_total_page(self, doc_html):
|
||
page_href = doc_html.xpath('//td[@class="embedded"]//p[@align="center"][1]//a[last()]/@href')[0]
|
||
pages_str = extract_id(page_href, "page")
|
||
return int(pages_str) if pages_str.isdigit() else 0
|
||
|
||
def get_common_analysis(self, section_name, doc_html):
|
||
# 使用lxml解析HTML
|
||
row_follow_tables = doc_html.xpath('//table[@class="torrents"]//tr[position() > 1]')
|
||
for row_follow in row_follow_tables:
|
||
html_content = lhtml.tostring(row_follow, encoding='unicode')
|
||
# print(f"html内容:{html_content}")
|
||
# 一级标题
|
||
first_title = row_follow.xpath('.//table[@class="torrentname"]//a[@title]/@title')[0]
|
||
second_title_s = row_follow.xpath(
|
||
'.//table[@class="torrentname"]//td[@class="embedded"]/text()[normalize-space()]'
|
||
'| .//table[@class="torrentname"]//td[@class="embedded"]//font[@title]/text()')
|
||
# 二级标题
|
||
second_title = ""
|
||
for text in second_title_s:
|
||
second_title = contains_alpha_or_chinese(text) if contains_alpha_or_chinese(
|
||
text) is not None else None
|
||
print(f"标题:{first_title} 二级标题:{second_title}")
|
||
type_id, type_name = "", ""
|
||
type_html = row_follow.xpath('.//td[contains(@class, "rowfollow")][1]//a[@href]')
|
||
for td_element in type_html:
|
||
type_id = extract_id(td_element.xpath('./@href')[0], "cat")
|
||
type_name = td_element.xpath('.//img[@title]/@title')[0]
|
||
# html_content = lhtml.tostring(td_element, encoding='unicode')
|
||
print(f"类型是:{type_id} + ' ' + {type_name}")
|
||
# 种子状态
|
||
seed_status = 1
|
||
seed_status_html = row_follow.xpath(
|
||
'.//table[@class="torrentname"]//td[@class="embedded"]//img[@alt]/@alt')
|
||
if len(seed_status_html) > 0:
|
||
for seed in seed_status_html:
|
||
s = check_seed_status(seed)
|
||
if s is not None:
|
||
seed_status = s
|
||
print(f"种子状态:{seed_status}")
|
||
|
||
seeding_status = 0
|
||
seeding_status_html = row_follow.xpath(
|
||
'.//table[@class="torrentname"]//div[@title]/@title')
|
||
if len(seeding_status_html) > 0:
|
||
seeding_status = 1
|
||
print(f"做种状态:{seeding_status}")
|
||
|
||
comment_count = row_follow.xpath('.//td[@class="rowfollow"][2]//a/text()[normalize-space()]')[0]
|
||
print(f"评论数:{comment_count}")
|
||
|
||
upload_time = ""
|
||
upload_time_html = row_follow.xpath('.//span[@title][parent::td]/@title')
|
||
for td_element in upload_time_html:
|
||
try:
|
||
upload_time = parser.parse(td_element)
|
||
except ValueError:
|
||
pass
|
||
print(f"资源上传时间:{upload_time}")
|
||
|
||
# 资源大小
|
||
size_html = row_follow.xpath('.//td[@class="rowfollow"][3]/text()[normalize-space()]')
|
||
size = size_html[0].strip() + '' + size_html[1].strip()
|
||
print(f"资源大小:{size}")
|
||
|
||
seed_count = row_follow.xpath('.//td[@class="rowfollow"][4]')[0].text_content().strip()
|
||
print(f"做种数:{seed_count}")
|
||
|
||
download_count = row_follow.xpath('.//td[@class="rowfollow"][5]')[0].text_content().strip()
|
||
print(f"下载数:{download_count}")
|
||
|
||
completion_count = row_follow.xpath('.//td[@class="rowfollow"][6]')[0].text_content().strip()
|
||
print(f"完成数:{completion_count}")
|
||
|
||
publisher = row_follow.xpath('.//td[@class="rowfollow"][7]')[0].text_content().strip()
|
||
print(f"发布者:{publisher}")
|
||
download_link = row_follow.xpath(
|
||
'.//table[@class="torrentname"]//*[contains(@class, "download")]/parent::a/@href')[0]
|
||
pt_id = extract_id(download_link, "id")
|
||
# 详情链接地址
|
||
details_link = row_follow.xpath('.//table[@class="torrentname"]//a[@href]/@href')[0]
|
||
print(
|
||
f"PT_ID == {pt_id} 下载链接:/{download_link} 详情链接:/{details_link}")
|
||
# douban_rating = doc.xpath('')
|
||
# print(f"豆瓣评分:/{douban_rating[0]}")
|
||
|
||
# imdb_rating = doc.xpath('')
|
||
# print(f"imdb_rating:/{imdb_rating[0]}")
|
||
entry = PtWebsiteData(
|
||
pt_id=pt_id,
|
||
source_name=section_name,
|
||
first_title=first_title,
|
||
second_title=second_title,
|
||
type_id=type_id,
|
||
type_name=type_name,
|
||
seed_status=seed_status,
|
||
status_remaining_time="",
|
||
seeding_status=seeding_status,
|
||
comment_count=comment_count,
|
||
upload_time=upload_time,
|
||
size=size,
|
||
seed_count=seed_count,
|
||
download_count=download_count,
|
||
completion_count=completion_count,
|
||
publisher=publisher,
|
||
douban_rating=0.0,
|
||
imdb_rating=0.0,
|
||
download_link=f'/{download_link}',
|
||
details_link=f'/{details_link}'
|
||
)
|
||
if self.if_pass is False:
|
||
# 如果包含置顶,出现错误不管
|
||
if "置顶" in html_content:
|
||
self.if_pass = True
|
||
self.insert_entry(self.if_pass, entry)
|
||
|
||
def insert_entry(self, if_pass, entry):
|
||
# if_pass == true 则吃掉异常,代码继续存储
|
||
if if_pass:
|
||
try:
|
||
self.db_main.insert_entry(entry)
|
||
except Exception as e:
|
||
# 第一次初始化数据的时候,为了防止数据没入库完成,出现新增数据,这里先设置成pass
|
||
logger.error(f"if_pass == {if_pass} 时出现错误:{e}")
|
||
pass
|
||
else:
|
||
try:
|
||
self.db_main.insert_entry(entry)
|
||
except Exception as e:
|
||
logger.error(f"数据存储失败,原因:{e}")
|
||
raise
|
||
|
||
def get_type(self, section_name, section_data):
|
||
res_txt = f"开始对 [{section_name}] 进行操作...,抓取网站分类:"
|
||
url, cookie = section_data.get('url'), section_data.get('cookie')
|
||
if cookie is not None and len(cookie.strip()) > 0:
|
||
self.headers["cookie"] = cookie
|
||
html = self.get_website_html(uri="/getrss.php", section_name=section_name, section_data=section_data)
|
||
if len(html) == 0:
|
||
return
|
||
try:
|
||
soup = BeautifulSoup(html, 'html.parser')
|
||
except Exception as e:
|
||
logger.error(f"{section_name} , 页面无法解析,请知晓!!!")
|
||
|
||
def get_website_html(self, uri, section_name, section_data):
|
||
# cookie不为空时候,可以签到
|
||
url = section_data.get('url') + uri
|
||
i = 0
|
||
for _ in range(5):
|
||
logger.info(f"开始对:{url} 进行 第 {i} 抓取!")
|
||
i = i + 1
|
||
try:
|
||
response = requests.get(url, headers=self.headers, timeout=5 * 60)
|
||
if response.status_code == 200:
|
||
return response.text
|
||
else:
|
||
logger.error(f"{section_name} , 出现错误,code码是:{response.status_code}, {response.text}!!!")
|
||
return ""
|
||
except Exception as e:
|
||
time.sleep(2)
|
||
else:
|
||
logger.error(f"{section_name} , 5次出现错误,无法访问!!!")
|
||
return ""
|
||
|
||
def opt(self):
|
||
toml_file = 'PT/pt_config.toml'
|
||
try:
|
||
with open(toml_file, 'r', encoding='utf-8') as file:
|
||
config_data = toml.load(file)
|
||
# 迭代每个 section
|
||
for section_name, section_data in config_data.items():
|
||
print(f"Processing section: {section_name} --- {section_data.get('url')}")
|
||
url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag')
|
||
if flag != 1:
|
||
# 拉取数据
|
||
self.get_data(section_name, section_data)
|
||
except FileNotFoundError:
|
||
logger.error(f"Error: The file '{toml_file}' was not found.")
|
||
except toml.TomlDecodeError as e:
|
||
logger.error(f"Error decoding TOML: {e}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
toml_file = 'pt_config.toml'
|
||
with open(toml_file, 'r', encoding='utf-8') as file:
|
||
config_data = toml.load(file)
|
||
# 迭代每个 section
|
||
for section_name, section_data in config_data.items():
|
||
print(f"Processing section: {section_name} --- {section_data.get('url')}")
|
||
url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag')
|
||
if flag != 1 and cookie is not None and len(cookie.strip()) > 0:
|
||
# 拉取数据
|
||
PtGetData().get_data(section_name, section_data)
|
||
break
|