Files
hot-news-api/app/services/crawler.py
2026-03-26 15:04:59 +08:00

241 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import traceback
import threading
from datetime import datetime
from functools import wraps
import pytz
import signal
from typing import List, Dict, Any, Optional, Callable
from app.services import crawler_factory, _scheduler
from app.utils.logger import log
from app.core import db, cache
from app.core.config import get_crawler_config
from app.utils.notification import notification_manager
# 获取爬虫配置
crawler_config = get_crawler_config()
# 配置常量
CRAWLER_INTERVAL = crawler_config.interval
CRAWLER_TIMEOUT = crawler_config.timeout
MAX_RETRY_COUNT = crawler_config.max_retry_count
SHANGHAI_TZ = pytz.timezone('Asia/Shanghai')
class CrawlerTimeoutError(Exception):
"""爬虫超时异常"""
pass
def timeout_handler(func: Callable, timeout: int = CRAWLER_TIMEOUT) -> Callable:
"""超时处理装饰器支持Unix信号和线程两种实现"""
@wraps(func)
def wrapper(*args, **kwargs):
# 线程实现的超时机制
result = [None]
exception = [None]
completed = [False]
def target():
try:
result[0] = func(*args, **kwargs)
except Exception as e:
exception[0] = e
finally:
completed[0] = True
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
thread.join(timeout)
if not completed[0]:
error_msg = f"Function {func.__name__} timed out after {timeout} seconds"
log.error(error_msg)
raise CrawlerTimeoutError(error_msg)
if exception[0]:
log.error(f"Function {func.__name__} raised an exception: {exception[0]}")
raise exception[0]
return result[0]
return wrapper
def safe_fetch(crawler_name: str, crawler, date_str: str, is_retry: bool = False) -> List[Dict[str, Any]]:
"""安全地执行爬虫抓取,处理异常并返回结果"""
try:
news_list = crawler.fetch(date_str)
if news_list and len(news_list) > 0:
cache_key = f"crawler:{crawler_name}:{date_str}"
cache.set_cache(key=cache_key, value=news_list, expire=0)
log.info(f"{crawler_name} fetch success, {len(news_list)} news fetched")
return news_list
else:
log.info(f"{'Second time ' if is_retry else ''}crawler {crawler_name} failed. 0 news fetched")
return []
except Exception as e:
error_msg = traceback.format_exc()
log.error(f"{'Second time ' if is_retry else ''}crawler {crawler_name} error: {error_msg}")
# 发送钉钉通知
try:
notification_manager.notify_crawler_error(
crawler_name=crawler_name,
error_msg=str(e),
date_str=date_str,
is_retry=is_retry
)
except Exception as notify_error:
log.error(f"Failed to send notification for crawler {crawler_name}: {notify_error}")
return []
def run_data_analysis(date_str: str):
"""执行数据分析并缓存结果"""
log.info(f"Starting data analysis for date {date_str}")
try:
# 导入分析模块(在这里导入避免循环依赖)
from app.analysis.trend_analyzer import TrendAnalyzer
from app.analysis.predictor import TrendPredictor
# 创建分析器实例
analyzer = TrendAnalyzer()
predictor = TrendPredictor()
# 1. 生成关键词云图数据并缓存
log.info("Generating keyword cloud data...")
analyzer.get_keyword_cloud(date_str, refresh=True)
# 2. 生成热点聚合分析数据并缓存
log.info("Generating trend analysis data...")
analyzer.get_analysis(date_str, analysis_type="main")
# 3. 生成跨平台热点分析数据并缓存
log.info("Generating cross-platform analysis data...")
analyzer.get_cross_platform_analysis(date_str, refresh=True)
# 4. 生成热点趋势预测数据并缓存
log.info("Generating trend prediction data...")
predictor.get_prediction(date_str)
# 5. 生成平台对比分析数据并缓存
log.info("Generating platform comparison data...")
analyzer.get_platform_comparison(date_str)
# 6. 生成高级分析数据并缓存
log.info("Generating advanced analysis data...")
analyzer.get_advanced_analysis(date_str, refresh=True)
# 7. 生成数据可视化分析数据并缓存
log.info("Generating data visualization analysis...")
analyzer.get_data_visualization(date_str, refresh=True)
# 8. 生成趋势预测分析数据并缓存
log.info("Generating trend forecast data...")
analyzer.get_trend_forecast(date_str, refresh=True)
log.info(f"All data analysis completed for date {date_str}")
except Exception as e:
error_msg = traceback.format_exc()
log.error(f"Error during data analysis: {str(e)}")
log.error(error_msg)
# 发送数据分析异常通知
try:
notification_manager.notify_analysis_error(
error_msg=str(e),
date_str=date_str
)
except Exception as notify_error:
log.error(f"Failed to send analysis error notification: {notify_error}")
@_scheduler.scheduled_job('interval', id='crawlers_logic', seconds=CRAWLER_INTERVAL,
max_instances=crawler_config.max_instances,
misfire_grace_time=crawler_config.misfire_grace_time)
def crawlers_logic():
"""爬虫主逻辑,包含超时保护和错误处理"""
@timeout_handler
def crawler_work():
now_time = datetime.now(SHANGHAI_TZ)
date_str = now_time.strftime("%Y-%m-%d")
log.info(f"Starting crawler job at {now_time.strftime('%Y-%m-%d %H:%M:%S')}")
retry_crawler = []
success_count = 0
failed_crawlers = []
for crawler_name, crawler in crawler_factory.items():
news_list = safe_fetch(crawler_name, crawler, date_str)
if news_list:
success_count += 1
else:
retry_crawler.append(crawler_name)
failed_crawlers.append(crawler_name)
# 第二轮爬取(重试失败的爬虫)
if retry_crawler:
log.info(f"Retrying {len(retry_crawler)} failed crawlers")
retry_failed = []
for crawler_name in retry_crawler:
news_list = safe_fetch(crawler_name, crawler_factory[crawler_name], date_str, is_retry=True)
if news_list:
success_count += 1
# 从失败列表中移除成功的爬虫
if crawler_name in failed_crawlers:
failed_crawlers.remove(crawler_name)
else:
retry_failed.append(crawler_name)
# 记录完成时间
end_time = datetime.now(SHANGHAI_TZ)
duration = (end_time - now_time).total_seconds()
log.info(f"Crawler job finished at {end_time.strftime('%Y-%m-%d %H:%M:%S')}, "
f"duration: {duration:.2f}s, success: {success_count}/{len(crawler_factory)}")
# 发送通知
try:
notification_manager.notify_crawler_summary(
success_count=success_count,
total_count=len(crawler_factory),
failed_crawlers=failed_crawlers,
duration=duration,
date_str=date_str
)
except Exception as notify_error:
log.error(f"Failed to send crawler notification: {notify_error}")
# 爬取完成后执行数据分析
log.info("Crawler job completed, starting data analysis...")
# 使用新线程执行分析,避免阻塞主线程
threading.Thread(target=run_data_analysis, args=(date_str,), daemon=True).start()
return success_count
try:
return crawler_work()
except CrawlerTimeoutError as e:
log.error(f"Crawler job timeout: {str(e)}")
# 发送超时通知
try:
notification_manager.notify_crawler_timeout(
timeout_seconds=CRAWLER_TIMEOUT,
date_str=date_str
)
except Exception as notify_error:
log.error(f"Failed to send timeout notification: {notify_error}")
return 0
except Exception as e:
log.error(f"Crawler job error: {str(e)}")
log.error(traceback.format_exc())
# 发送通用异常通知
try:
notification_manager.notify_crawler_error(
crawler_name="crawler_job",
error_msg=str(e),
date_str=date_str
)
except Exception as notify_error:
log.error(f"Failed to send error notification: {notify_error}")
return 0