新增数据库操作

This commit is contained in:
rm 2024-01-16 23:33:13 +08:00
parent d1207d1bfb
commit f3e2a05e34
8 changed files with 495 additions and 32 deletions

View File

@ -11,6 +11,7 @@
import sys
import time
from qnloft_db import db_config as config
import requests
import toml
from bs4 import BeautifulSoup
@ -18,6 +19,9 @@ from loguru import logger
from lxml import html as lhtml
from urllib.parse import urlparse, parse_qs
from qnloft_db.sqlite_db_main import SqliteDbMain
from qnloft_db_model.PtWebsiteData import PtWebsiteData
def extract_id(url, field):
parsed_url = urlparse(url)
@ -46,6 +50,7 @@ class PtGetData:
logger.add("../log/PtGetData_{time:YYYY-MM-DD}.log", rotation="1 day", level="INFO")
logger.add(sys.stderr, level="INFO")
self.toml_file = 'PT/pt_config.toml'
self.torrents_uri = "/torrents.php?sort=0&type=desc"
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh,zh-CN;q=0.9',
@ -60,19 +65,21 @@ class PtGetData:
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
}
self.db_main = SqliteDbMain(config.pt_website_db)
def get_data(self, section_name, section_data):
res_txt = f"开始对 [{section_name}] 进行操作...,抓取数据:"
print(res_txt)
url, cookie = section_data.get('url'), section_data.get('cookie')
if cookie is not None and len(cookie.strip()) > 0:
self.headers["cookie"] = cookie
html = self.get_website_html(uri="/torrents.php", section_name=section_name, section_data=section_data)
html = self.get_website_html(uri=self.torrents_uri, section_name=section_name, section_data=section_data)
if len(html) == 0:
return
try:
doc_html = lhtml.fromstring(html)
# 解析网页内容
self.get_common_analysis(doc_html)
self.get_common_analysis(section_name, doc_html)
# 获取分页
pages = self.get_common_total_page(doc_html)
for i in range(0, pages):
@ -84,28 +91,28 @@ class PtGetData:
def get_data_by_page(self, section_name, section_data, page_num=0):
if page_num > 1:
html = self.get_website_html(uri=f"/torrents.php?incldead=1&spstate=0&page={page_num}",
html = self.get_website_html(uri=f"{self.torrents_uri}&incldead=1&spstate=0&page={page_num}",
section_name=section_name, section_data=section_data)
if len(html) == 0:
return
doc_html = lhtml.fromstring(html)
self.get_common_analysis(doc_html)
self.get_common_analysis(section_name, doc_html)
def get_common_total_page(self, doc_html):
page_href = doc_html.xpath('//td[@class="embedded"]//p[@align="center"][1]//a[last()]/@href')[0]
pages_str = extract_id(page_href, "page")
return int(pages_str) if pages_str.isdigit() else 0
def get_common_analysis(self, doc_html):
def get_common_analysis(self, section_name, doc_html):
entries = []
# 使用lxml解析HTML
row_follow_tables = doc_html.xpath('//table[@class="torrents"]//tr[position() > 1]')
for row_follow in row_follow_tables:
html_content = lhtml.tostring(row_follow, encoding='unicode')
# html_content = lhtml.tostring(row_follow, encoding='unicode')
# print(f"html内容{html_content}")
print("=" * 100)
# 一级标题
first_title = row_follow.xpath('.//table[@class="torrentname"]//a[@title]/@title')[0]
print(f"标题:{first_title}")
second_title_s = row_follow.xpath(
'.//table[@class="torrentname"]//td[@class="embedded"]/text()[normalize-space()]'
'| .//table[@class="torrentname"]//td[@class="embedded"]//font[@title]/text()')
@ -114,14 +121,13 @@ class PtGetData:
for text in second_title_s:
second_title = contains_alpha_or_chinese(text) if contains_alpha_or_chinese(
text) is not None else None
print(f"二级标题:{second_title}")
type_id, type_name = "", ""
type_html = row_follow.xpath('.//td[contains(@class, "rowfollow")][1]//a[@href]')
for td_element in type_html:
type_id = extract_id(td_element.xpath('./@href')[0], "cat")
type_name = td_element.xpath('.//img[@title]/@title')[0]
html_content = lhtml.tostring(td_element, encoding='unicode')
# html_content = lhtml.tostring(td_element, encoding='unicode')
print(f"类型是:{type_id} + ' ' + {type_name}")
# 种子状态
seed_status = 1
@ -168,12 +174,34 @@ class PtGetData:
print(f"发布者:{publisher}")
download_link = row_follow.xpath(
'.//table[@class="torrentname"]//*[contains(@class, "download")]/parent::a/@href')[0]
print(f"下载链接:/{download_link}")
pt_id = extract_id(download_link, "id")
print(f"PT_ID == {pt_id}")
# 详情链接地址
details_link = row_follow.xpath('.//table[@class="torrentname"]//a[@href]/@href')[0]
print(f"详情链接:/{details_link}")
print(f"PT_ID == {pt_id} 标题:{first_title} 二级标题:{second_title} 下载链接:/{download_link} 详情链接:/{details_link}")
entry = PtWebsiteData(
pt_id=pt_id,
source_name=section_name,
first_title=first_title,
second_title=second_title,
type_id=type_id,
type_name=type_name,
seed_status=seed_status,
status_remaining_time="",
seeding_status=seeding_status,
comment_count=comment_count,
upload_time=upload_time,
size=size,
seed_count=seed_count,
download_count=download_count,
completion_count=completion_count,
publisher=publisher,
douban_rating=0.0,
imdb_rating=0.0,
download_link=f'/{download_link}',
details_link=f'/{details_link}'
)
entries.append(entry)
self.db_main.insert_all_entry(entries)
# break
# douban_rating = doc.xpath('')

View File

@ -1,10 +1,13 @@
import time
import pandas as pd
import requests
import toml
from lxml import html as lhtml
from urllib.parse import urlparse, parse_qs
from qnloft_db_model.PtWebsiteData import PtWebsiteData
def extract_id(url, field) -> bytes:
parsed_url = urlparse(url)
@ -53,29 +56,46 @@ cookie = "cf_clearance=jF00JzyDrgabpWNeI.lugDevna4ve32DaZmrQRdaCT0-1704082342-0-
headers["cookie"] = cookie
url = url + "/torrents.php"
html = ""
for _ in range(5):
try:
response = requests.get(url, headers=headers, timeout=5 * 60)
if response.status_code == 200:
html = response.text
break
else:
print(f"{section_name} , 出现错误code码是{response.status_code}, {response.text}")
break
except Exception as e:
time.sleep(2)
else:
print(f"{section_name} , 5次出现错误无法访问")
# for _ in range(5):
# try:
# response = requests.get(url, headers=headers, timeout=5 * 60)
# if response.status_code == 200:
# html = response.text
# break
# else:
# print(f"{section_name} , 出现错误code码是{response.status_code}, {response.text}")
# break
# except Exception as e:
# time.sleep(2)
# else:
# print(f"{section_name} , 5次出现错误无法访问")
# with open('test.html', 'r', encoding='utf-8') as file:
# html = file.read()
# print(html)
doc_html = lhtml.fromstring(html)
page_href = doc_html.xpath('//td[@class="embedded"]//p[@align="center"][1]//a[last()]/@href')[0]
pages_str = extract_id(page_href, "page")
pages = int(pages_str) if pages_str.isdigit() else 0
for i in range(0, pages):
print(i)
# doc_html = lhtml.fromstring(html)
# page_href = doc_html.xpath('//td[@class="embedded"]//p[@align="center"][1]//a[last()]/@href')[0]
# pages_str = extract_id(page_href, "page")
# pages = int(pages_str) if pages_str.isdigit() else 0
# for i in range(0, pages):
# print(i)
# 查询数据库表的结构信息
columns = PtWebsiteData.__table__.columns.keys()
# 使用列名创建一个包含空值的字典
data = {col: [] for col in columns}
# 创建空 DataFrame
df = pd.DataFrame(data)
for i in range(0,10):
# 创建一行数据
row_data = {'pt_id': i}
# 将一行数据添加到 DataFrame
df = df.append(row_data, ignore_index=True)
print(df)
"""
主键id,pt资源id,来源名称,一级标题,二级标题,分类id分类名称
种子状态,状态剩余时间,做种状态,评论数,资源上传时间,资源大小

22
qnloft_db/db_config.py Normal file
View File

@ -0,0 +1,22 @@
mysql = {
"url": "127.0.0.1",
"port": "3306",
"username": "root",
"password": "qeadzc123",
"database": "qnloft_hospital",
}
# pt网站数据
pt_website_db = "pt_website.db"
# 板块相关数据
stock_sector_db = "stock_sector.db"
# 个股日线数据
stock_daily_db = "stock_daily.db"
# 分钟线
stock_daily_freq_db = "stock_daily_freq.db"
# redis_info = ['10.10.XXX', XXX]
# mongo_info = ['10.10.XXX', XXX]
# es_info = ['10.10.XXX', XXX]
# sqlserver_info = ['10.10.XXX:XXX', 'XXX', 'XXX', 'XXX']
# db2_info = ['10.10.XXX', XXX, 'XXX', 'XXX']
# postgre_info = ['XXX', 'XXX', 'XXX', '10.10.XXX', XXX]
# ck_info = ['10.10.XXX', XXX]

269
qnloft_db/db_main.py Normal file
View File

@ -0,0 +1,269 @@
import traceback
import pandas
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import *
from qnloft_db import db_config as config
class DbMain:
def __init__(self):
# clear_mappers()
self.inspector = None
self.session = None
self.engine = None
def get_session(self):
# 绑定引擎
session_factory = sessionmaker(bind=self.engine)
# 创建数据库链接池直接使用session即可为当前线程拿出一个链接对象conn
# 内部会采用threading.local进行隔离
Session = scoped_session(session_factory)
return Session()
# ===================== insert or update方法=============================
def insert_all_entry(self, entries):
try:
self.create_table(entries)
self.session.add_all(entries)
self.session.commit()
except Exception as e:
print(e)
finally:
self.session.close()
def insert_entry(self, entry):
try:
self.create_table(entry)
self.session.add(entry)
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误")
finally:
self.session.close()
def insert_or_update(self, entry, query_conditions):
"""
insert_or_update 的是用需要在 对象中新增to_dict方法将需要更新的字段转成字典
:param entry:
:param query_conditions:
:return:
"""
try:
self.create_table(entry)
conditions = " AND ".join(f"{key} = '{value}'" for key, value in query_conditions.items())
select_sql = text(f"select count(1) from {entry.__tablename__} where 1=1 and {conditions}")
result = self.session.execute(select_sql)
# 如果查询有结果则执行update操作
if result.scalar() > 0:
if hasattr(entry, 'to_dict'):
formatted_attributes = ", ".join([f'{attr} = "{value}"' for attr, value in entry.to_dict().items()])
update_sql = text(f"UPDATE {entry.__tablename__} SET {formatted_attributes} WHERE {conditions}")
self.session.execute(update_sql)
else:
raise Exception("对象 不包含 to_dict 方法,请添加!")
else:
# 执行新增错做
self.insert_entry(entry)
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.expire_all()
self.session.close()
def create_table(self, entries):
if isinstance(entries, list):
table_name = entries[0].__tablename__
else:
table_name = entries.__tablename__
# 检查表是否存在,如果不存在则创建
if not self.inspector.has_table(table_name):
if isinstance(entries, list):
entries[0].metadata.create_all(self.engine)
else:
entries.metadata.create_all(self.engine)
def pandas_insert(self, data=pandas, table_name=""):
"""
新增数据操作类型是pandas
:param data:
:param table_name:
:return:
"""
try:
data.to_sql(table_name, con=self.engine.connect(), if_exists='append', index=True, index_label='id')
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
self.engine.dispose()
# ===================== select 方法=============================
def query_by_id(self, model, db_id):
try:
return self.session.query(model).filter_by(id=db_id).all()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def pandas_query_by_model(self, model, order_col=None, page_number=None, page_size=None):
"""
使用pandas的sql引擎执行
:param page_size:每页的记录数
:param page_number:第N页
:param model:
:param order_col:
:return:
"""
try:
# 判断表是否存在
if self.has_table(model):
query = self.session.query(model)
if order_col is not None:
query = query.order_by(order_col)
if page_number is not None and page_size is not None:
offset = (page_number - 1) * page_size
query = query.offset(offset).limit(page_size)
return pandas.read_sql_query(query.statement, self.engine.connect())
return pandas.DataFrame()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
self.engine.dispose()
def pandas_query_by_sql(self, stmt=""):
"""
使用pandas的sql引擎执行
:param stmt:
:return:
"""
try:
return pandas.read_sql_query(sql=stmt, con=self.engine.connect())
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
self.engine.dispose()
def pandas_query_by_condition(self, model, query_condition):
try:
# 当需要根据多个条件进行查询操作时
# query_condition = and_(
# StockDaily.trade_date == '20230823',
# StockDaily.symbol == 'ABC'
# )
query = self.session.query(model).filter(query_condition).order_by(model.id)
return self.pandas_query_by_sql(stmt=query.statement).reset_index()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
# ===================== delete 方法=============================
def delete_by_id(self, model, db_id):
try:
# 使用 delete() 方法删除符合条件的记录
self.session.query(model).filter_by(id=db_id).delete()
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def delete_by_condition(self, model, delete_condition):
try:
# 使用 delete() 方法删除符合条件的记录
# 定义要删除的记录的条件
# 例如,假设你要删除 trade_date 为 '20230823' 的记录
# delete_condition = StockDaily.trade_date == '20230823'
# 当需要根据多个条件进行删除操作时
# delete_condition = and_(
# StockDaily.trade_date == '20230823',
# StockDaily.symbol == 'ABC'
# )
self.session.query(model).filter(delete_condition).delete()
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def delete_all_table(self, model): # 清空表数据
try:
self.session.query(model).delete()
self.session.commit()
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
# ===================== 其它 方法=============================
def has_table(self, entries):
if isinstance(entries, list):
table_name = entries[0].__tablename__
else:
table_name = entries.__tablename__
# 检查表是否存在,如果不存在则创建
return self.inspector.has_table(table_name)
def execute_sql(self, s):
try:
sql_text = text(s)
return self.session.execute(sql_text)
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def execute_sql_to_pandas(self, s):
try:
sql_text = text(s)
res = self.session.execute(sql_text)
return pandas.DataFrame(res.fetchall(), columns=res.keys())
except Exception as e:
trace = traceback.extract_tb(e.__traceback__)
for filename, lineno, funcname, source in trace:
print(f"在文件 {filename} 的第 {lineno} 行发生错误 ,方法名称:{funcname} 发生错误的源码: {source}"
f"错误内容:{traceback.format_exc()}")
finally:
self.session.close()
def close(self):
self.session.close()

View File

@ -0,0 +1,26 @@
from qnloft_db.db_main import *
def create_mysql_engine():
# 创建引擎
return create_engine(
f"mysql+pymysql://{config.mysql['username']}:{config.mysql['password']}@{config.mysql['url']}:{config.mysql['port']}/{config.mysql['database']}?charset=utf8mb4",
# "mysql+pymysql://tom@127.0.0.1:3306/db1?charset=utf8mb4", # 无密码时
# 超过链接池大小外最多创建的链接
max_overflow=0,
# 链接池大小
pool_size=5,
# 链接池中没有可用链接则最多等待的秒数,超过该秒数后报错
pool_timeout=10,
# 多久之后对链接池中的链接进行一次回收
pool_recycle=1,
# 查看原生语句(未格式化)
echo=True
)
class MysqlDbMain(DbMain):
def __init__(self):
super().__init__()
self.engine = create_mysql_engine()
self.session = self.get_session()

View File

@ -0,0 +1,44 @@
from pathlib import Path
from qnloft_db.db_main import *
import platform
class SqliteDbMain(DbMain):
def __init__(self, database_name):
self.database_name = database_name
super().__init__()
self.engine = self.__create_sqlite_engine()
self.engine_path = self.__get_path()
self.session = self.get_session()
self.inspector = inspect(self.engine)
def __get_path(self):
sys_platform = platform.platform().lower()
print(f'当前操作系统:{platform.platform()}')
__engine = ''
if 'windows' in sys_platform.lower():
__engine = f"E:\\sqlite_db\\stock_db\\{self.database_name}"
elif 'macos' in sys_platform.lower():
__engine = f"/Users/renmeng/Documents/sqlite_db/{self.database_name}"
else:
__engine = f"{self.database_name}"
return __engine
def __create_sqlite_engine(self):
sys_platform = platform.platform().lower()
__engine = ''
if 'windows' in sys_platform.lower():
__engine = f"sqlite:///E:\\sqlite_db\\stock_db\\{self.database_name}"
elif 'macos' in sys_platform.lower():
__engine = f"sqlite:////Users/renmeng/Documents/sqlite_db/{self.database_name}"
else:
__engine = f"sqlite:///{self.database_name}"
print(f"当前__engine是{__engine}")
return create_engine(__engine, pool_size=10, pool_timeout=10, echo=True)
def get_db_size(self):
file_size = Path(self.engine_path).stat().st_size
total = f"{file_size / (1024 * 1024):.2f} MB"
print(f"文件大小: {file_size / (1024 * 1024):.2f} MB")
return total

View File

@ -0,0 +1,54 @@
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Float, UniqueConstraint
class PtWebsiteData(declarative_base()):
__tablename__ = 'pt_website_data'
id = Column(Integer, primary_key=True)
# pt资源id
pt_id = Column(Integer, nullable=False)
# 来源名称
source_name = Column(String, nullable=False)
# 一级标题
first_title = Column(String, nullable=False)
# 二级标题
second_title = Column(String)
# 分类id
type_id = Column(Integer)
# 分类名称
type_name = Column(String)
# 种子状态
seed_status = Column(String)
# 状态剩余时间
status_remaining_time = Column(String)
# 做种状态
seeding_status = Column(String)
# 评论数
comment_count = Column(Integer)
# 资源上传时间
upload_time = Column(String)
# 资源大小
size = Column(String)
# 做种数
seed_count = Column(Integer)
# 下载数
download_count = Column(Integer)
# 完成数
completion_count = Column(Integer)
# 发布者
publisher = Column(String)
# 豆瓣评分
douban_rating = Column(Float)
# IMDB评分
imdb_rating = Column(Float)
# 下载链接
download_link = Column(String)
# 详情链接
details_link = Column(String)
# 定义唯一约束
__table_args__ = (
UniqueConstraint('pt_id', 'source_name', name='uq_pt_id_source_name'),
UniqueConstraint('source_name', 'first_title', 'second_title', name='uq_source_titles'),
)