Files
hot-news-api/app/api/v1/analysis.py
2026-03-26 15:42:37 +08:00

705 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Query
from typing import Optional
from datetime import datetime
import pytz
from app.analysis.trend_analyzer import TrendAnalyzer
from app.analysis.predictor import TrendPredictor
from app.utils.logger import log
from app.core import cache
router = APIRouter()
@router.get(
"/trend",
summary="热点聚合分析",
description="分析各平台热点数据的共性和差异,提取共同关键词、跨平台热点话题等",
response_description="返回热点分析结果",
responses={
200: {"description": "成功获取热点聚合分析"},
500: {"description": "分析过程出错"}
}
)
async def get_trend_analysis(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
),
type: str = Query(
default="main",
description="分析类型main(主题分析), platform(平台对比), cross(跨平台热点), advanced(高级分析)",
enum=["main", "platform", "cross", "advanced"]
)
):
"""
**热点聚合分析**
对多个平台的热点数据进行聚合分析,识别共同话题和传播趋势。
**分析类型说明:**
- `main`: 主题分析 - 提取核心热点话题和关键词
- `platform`: 平台对比 - 比较不同平台的特点和差异
- `cross`: 跨平台热点 - 识别在多个平台同时出现的话题
- `advanced`: 高级分析 - 提供更深层次的数据洞察
**应用场景:**
- 舆情监控
- 热点追踪
- 内容运营决策
- 市场趋势分析
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 从缓存中获取数据
cache_key = f"analysis:trend:{date}:{type}"
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved trend analysis from cache for {date}, type: {type}")
return cached_data
# 如果缓存中没有,则生成新的分析数据
analyzer = TrendAnalyzer()
result = analyzer.get_analysis(date, type)
return result
except Exception as e:
log.error(f"Error in trend analysis: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
}
@router.get(
"/platform-comparison",
summary="平台对比分析",
description="分析各平台热点数据的特点、热度排行、更新频率等,比较不同平台间的异同",
response_description="返回平台对比分析结果",
responses={
200: {"description": "成功获取平台对比分析"}
}
)
async def get_platform_comparison(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
)
):
"""
**平台对比分析**
对比分析不同平台的特点、热度分布和内容特征。
**分析维度:**
- 数据统计:各平台新闻数量、平均热度
- 更新频率:数据更新速度
- 热度排行:按平台热度排序
- 特征分析:各平台的内容特点
**适用场景:**
- 平台选择决策
- 投放策略制定
- 用户画像分析
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 从缓存中获取数据
cache_key = f"analysis:trend:{date}:platform_comparison"
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved platform comparison from cache for {date}")
return cached_data
# 如果缓存中没有,则生成新的分析数据
analyzer = TrendAnalyzer()
result = analyzer.get_platform_comparison(date)
return result
except Exception as e:
log.error(f"Error in platform comparison: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
}
@router.get(
"/cross-platform",
summary="跨平台热点分析",
description="分析在多个平台上出现的热点话题,以及热点的传播路径",
response_description="返回跨平台热点分析结果",
responses={
200: {
"description": "成功获取跨平台热点分析",
"content": {
"application/json": {
"example": {
"status": "success",
"date": "2024-01-15",
"cross_platform_topics": [
{
"topic": "某明星事件",
"platforms": ["weibo", "douyin", "baidu"],
"first_platform": "weibo",
"spread_path": ["weibo", "zhihu", "baidu", "douyin"],
"heat_trend": "rising"
}
],
"topic_count": 15,
"avg_platforms_per_topic": 3.5
}
}
}
}
}
)
async def get_cross_platform_analysis(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
),
refresh: bool = Query(
default=False,
description="是否强制刷新缓存"
)
):
"""
**跨平台热点分析**
识别在多个平台同时出现的热点话题,分析传播路径和发展趋势。
**分析内容:**
- 跨平台话题识别
- 首发平台判断
- 传播路径追踪
- 热度趋势分析
**价值:**
- 发现全网热点
- 追踪舆情走向
- 把握传播规律
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 从缓存中获取数据
cache_key = f"analysis:trend:{date}:cross_platform"
# 如果不是强制刷新,尝试从缓存获取
if not refresh:
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved cross platform analysis from cache for {date}")
return cached_data
# 如果缓存中没有或需要刷新,则生成新的分析数据
analyzer = TrendAnalyzer()
result = analyzer.get_cross_platform_analysis(date, refresh)
return result
except Exception as e:
log.error(f"Error in cross platform analysis: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
}
@router.get(
"/advanced",
summary="高级分析",
description="提供更深入的热点分析,包括关键词云图、情感分析、热点演变趋势等",
response_description="返回高级分析结果",
responses={
200: {
"description": "成功获取高级分析",
"content": {
"application/json": {
"example": {
"status": "success",
"date": "2024-01-15",
"keyword_cloud": [{"word": "AI", "weight": 95}],
"sentiment_analysis": {
"positive": 45,
"neutral": 40,
"negative": 15
},
"evolution_trend": [
{"hour": "00:00", "heat": 5000},
{"hour": "12:00", "heat": 8500}
]
}
}
}
}
}
)
async def get_advanced_analysis(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
),
refresh: bool = Query(
default=False,
description="是否强制刷新缓存"
)
):
"""
**高级分析**
提供深度的数据分析功能,包括词云、情感分析和演变趋势。
**分析模块:**
- 关键词云图:可视化展示高频词汇
- 情感分析:正/中/负向情感分布
- 演变趋势:时间维度上的热度变化
- 深度洞察:数据背后的规律
**适用场景:**
- 深度报告生成
- 趋势研究
- 舆情分析
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 从缓存中获取数据
cache_key = f"analysis:trend:{date}:advanced_analysis"
# 如果不是强制刷新,尝试从缓存获取
if not refresh:
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved advanced analysis from cache for {date}")
return cached_data
# 如果缓存中没有或需要刷新,则生成新的分析数据
analyzer = TrendAnalyzer()
result = analyzer.get_advanced_analysis(date, refresh)
return result
except Exception as e:
log.error(f"Error in advanced analysis: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
}
@router.get(
"/prediction",
summary="热点趋势预测",
description="基于历史数据预测热点话题的发展趋势,包括上升趋势、下降趋势、持续热门话题等",
response_description="返回热点预测结果",
responses={
200: {
"description": "成功获取热点预测",
"content": {
"application/json": {
"example": {
"status": "success",
"date": "2024-01-15",
"rising_topics": [
{"topic": "新技术发布", "trend": "rising", "confidence": 0.85}
],
"declining_topics": [
{"topic": "旧闻", "trend": "declining", "confidence": 0.75}
],
"sustained_hot_topics": [
{"topic": "持续热点", "trend": "stable", "duration": "3 天"}
]
}
}
}
}
}
)
async def get_trend_prediction(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
)
):
"""
**热点趋势预测**
基于历史数据和算法模型预测热点话题的未来发展趋势。
**预测类型:**
- 上升趋势:热度正在上涨的话题
- 下降趋势:热度逐渐消退的话题
- 持续热门:长期保持高热度的话题
**输出指标:**
- 趋势方向rising/declining/stable
- 置信度:预测的可信程度 (0-1)
- 持续时间:话题预计持续的时间
**应用价值:**
- 提前布局内容
- 把握流量机会
- 规避过时话题
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 从缓存中获取数据
cache_key = f"analysis:prediction:{date}"
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved trend prediction from cache for {date}")
return cached_data
# 如果缓存中没有,则生成新的预测数据
predictor = TrendPredictor()
result = predictor.get_prediction(date)
return result
except Exception as e:
log.error(f"Error in trend prediction: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
}
@router.get(
"/keyword-cloud",
summary="关键词云图",
description="提取热点数据中的关键词,按不同类别(科技、娱乐、社会等)进行分类,用于生成词云",
response_description="返回关键词云图数据",
responses={
200: {
"description": "成功获取关键词云图",
"content": {
"application/json": {
"example": {
"status": "success",
"date": "2024-01-15",
"keyword_clouds": {
"科技": [
{"word": "人工智能", "weight": 95},
{"word": "大模型", "weight": 88}
],
"娱乐": [
{"word": "电影", "weight": 75},
{"word": "明星", "weight": 70}
]
},
"total_keywords": 200
}
}
}
}
}
)
async def get_keyword_cloud(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
),
refresh: bool = Query(
default=False,
description="是否强制刷新缓存"
),
platforms: Optional[str] = Query(
default=None,
description="指定平台,多个平台用逗号分隔,如 baidu,weibo",
example="baidu,weibo"
),
category: Optional[str] = Query(
default=None,
description="指定分类,如科技、娱乐等",
example="科技"
),
keyword_count: int = Query(
default=200,
ge=50,
le=1000,
description="返回的关键词数量,范围 50-1000"
)
):
"""
**关键词云图**
从热点数据中提取高频关键词,并按类别分类,可用于可视化展示。
**参数说明:**
- `platforms`: 限定分析的平台范围
- `category`: 筛选特定分类的关键词
- `keyword_count`: 返回的关键词数量
**分类体系:**
- 科技互联网、数码、AI 等
- 娱乐:影视、明星、综艺等
- 社会:民生、时事等
- 体育:赛事、运动员等
- 财经:经济、金融、股市等
**数据格式:**
每个关键词包含:
- `word`: 词语本身
- `weight`: 权重值(基于词频和重要性)
**应用场景:**
- 数据可视化
- 内容标签生成
- 主题挖掘
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 从缓存中获取数据
cache_key = f"analysis:keyword_cloud:{date}"
# 如果不是强制刷新,尝试从缓存获取
if not refresh:
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved keyword cloud from cache for {date}")
# 如果指定了分类,过滤结果
if category and cached_data.get("status") == "success" and "keyword_clouds" in cached_data:
if category in cached_data["keyword_clouds"]:
filtered_data = cached_data.copy()
filtered_data["keyword_clouds"] = {category: cached_data["keyword_clouds"][category]}
return filtered_data
return cached_data
# 如果缓存中没有或需要刷新,则生成新的关键词云数据
analyzer = TrendAnalyzer()
result = analyzer.get_keyword_cloud(date, refresh, keyword_count)
return result
except Exception as e:
log.error(f"Error in keyword cloud analysis: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
}
@router.get(
"/data-visualization",
summary="数据可视化分析",
description="提供热点数据的可视化分析,包括主题热度分布图",
response_description="返回数据可视化分析结果",
responses={
200: {
"description": "成功获取数据可视化分析",
"content": {
"application/json": {
"example": {
"status": "success",
"date": "2024-01-15",
"theme_distribution": {
"科技": 35,
"娱乐": 25,
"社会": 20,
"财经": 15,
"体育": 5
},
"platform_heatmap": {
"weibo": {"morning": 80, "afternoon": 95, "evening": 100},
"zhihu": {"morning": 60, "afternoon": 75, "evening": 85}
},
"charts_data": {...}
}
}
}
}
}
)
async def get_data_visualization(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
),
refresh: bool = Query(
default=False,
description="是否强制刷新缓存"
),
platforms: str = Query(
default=None,
description="指定要分析的平台,多个平台用逗号分隔",
example="baidu,weibo,douyin"
)
):
"""
**数据可视化分析**
提供用于前端可视化的结构化数据分析结果。
**可视化类型:**
- 主题热度分布图:饼图/柱状图数据
- 平台热力图:时间段热度对比
- 趋势线图:时间序列数据
- 排行榜TOP N 数据
**输出格式:**
数据格式适合常见图表库ECharts、Chart.js 等)直接使用。
**支持的平台筛选:**
可以指定部分平台进行分析,减少数据量。
**典型应用:**
- Dashboard 展示
- 数据报表
- 实时监控大屏
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 从缓存中获取数据
cache_key = f"analysis:data_visualization:{date}"
# 如果不是强制刷新,尝试从缓存获取
if not refresh:
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved data visualization from cache for {date}")
return cached_data
# 解析平台参数
platform_list = None
if platforms:
platform_list = [p.strip() for p in platforms.split(",") if p.strip()]
# 如果缓存中没有或需要刷新,则生成新的可视化数据
analyzer = TrendAnalyzer()
result = analyzer.get_data_visualization(date, refresh, platform_list)
return result
except Exception as e:
log.error(f"Error in data visualization: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
}
@router.get(
"/trend-forecast",
summary="热点趋势预测分析",
description="分析热点话题的演变趋势,预测热点的发展方向",
response_description="返回热点趋势预测分析结果",
responses={
200: {
"description": "成功获取热点趋势预测",
"content": {
"application/json": {
"example": {
"status": "success",
"date": "2024-01-15",
"time_range": "24h",
"forecast": [
{
"topic": "热门话题 A",
"current_heat": 8500,
"predicted_heat": 9500,
"trend": "rising",
"confidence": 0.82
},
{
"topic": "热门话题 B",
"current_heat": 6000,
"predicted_heat": 4500,
"trend": "declining",
"confidence": 0.75
}
],
"key_insights": "预计未来 24 小时内,科技类话题将持续升温..."
}
}
}
}
}
)
async def get_trend_forecast(
date: Optional[str] = Query(
default=None,
description="指定日期,格式为 YYYY-MM-DD默认为当天",
example="2024-01-15"
),
refresh: bool = Query(
default=False,
description="是否强制刷新缓存"
),
time_range: str = Query(
default="24h",
description="预测时间范围24h(24 小时), 7d(7 天), 30d(30 天)",
enum=["24h", "7d", "30d"]
)
):
"""
**热点趋势预测分析**
基于时间序列分析预测热点话题的未来发展趋势。
**预测时间范围:**
- `24h`: 短期预测,适合快速变化的话题
- `7d`: 中期预测,适合持续性话题
- `30d`: 长期预测,适合重大事件
**输出内容:**
- 当前热度值
- 预测热度值
- 趋势方向(上升/下降/平稳)
- 置信度评分
- 关键洞察总结
**预测维度:**
- 整体热度走势
- 分话题发展趋势
- 平台表现预测
- 新兴话题发现
**业务价值:**
- 内容策划参考
- 资源投放决策
- 风险预警
"""
try:
if not date:
date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 验证时间范围参数
valid_time_ranges = ["24h", "7d", "30d"]
if time_range not in valid_time_ranges:
time_range = "24h" # 默认使用 24 小时
# 从缓存中获取数据
cache_key = f"analysis:trend_forecast:{date}:{time_range}"
# 如果不是强制刷新,尝试从缓存获取
if not refresh:
cached_data = cache.get_cache(cache_key)
if cached_data:
log.info(f"Retrieved trend forecast from cache for {date}, time_range: {time_range}")
return cached_data
# 如果缓存中没有或需要刷新,则生成新的趋势预测数据
analyzer = TrendAnalyzer()
result = analyzer.get_trend_forecast(date, refresh, time_range)
return result
except Exception as e:
log.error(f"Error in trend forecast: {e}")
return {
"status": "error",
"message": str(e),
"date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d"),
"time_range": time_range
}