From 2b13729f195ee9d6d8866d08662bc99f8c363ac7 Mon Sep 17 00:00:00 2001 From: rm Date: Thu, 18 Jan 2024 16:32:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=B8=80=E4=BA=9B=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- PT/pt_get_data.py | 61 ++++++++------ PT/test.py | 28 +++---- qnloft_db/db_main.py | 190 ++++++++++++++++++++++++------------------- 3 files changed, 153 insertions(+), 126 deletions(-) diff --git a/PT/pt_get_data.py b/PT/pt_get_data.py index e41284c..f1829d4 100644 --- a/PT/pt_get_data.py +++ b/PT/pt_get_data.py @@ -12,6 +12,8 @@ import random import sys import time +from sqlalchemy import func + from qnloft_db import db_config as config import requests import toml @@ -67,6 +69,7 @@ class PtGetData: 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', } self.db_main = SqliteDbMain(config.pt_website_db) + self.if_pass = False def get_data(self, section_name, section_data): res_txt = f"开始对 [{section_name}] 进行操作...,抓取数据:" @@ -78,6 +81,15 @@ class PtGetData: if len(html) == 0: return try: + # 取数据库中查询一下,是否存在source_name=section_name的数据,如果存在,则不是初始化 + count = self.db_main.pandas_query_by_condition( + model=func.count(PtWebsiteData.id), + query_condition=PtWebsiteData.source_name == section_name, + ) + # 如果不存在,则是初始化数据 + res = int(count['count_1'].iloc[0]) + if res == 0: + self.if_pass = True doc_html = lhtml.fromstring(html) # 解析网页内容 self.get_common_analysis(section_name, doc_html) @@ -211,22 +223,20 @@ class PtGetData: download_link=f'/{download_link}', details_link=f'/{details_link}' ) - # 如果包含置顶,出现错误不管 - if "置顶" in html_content: - self.insert_entry(True, entry) - else: - # todo 这里的逻辑明天补全 - # 取数据库中查询一下,是否存在source_name=section_name的数据,如果存在,则不是初始化 - # 如果不存在,则是初始化数据 - pass + if self.if_pass is False: + # 如果包含置顶,出现错误不管 + if "置顶" in html_content: + self.if_pass = True + self.insert_entry(self.if_pass, entry) def insert_entry(self, if_pass, entry): + # if_pass == true 则吃掉异常,代码继续存储 if if_pass: try: self.db_main.insert_entry(entry) except Exception as e: # 第一次初始化数据的时候,为了防止数据没入库完成,出现新增数据,这里先设置成pass - logger.error(f"if_pass == {if_pass} 是出现错误:{e}") + logger.error(f"if_pass == {if_pass} 时出现错误:{e}") pass else: try: @@ -267,23 +277,22 @@ class PtGetData: logger.error(f"{section_name} , 5次出现错误,无法访问!!!") return "" - -def opt(self): - toml_file = 'PT/pt_config.toml' - try: - with open(toml_file, 'r', encoding='utf-8') as file: - config_data = toml.load(file) - # 迭代每个 section - for section_name, section_data in config_data.items(): - print(f"Processing section: {section_name} --- {section_data.get('url')}") - url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag') - if flag != 1: - # 拉取数据 - self.get_data(section_name, section_data) - except FileNotFoundError: - logger.error(f"Error: The file '{toml_file}' was not found.") - except toml.TomlDecodeError as e: - logger.error(f"Error decoding TOML: {e}") + def opt(self): + toml_file = 'PT/pt_config.toml' + try: + with open(toml_file, 'r', encoding='utf-8') as file: + config_data = toml.load(file) + # 迭代每个 section + for section_name, section_data in config_data.items(): + print(f"Processing section: {section_name} --- {section_data.get('url')}") + url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag') + if flag != 1: + # 拉取数据 + self.get_data(section_name, section_data) + except FileNotFoundError: + logger.error(f"Error: The file '{toml_file}' was not found.") + except toml.TomlDecodeError as e: + logger.error(f"Error decoding TOML: {e}") if __name__ == '__main__': diff --git a/PT/test.py b/PT/test.py index e281344..187b5ac 100644 --- a/PT/test.py +++ b/PT/test.py @@ -3,13 +3,17 @@ from datetime import datetime import pandas as pd import requests +import sqlalchemy import toml from lxml import html as lhtml from urllib.parse import urlparse, parse_qs +from sqlalchemy.orm import attributes +from sqlalchemy import func +from qnloft_db.sqlite_db_main import SqliteDbMain from qnloft_db_model.PtWebsiteData import PtWebsiteData from dateutil import parser - +from qnloft_db import db_config as config def extract_id(url, field) -> bytes: parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) @@ -89,23 +93,15 @@ data = {col: [] for col in columns} # 创建空 DataFrame df = pd.DataFrame(data) - -def is_date(s): - try: - datetime.strptime(s, '%Y-%m-%d %H:%M:%S') - return True - except ValueError: - return False +db_main = SqliteDbMain(config.pt_website_db) +count = db_main.pandas_query_by_condition( + model=func.count(PtWebsiteData.id), + query_condition=PtWebsiteData.source_name == "1PTBar/壹PT", +) +print(int(count['count_1'].iloc[0])) -my_list = ['置顶促销', '国语配音', '中文字幕', '2021-02-02 13:26:26','2021-02-02','2021-02-02 13:26'] -for item in my_list: - try: - parsed_date = parser.parse(item) - print(parsed_date) - except ValueError: - pass - """ +""" 主键id,pt资源id,来源名称,一级标题,二级标题,分类id,分类名称 种子状态,状态剩余时间,做种状态,评论数,资源上传时间,资源大小, 做种数,下载数,完成数,发布者,豆瓣评分,IMDB评分,下载链接,详情链接 diff --git a/qnloft_db/db_main.py b/qnloft_db/db_main.py index 2ea12d2..459b792 100644 --- a/qnloft_db/db_main.py +++ b/qnloft_db/db_main.py @@ -1,7 +1,7 @@ import traceback import pandas -from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.orm import sessionmaker, scoped_session, attributes from sqlalchemy import * from qnloft_db import db_config as config @@ -168,14 +168,29 @@ class DbMain: self.session.close() self.engine.dispose() - def pandas_query_by_condition(self, model, query_condition): + def pandas_query_by_condition(self, model, query_condition, sort_column=None, ascending=True): try: # 当需要根据多个条件进行查询操作时 # query_condition = and_( # StockDaily.trade_date == '20230823', # StockDaily.symbol == 'ABC' # ) - query = self.session.query(model).filter(query_condition).order_by(model.id) + print(model) + query = self.session.query(model).filter(query_condition) + # 如果模型中存在排序字段,则进行排序 + if sort_column: + sort_attr = None + if isinstance(sort_column, attributes.InstrumentedAttribute): + # 如果包含点号,表示是类属性 + sort_attr = sort_column + elif isinstance(sort_column, str): + # 否则,按照字符串处理 + sort_attr = getattr(model, sort_column, None) + if sort_attr: + if ascending: + query = query.order_by(sort_attr) + else: + query = query.order_by(sort_attr.desc()) return self.pandas_query_by_sql(stmt=query.statement).reset_index() except Exception as e: trace = traceback.extract_tb(e.__traceback__) @@ -185,86 +200,93 @@ class DbMain: finally: self.session.close() - # ===================== delete 方法============================= - def delete_by_id(self, model, db_id): - try: - # 使用 delete() 方法删除符合条件的记录 - self.session.query(model).filter_by(id=db_id).delete() - self.session.commit() - except Exception as e: - trace = traceback.extract_tb(e.__traceback__) - for filename, lineno, funcname, source in trace: - print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" - f"错误内容:{traceback.format_exc()}") - finally: - self.session.close() - def delete_by_condition(self, model, delete_condition): - try: - # 使用 delete() 方法删除符合条件的记录 - # 定义要删除的记录的条件 - # 例如,假设你要删除 trade_date 为 '20230823' 的记录 - # delete_condition = StockDaily.trade_date == '20230823' - # 当需要根据多个条件进行删除操作时 - # delete_condition = and_( - # StockDaily.trade_date == '20230823', - # StockDaily.symbol == 'ABC' - # ) - self.session.query(model).filter(delete_condition).delete() - self.session.commit() - except Exception as e: - trace = traceback.extract_tb(e.__traceback__) - for filename, lineno, funcname, source in trace: - print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" - f"错误内容:{traceback.format_exc()}") - finally: - self.session.close() - - def delete_all_table(self, model): # 清空表数据 - try: - self.session.query(model).delete() - self.session.commit() - except Exception as e: - trace = traceback.extract_tb(e.__traceback__) - for filename, lineno, funcname, source in trace: - print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" - f"错误内容:{traceback.format_exc()}") - finally: - self.session.close() - - # ===================== 其它 方法============================= - def has_table(self, entries): - if isinstance(entries, list): - table_name = entries[0].__tablename__ - else: - table_name = entries.__tablename__ - # 检查表是否存在,如果不存在则创建 - return self.inspector.has_table(table_name) - - def execute_sql(self, s): - try: - sql_text = text(s) - return self.session.execute(sql_text) - except Exception as e: - trace = traceback.extract_tb(e.__traceback__) - for filename, lineno, funcname, source in trace: - print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" - f"错误内容:{traceback.format_exc()}") - finally: - self.session.close() - - def execute_sql_to_pandas(self, s): - try: - sql_text = text(s) - res = self.session.execute(sql_text) - return pandas.DataFrame(res.fetchall(), columns=res.keys()) - except Exception as e: - trace = traceback.extract_tb(e.__traceback__) - for filename, lineno, funcname, source in trace: - print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" - f"错误内容:{traceback.format_exc()}") - finally: - self.session.close() - - def close(self): +# ===================== delete 方法============================= +def delete_by_id(self, model, db_id): + try: + # 使用 delete() 方法删除符合条件的记录 + self.session.query(model).filter_by(id=db_id).delete() + self.session.commit() + except Exception as e: + trace = traceback.extract_tb(e.__traceback__) + for filename, lineno, funcname, source in trace: + print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" + f"错误内容:{traceback.format_exc()}") + finally: self.session.close() + + +def delete_by_condition(self, model, delete_condition): + try: + # 使用 delete() 方法删除符合条件的记录 + # 定义要删除的记录的条件 + # 例如,假设你要删除 trade_date 为 '20230823' 的记录 + # delete_condition = StockDaily.trade_date == '20230823' + # 当需要根据多个条件进行删除操作时 + # delete_condition = and_( + # StockDaily.trade_date == '20230823', + # StockDaily.symbol == 'ABC' + # ) + self.session.query(model).filter(delete_condition).delete() + self.session.commit() + except Exception as e: + trace = traceback.extract_tb(e.__traceback__) + for filename, lineno, funcname, source in trace: + print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" + f"错误内容:{traceback.format_exc()}") + finally: + self.session.close() + + +def delete_all_table(self, model): # 清空表数据 + try: + self.session.query(model).delete() + self.session.commit() + except Exception as e: + trace = traceback.extract_tb(e.__traceback__) + for filename, lineno, funcname, source in trace: + print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" + f"错误内容:{traceback.format_exc()}") + finally: + self.session.close() + + +# ===================== 其它 方法============================= +def has_table(self, entries): + if isinstance(entries, list): + table_name = entries[0].__tablename__ + else: + table_name = entries.__tablename__ + # 检查表是否存在,如果不存在则创建 + return self.inspector.has_table(table_name) + + +def execute_sql(self, s): + try: + sql_text = text(s) + return self.session.execute(sql_text) + except Exception as e: + trace = traceback.extract_tb(e.__traceback__) + for filename, lineno, funcname, source in trace: + print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" + f"错误内容:{traceback.format_exc()}") + finally: + self.session.close() + + +def execute_sql_to_pandas(self, s): + try: + sql_text = text(s) + res = self.session.execute(sql_text) + return pandas.DataFrame(res.fetchall(), columns=res.keys()) + except Exception as e: + trace = traceback.extract_tb(e.__traceback__) + for filename, lineno, funcname, source in trace: + print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}" + f"错误内容:{traceback.format_exc()}") + finally: + self.session.close() + + +def close(self): + self.session.close()