更新一些代码

This commit is contained in:
rm 2024-01-18 16:32:27 +08:00
parent 90f88a6dd6
commit 2b13729f19
3 changed files with 153 additions and 126 deletions

View File

@ -12,6 +12,8 @@ import random
import sys import sys
import time import time
from sqlalchemy import func
from qnloft_db import db_config as config from qnloft_db import db_config as config
import requests import requests
import toml import toml
@ -67,6 +69,7 @@ class PtGetData:
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
} }
self.db_main = SqliteDbMain(config.pt_website_db) self.db_main = SqliteDbMain(config.pt_website_db)
self.if_pass = False
def get_data(self, section_name, section_data): def get_data(self, section_name, section_data):
res_txt = f"开始对 [{section_name}] 进行操作...,抓取数据:" res_txt = f"开始对 [{section_name}] 进行操作...,抓取数据:"
@ -78,6 +81,15 @@ class PtGetData:
if len(html) == 0: if len(html) == 0:
return return
try: try:
# 取数据库中查询一下是否存在source_name=section_name的数据如果存在则不是初始化
count = self.db_main.pandas_query_by_condition(
model=func.count(PtWebsiteData.id),
query_condition=PtWebsiteData.source_name == section_name,
)
# 如果不存在,则是初始化数据
res = int(count['count_1'].iloc[0])
if res == 0:
self.if_pass = True
doc_html = lhtml.fromstring(html) doc_html = lhtml.fromstring(html)
# 解析网页内容 # 解析网页内容
self.get_common_analysis(section_name, doc_html) self.get_common_analysis(section_name, doc_html)
@ -211,22 +223,20 @@ class PtGetData:
download_link=f'/{download_link}', download_link=f'/{download_link}',
details_link=f'/{details_link}' details_link=f'/{details_link}'
) )
# 如果包含置顶,出现错误不管 if self.if_pass is False:
if "置顶" in html_content: # 如果包含置顶,出现错误不管
self.insert_entry(True, entry) if "置顶" in html_content:
else: self.if_pass = True
# todo 这里的逻辑明天补全 self.insert_entry(self.if_pass, entry)
# 取数据库中查询一下是否存在source_name=section_name的数据如果存在则不是初始化
# 如果不存在,则是初始化数据
pass
def insert_entry(self, if_pass, entry): def insert_entry(self, if_pass, entry):
# if_pass == true 则吃掉异常,代码继续存储
if if_pass: if if_pass:
try: try:
self.db_main.insert_entry(entry) self.db_main.insert_entry(entry)
except Exception as e: except Exception as e:
# 第一次初始化数据的时候为了防止数据没入库完成出现新增数据这里先设置成pass # 第一次初始化数据的时候为了防止数据没入库完成出现新增数据这里先设置成pass
logger.error(f"if_pass == {if_pass} 出现错误:{e}") logger.error(f"if_pass == {if_pass} 出现错误:{e}")
pass pass
else: else:
try: try:
@ -267,23 +277,22 @@ class PtGetData:
logger.error(f"{section_name} , 5次出现错误无法访问") logger.error(f"{section_name} , 5次出现错误无法访问")
return "" return ""
def opt(self):
def opt(self): toml_file = 'PT/pt_config.toml'
toml_file = 'PT/pt_config.toml' try:
try: with open(toml_file, 'r', encoding='utf-8') as file:
with open(toml_file, 'r', encoding='utf-8') as file: config_data = toml.load(file)
config_data = toml.load(file) # 迭代每个 section
# 迭代每个 section for section_name, section_data in config_data.items():
for section_name, section_data in config_data.items(): print(f"Processing section: {section_name} --- {section_data.get('url')}")
print(f"Processing section: {section_name} --- {section_data.get('url')}") url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag')
url, cookie, flag = section_data.get('url'), section_data.get('cookie'), section_data.get('flag') if flag != 1:
if flag != 1: # 拉取数据
# 拉取数据 self.get_data(section_name, section_data)
self.get_data(section_name, section_data) except FileNotFoundError:
except FileNotFoundError: logger.error(f"Error: The file '{toml_file}' was not found.")
logger.error(f"Error: The file '{toml_file}' was not found.") except toml.TomlDecodeError as e:
except toml.TomlDecodeError as e: logger.error(f"Error decoding TOML: {e}")
logger.error(f"Error decoding TOML: {e}")
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -3,13 +3,17 @@ from datetime import datetime
import pandas as pd import pandas as pd
import requests import requests
import sqlalchemy
import toml import toml
from lxml import html as lhtml from lxml import html as lhtml
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
from sqlalchemy.orm import attributes
from sqlalchemy import func
from qnloft_db.sqlite_db_main import SqliteDbMain
from qnloft_db_model.PtWebsiteData import PtWebsiteData from qnloft_db_model.PtWebsiteData import PtWebsiteData
from dateutil import parser from dateutil import parser
from qnloft_db import db_config as config
def extract_id(url, field) -> bytes: def extract_id(url, field) -> bytes:
parsed_url = urlparse(url) parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query) query_params = parse_qs(parsed_url.query)
@ -89,23 +93,15 @@ data = {col: [] for col in columns}
# 创建空 DataFrame # 创建空 DataFrame
df = pd.DataFrame(data) df = pd.DataFrame(data)
db_main = SqliteDbMain(config.pt_website_db)
def is_date(s): count = db_main.pandas_query_by_condition(
try: model=func.count(PtWebsiteData.id),
datetime.strptime(s, '%Y-%m-%d %H:%M:%S') query_condition=PtWebsiteData.source_name == "1PTBar/壹PT",
return True )
except ValueError: print(int(count['count_1'].iloc[0]))
return False
my_list = ['置顶促销', '国语配音', '中文字幕', '2021-02-02 13:26:26','2021-02-02','2021-02-02 13:26'] """
for item in my_list:
try:
parsed_date = parser.parse(item)
print(parsed_date)
except ValueError:
pass
"""
主键id,pt资源id,来源名称,一级标题,二级标题,分类id分类名称 主键id,pt资源id,来源名称,一级标题,二级标题,分类id分类名称
种子状态,状态剩余时间,做种状态,评论数,资源上传时间,资源大小 种子状态,状态剩余时间,做种状态,评论数,资源上传时间,资源大小
做种数,下载数,完成数发布者豆瓣评分IMDB评分下载链接详情链接 做种数,下载数,完成数发布者豆瓣评分IMDB评分下载链接详情链接

