125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
from utils.comm import *
|
||
from snownlp import SnowNLP
|
||
import jieba
|
||
import jieba.posseg as pseg
|
||
import jieba.analyse
|
||
|
||
|
||
class News:
|
||
def __init__(self, trade_date=datetime.now()):
|
||
self.trade_date = trade_date.strftime('%Y%m%d')
|
||
|
||
@staticmethod
|
||
def preprocess(text):
|
||
seg_list = jieba.lcut(text) # 文本预处理及分词
|
||
return seg_list
|
||
|
||
@staticmethod
|
||
def sentiment_analysis(text): # 情感分析
|
||
s = SnowNLP(text)
|
||
sentiment = s.sentiments # 情感得分,范围0-1,越接近1表示正面情感,越接近0表示负面情感
|
||
return sentiment
|
||
|
||
@staticmethod
|
||
def extract_keywords(text, top_k=5): # 关键词提取
|
||
keywords = jieba.analyse.extract_tags(text, topK=top_k, withWeight=False, allowPOS=()) # 提取关键词,默认返回前5个
|
||
return keywords
|
||
|
||
@staticmethod
|
||
def named_entity_recognition(text): # 命名实体识别
|
||
words = pseg.cut(text)
|
||
entities = []
|
||
for word, flag in words:
|
||
# 识别人名、地名、机构名、其它专名
|
||
if flag in ['nr', 'ns', 'nt', 'nz']:
|
||
entities.append((word, flag))
|
||
return entities
|
||
|
||
def result(self, news):
|
||
print("原文:", news)
|
||
preprocessed_news = self.preprocess(news)
|
||
sentiment_score = self.sentiment_analysis(news)
|
||
keywords = self.extract_keywords(news, top_k=3)
|
||
entities = self.named_entity_recognition(news)
|
||
|
||
print("分词结果:", preprocessed_news)
|
||
print("情感分析得分:", round(sentiment_score, 2))
|
||
print("关键词提取:", keywords)
|
||
print("命名实体识别:", entities)
|
||
|
||
def news_cctv(self):
|
||
for _ in range(5):
|
||
try:
|
||
# 中央新闻
|
||
df = ak.news_cctv(date=self.trade_date)
|
||
# 情感分析
|
||
# df['title'].apply(lambda x: self.result(x))
|
||
# print(df['情感'])
|
||
return df
|
||
except Exception as e:
|
||
print(f"{self.trade_date} 日的新闻咨询拉取发生错误!{e.__traceback__}")
|
||
time.sleep(1)
|
||
continue
|
||
else:
|
||
print("中央新闻 5次出现错误,请关注!!!")
|
||
return None
|
||
|
||
def news_cls(self):
|
||
for _ in range(5):
|
||
try:
|
||
# 财联社-电报
|
||
df = ak.stock_telegraph_cls()
|
||
# 情感分析
|
||
# df['title'].apply(lambda x: self.result(x))
|
||
# print(df['情感'])
|
||
return df
|
||
except Exception as e:
|
||
print(f"{self.trade_date} 财联社-电报 咨询拉取发生错误!{e.__traceback__}")
|
||
continue
|
||
else:
|
||
print("财联社-电报 5次出现错误,请关注!!!")
|
||
return None
|
||
|
||
@staticmethod
|
||
def news_stock_by_code(symbol, date_time=None):
|
||
# 个股新闻
|
||
stock_news_em_df = ak.stock_news_em(symbol=symbol)
|
||
# 将发布时间列转换为日期时间类型
|
||
stock_news_em_df['发布时间'] = pd.to_datetime(stock_news_em_df['发布时间'])
|
||
# print(stock_news_em_df)
|
||
if date_time is not None:
|
||
if len(date_time.split("-")) == 1:
|
||
str_date = datetime.strptime(str(date_time), '%Y%m%d').strftime('%Y-%m-%d')
|
||
else:
|
||
str_date = datetime.strptime(str(date_time), '%Y-%m-%d').strftime('%Y-%m-%d')
|
||
else:
|
||
# 获取今天的日期
|
||
str_date = datetime.now().date()
|
||
# 使用日期过滤器筛选出今天的新闻
|
||
today_news = stock_news_em_df[stock_news_em_df['发布时间'].dt.date == str_date]
|
||
# 打印今天的新闻
|
||
print(today_news['新闻标题'])
|
||
print(today_news["新闻内容"])
|
||
# today_news['新闻标题'].apply(lambda x: result(x))
|
||
return today_news
|
||
|
||
def html_page_data(self):
|
||
cctv_df = self.news_cctv()
|
||
cls_df = self.news_cls()
|
||
cctv_content, cls_content = '', ''
|
||
for index, row in cctv_df.iterrows():
|
||
cctv_content += f'<tr><th scope="row">{row["title"]}</th>' \
|
||
f'<td>开发中...</td>'
|
||
cctv_content += '</tr>'
|
||
for index, row in cls_df.iterrows():
|
||
if len(row["标题"]) > 1:
|
||
cls_content += f'<tr><th scope="row">{row["标题"]}</th>' \
|
||
f'<td>开发中...</td>'
|
||
cls_content += '</tr>'
|
||
return cctv_content, cls_content
|
||
|
||
|
||
if __name__ == '__main__':
|
||
date_obj = datetime.strptime('20231019', "%Y%m%d")
|
||
print(News().news_cctv())
|