241 lines
9.0 KiB
Python
241 lines
9.0 KiB
Python
import time
|
||
import traceback
|
||
import threading
|
||
from datetime import datetime
|
||
from functools import wraps
|
||
import pytz
|
||
import signal
|
||
from typing import List, Dict, Any, Optional, Callable
|
||
|
||
from app.services import crawler_factory, _scheduler
|
||
from app.utils.logger import log
|
||
from app.core import db, cache
|
||
from app.core.config import get_crawler_config
|
||
from app.utils.notification import notification_manager
|
||
|
||
# 获取爬虫配置
|
||
crawler_config = get_crawler_config()
|
||
|
||
# 配置常量
|
||
CRAWLER_INTERVAL = crawler_config.interval
|
||
CRAWLER_TIMEOUT = crawler_config.timeout
|
||
MAX_RETRY_COUNT = crawler_config.max_retry_count
|
||
SHANGHAI_TZ = pytz.timezone('Asia/Shanghai')
|
||
|
||
class CrawlerTimeoutError(Exception):
|
||
"""爬虫超时异常"""
|
||
pass
|
||
|
||
def timeout_handler(func: Callable, timeout: int = CRAWLER_TIMEOUT) -> Callable:
|
||
"""超时处理装饰器,支持Unix信号和线程两种实现"""
|
||
@wraps(func)
|
||
def wrapper(*args, **kwargs):
|
||
# 线程实现的超时机制
|
||
result = [None]
|
||
exception = [None]
|
||
completed = [False]
|
||
|
||
def target():
|
||
try:
|
||
result[0] = func(*args, **kwargs)
|
||
except Exception as e:
|
||
exception[0] = e
|
||
finally:
|
||
completed[0] = True
|
||
|
||
thread = threading.Thread(target=target)
|
||
thread.daemon = True
|
||
thread.start()
|
||
thread.join(timeout)
|
||
|
||
if not completed[0]:
|
||
error_msg = f"Function {func.__name__} timed out after {timeout} seconds"
|
||
log.error(error_msg)
|
||
raise CrawlerTimeoutError(error_msg)
|
||
|
||
if exception[0]:
|
||
log.error(f"Function {func.__name__} raised an exception: {exception[0]}")
|
||
raise exception[0]
|
||
|
||
return result[0]
|
||
return wrapper
|
||
|
||
def safe_fetch(crawler_name: str, crawler, date_str: str, is_retry: bool = False) -> List[Dict[str, Any]]:
|
||
"""安全地执行爬虫抓取,处理异常并返回结果"""
|
||
try:
|
||
news_list = crawler.fetch(date_str)
|
||
if news_list and len(news_list) > 0:
|
||
cache_key = f"crawler:{crawler_name}:{date_str}"
|
||
cache.set_cache(key=cache_key, value=news_list, expire=0)
|
||
|
||
log.info(f"{crawler_name} fetch success, {len(news_list)} news fetched")
|
||
return news_list
|
||
else:
|
||
log.info(f"{'Second time ' if is_retry else ''}crawler {crawler_name} failed. 0 news fetched")
|
||
return []
|
||
except Exception as e:
|
||
error_msg = traceback.format_exc()
|
||
log.error(f"{'Second time ' if is_retry else ''}crawler {crawler_name} error: {error_msg}")
|
||
|
||
# 发送钉钉通知
|
||
try:
|
||
notification_manager.notify_crawler_error(
|
||
crawler_name=crawler_name,
|
||
error_msg=str(e),
|
||
date_str=date_str,
|
||
is_retry=is_retry
|
||
)
|
||
except Exception as notify_error:
|
||
log.error(f"Failed to send notification for crawler {crawler_name}: {notify_error}")
|
||
|
||
return []
|
||
|
||
def run_data_analysis(date_str: str):
|
||
"""执行数据分析并缓存结果"""
|
||
log.info(f"Starting data analysis for date {date_str}")
|
||
try:
|
||
# 导入分析模块(在这里导入避免循环依赖)
|
||
from app.analysis.trend_analyzer import TrendAnalyzer
|
||
from app.analysis.predictor import TrendPredictor
|
||
|
||
# 创建分析器实例
|
||
analyzer = TrendAnalyzer()
|
||
predictor = TrendPredictor()
|
||
|
||
# 1. 生成关键词云图数据并缓存
|
||
log.info("Generating keyword cloud data...")
|
||
analyzer.get_keyword_cloud(date_str, refresh=True)
|
||
|
||
# 2. 生成热点聚合分析数据并缓存
|
||
log.info("Generating trend analysis data...")
|
||
analyzer.get_analysis(date_str, analysis_type="main")
|
||
|
||
# 3. 生成跨平台热点分析数据并缓存
|
||
log.info("Generating cross-platform analysis data...")
|
||
analyzer.get_cross_platform_analysis(date_str, refresh=True)
|
||
|
||
# 4. 生成热点趋势预测数据并缓存
|
||
log.info("Generating trend prediction data...")
|
||
predictor.get_prediction(date_str)
|
||
|
||
# 5. 生成平台对比分析数据并缓存
|
||
log.info("Generating platform comparison data...")
|
||
analyzer.get_platform_comparison(date_str)
|
||
|
||
# 6. 生成高级分析数据并缓存
|
||
log.info("Generating advanced analysis data...")
|
||
analyzer.get_advanced_analysis(date_str, refresh=True)
|
||
|
||
# 7. 生成数据可视化分析数据并缓存
|
||
log.info("Generating data visualization analysis...")
|
||
analyzer.get_data_visualization(date_str, refresh=True)
|
||
|
||
# 8. 生成趋势预测分析数据并缓存
|
||
log.info("Generating trend forecast data...")
|
||
analyzer.get_trend_forecast(date_str, refresh=True)
|
||
|
||
log.info(f"All data analysis completed for date {date_str}")
|
||
except Exception as e:
|
||
error_msg = traceback.format_exc()
|
||
log.error(f"Error during data analysis: {str(e)}")
|
||
log.error(error_msg)
|
||
|
||
# 发送数据分析异常通知
|
||
try:
|
||
notification_manager.notify_analysis_error(
|
||
error_msg=str(e),
|
||
date_str=date_str
|
||
)
|
||
except Exception as notify_error:
|
||
log.error(f"Failed to send analysis error notification: {notify_error}")
|
||
|
||
@_scheduler.scheduled_job('interval', id='crawlers_logic', seconds=CRAWLER_INTERVAL,
|
||
max_instances=crawler_config.max_instances,
|
||
misfire_grace_time=crawler_config.misfire_grace_time)
|
||
def crawlers_logic():
|
||
"""爬虫主逻辑,包含超时保护和错误处理"""
|
||
|
||
@timeout_handler
|
||
def crawler_work():
|
||
now_time = datetime.now(SHANGHAI_TZ)
|
||
date_str = now_time.strftime("%Y-%m-%d")
|
||
log.info(f"Starting crawler job at {now_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
retry_crawler = []
|
||
success_count = 0
|
||
failed_crawlers = []
|
||
|
||
for crawler_name, crawler in crawler_factory.items():
|
||
news_list = safe_fetch(crawler_name, crawler, date_str)
|
||
if news_list:
|
||
success_count += 1
|
||
else:
|
||
retry_crawler.append(crawler_name)
|
||
failed_crawlers.append(crawler_name)
|
||
|
||
# 第二轮爬取(重试失败的爬虫)
|
||
if retry_crawler:
|
||
log.info(f"Retrying {len(retry_crawler)} failed crawlers")
|
||
retry_failed = []
|
||
for crawler_name in retry_crawler:
|
||
news_list = safe_fetch(crawler_name, crawler_factory[crawler_name], date_str, is_retry=True)
|
||
if news_list:
|
||
success_count += 1
|
||
# 从失败列表中移除成功的爬虫
|
||
if crawler_name in failed_crawlers:
|
||
failed_crawlers.remove(crawler_name)
|
||
else:
|
||
retry_failed.append(crawler_name)
|
||
|
||
# 记录完成时间
|
||
end_time = datetime.now(SHANGHAI_TZ)
|
||
duration = (end_time - now_time).total_seconds()
|
||
log.info(f"Crawler job finished at {end_time.strftime('%Y-%m-%d %H:%M:%S')}, "
|
||
f"duration: {duration:.2f}s, success: {success_count}/{len(crawler_factory)}")
|
||
|
||
# 发送通知
|
||
try:
|
||
notification_manager.notify_crawler_summary(
|
||
success_count=success_count,
|
||
total_count=len(crawler_factory),
|
||
failed_crawlers=failed_crawlers,
|
||
duration=duration,
|
||
date_str=date_str
|
||
)
|
||
except Exception as notify_error:
|
||
log.error(f"Failed to send crawler notification: {notify_error}")
|
||
|
||
# 爬取完成后执行数据分析
|
||
log.info("Crawler job completed, starting data analysis...")
|
||
# 使用新线程执行分析,避免阻塞主线程
|
||
threading.Thread(target=run_data_analysis, args=(date_str,), daemon=True).start()
|
||
|
||
return success_count
|
||
|
||
try:
|
||
return crawler_work()
|
||
except CrawlerTimeoutError as e:
|
||
log.error(f"Crawler job timeout: {str(e)}")
|
||
# 发送超时通知
|
||
try:
|
||
notification_manager.notify_crawler_timeout(
|
||
timeout_seconds=CRAWLER_TIMEOUT,
|
||
date_str=date_str
|
||
)
|
||
except Exception as notify_error:
|
||
log.error(f"Failed to send timeout notification: {notify_error}")
|
||
return 0
|
||
except Exception as e:
|
||
log.error(f"Crawler job error: {str(e)}")
|
||
log.error(traceback.format_exc())
|
||
# 发送通用异常通知
|
||
try:
|
||
notification_manager.notify_crawler_error(
|
||
crawler_name="crawler_job",
|
||
error_msg=str(e),
|
||
date_str=date_str
|
||
)
|
||
except Exception as notify_error:
|
||
log.error(f"Failed to send error notification: {notify_error}")
|
||
return 0
|