View File

@ -1,7 +1,7 @@
import traceback import traceback
import pandas import pandas
from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.orm import sessionmaker, scoped_session, attributes
from sqlalchemy import * from sqlalchemy import *
from qnloft_db import db_config as config from qnloft_db import db_config as config
@ -168,14 +168,29 @@ class DbMain:
self.session.close() self.session.close()
self.engine.dispose() self.engine.dispose()
def pandas_query_by_condition(self, model, query_condition): def pandas_query_by_condition(self, model, query_condition, sort_column=None, ascending=True):
try: try:
# 当需要根据多个条件进行查询操作时 # 当需要根据多个条件进行查询操作时
# query_condition = and_( # query_condition = and_(
# StockDaily.trade_date == '20230823', # StockDaily.trade_date == '20230823',
# StockDaily.symbol == 'ABC' # StockDaily.symbol == 'ABC'
# ) # )
query = self.session.query(model).filter(query_condition).order_by(model.id) print(model)
query = self.session.query(model).filter(query_condition)
# 如果模型中存在排序字段,则进行排序
if sort_column:
sort_attr = None
if isinstance(sort_column, attributes.InstrumentedAttribute):
# 如果包含点号,表示是类属性
sort_attr = sort_column
elif isinstance(sort_column, str):
# 否则,按照字符串处理
sort_attr = getattr(model, sort_column, None)
if sort_attr:
if ascending:
query = query.order_by(sort_attr)
else:
query = query.order_by(sort_attr.desc())
return self.pandas_query_by_sql(stmt=query.statement).reset_index() return self.pandas_query_by_sql(stmt=query.statement).reset_index()
except Exception as e: except Exception as e:
trace = traceback.extract_tb(e.__traceback__) trace = traceback.extract_tb(e.__traceback__)
@ -185,86 +200,93 @@ class DbMain:
finally: finally:
self.session.close() self.session.close()
# ===================== delete 方法=============================
def delete_by_id(self, model, db_id):
try:
# 使用 delete() 方法删除符合条件的记录
self.session.query(model).filter_by(id=db_id).delete()
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def delete_by_condition(self, model, delete_condition): # ===================== delete 方法=============================
try: def delete_by_id(self, model, db_id):
# 使用 delete() 方法删除符合条件的记录 try:
# 定义要删除的记录的条件 # 使用 delete() 方法删除符合条件的记录
# 例如,假设你要删除 trade_date 为 '20230823' 的记录 self.session.query(model).filter_by(id=db_id).delete()
# delete_condition = StockDaily.trade_date == '20230823' self.session.commit()
# 当需要根据多个条件进行删除操作时 except Exception as e:
# delete_condition = and_( trace = traceback.extract_tb(e.__traceback__)
# StockDaily.trade_date == '20230823', for filename, lineno, funcname, source in trace:
# StockDaily.symbol == 'ABC' print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
# ) f"错误内容:{traceback.format_exc()}")
self.session.query(model).filter(delete_condition).delete() finally:
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def delete_all_table(self, model): # 清空表数据
try:
self.session.query(model).delete()
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
# ===================== 其它 方法=============================
def has_table(self, entries):
if isinstance(entries, list):
table_name = entries[0].__tablename__
else:
table_name = entries.__tablename__
# 检查表是否存在,如果不存在则创建
return self.inspector.has_table(table_name)
def execute_sql(self, s):
try:
sql_text = text(s)
return self.session.execute(sql_text)
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def execute_sql_to_pandas(self, s):
try:
sql_text = text(s)
res = self.session.execute(sql_text)
return pandas.DataFrame(res.fetchall(), columns=res.keys())
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def close(self):
self.session.close() self.session.close()
def delete_by_condition(self, model, delete_condition):
try:
# 使用 delete() 方法删除符合条件的记录
# 定义要删除的记录的条件
# 例如,假设你要删除 trade_date 为 '20230823' 的记录
# delete_condition = StockDaily.trade_date == '20230823'
# 当需要根据多个条件进行删除操作时
# delete_condition = and_(
# StockDaily.trade_date == '20230823',
# StockDaily.symbol == 'ABC'
# )
self.session.query(model).filter(delete_condition).delete()
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def delete_all_table(self, model): # 清空表数据
try:
self.session.query(model).delete()
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
# ===================== 其它 方法=============================
def has_table(self, entries):
if isinstance(entries, list):
table_name = entries[0].__tablename__
else:
table_name = entries.__tablename__
# 检查表是否存在,如果不存在则创建
return self.inspector.has_table(table_name)
def execute_sql(self, s):
try:
sql_text = text(s)
return self.session.execute(sql_text)
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def execute_sql_to_pandas(self, s):
try:
sql_text = text(s)
res = self.session.execute(sql_text)
return pandas.DataFrame(res.fetchall(), columns=res.keys())
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def close(self):
self.session.close()