""" 抓取PT数据 1. 创建数据库,sqlite 2. 创建表 3. 解析网站表格 4. 按照规则进行去重 5. 数据入库 数据如何展示呢?? """ import logging import random import sys import time from sqlalchemy import func from qnloft_db import db_config as config import requests import toml from bs4 import BeautifulSoup from loguru import logger from lxml import html as lhtml from urllib.parse import urlparse, parse_qs from qnloft_db.sqlite_db_main import SqliteDbMain from qnloft_db_model.PtWebsiteData import PtWebsiteData from dateutil import parser def extract_id(url, field): parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) return query_params.get(field, [None])[0] def contains_alpha_or_chinese(input_str): s = input_str.strip() # 判断是否包含字母 has_alpha = any(char.isalpha() for char in s) # 判断是否包含汉字 has_chinese = any('\u4e00' <= char <= '\u9fff' for char in s) # 返回结果 return s if has_alpha or has_chinese else None def check_seed_status(status): s = ["%", "Free", "free"] return status if any(keyword in status for keyword in s) else None class PtGetData: def __init__(self): logger.add("../log/PtGetData_{time:YYYY-MM-DD}.log", rotation="1 day", level="INFO") logger.add(sys.stderr, level="INFO") self.torrents_uri = "/torrents.php" self.headers = { 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'accept-language': 'zh,zh-CN;q=0.9', 'cache-control': 'max-age=0', 'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', } self.db_main = SqliteDbMain(config.pt_website_db) self.if_pass = False def get_data(self, section_name, section_data): res_txt = f"开始对 [{section_name}] 进行操作...,抓取数据:" logger.info(res_txt) url, cookie = section_data.get('url'), section_data.get('cookie') if cookie is not None and len(cookie.strip()) > 0: self.headers["cookie"] = cookie if len(section_data.get("torrents")) > 1: self.torrents_uri = section_data.get("torrents") html = self.get_website_html(uri=self.torrents_uri + "?sort=0&type=desc", section_name=section_name, section_data=section_data) if len(html) == 0: return try: # 取数据库中查询一下,是否存在source_name=section_name的数据,如果存在,则不是初始化 count = self.db_main.pandas_query_by_condition( model=func.count(PtWebsiteData.id), query_condition=PtWebsiteData.source_name == section_name, ) # 如果不存在,则是初始化数据 res = int(count['count_1'].iloc[0]) if res == 0: self.if_pass = True doc_html = lhtml.fromstring(html) # 解析网页内容 self.get_common_analysis(section_name, doc_html) # 获取分页 pages = self.get_common_total_page(doc_html) for i in range(0, pages): sleep_time = random.uniform(1, 3) logger.info( f"总共 【{pages}】 页,开始抓取第 【{i}】 页数据,还剩 【{pages - i}】 页,不过要休眠 {sleep_time} 秒") time.sleep(sleep_time) # 对页面数据进行解析和存储 self.get_data_by_page(section_name, section_data, i) break except Exception as e: logger.error(f"页面无法解析,请知晓!!!{e}") return def get_data_by_page(self, section_name, section_data, page_num=0): if page_num >= 1: html = self.get_website_html(uri=f"{self.torrents_uri}&page={page_num}", section_name=section_name, section_data=section_data) if len(html) == 0: return doc_html = lhtml.fromstring(html) self.get_common_analysis(section_name, doc_html) def get_common_total_page(self, doc_html): page_href = doc_html.xpath('//td[@class="embedded"]//p[@align="center"][1]//a[last()]/@href')[0] pages_str = extract_id(page_href, "page") return int(pages_str) if pages_str.isdigit() else 0 def get_common_analysis(self, section_name, doc_html): # 使用lxml解析HTML row_follow_tables = doc_html.xpath('//table[@class="torrents"]//tr[position() > 1]') for row_follow in row_follow_tables: html_content = lhtml.tostring(row_follow, encoding='unicode') # print(f"html内容:{html_content}") # 一级标题 first_title = row_follow.xpath('.//table[@class="torrentname"]//a[@title]/@title')[0] second_title_s = row_follow.xpath( './/table[@class="torrentname"]//td[@class="embedded"]/text()[normalize-space()]' '| .//table[@class="torrentname"]//td[@class="embedded"]//font[@title]/text()') # 二级标题 second_title = "" for text in second_title_s: second_title = contains_alpha_or_chinese(text) if contains_alpha_or_chinese( text) is not None else None print(f"标题:{first_title} 二级标题:{second_title}") type_id, type_name = "", "" type_html = row_follow.xpath('.//td[contains(@class, "rowfollow")][1]//a[@href]') for td_element in type_html: type_id = extract_id(td_element.xpath('./@href')[0], "cat") type_name = td_element.xpath('.//img[@title]/@title')[0] # html_content = lhtml.tostring(td_element, encoding='unicode') print(f"类型是:{type_id} + ' ' + {type_name}") # 种子状态 seed_status = 1 seed_status_html = row_follow.xpath( './/table[@class="torrentname"]//td[@class="embedded"]//img[@alt]/@alt') if len(seed_status_html) > 0: for seed in seed_status_html: s = check_seed_status(seed) if s is not None: seed_status = s print(f"种子状态:{seed_status}") seeding_status = 0 seeding_status_html = row_follow.xpath( './/table[@class="torrentname"]//div[@title]/@title') if len(seeding_status_html) > 0: seeding_status = 1 print(f"做种状态:{seeding_status}") comment_count = row_follow.xpath('.//td[@class="rowfollow"][2]//a/text()[normalize-space()]')[0] print(f"评论数:{comment_count}") upload_time = "" upload_time_html = row_follow.xpath('.//span[@title][parent::td]/@title') for td_element in upload_time_html: try: upload_time = parser.parse(td_element) except ValueError: pass print(f"资源上传时间:{upload_time}") # 资源大小 size_html = row_follow.xpath('.//td[@class="rowfollow"][3]/text()[normalize-space()]') size = size_html[0].strip() + '' + size_html[1].strip() print(f"资源大小:{size}") seed_count = row_follow.xpath('.//td[@class="rowfollow"][4]')[0].text_content().strip() print(f"做种数:{seed_count}") download_count = row_follow.xpath('.//td[@class="rowfollow"][5]')[0].text_content().strip() print(f"下载数:{download_count}") completion_count = row_follow.xpath('.//td[@class="rowfollow"][6]')[0].text_content().strip() print(f"完成数:{completion_count}") publisher = row_follow.xpath('.//td[@class="rowfollow"][7]')[0].text_content().strip() print(f"发布者:{publisher}") download_link = row_follow.xpath( './/table[@class="torrentname"]//*[contains(@class, "download")]/parent::a/@href')[0] pt_id = extract_id(download_link, "id") # 详情链接地址 details_link = row_follow.xpath('.//table[@class="torrentname"]//a[@href]/@href')[0] print( f"PT_ID == {pt_id} 下载链接:/{download_link} 详情链接:/{details_link}") # douban_rating = doc.xpath('') # print(f"豆瓣评分:/{douban_rating[0]}") # imdb_rating = doc.xpath('') # print(f"imdb_rating:/{imdb_rating[0]}") entry = PtWebsiteData( pt_id=pt_id, source_name=section_name, first_title=first_title, second_title=second_title, type_id=type_id, type_name=type_name, seed_status=seed_status, status_remaining_time="", seeding_status=seeding_status, comment_count=comment_count, upload_time=upload_time, size=size, seed_count=seed_count, download_count=download_count, completion_count=completion_count, publisher=publisher, douban_rating=0.0, imdb_rating=0.0, download_link=f'/{download_link}', details_link=f'/{details_link}' ) if self.if_pass is False: # 如果包含置顶,出现错误不管 if "置顶" in html_content: self.if_pass = True self.insert_entry(self.if_pass, entry) def insert_entry(self, if_pass, entry): # if_pass == true 则吃掉异常,代码继续存储 if if_pass: try: self.db_main.insert_entry(entry) except Exception as e: # 第一次初始化数据的时候,为了防止数据没入库完成,出现新增数据,这里先设置成pass logger.error(f"if_pass == {if_pass} 时出现错误:{e}") pass else: try: self.db_main.insert_entry(entry) except Exception as e: logger.error(f"数据存储失败,原因:{e}") raise def get_type(self, section_name, section_data): res_txt = f"开始对 [{section_name}] 进行操作...,抓取网站分类:" url, cookie = section_data.get('url'), section_data.get('cookie') if cookie is not None and len(cookie.strip()) > 0: self.headers["cookie"] = cookie html = self.get_website_html(uri="/getrss.php", section_name=section_name, section_data=section_data) if len(html) == 0: return try: soup = BeautifulSoup(html, 'html.parser') except Exception as e: logger.error(f"{section_name} , 页面无法解析,请知晓!!!") def get_website_html(self, uri, section_name, section_data): # cookie不为空时候,可以签到 url = section_data.get('url') + uri i = 0 for _ in range(5): logger.info(f"开始对:{url} 进行 第 {i} 抓取!") i = i + 1 try: response = requests.get(url, headers=self.headers, timeout=5 * 60) if response.status_code == 200: return response.text else: logger.error(f"{section_name} , 出现错误,code码是:{response.status_code}, {response.text}!!!") return "" except Exception as e: time.sleep(2) else: logger.error(f"{section_name} , 5次出现错误,无法访问!!!") return "" def opt(self): toml_file = 'PT/pt_config.toml' try: with open(toml_file, 'r', encoding='utf-8') as file: config_data = toml.load(file) # 迭代每个 section for section_name, section_data in config_data.items(): print(f"Processing section: {section_name} --- {section_data.get('url')}") url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag') if flag != 1: # 拉取数据 self.get_data(section_name, section_data) except FileNotFoundError: logger.error(f"Error: The file '{toml_file}' was not found.") except toml.TomlDecodeError as e: logger.error(f"Error decoding TOML: {e}") if __name__ == '__main__': toml_file = 'pt_config.toml' with open(toml_file, 'r', encoding='utf-8') as file: config_data = toml.load(file) # 迭代每个 section for section_name, section_data in config_data.items(): print(f"Processing section: {section_name} --- {section_data.get('url')}") url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag') if flag != 1 and cookie is not None and len(cookie.strip()) > 0: # 拉取数据 PtGetData().get_data(section_name, section_data) break