from fastapi import APIRouter, Query from typing import Optional from datetime import datetime import pytz from app.analysis.trend_analyzer import TrendAnalyzer from app.analysis.predictor import TrendPredictor from app.utils.logger import log from app.core import cache router = APIRouter() @router.get( "/trend", summary="热点聚合分析", description="分析各平台热点数据的共性和差异,提取共同关键词、跨平台热点话题等", response_description="返回热点分析结果", responses={ 200: {"description": "成功获取热点聚合分析"}, 500: {"description": "分析过程出错"} } ) async def get_trend_analysis( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ), type: str = Query( default="main", description="分析类型:main(主题分析), platform(平台对比), cross(跨平台热点), advanced(高级分析)", enum=["main", "platform", "cross", "advanced"] ) ): """ **热点聚合分析** 对多个平台的热点数据进行聚合分析,识别共同话题和传播趋势。 **分析类型说明:** - `main`: 主题分析 - 提取核心热点话题和关键词 - `platform`: 平台对比 - 比较不同平台的特点和差异 - `cross`: 跨平台热点 - 识别在多个平台同时出现的话题 - `advanced`: 高级分析 - 提供更深层次的数据洞察 **应用场景:** - 舆情监控 - 热点追踪 - 内容运营决策 - 市场趋势分析 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 从缓存中获取数据 cache_key = f"analysis:trend:{date}:{type}" cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved trend analysis from cache for {date}, type: {type}") return cached_data # 如果缓存中没有,则生成新的分析数据 analyzer = TrendAnalyzer() result = analyzer.get_analysis(date, type) return result except Exception as e: log.error(f"Error in trend analysis: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") } @router.get( "/platform-comparison", summary="平台对比分析", description="分析各平台热点数据的特点、热度排行、更新频率等,比较不同平台间的异同", response_description="返回平台对比分析结果", responses={ 200: {"description": "成功获取平台对比分析"} } ) async def get_platform_comparison( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ) ): """ **平台对比分析** 对比分析不同平台的特点、热度分布和内容特征。 **分析维度:** - 数据统计:各平台新闻数量、平均热度 - 更新频率:数据更新速度 - 热度排行:按平台热度排序 - 特征分析:各平台的内容特点 **适用场景:** - 平台选择决策 - 投放策略制定 - 用户画像分析 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 从缓存中获取数据 cache_key = f"analysis:trend:{date}:platform_comparison" cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved platform comparison from cache for {date}") return cached_data # 如果缓存中没有,则生成新的分析数据 analyzer = TrendAnalyzer() result = analyzer.get_platform_comparison(date) return result except Exception as e: log.error(f"Error in platform comparison: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") } @router.get( "/cross-platform", summary="跨平台热点分析", description="分析在多个平台上出现的热点话题,以及热点的传播路径", response_description="返回跨平台热点分析结果", responses={ 200: { "description": "成功获取跨平台热点分析", "content": { "application/json": { "example": { "status": "success", "date": "2024-01-15", "cross_platform_topics": [ { "topic": "某明星事件", "platforms": ["weibo", "douyin", "baidu"], "first_platform": "weibo", "spread_path": ["weibo", "zhihu", "baidu", "douyin"], "heat_trend": "rising" } ], "topic_count": 15, "avg_platforms_per_topic": 3.5 } } } } } ) async def get_cross_platform_analysis( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ), refresh: bool = Query( default=False, description="是否强制刷新缓存" ) ): """ **跨平台热点分析** 识别在多个平台同时出现的热点话题,分析传播路径和发展趋势。 **分析内容:** - 跨平台话题识别 - 首发平台判断 - 传播路径追踪 - 热度趋势分析 **价值:** - 发现全网热点 - 追踪舆情走向 - 把握传播规律 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 从缓存中获取数据 cache_key = f"analysis:trend:{date}:cross_platform" # 如果不是强制刷新,尝试从缓存获取 if not refresh: cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved cross platform analysis from cache for {date}") return cached_data # 如果缓存中没有或需要刷新,则生成新的分析数据 analyzer = TrendAnalyzer() result = analyzer.get_cross_platform_analysis(date, refresh) return result except Exception as e: log.error(f"Error in cross platform analysis: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") } @router.get( "/advanced", summary="高级分析", description="提供更深入的热点分析,包括关键词云图、情感分析、热点演变趋势等", response_description="返回高级分析结果", responses={ 200: { "description": "成功获取高级分析", "content": { "application/json": { "example": { "status": "success", "date": "2024-01-15", "keyword_cloud": [{"word": "AI", "weight": 95}], "sentiment_analysis": { "positive": 45, "neutral": 40, "negative": 15 }, "evolution_trend": [ {"hour": "00:00", "heat": 5000}, {"hour": "12:00", "heat": 8500} ] } } } } } ) async def get_advanced_analysis( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ), refresh: bool = Query( default=False, description="是否强制刷新缓存" ) ): """ **高级分析** 提供深度的数据分析功能,包括词云、情感分析和演变趋势。 **分析模块:** - 关键词云图:可视化展示高频词汇 - 情感分析:正/中/负向情感分布 - 演变趋势:时间维度上的热度变化 - 深度洞察:数据背后的规律 **适用场景:** - 深度报告生成 - 趋势研究 - 舆情分析 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 从缓存中获取数据 cache_key = f"analysis:trend:{date}:advanced_analysis" # 如果不是强制刷新,尝试从缓存获取 if not refresh: cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved advanced analysis from cache for {date}") return cached_data # 如果缓存中没有或需要刷新,则生成新的分析数据 analyzer = TrendAnalyzer() result = analyzer.get_advanced_analysis(date, refresh) return result except Exception as e: log.error(f"Error in advanced analysis: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") } @router.get( "/prediction", summary="热点趋势预测", description="基于历史数据预测热点话题的发展趋势,包括上升趋势、下降趋势、持续热门话题等", response_description="返回热点预测结果", responses={ 200: { "description": "成功获取热点预测", "content": { "application/json": { "example": { "status": "success", "date": "2024-01-15", "rising_topics": [ {"topic": "新技术发布", "trend": "rising", "confidence": 0.85} ], "declining_topics": [ {"topic": "旧闻", "trend": "declining", "confidence": 0.75} ], "sustained_hot_topics": [ {"topic": "持续热点", "trend": "stable", "duration": "3 天"} ] } } } } } ) async def get_trend_prediction( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ) ): """ **热点趋势预测** 基于历史数据和算法模型预测热点话题的未来发展趋势。 **预测类型:** - 上升趋势:热度正在上涨的话题 - 下降趋势:热度逐渐消退的话题 - 持续热门:长期保持高热度的话题 **输出指标:** - 趋势方向:rising/declining/stable - 置信度:预测的可信程度 (0-1) - 持续时间:话题预计持续的时间 **应用价值:** - 提前布局内容 - 把握流量机会 - 规避过时话题 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 从缓存中获取数据 cache_key = f"analysis:prediction:{date}" cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved trend prediction from cache for {date}") return cached_data # 如果缓存中没有,则生成新的预测数据 predictor = TrendPredictor() result = predictor.get_prediction(date) return result except Exception as e: log.error(f"Error in trend prediction: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") } @router.get( "/keyword-cloud", summary="关键词云图", description="提取热点数据中的关键词,按不同类别(科技、娱乐、社会等)进行分类,用于生成词云", response_description="返回关键词云图数据", responses={ 200: { "description": "成功获取关键词云图", "content": { "application/json": { "example": { "status": "success", "date": "2024-01-15", "keyword_clouds": { "科技": [ {"word": "人工智能", "weight": 95}, {"word": "大模型", "weight": 88} ], "娱乐": [ {"word": "电影", "weight": 75}, {"word": "明星", "weight": 70} ] }, "total_keywords": 200 } } } } } ) async def get_keyword_cloud( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ), refresh: bool = Query( default=False, description="是否强制刷新缓存" ), platforms: Optional[str] = Query( default=None, description="指定平台,多个平台用逗号分隔,如 baidu,weibo", example="baidu,weibo" ), category: Optional[str] = Query( default=None, description="指定分类,如科技、娱乐等", example="科技" ), keyword_count: int = Query( default=200, ge=50, le=1000, description="返回的关键词数量,范围 50-1000" ) ): """ **关键词云图** 从热点数据中提取高频关键词,并按类别分类,可用于可视化展示。 **参数说明:** - `platforms`: 限定分析的平台范围 - `category`: 筛选特定分类的关键词 - `keyword_count`: 返回的关键词数量 **分类体系:** - 科技:互联网、数码、AI 等 - 娱乐:影视、明星、综艺等 - 社会:民生、时事等 - 体育:赛事、运动员等 - 财经:经济、金融、股市等 **数据格式:** 每个关键词包含: - `word`: 词语本身 - `weight`: 权重值(基于词频和重要性) **应用场景:** - 数据可视化 - 内容标签生成 - 主题挖掘 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 从缓存中获取数据 cache_key = f"analysis:keyword_cloud:{date}" # 如果不是强制刷新,尝试从缓存获取 if not refresh: cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved keyword cloud from cache for {date}") # 如果指定了分类,过滤结果 if category and cached_data.get("status") == "success" and "keyword_clouds" in cached_data: if category in cached_data["keyword_clouds"]: filtered_data = cached_data.copy() filtered_data["keyword_clouds"] = {category: cached_data["keyword_clouds"][category]} return filtered_data return cached_data # 如果缓存中没有或需要刷新,则生成新的关键词云数据 analyzer = TrendAnalyzer() result = analyzer.get_keyword_cloud(date, refresh, keyword_count) return result except Exception as e: log.error(f"Error in keyword cloud analysis: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") } @router.get( "/data-visualization", summary="数据可视化分析", description="提供热点数据的可视化分析,包括主题热度分布图", response_description="返回数据可视化分析结果", responses={ 200: { "description": "成功获取数据可视化分析", "content": { "application/json": { "example": { "status": "success", "date": "2024-01-15", "theme_distribution": { "科技": 35, "娱乐": 25, "社会": 20, "财经": 15, "体育": 5 }, "platform_heatmap": { "weibo": {"morning": 80, "afternoon": 95, "evening": 100}, "zhihu": {"morning": 60, "afternoon": 75, "evening": 85} }, "charts_data": {...} } } } } } ) async def get_data_visualization( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ), refresh: bool = Query( default=False, description="是否强制刷新缓存" ), platforms: str = Query( default=None, description="指定要分析的平台,多个平台用逗号分隔", example="baidu,weibo,douyin" ) ): """ **数据可视化分析** 提供用于前端可视化的结构化数据分析结果。 **可视化类型:** - 主题热度分布图:饼图/柱状图数据 - 平台热力图:时间段热度对比 - 趋势线图:时间序列数据 - 排行榜:TOP N 数据 **输出格式:** 数据格式适合常见图表库(ECharts、Chart.js 等)直接使用。 **支持的平台筛选:** 可以指定部分平台进行分析,减少数据量。 **典型应用:** - Dashboard 展示 - 数据报表 - 实时监控大屏 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 从缓存中获取数据 cache_key = f"analysis:data_visualization:{date}" # 如果不是强制刷新,尝试从缓存获取 if not refresh: cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved data visualization from cache for {date}") return cached_data # 解析平台参数 platform_list = None if platforms: platform_list = [p.strip() for p in platforms.split(",") if p.strip()] # 如果缓存中没有或需要刷新,则生成新的可视化数据 analyzer = TrendAnalyzer() result = analyzer.get_data_visualization(date, refresh, platform_list) return result except Exception as e: log.error(f"Error in data visualization: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") } @router.get( "/trend-forecast", summary="热点趋势预测分析", description="分析热点话题的演变趋势,预测热点的发展方向", response_description="返回热点趋势预测分析结果", responses={ 200: { "description": "成功获取热点趋势预测", "content": { "application/json": { "example": { "status": "success", "date": "2024-01-15", "time_range": "24h", "forecast": [ { "topic": "热门话题 A", "current_heat": 8500, "predicted_heat": 9500, "trend": "rising", "confidence": 0.82 }, { "topic": "热门话题 B", "current_heat": 6000, "predicted_heat": 4500, "trend": "declining", "confidence": 0.75 } ], "key_insights": "预计未来 24 小时内,科技类话题将持续升温..." } } } } } ) async def get_trend_forecast( date: Optional[str] = Query( default=None, description="指定日期,格式为 YYYY-MM-DD,默认为当天", example="2024-01-15" ), refresh: bool = Query( default=False, description="是否强制刷新缓存" ), time_range: str = Query( default="24h", description="预测时间范围:24h(24 小时), 7d(7 天), 30d(30 天)", enum=["24h", "7d", "30d"] ) ): """ **热点趋势预测分析** 基于时间序列分析预测热点话题的未来发展趋势。 **预测时间范围:** - `24h`: 短期预测,适合快速变化的话题 - `7d`: 中期预测,适合持续性话题 - `30d`: 长期预测,适合重大事件 **输出内容:** - 当前热度值 - 预测热度值 - 趋势方向(上升/下降/平稳) - 置信度评分 - 关键洞察总结 **预测维度:** - 整体热度走势 - 分话题发展趋势 - 平台表现预测 - 新兴话题发现 **业务价值:** - 内容策划参考 - 资源投放决策 - 风险预警 """ try: if not date: date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d") # 验证时间范围参数 valid_time_ranges = ["24h", "7d", "30d"] if time_range not in valid_time_ranges: time_range = "24h" # 默认使用 24 小时 # 从缓存中获取数据 cache_key = f"analysis:trend_forecast:{date}:{time_range}" # 如果不是强制刷新,尝试从缓存获取 if not refresh: cached_data = cache.get_cache(cache_key) if cached_data: log.info(f"Retrieved trend forecast from cache for {date}, time_range: {time_range}") return cached_data # 如果缓存中没有或需要刷新,则生成新的趋势预测数据 analyzer = TrendAnalyzer() result = analyzer.get_trend_forecast(date, refresh, time_range) return result except Exception as e: log.error(f"Error in trend forecast: {e}") return { "status": "error", "message": str(e), "date": date or datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d"), "time_range": time_range }