from utils.comm import * from snownlp import SnowNLP import jieba import jieba.posseg as pseg import jieba.analyse class News: def __init__(self, trade_date=datetime.now()): self.trade_date = trade_date.strftime('%Y%m%d') @staticmethod def preprocess(text): seg_list = jieba.lcut(text) # 文本预处理及分词 return seg_list @staticmethod def sentiment_analysis(text): # 情感分析 s = SnowNLP(text) sentiment = s.sentiments # 情感得分,范围0-1,越接近1表示正面情感,越接近0表示负面情感 return sentiment @staticmethod def extract_keywords(text, top_k=5): # 关键词提取 keywords = jieba.analyse.extract_tags(text, topK=top_k, withWeight=False, allowPOS=()) # 提取关键词,默认返回前5个 return keywords @staticmethod def named_entity_recognition(text): # 命名实体识别 words = pseg.cut(text) entities = [] for word, flag in words: # 识别人名、地名、机构名、其它专名 if flag in ['nr', 'ns', 'nt', 'nz']: entities.append((word, flag)) return entities def result(self, news): print("原文:", news) preprocessed_news = self.preprocess(news) sentiment_score = self.sentiment_analysis(news) keywords = self.extract_keywords(news, top_k=3) entities = self.named_entity_recognition(news) print("分词结果:", preprocessed_news) print("情感分析得分:", round(sentiment_score, 2)) print("关键词提取:", keywords) print("命名实体识别:", entities) def news_cctv(self): for _ in range(5): try: # 中央新闻 df = ak.news_cctv(date=self.trade_date) # 情感分析 # df['title'].apply(lambda x: self.result(x)) # print(df['情感']) return df except Exception as e: print(f"{self.trade_date} 日的新闻咨询拉取发生错误!{e.__traceback__}") time.sleep(1) continue else: print("中央新闻 5次出现错误,请关注!!!") return None def news_cls(self): for _ in range(5): try: # 财联社-电报 df = ak.stock_telegraph_cls() # 情感分析 # df['title'].apply(lambda x: self.result(x)) # print(df['情感']) return df except Exception as e: print(f"{self.trade_date} 财联社-电报 咨询拉取发生错误!{e.__traceback__}") continue else: print("财联社-电报 5次出现错误,请关注!!!") return None @staticmethod def news_stock_by_code(symbol, date_time=None): # 个股新闻 stock_news_em_df = ak.stock_news_em(symbol=symbol) # 将发布时间列转换为日期时间类型 stock_news_em_df['发布时间'] = pd.to_datetime(stock_news_em_df['发布时间']) # print(stock_news_em_df) if date_time is not None: if len(date_time.split("-")) == 1: str_date = datetime.strptime(str(date_time), '%Y%m%d').strftime('%Y-%m-%d') else: str_date = datetime.strptime(str(date_time), '%Y-%m-%d').strftime('%Y-%m-%d') else: # 获取今天的日期 str_date = datetime.now().date() # 使用日期过滤器筛选出今天的新闻 today_news = stock_news_em_df[stock_news_em_df['发布时间'].dt.date == str_date] # 打印今天的新闻 print(today_news['新闻标题']) print(today_news["新闻内容"]) # today_news['新闻标题'].apply(lambda x: result(x)) return today_news def html_page_data(self): cctv_df = self.news_cctv() cls_df = self.news_cls() cctv_content, cls_content = '', '' for index, row in cctv_df.iterrows(): cctv_content += f'