286 lines
9.5 KiB
Python
286 lines
9.5 KiB
Python
import json
|
|
import time
|
|
import hmac
|
|
import hashlib
|
|
import base64
|
|
import urllib.parse
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
import requests
|
|
import pytz
|
|
|
|
from app.utils.logger import log
|
|
from app.core.config import get_notification_config
|
|
|
|
|
|
class DingTalkNotifier:
|
|
"""钉钉机器人通知器"""
|
|
|
|
def __init__(self):
|
|
self.config = get_notification_config()
|
|
self.webhook_url = self.config.get('dingtalk', {}).get('webhook_url', '')
|
|
self.secret = self.config.get('dingtalk', {}).get('secret', '')
|
|
self.enabled = self.config.get('dingtalk', {}).get('enabled', False)
|
|
self.timeout = self.config.get('dingtalk', {}).get('timeout', 10)
|
|
self.notify_success = self.config.get('dingtalk', {}).get('notify_success', False)
|
|
self.shanghai_tz = pytz.timezone('Asia/Shanghai')
|
|
|
|
if not self.webhook_url and self.enabled:
|
|
log.warning("DingTalk webhook URL not configured, notifications will be disabled")
|
|
self.enabled = False
|
|
|
|
def _generate_sign(self, timestamp: int) -> str:
|
|
"""生成钉钉机器人签名"""
|
|
if not self.secret:
|
|
return ""
|
|
|
|
string_to_sign = f'{timestamp}\n{self.secret}'
|
|
hmac_code = hmac.new(
|
|
self.secret.encode('utf-8'),
|
|
string_to_sign.encode('utf-8'),
|
|
digestmod=hashlib.sha256
|
|
).digest()
|
|
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
|
return sign
|
|
|
|
def _send_message(self, message: Dict[str, Any]) -> bool:
|
|
"""发送消息到钉钉"""
|
|
if not self.enabled:
|
|
log.debug("DingTalk notifications are disabled")
|
|
return False
|
|
|
|
try:
|
|
# 生成时间戳和签名
|
|
timestamp = int(round(time.time() * 1000))
|
|
sign = self._generate_sign(timestamp)
|
|
|
|
# 构建请求URL
|
|
url = self.webhook_url
|
|
if sign:
|
|
url += f"×tamp={timestamp}&sign={sign}"
|
|
|
|
# 发送请求
|
|
response = requests.post(
|
|
url,
|
|
json=message,
|
|
timeout=self.timeout,
|
|
headers={'Content-Type': 'application/json'}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
if result.get('errcode') == 0:
|
|
log.info("DingTalk notification sent successfully")
|
|
return True
|
|
else:
|
|
log.error(f"DingTalk API error: {result.get('errmsg', 'Unknown error')}")
|
|
return False
|
|
else:
|
|
log.error(f"DingTalk HTTP error: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
log.error(f"Failed to send DingTalk notification: {str(e)}")
|
|
return False
|
|
|
|
def send_text_message(self, content: str, at_mobiles: Optional[List[str]] = None,
|
|
at_all: bool = False) -> bool:
|
|
"""发送文本消息"""
|
|
message = {
|
|
"msgtype": "text",
|
|
"text": {
|
|
"content": content
|
|
}
|
|
}
|
|
|
|
if at_mobiles or at_all:
|
|
message["at"] = {
|
|
"atMobiles": at_mobiles or [],
|
|
"isAtAll": at_all
|
|
}
|
|
|
|
return self._send_message(message)
|
|
|
|
def send_markdown_message(self, title: str, text: str,
|
|
at_mobiles: Optional[List[str]] = None,
|
|
at_all: bool = False) -> bool:
|
|
"""发送Markdown消息"""
|
|
message = {
|
|
"msgtype": "markdown",
|
|
"markdown": {
|
|
"title": title,
|
|
"text": text
|
|
}
|
|
}
|
|
|
|
if at_mobiles or at_all:
|
|
message["at"] = {
|
|
"atMobiles": at_mobiles or [],
|
|
"isAtAll": at_all
|
|
}
|
|
|
|
return self._send_message(message)
|
|
|
|
def send_crawler_error(self, crawler_name: str, error_msg: str,
|
|
date_str: str, is_retry: bool = False) -> bool:
|
|
"""发送爬虫错误通知"""
|
|
current_time = datetime.now(self.shanghai_tz).strftime("%Y-%m-%d %H:%M:%S")
|
|
retry_text = "重试失败" if is_retry else "首次失败"
|
|
|
|
title = f"🚨 爬虫异常通知 - {crawler_name}"
|
|
content = f"""
|
|
## {title}
|
|
|
|
**时间**: {current_time}\n
|
|
**爬虫**: {crawler_name}\n
|
|
**日期**: {date_str}\n
|
|
**状态**: {retry_text}\n
|
|
**错误信息**:
|
|
```
|
|
{error_msg}
|
|
```
|
|
|
|
请及时检查爬虫状态!
|
|
""".strip()
|
|
|
|
# 异常时@所有人
|
|
return self.send_markdown_message(title, content, at_all=True)
|
|
|
|
def send_crawler_timeout(self, timeout_seconds: int, date_str: str) -> bool:
|
|
"""发送爬虫超时通知"""
|
|
current_time = datetime.now(self.shanghai_tz).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
title = "⏰ 爬虫超时通知"
|
|
content = f"""
|
|
## {title}
|
|
|
|
**时间**: {current_time}\n
|
|
**日期**: {date_str}\n
|
|
**超时时长**: {timeout_seconds}秒\n
|
|
**状态**: 爬虫任务执行超时被强制终止
|
|
|
|
请检查爬虫性能或调整超时配置!
|
|
""".strip()
|
|
|
|
# 超时异常时@所有人
|
|
return self.send_markdown_message(title, content, at_all=True)
|
|
|
|
def send_crawler_summary(self, success_count: int, total_count: int,
|
|
failed_crawlers: List[str], duration: float,
|
|
date_str: str) -> bool:
|
|
"""发送爬虫执行摘要通知"""
|
|
# 全部成功且未启用正常通知时,不发送
|
|
if success_count == total_count and not self.notify_success:
|
|
return True
|
|
|
|
current_time = datetime.now(self.shanghai_tz).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# 构建失败爬虫列表
|
|
failed_list = "\n".join([f"- {name}" for name in failed_crawlers]) if failed_crawlers else ""
|
|
|
|
if failed_crawlers:
|
|
title = f"🚨 爬虫执行摘要 - {date_str}"
|
|
else:
|
|
title = f"📊 爬虫执行摘要 - {date_str}"
|
|
|
|
# 根据是否有失败构建不同的内容
|
|
if failed_crawlers:
|
|
content = f"""
|
|
## {title}
|
|
|
|
**时间**: {current_time}\n
|
|
**日期**: {date_str}\n
|
|
**执行时长**: {duration:.2f}秒\n
|
|
**成功**: {success_count}/{total_count}\n
|
|
**失败**: {len(failed_crawlers)}
|
|
|
|
**失败的爬虫**:
|
|
{failed_list}
|
|
|
|
请关注失败的爬虫状态!
|
|
""".strip()
|
|
else:
|
|
content = f"""
|
|
## {title}
|
|
|
|
**时间**: {current_time}\n
|
|
**日期**: {date_str}\n
|
|
**执行时长**: {duration:.2f}秒\n
|
|
**成功**: {success_count}/{total_count}\n
|
|
**失败**: {len(failed_crawlers)}
|
|
|
|
所有爬虫执行成功!
|
|
""".strip()
|
|
|
|
# 有失败时@所有人,没失败时不@
|
|
at_all = len(failed_crawlers) > 0
|
|
return self.send_markdown_message(title, content, at_all=at_all)
|
|
|
|
def send_analysis_error(self, error_msg: str, date_str: str) -> bool:
|
|
"""发送数据分析错误通知"""
|
|
current_time = datetime.now(self.shanghai_tz).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
title = "🔍 数据分析异常通知"
|
|
content = f"""
|
|
## {title}
|
|
|
|
**时间**: {current_time}\n
|
|
**日期**: {date_str}\n
|
|
**错误信息**:
|
|
```
|
|
{error_msg}
|
|
```
|
|
|
|
数据分析任务执行失败,请检查分析模块!
|
|
""".strip()
|
|
|
|
# 分析异常时@所有人
|
|
return self.send_markdown_message(title, content, at_all=True)
|
|
|
|
class NotificationManager:
|
|
"""通知管理器,支持多种通知方式"""
|
|
|
|
def __init__(self):
|
|
self.dingtalk = DingTalkNotifier()
|
|
# 可以在这里添加其他通知方式,如企业微信、邮件等
|
|
|
|
def is_enabled(self) -> bool:
|
|
"""检查通知是否启用"""
|
|
return self.dingtalk.enabled
|
|
|
|
@property
|
|
def webhook_url(self) -> str:
|
|
"""获取webhook URL"""
|
|
return self.dingtalk.webhook_url
|
|
|
|
def send_text(self, content: str, at_all: bool = False) -> bool:
|
|
"""发送文本消息"""
|
|
return self.dingtalk.send_text_message(content, at_all=at_all)
|
|
|
|
def send_markdown(self, title: str, text: str, at_all: bool = False) -> bool:
|
|
"""发送Markdown消息"""
|
|
return self.dingtalk.send_markdown_message(title, text, at_all=at_all)
|
|
|
|
def notify_crawler_error(self, crawler_name: str, error_msg: str,
|
|
date_str: str, is_retry: bool = False):
|
|
"""通知爬虫错误"""
|
|
self.dingtalk.send_crawler_error(crawler_name, error_msg, date_str, is_retry)
|
|
|
|
def notify_crawler_timeout(self, timeout_seconds: int, date_str: str):
|
|
"""通知爬虫超时"""
|
|
self.dingtalk.send_crawler_timeout(timeout_seconds, date_str)
|
|
|
|
def notify_crawler_summary(self, success_count: int, total_count: int,
|
|
failed_crawlers: List[str], duration: float,
|
|
date_str: str):
|
|
"""通知爬虫执行摘要"""
|
|
self.dingtalk.send_crawler_summary(success_count, total_count,
|
|
failed_crawlers, duration, date_str)
|
|
|
|
def notify_analysis_error(self, error_msg: str, date_str: str):
|
|
"""通知数据分析错误"""
|
|
self.dingtalk.send_analysis_error(error_msg, date_str)
|
|
|
|
|
|
# 全局通知管理器实例
|
|
notification_manager = NotificationManager() |