qnloft-spider/PT/pt_get_data.py

317 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
抓取PT数据
1. 创建数据库sqlite
2. 创建表
3. 解析网站表格
4. 按照规则进行去重
5. 数据入库
数据如何展示呢??
"""
import logging
import random
import sys
import time
from sqlalchemy import func
from qnloft_db import db_config as config
import requests
import toml
from bs4 import BeautifulSoup
from loguru import logger
from lxml import html as lhtml
from urllib.parse import urlparse, parse_qs
from qnloft_db.sqlite_db_main import SqliteDbMain
from qnloft_db_model.PtWebsiteData import PtWebsiteData
from dateutil import parser
def extract_id(url, field):
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
return query_params.get(field, [None])[0]
def contains_alpha_or_chinese(input_str):
s = input_str.strip()
# 判断是否包含字母
has_alpha = any(char.isalpha() for char in s)
# 判断是否包含汉字
has_chinese = any('\u4e00' <= char <= '\u9fff' for char in s)
# 返回结果
return s if has_alpha or has_chinese else None
def check_seed_status(status):
s = ["%", "Free", "free"]
return status if any(keyword in status for keyword in s) else None
class PtGetData:
def __init__(self):
logger.add("../log/PtGetData_{time:YYYY-MM-DD}.log", rotation="1 day", level="INFO")
logger.add(sys.stderr, level="INFO")
self.torrents_uri = "/torrents.php"
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh,zh-CN;q=0.9',
'cache-control': 'max-age=0',
'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
}
self.db_main = SqliteDbMain(config.pt_website_db)
self.if_pass = False
def get_data(self, section_name, section_data):
res_txt = f"开始对 [{section_name}] 进行操作...,抓取数据:"
logger.info(res_txt)
url, cookie = section_data.get('url'), section_data.get('cookie')
if cookie is not None and len(cookie.strip()) > 0:
self.headers["cookie"] = cookie
if len(section_data.get("torrents")) > 1:
self.torrents_uri = section_data.get("torrents")
html = self.get_website_html(uri=self.torrents_uri + "?sort=0&type=desc", section_name=section_name,
section_data=section_data)
if len(html) == 0:
return
try:
# 取数据库中查询一下是否存在source_name=section_name的数据如果存在则不是初始化
count = self.db_main.pandas_query_by_condition(
model=func.count(PtWebsiteData.id),
query_condition=PtWebsiteData.source_name == section_name,
)
# 如果不存在,则是初始化数据
res = int(count['count_1'].iloc[0])
if res == 0:
self.if_pass = True
doc_html = lhtml.fromstring(html)
# 解析网页内容
self.get_common_analysis(section_name, doc_html)
# 获取分页
pages = self.get_common_total_page(doc_html)
for i in range(0, pages):
sleep_time = random.uniform(1, 3)
logger.info(
f"总共 【{pages}】 页,开始抓取第 【{i}】 页数据,还剩 【{pages - i}】 页,不过要休眠 {sleep_time}")
time.sleep(sleep_time)
# 对页面数据进行解析和存储
self.get_data_by_page(section_name, section_data, i)
break
except Exception as e:
logger.error(f"页面无法解析,请知晓!!!{e}")
return
def get_data_by_page(self, section_name, section_data, page_num=0):
if page_num >= 1:
html = self.get_website_html(uri=f"{self.torrents_uri}&page={page_num}",
section_name=section_name, section_data=section_data)
if len(html) == 0:
return
doc_html = lhtml.fromstring(html)
self.get_common_analysis(section_name, doc_html)
def get_common_total_page(self, doc_html):
page_href = doc_html.xpath('//td[@class="embedded"]//p[@align="center"][1]//a[last()]/@href')[0]
pages_str = extract_id(page_href, "page")
return int(pages_str) if pages_str.isdigit() else 0
def get_common_analysis(self, section_name, doc_html):
# 使用lxml解析HTML
row_follow_tables = doc_html.xpath('//table[@class="torrents"]//tr[position() > 1]')
for row_follow in row_follow_tables:
html_content = lhtml.tostring(row_follow, encoding='unicode')
# print(f"html内容{html_content}")
# 一级标题
first_title = row_follow.xpath('.//table[@class="torrentname"]//a[@title]/@title')[0]
second_title_s = row_follow.xpath(
'.//table[@class="torrentname"]//td[@class="embedded"]/text()[normalize-space()]'
'| .//table[@class="torrentname"]//td[@class="embedded"]//font[@title]/text()')
# 二级标题
second_title = ""
for text in second_title_s:
second_title = contains_alpha_or_chinese(text) if contains_alpha_or_chinese(
text) is not None else None
print(f"标题:{first_title} 二级标题:{second_title}")
type_id, type_name = "", ""
type_html = row_follow.xpath('.//td[contains(@class, "rowfollow")][1]//a[@href]')
for td_element in type_html:
type_id = extract_id(td_element.xpath('./@href')[0], "cat")
type_name = td_element.xpath('.//img[@title]/@title')[0]
# html_content = lhtml.tostring(td_element, encoding='unicode')
print(f"类型是:{type_id} + ' ' + {type_name}")
# 种子状态
seed_status = 1
seed_status_html = row_follow.xpath(
'.//table[@class="torrentname"]//td[@class="embedded"]//img[@alt]/@alt')
if len(seed_status_html) > 0:
for seed in seed_status_html:
s = check_seed_status(seed)
if s is not None:
seed_status = s
print(f"种子状态:{seed_status}")
seeding_status = 0
seeding_status_html = row_follow.xpath(
'.//table[@class="torrentname"]//div[@title]/@title')
if len(seeding_status_html) > 0:
seeding_status = 1
print(f"做种状态:{seeding_status}")
comment_count = row_follow.xpath('.//td[@class="rowfollow"][2]//a/text()[normalize-space()]')[0]
print(f"评论数:{comment_count}")
upload_time = ""
upload_time_html = row_follow.xpath('.//span[@title][parent::td]/@title')
for td_element in upload_time_html:
try:
upload_time = parser.parse(td_element)
except ValueError:
pass
print(f"资源上传时间:{upload_time}")
# 资源大小
size_html = row_follow.xpath('.//td[@class="rowfollow"][3]/text()[normalize-space()]')
size = size_html[0].strip() + '' + size_html[1].strip()
print(f"资源大小:{size}")
seed_count = row_follow.xpath('.//td[@class="rowfollow"][4]')[0].text_content().strip()
print(f"做种数:{seed_count}")
download_count = row_follow.xpath('.//td[@class="rowfollow"][5]')[0].text_content().strip()
print(f"下载数:{download_count}")
completion_count = row_follow.xpath('.//td[@class="rowfollow"][6]')[0].text_content().strip()
print(f"完成数:{completion_count}")
publisher = row_follow.xpath('.//td[@class="rowfollow"][7]')[0].text_content().strip()
print(f"发布者:{publisher}")
download_link = row_follow.xpath(
'.//table[@class="torrentname"]//*[contains(@class, "download")]/parent::a/@href')[0]
pt_id = extract_id(download_link, "id")
# 详情链接地址
details_link = row_follow.xpath('.//table[@class="torrentname"]//a[@href]/@href')[0]
print(
f"PT_ID == {pt_id} 下载链接:/{download_link} 详情链接:/{details_link}")
# douban_rating = doc.xpath('')
# print(f"豆瓣评分:/{douban_rating[0]}")
# imdb_rating = doc.xpath('')
# print(f"imdb_rating/{imdb_rating[0]}")
entry = PtWebsiteData(
pt_id=pt_id,
source_name=section_name,
first_title=first_title,
second_title=second_title,
type_id=type_id,
type_name=type_name,
seed_status=seed_status,
status_remaining_time="",
seeding_status=seeding_status,
comment_count=comment_count,
upload_time=upload_time,
size=size,
seed_count=seed_count,
download_count=download_count,
completion_count=completion_count,
publisher=publisher,
douban_rating=0.0,
imdb_rating=0.0,
download_link=f'/{download_link}',
details_link=f'/{details_link}'
)
if self.if_pass is False:
# 如果包含置顶,出现错误不管
if "置顶" in html_content:
self.if_pass = True
self.insert_entry(self.if_pass, entry)
def insert_entry(self, if_pass, entry):
# if_pass == true 则吃掉异常,代码继续存储
if if_pass:
try:
self.db_main.insert_entry(entry)
except Exception as e:
# 第一次初始化数据的时候为了防止数据没入库完成出现新增数据这里先设置成pass
logger.error(f"if_pass == {if_pass} 时出现错误:{e}")
pass
else:
try:
self.db_main.insert_entry(entry)
except Exception as e:
logger.error(f"数据存储失败,原因:{e}")
raise
def get_type(self, section_name, section_data):
res_txt = f"开始对 [{section_name}] 进行操作...,抓取网站分类:"
url, cookie = section_data.get('url'), section_data.get('cookie')
if cookie is not None and len(cookie.strip()) > 0:
self.headers["cookie"] = cookie
html = self.get_website_html(uri="/getrss.php", section_name=section_name, section_data=section_data)
if len(html) == 0:
return
try:
soup = BeautifulSoup(html, 'html.parser')
except Exception as e:
logger.error(f"{section_name} , 页面无法解析,请知晓!!!")
def get_website_html(self, uri, section_name, section_data):
# cookie不为空时候可以签到
url = section_data.get('url') + uri
i = 0
for _ in range(5):
logger.info(f"开始对:{url} 进行 第 {i} 抓取!")
i = i + 1
try:
response = requests.get(url, headers=self.headers, timeout=5 * 60)
if response.status_code == 200:
return response.text
else:
logger.error(f"{section_name} , 出现错误code码是{response.status_code}, {response.text}")
return ""
except Exception as e:
time.sleep(2)
else:
logger.error(f"{section_name} , 5次出现错误无法访问")
return ""
def opt(self):
toml_file = 'PT/pt_config.toml'
try:
with open(toml_file, 'r', encoding='utf-8') as file:
config_data = toml.load(file)
# 迭代每个 section
for section_name, section_data in config_data.items():
print(f"Processing section: {section_name} --- {section_data.get('url')}")
url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag')
if flag != 1:
# 拉取数据
self.get_data(section_name, section_data)
except FileNotFoundError:
logger.error(f"Error: The file '{toml_file}' was not found.")
except toml.TomlDecodeError as e:
logger.error(f"Error decoding TOML: {e}")
if __name__ == '__main__':
toml_file = 'pt_config.toml'
with open(toml_file, 'r', encoding='utf-8') as file:
config_data = toml.load(file)
# 迭代每个 section
for section_name, section_data in config_data.items():
print(f"Processing section: {section_name} --- {section_data.get('url')}")
url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag')
if flag != 1 and cookie is not None and len(cookie.strip()) > 0:
# 拉取数据
PtGetData().get_data(section_name, section_data)
break