This commit is contained in:
2026-03-26 15:04:59 +08:00
commit e0af97ac7f
65 changed files with 7366 additions and 0 deletions

View File

View File

@@ -0,0 +1,99 @@
import json
import datetime
import requests
import urllib3
from bs4 import BeautifulSoup
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class BaiduNewsCrawler(Crawler):
# 返回news_list
def fetch(self, date_str) -> list:
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
resp = requests.get(url=url, params=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
json_data = resp.json()
contents = json_data.get("data")["cards"][0]["content"][0]["content"]
result = []
cache_list = []
for content in contents:
title = content.get("word")
url = content.get("url")
desc = content.get("desc")
score = content.get("hotScore")
# replace url m to www
url = url.replace("m.", "www.")
news = {
'title': title,
'url': url,
'content': desc,
'source': 'baidu',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S') # 使用格式化的时间字符串
}
result.append(news)
cache_list.append(news) # 直接添加字典json.dumps会在后面处理整个列表
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "baidu"
@staticmethod
def fetch_v0():
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://top.baidu.com/board?tab=realtime"
proxies = {
# "http": "http://127.0.0.1:7890",
# "https": "http://127.0.0.0:7890"
}
header = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,"
"*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"upgrade-insecure-requests": 1,
"host": "www.baidu.com",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/86.0.4240.183 Safari/537.36"
}
html = requests.get(url=url, params=header, verify=False, proxies=proxies)
html.encoding = "utf-8"
html_text = html.text
soup = BeautifulSoup(html_text, "html.parser")
main_content = soup.find_all("main")[0]
news_main_content = main_content.find("div", style='margin-bottom:20px')
div_elements = news_main_content.find_all('div', class_='category-wrap_iQLoo horizontal_1eKyQ')
result = []
for div_element in div_elements:
hot_index = div_element.find(class_='hot-index_1Bl1a').text.strip()
news_title = div_element.find(class_='c-single-text-ellipsis').text.strip()
news_link = div_element.find('a', class_='title_dIF3B')['href']
news = {
'title': news_title,
'url': news_link,
'content': "",
'source': 'baidu',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S') # 使用格式化的时间字符串
}
result.append(news)
return result

View File

@@ -0,0 +1,64 @@
import json
import datetime
import requests
import urllib3
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class BilibiliCrawler(Crawler):
def fetch(self, date_str):
current_time = datetime.datetime.now()
url = "https://api.bilibili.com/x/web-interface/popular"
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"Chrome/122.0.0.0 Safari/537.36"
"AppleWebKit/537.36 (KHTML, like Gecko) "
),
"Referer": "https://www.bilibili.com/",
}
resp = requests.get(url=url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
data = resp.json()
if data["code"] != 0:
print(f"API error: {data['message']}")
return []
result = []
cache_list = []
for item in data["data"].get("list", []):
title = item.get("title", "")
bvid = item.get("bvid", "")
desc = item.get("desc", "")
video_url = f"https://www.bilibili.com/video/{bvid}"
news = {
'title': title,
'url': video_url,
'content': desc,
'source': 'bilibili',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "bilibili"

100
app/services/sites/cls.py Normal file
View File

@@ -0,0 +1,100 @@
import json
import datetime
import requests
import urllib3
from .crawler import Crawler
from ...core import cache
urllib3.disable_warnings()
class CLSCrawler(Crawler):
"""财联社"""
def fetch(self, date_str) -> list:
current_time = datetime.datetime.now()
try:
params = {
'app': 'CailianpressWeb',
'os': 'web',
'sv': '8.4.6',
'sign': '9f8797a1f4de66c2370f7a03990d2737'
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.cls.cn/',
'Origin': 'https://www.cls.cn'
}
response = requests.get(
"https://www.cls.cn/featured/v1/column/list",
params=params,
headers=headers,
timeout=self.timeout,
verify=False
)
response.raise_for_status()
data = response.json()
if data.get('errno') != 0:
return []
column_list = data.get('data', {}).get('column_list', [])
result = []
cache_list = []
for idx, column in enumerate(column_list[:20]):
try:
title = column.get('title', '').strip()
if not title or len(title) < 2:
continue
article_list = column.get('article_list', {})
if article_list:
article_title = article_list.get('title', '').strip()
jump_url = article_list.get('jump_url', '').strip()
brief = article_list.get('brief', '').strip()
if article_title:
display_title = f"[{title}] {article_title}"
content = brief if brief else article_title
url = "https://www.cls.cn/telegraph"
else:
display_title = title
content = column.get('brief', '').strip()
url = f"https://www.cls.cn/telegraph"
else:
display_title = title
content = column.get('brief', '').strip()
url = f"https://www.cls.cn/telegraph"
news = {
'title': display_title,
'url': url,
'content': content,
'source': 'cls',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S'),
'score': 1000 - idx,
'rank': idx + 1
}
result.append(news)
cache_list.append(news)
except Exception:
continue
if cache_list:
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
return []
def crawler_name(self):
return "cls"

View File

@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class Crawler(ABC):
def __init__(self):
self.header = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,"
"*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/86.0.4240.183 Safari/537.36"
}
self.timeout = 10
@abstractmethod
def fetch(self, date_str: str) -> List[Dict[str, Any]]:
"""获取新闻列表"""
pass
@abstractmethod
def crawler_name(self) -> str:
"""获取爬虫名称"""
pass

View File

@@ -0,0 +1,79 @@
import json
import re
import datetime
import requests
import urllib3
from bs4 import BeautifulSoup
from ...core import cache
from .crawler import Crawler
urllib3.disable_warnings()
class DouBanCrawler(Crawler):
"""豆瓣网"""
def fetch(self, date_str):
current_time = datetime.datetime.now()
url = "https://www.douban.com/group/explore"
header = self.header.copy()
header.update({
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-encoding": "",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
"host": "www.douban.com",
"referer": "https://www.douban.com/group/explore",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
})
resp = requests.get(url=url, headers=header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
html_text = resp.text
soup = BeautifulSoup(html_text, "html.parser")
topic_list = soup.find_all('div', class_='channel-item')
result = []
cache_list = []
for topic in topic_list:
title_elem = topic.find('h3')
if not title_elem:
continue
link_elem = title_elem.find('a')
if not link_elem:
continue
title = link_elem.text.strip()
url = link_elem.get('href')
desc_elem = topic.find('div', class_='content')
desc = desc_elem.text.strip() if desc_elem else ""
news = {
'title': title,
'url': url,
'content': desc,
'source': 'douban',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "douban"

View File

@@ -0,0 +1,111 @@
import json
import datetime
import time
import requests
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from ...core import cache
from ...db.mysql import News
from .crawler import Crawler
from ..browser_manager import BrowserManager
class DouYinCrawler(Crawler):
def fetch(self, date_str):
return self.fetch_v2(date_str)
def fetch_v1(self, date_str):
current_time = datetime.datetime.now()
url = "https://www.douyin.com/hot"
browser_manager = BrowserManager()
try:
# 使用浏览器管理器获取页面内容
page_source, driver = browser_manager.get_page_content(url, wait_time=5)
result = []
cache_list = []
# 抖音热榜条目li 标签里含 /video/ 链接)
items = driver.find_elements(By.XPATH, '//li[a[contains(@href, "/video/")]]')
for item in items:
try:
# 提取标题(含 # 标签或较长文本)
title_elem = item.find_element(By.XPATH, './/div[contains(text(), "#") or string-length(text()) > 10]')
# 提取链接
link_elem = item.find_element(By.XPATH, './/a[contains(@href, "/video/")]')
# 提取热度
hot_elem = item.find_element(By.XPATH, './/span[contains(text(), "") or contains(text(), "亿")]')
title = title_elem.text.strip()
item_url = "https://www.douyin.com" + link_elem.get_attribute("href")
hot = hot_elem.text.strip()
news = {
'title': title,
'url': item_url,
'content': f"热度: {hot}",
'source': 'douyin',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
except Exception:
continue # 跳过无效项
# 缓存并返回
if cache_list:
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
return []
def fetch_v2(self, date_str):
current_time = datetime.datetime.now()
url = "https://www.douyin.com/aweme/v1/web/hot/search/list/?device_platform=webapp&aid=6383&channel=channel_pc_web&detail_list=1&source=6&pc_client_type=1&pc_libra_divert=Windows&support_h265=1&support_dash=1&version_code=170400&version_name=17.4.0&cookie_enabled=true&screen_width=1920&screen_height=1080&browser_language=zh-CN&browser_platform=Win32&browser_name=Chrome&browser_version=136.0.0.0&browser_online=true&engine_name=Blink&engine_version=136.0.0.0&os_name=Windows&os_version=10&cpu_core_num=16&device_memory=8&platform=PC&downlink=10&effective_type=4g&round_trip_time=50&webid=7490997798633555467"
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"Chrome/122.0.0.0 Safari/537.36"
"AppleWebKit/537.36 (KHTML, like Gecko) "
),
"Referer": "https://www.douyin.com/",
}
resp = requests.get(url=url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
data = resp.json()
# https://www.douyin.com/hot/2094286?&trending_topic=%E5%A4%8F%E5%A4%A9%E7%9A%84%E5%91%B3%E9%81%93%E5%9C%A8%E6%8A%96%E9%9F%B3&previous_page=main_page&enter_method=trending_topic&modeFrom=hotDetail&tab_name=trend&position=1&hotValue=11892557
result = []
cache_list = []
for item in data["data"]["word_list"]:
title = item["word"]
url = f"https://www.douyin.com/hot/{item['sentence_id']}?&trending_topic={item['word']}&previous_page=main_page&enter_method=trending_topic&modeFrom=hotDetail&tab_name=trend&position=1&hotValue={item['hot_value']}"
news = {
'title': title,
'url': url,
'content': title,
'source': 'douyin',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "douyin"

View File

@@ -0,0 +1,88 @@
import json
import datetime
import requests
import urllib3
from .crawler import Crawler
from ...core import cache
urllib3.disable_warnings()
class EastMoneyCrawler(Crawler):
"""东方财富网"""
def fetch(self, date_str) -> list:
current_time = datetime.datetime.now()
try:
params = {
'client': 'web',
'biz': 'web_724',
'fastColumn': '102',
'sortEnd': '',
'pageSize': '50',
'req_trace': str(int(current_time.timestamp() * 1000)) # 使用当前时间戳
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://kuaixun.eastmoney.com/',
'Origin': 'https://kuaixun.eastmoney.com'
}
response = requests.get(
"https://np-weblist.eastmoney.com/comm/web/getFastNewsList",
params=params,
headers=headers,
timeout=self.timeout,
verify=False
)
response.raise_for_status()
data = response.json()
if data.get('code') != '1':
return []
fast_news_list = data.get('data', {}).get('fastNewsList', [])
result = []
cache_list = []
for idx, news_item in enumerate(fast_news_list[:20]): # 取前20条
try:
title = news_item.get('title', '').strip()
if not title:
continue
summary = news_item.get('summary', '').strip()
show_time = news_item.get('showTime', '').strip()
code = news_item.get('code', '').strip()
url = f"https://finance.eastmoney.com/a/{code}" if code else "https://kuaixun.eastmoney.com/"
news = {
'title': title,
'url': url,
'content': summary,
'source': 'eastmoney',
'publish_time': show_time if show_time else current_time.strftime('%Y-%m-%d %H:%M:%S'),
'score': 1000 - idx,
'rank': idx + 1
}
result.append(news)
cache_list.append(news)
except Exception:
continue
if cache_list:
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
return []
def crawler_name(self):
return "eastmoney"

View File

@@ -0,0 +1,64 @@
from typing import Dict, Type
from .baidu import BaiduNewsCrawler
from .bilibili import BilibiliCrawler
from .crawler import Crawler
from .douban import DouBanCrawler
from .douyin import DouYinCrawler
from .ftpojie import FtPoJieCrawler
from .github import GithubCrawler
from .hackernews import HackerNewsCrawler
from .hupu import HuPuCrawler
from .jinritoutiao import JinRiTouTiaoCrawler
from .juejin import JueJinCrawler
from .sspai import ShaoShuPaiCrawler
from .stackoverflow import StackOverflowCrawler
from .tenxunwang import TenXunWangCrawler
from .tieba import TieBaCrawler
from .tskr import TsKrCrawler
from .vtex import VtexCrawler
from .weibo import WeiboCrawler
from .weixin import WeiXinCrawler
from .zhihu import ZhiHuCrawler
from .sina_finance import SinaFinanceCrawler
from .eastmoney import EastMoneyCrawler
from .xueqiu import XueqiuCrawler
from .cls import CLSCrawler
class CrawlerRegister:
def __init__(self):
self.crawlers = {}
def register(self) -> Dict[str, Crawler]:
"""注册所有爬虫"""
crawler_map = {
"baidu": BaiduNewsCrawler(),
"shaoshupai": ShaoShuPaiCrawler(),
"weibo": WeiboCrawler(),
"zhihu": ZhiHuCrawler(),
"36kr": TsKrCrawler(),
"52pojie": FtPoJieCrawler(),
"bilibili": BilibiliCrawler(),
"douban": DouBanCrawler(),
"hupu": HuPuCrawler(),
"tieba": TieBaCrawler(),
"juejin": JueJinCrawler(),
"douyin": DouYinCrawler(),
"v2ex": VtexCrawler(),
"jinritoutiao": JinRiTouTiaoCrawler(),
"tenxunwang": TenXunWangCrawler(),
"stackoverflow": StackOverflowCrawler(),
"github": GithubCrawler(),
"hackernews": HackerNewsCrawler(),
"sina_finance": SinaFinanceCrawler(),
"eastmoney": EastMoneyCrawler(),
"xueqiu": XueqiuCrawler(),
"cls": CLSCrawler(),
}
self.crawlers = crawler_map
return self.crawlers
def get_crawlers(self):
return self.register().values()

View File

@@ -0,0 +1,69 @@
import json
import datetime # 添加datetime导入
import re
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from ...core import cache
from ...db.mysql import News
from .crawler import Crawler
urllib3.disable_warnings()
class FtPoJieCrawler(Crawler):
"""吾爱破解"""
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://www.52pojie.cn/forum.php?mod=guide&view=hot"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
resp.encoding = 'gbk' # 52pojie使用GBK编码
html_text = resp.text
soup = BeautifulSoup(html_text, "html.parser")
# 找到热门帖子列表
hot_threads = soup.find_all('tbody', id=lambda x: x and x.startswith('normalthread_'))
result = []
cache_list = []
for thread in hot_threads:
title_elem = thread.find('a', class_='xst')
if not title_elem:
continue
title = title_elem.text.strip()
url = "https://www.52pojie.cn/" + title_elem.get('href')
# 获取帖子信息
info_elem = thread.find('td', class_='by')
info = info_elem.text.strip() if info_elem else ""
news = {
'title': title,
'url': url,
'content': info,
'source': '52pojie',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "52pojie"

View File

@@ -0,0 +1,58 @@
import json
import datetime
import requests
import urllib3
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class GithubCrawler(Crawler):
def fetch(self, date_str):
current_time = datetime.datetime.now()
url = "https://api.github.com/search/repositories?q=stars:%3E1&sort=stars"
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"Chrome/122.0.0.0 Safari/537.36"
"AppleWebKit/537.36 (KHTML, like Gecko) "
),
"Referer": "https://github.com/",
}
resp = requests.get(url=url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
data = resp.json()
result = []
cache_list = []
for i, item in enumerate(data["items"]):
title = item.get("full_name", "")
url = item.get("html_url", "")
desc = item.get("description", "")
news = {
'title': title,
'url': url,
'content': desc,
'source': self.crawler_name(),
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "github"

View File

@@ -0,0 +1,235 @@
import json
import datetime
import time
import requests
from bs4 import BeautifulSoup
import urllib3
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from ...core import cache
from ...db.mysql import News
from .crawler import Crawler
from ..browser_manager import BrowserManager
# 禁用SSL警告
urllib3.disable_warnings()
class HackerNewsCrawler(Crawler):
"""hacker news"""
def fetch(self, date_str):
current_time = datetime.datetime.now()
try:
# 首先尝试直接请求方式获取内容
result = self._fetch_with_requests()
if result and len(result) > 0:
# 缓存数据
cache.hset(date_str, self.crawler_name(), json.dumps(result, ensure_ascii=False))
return result
# 如果请求方式失败,尝试使用浏览器模拟获取
browser_manager = BrowserManager()
result = self._fetch_with_browser(browser_manager)
if result and len(result) > 0:
# 缓存数据
cache.hset(date_str, self.crawler_name(), json.dumps(result, ensure_ascii=False))
return result
except Exception as e:
# 如果遇到错误,返回空列表
return []
# 所有方法都失败,返回空列表
return []
def _fetch_with_requests(self):
"""使用requests直接获取Hacker News内容"""
url = "https://news.ycombinator.com/"
try:
# 发送HTTP请求
response = requests.get(url, headers=self.header, timeout=self.timeout)
if response.status_code != 200:
return []
# 解析HTML内容
soup = BeautifulSoup(response.text, 'html.parser')
result = []
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 获取所有新闻条目
items = soup.select("tr.athing")
for item in items:
try:
# 获取ID用于关联评论和元数据
item_id = item.get('id')
if not item_id:
continue
# 获取标题和链接
title_element = item.select_one(".titleline a")
if not title_element:
continue
title = title_element.text.strip()
url = title_element.get('href')
# 如果URL是相对路径转换为绝对路径
if url and not url.startswith('http'):
url = f"https://news.ycombinator.com/{url}"
# 获取来源网站
site_element = item.select_one(".sitestr")
site = site_element.text.strip() if site_element else ""
# 查找下一个tr获取元数据分数、用户、时间等
metadata = item.find_next_sibling('tr')
if not metadata:
continue
# 获取分数
score_element = metadata.select_one(".score")
score = score_element.text.strip() if score_element else "0 points"
# 获取作者
user_element = metadata.select_one(".hnuser")
user = user_element.text.strip() if user_element else "unknown"
# 获取评论数
comments_element = metadata.select_one("a:last-child")
comments = comments_element.text.strip() if comments_element else "0 comments"
if "discuss" in comments:
comments = "0 comments"
# 构建内容摘要
content = f"来源: {site} | 得分: {score} | 作者: {user} | 评论: {comments}"
news = {
'title': title,
'url': url,
'content': content,
'source': 'hackernews',
'publish_time': current_time
}
result.append(news)
# 限制获取前30条
if len(result) >= 30:
break
except Exception as e:
continue
return result
except Exception as e:
return []
def _fetch_with_browser(self, browser_manager):
"""使用浏览器模拟方式获取Hacker News内容"""
url = "https://news.ycombinator.com/"
try:
# 获取页面内容
page_source, driver = browser_manager.get_page_content(url, wait_time=5)
# 等待页面元素加载
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".athing"))
)
except:
# 如果等待超时,仍然尝试获取内容
pass
result = []
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 获取所有新闻条目
items = driver.find_elements(By.CSS_SELECTOR, "tr.athing")
for item in items:
try:
# 获取ID用于关联评论和元数据
item_id = item.get_attribute("id")
if not item_id:
continue
# 获取标题和链接
title_element = item.find_element(By.CSS_SELECTOR, ".titleline a")
title = title_element.text.strip()
url = title_element.get_attribute("href")
# 获取来源网站
site = ""
try:
site_element = item.find_element(By.CSS_SELECTOR, ".sitestr")
site = site_element.text.strip()
except:
pass
# 查找下一个tr获取元数据分数、用户、时间等
try:
metadata = driver.find_element(By.XPATH, f"//tr[@id='{item_id}']/following-sibling::tr[1]")
# 获取分数
score = "0 points"
try:
score_element = metadata.find_element(By.CSS_SELECTOR, ".score")
score = score_element.text.strip()
except:
pass
# 获取作者
user = "unknown"
try:
user_element = metadata.find_element(By.CSS_SELECTOR, ".hnuser")
user = user_element.text.strip()
except:
pass
# 获取评论数
comments = "0 comments"
try:
comments_element = metadata.find_element(By.XPATH, ".//a[last()]")
comments = comments_element.text.strip()
if "discuss" in comments:
comments = "0 comments"
except:
pass
# 构建内容摘要
content = f"来源: {site} | 得分: {score} | 作者: {user} | 评论: {comments}"
except:
content = f"来源: {site}"
news = {
'title': title,
'url': url,
'content': content,
'source': 'hackernews',
'publish_time': current_time
}
result.append(news)
# 限制获取前30条
if len(result) >= 30:
break
except Exception as e:
continue
return result
except Exception as e:
return []
def crawler_name(self):
return "hackernews"

View File

@@ -0,0 +1,72 @@
import json
import datetime # 添加datetime导入
import re
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from ...core import cache
from ...db.mysql import News
from .crawler import Crawler
urllib3.disable_warnings()
class HuPuCrawler(Crawler):
"""虎扑"""
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://bbs.hupu.com/all-gambia"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
html_text = resp.text
soup = BeautifulSoup(html_text, "html.parser")
# 找到热门帖子列表
post_list = soup.find_all('div', class_='t-info')
result = []
cache_list = []
for post in post_list:
title_elem = post.find('span', class_='t-title')
if not title_elem:
continue
link_elem = post.find('a')
if not link_elem:
continue
title = title_elem.text.strip()
url = "https://bbs.hupu.com" + link_elem.get('href') if link_elem.get('href').startswith('/') else link_elem.get('href')
# 获取帖子信息
info_elem = post.find('span', class_='t-replies')
info = info_elem.text.strip() if info_elem else ""
news = {
'title': title,
'url': url,
'content': info,
'source': 'hupu',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "hupu"

View File

@@ -0,0 +1,63 @@
# -- coding: utf-8 --
import json
import datetime # 添加datetime导入
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from ...core import cache
from .crawler import Crawler
urllib3.disable_warnings()
class JinRiTouTiaoCrawler(Crawler):
""" 今日头条 """
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://www.toutiao.com/hot-event/hot-board/?origin=toutiao_pc"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
try:
json_data = resp.json()
data = json_data.get('data', [])
result = []
cache_list = []
for item in data:
title = item.get('Title', '')
url = item.get('Url', '')
hot_value = item.get('HotValue', '')
news = {
'title': title,
'url': url,
'content': f"热度: {hot_value}",
'source': 'jinritoutiao',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"Error parsing JSON: {e}")
return []
def crawler_name(self):
return "jinritoutiao"

View File

@@ -0,0 +1,63 @@
import json
import datetime # 添加datetime导入
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class JueJinCrawler(Crawler):
"""掘金"""
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://api.juejin.cn/content_api/v1/content/article_rank?category_id=1&type=hot"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
try:
json_data = resp.json()
data = json_data.get('data', [])
result = []
cache_list = []
for item in data:
article_info = item.get('content', {})
title = article_info.get('title', '')
article_id = article_info.get('content_id', '')
url = f"https://juejin.cn/post/{article_id}"
news = {
'title': title,
'url': url,
'content': title,
'source': 'juejin',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"Error parsing JSON: {e}")
return []
def crawler_name(self):
return "juejin"

View File

@@ -0,0 +1,20 @@
import datetime
from sqlalchemy import Column, String, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class DailyNews(Base):
__tablename__ = 'tab_daily_news'
id = Column(Integer, primary_key=True)
title = Column(String(255))
desc = Column(String(255))
link = Column(String(255))
type = Column(Integer, default=0)
score = Column(Integer, default=0)
times = Column(Integer, default=0)
create_time = Column(DateTime, default=datetime.datetime.now)
update_time = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now)

View File

@@ -0,0 +1,75 @@
import json
import datetime
import requests
import urllib3
from ...core import cache
from .crawler import Crawler
urllib3.disable_warnings()
class SinaFinanceCrawler(Crawler):
"""新浪财经"""
def fetch(self, date_str):
current_time = datetime.datetime.now()
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://finance.sina.com.cn/',
'Origin': 'https://finance.sina.com.cn'
}
response = requests.get(
"https://zhibo.sina.com.cn/api/zhibo/feed?page=1&page_size=20&zhibo_id=152&tag_id=0&dire=f&dpc=1&pagesize=20",
headers=headers,
timeout=self.timeout,
verify=False
)
response.raise_for_status()
data = response.json()
if data.get('result', {}).get('status', {}).get('code') != 0:
return []
feed_list = data.get('result', {}).get('data', {}).get('feed', {}).get('list', [])
result = []
cache_list = []
for item in feed_list:
try:
title = item.get('rich_text', '').strip()
if not title:
continue
ext_str = item.get('ext', '{}')
try:
ext_data = json.loads(ext_str)
doc_url = ext_data.get('docurl', '')
except:
doc_url = item.get('docurl', '').strip(' "')
news = {
'title': title,
'url': doc_url,
'content': title,
'source': 'sina_finance',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
except Exception:
continue
if cache_list:
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
return []
def crawler_name(self):
return "sina_finance"

View File

@@ -0,0 +1,60 @@
import json
import datetime # 添加datetime导入
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from ...core import cache
from .crawler import Crawler
urllib3.disable_warnings()
class ShaoShuPaiCrawler(Crawler):
"""少数派"""
def fetch(self, date_str):
current_time = datetime.datetime.now()
url = "https://sspai.com/api/v1/article/index/page/get?limit=20&offset=0&created_at=0"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
try:
json_data = resp.json()
data = json_data.get('data', [])
result = []
cache_list = []
for item in data:
title = item.get('title', '')
article_id = item.get('id', '')
url = f"https://sspai.com/post/{article_id}"
summary = item.get('summary', '')
news = {
'title': title,
'url': url,
'content': summary,
'source': 'sspai',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"Error parsing JSON: {e}")
return []
def crawler_name(self):
return "shaoshupai"

View File

@@ -0,0 +1,58 @@
import json
import datetime
import requests
import urllib3
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class StackOverflowCrawler(Crawler):
def fetch(self, date_str):
current_time = datetime.datetime.now()
url = "https://api.stackexchange.com/2.3/questions?order=desc&sort=hot&site=stackoverflow"
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"Chrome/122.0.0.0 Safari/537.36"
"AppleWebKit/537.36 (KHTML, like Gecko) "
),
"Referer": "https://stackoverflow.com/",
}
resp = requests.get(url=url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
data = resp.json()
result = []
cache_list = []
for i, item in enumerate(data["items"]):
title = item.get("title", "")
url = item.get("link", "")
desc = item.get("title", "")
news = {
'title': title,
'url': url,
'content': desc,
'source': 'stackoverflow',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "stackoverflow"

View File

@@ -0,0 +1,65 @@
import json
import datetime
import requests
import urllib3
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class TenXunWangCrawler(Crawler):
"""腾讯网"""
def fetch(self, date_str):
current_time = datetime.datetime.now()
url = "https://i.news.qq.com/gw/event/pc_hot_ranking_list?ids_hash=&offset=0&page_size=51&appver=15.5_qqnews_7.1.60&rank_id=hot"
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"Chrome/122.0.0.0 Safari/537.36"
"AppleWebKit/537.36 (KHTML, like Gecko) "
),
"Referer": "https://news.qq.com/",
}
resp = requests.get(url=url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
data = resp.json()
result = []
cache_list = []
for i, item in enumerate(data["idlist"][0].get("newslist", [])):
if i == 0:
# 腾讯新闻用户最关注的热点每10分钟更新一次
continue
title = item.get("title", "")
url = item.get("url", "")
desc = item.get("abstract", "")
news = {
'title': title,
'url': url,
'content': desc,
'source': 'tenxunwang',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "tenxunwang"

View File

@@ -0,0 +1,65 @@
import json
import datetime # 添加datetime导入
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class TieBaCrawler(Crawler):
"""百度贴吧"""
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
url = "http://tieba.baidu.com/hottopic/browse/topicList"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
try:
json_data = resp.json()
data = json_data.get('data', {}).get('bang_topic', {}).get('topic_list', [])
result = []
cache_list = []
for item in data:
title = item.get('topic_name', '')
url = item.get('topic_url', '')
if url and not url.startswith('http'):
url = f"http://tieba.baidu.com{url}"
desc = item.get('topic_desc', '')
news = {
'title': title,
'url': url,
'content': desc,
'source': 'tieba',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"Error parsing JSON: {e}")
return []
def crawler_name(self):
return "tieba"

View File

@@ -0,0 +1,83 @@
import json
import datetime
import time
import requests
import urllib3
from .crawler import Crawler
from ...core import cache
urllib3.disable_warnings()
class TsKrCrawler(Crawler):
"""36氪"""
def fetch(self, date_str):
"""
获取36氪热榜数据
"""
current_time = datetime.datetime.now()
url = f"https://gateway.36kr.com/api/mis/nav/home/nav/rank/hot"
headers = {
"Content-Type": "application/json; charset=utf-8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
}
body = {
"partner_id": "wap",
"param": {
"siteId": 1,
"platformId": 2,
},
"timestamp": int(time.time() * 1000),
}
try:
resp = requests.post(
url=url,
headers=headers,
json=body,
verify=False,
timeout=self.timeout
)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
json_data = resp.json()
data_key = "hotRankList"
data_list = json_data.get("data", {}).get(data_key, [])
result = []
cache_list = []
for item in data_list:
template_material = item.get("templateMaterial", {})
item_id = item.get("itemId", "")
title = template_material.get("widgetTitle", "")
article_url = f"https://www.36kr.com/p/{item_id}"
news = {
'title': title,
'url': article_url,
'content': title,
'source': '36kr',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S'),
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"Error fetching 36kr data: {e}")
return []
def crawler_name(self):
return "36kr"

View File

@@ -0,0 +1,71 @@
import json
import datetime # 添加datetime导入
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class VtexCrawler(Crawler):
"""v2ex"""
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://www.v2ex.com/?tab=hot"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
html_text = resp.text
soup = BeautifulSoup(html_text, "html.parser")
# 找到热门话题列表
topic_list = soup.find_all('div', class_='cell item')
result = []
cache_list = []
for topic in topic_list:
title_elem = topic.find('span', class_='item_title')
if not title_elem:
continue
link_elem = title_elem.find('a')
if not link_elem:
continue
title = link_elem.text.strip()
url = "https://www.v2ex.com" + link_elem.get('href')
# 获取话题信息
info_elem = topic.find('span', class_='topic_info')
info = info_elem.text.strip() if info_elem else ""
news = {
'title': title,
'url': url,
'content': info,
'source': 'v2ex',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
def crawler_name(self):
return "v2ex"

View File

@@ -0,0 +1,68 @@
import json
import datetime # 添加datetime导入
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from ...core import cache
from .crawler import Crawler
urllib3.disable_warnings()
class WeiboCrawler(Crawler):
"""微博"""
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
header = self.header.copy()
header.update({
"accept": "application/json, text/javascript, */*; q=0.01",
"host": "weibo.com",
"Referer": "https://weibo.com",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
})
url = "https://weibo.com/ajax/side/hotSearch"
resp = requests.get(url=url, headers=header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
try:
json_data = resp.json()
data = json_data.get('data', {}).get('realtime', [])
result = []
cache_list = []
for item in data:
title = item.get('word', '')
url = f"https://s.weibo.com/weibo?q=%23{title}%23"
news = {
'title': title,
'url': url,
'content': title,
'source': 'weibo',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"Error parsing JSON: {e}")
return []
def crawler_name(self):
return "weibo"

View File

@@ -0,0 +1,228 @@
import json
import datetime
import time
import requests
from bs4 import BeautifulSoup
import urllib3
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from ...core import cache
from ...db.mysql import News
from .crawler import Crawler
from ..browser_manager import BrowserManager
# 禁用SSL警告
urllib3.disable_warnings()
class WeiXinCrawler(Crawler):
"""
微信热门内容爬虫
使用微信看一看热门页面获取数据
"""
def fetch(self, date_str):
"""获取微信热门内容"""
current_time = datetime.datetime.now()
browser_manager = BrowserManager()
try:
# 首先尝试从微信看一看获取热门内容
result = self._fetch_from_weixin_kankan(browser_manager)
if result and len(result) > 0:
# 缓存数据
cache.hset(date_str, self.crawler_name(), json.dumps(result, ensure_ascii=False))
return result
# 如果看一看失败,尝试从微信读书获取热门书评
result = self._fetch_from_weixin_dushu(browser_manager)
if result and len(result) > 0:
# 缓存数据
cache.hset(date_str, self.crawler_name(), json.dumps(result, ensure_ascii=False))
return result
except Exception as e:
# 如果遇到错误,返回空列表
return []
# 所有方法都失败,返回空列表
return []
def _fetch_from_weixin_kankan(self, browser_manager):
"""从微信看一看页面获取热门内容"""
url = "https://k.weixin.qq.com/"
try:
# 获取页面内容
page_source, driver = browser_manager.get_page_content(url, wait_time=10)
# 等待热门内容加载
try:
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".hot"))
)
except:
# 如果等待超时,仍然尝试获取内容
pass
# 点击"热点"标签切换到热门内容
try:
hot_tab = driver.find_element(By.XPATH, "//div[contains(text(), '热点') and @class='tab']")
hot_tab.click()
time.sleep(3) # 等待内容加载
except:
# 如果找不到热点标签,继续尝试获取当前页面内容
pass
result = []
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 获取文章列表
articles = driver.find_elements(By.CSS_SELECTOR, ".article-item")
if not articles:
# 尝试其他可能的选择器
articles = driver.find_elements(By.CSS_SELECTOR, ".doc-item")
if not articles:
# 再尝试其他可能的选择器
articles = driver.find_elements(By.CSS_SELECTOR, ".item")
for article in articles:
try:
# 获取文章标题和链接
title_elem = article.find_element(By.CSS_SELECTOR, "h3, .title")
title = title_elem.text.strip()
# 尝试获取链接
link = None
try:
link_elem = article.find_element(By.TAG_NAME, "a")
link = link_elem.get_attribute("href")
except:
# 如果直接获取链接失败则记录文章id以后可以构建链接
try:
article_id = article.get_attribute("data-id") or article.get_attribute("id")
link = f"https://k.weixin.qq.com/article?id={article_id}"
except:
link = "https://k.weixin.qq.com/"
# 获取来源
source = ""
try:
source_elem = article.find_element(By.CSS_SELECTOR, ".account, .source")
source = source_elem.text.strip()
except:
pass
# 获取摘要
summary = ""
try:
summary_elem = article.find_element(By.CSS_SELECTOR, ".desc, .summary, p")
summary = summary_elem.text.strip()
except:
pass
news = {
'title': title,
'url': link,
'content': f"来源: {source} | 摘要: {summary[:50] if summary else '无摘要'}",
'source': 'weixin',
'publish_time': current_time
}
result.append(news)
# 限制获取前20条
if len(result) >= 20:
break
except Exception as e:
continue
return result
except Exception as e:
return []
def _fetch_from_weixin_dushu(self, browser_manager):
"""从微信读书获取热门书评"""
url = "https://weread.qq.com/web/category/all"
try:
# 获取页面内容
page_source, driver = browser_manager.get_page_content(url, wait_time=8)
result = []
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 尝试点击排行榜标签
try:
rank_tab = driver.find_element(By.XPATH, "//a[contains(text(), '排行榜')]")
rank_tab.click()
time.sleep(3) # 等待内容加载
except:
# 如果找不到排行榜标签,继续尝试获取当前页面内容
pass
# 获取热门书籍列表
books = driver.find_elements(By.CSS_SELECTOR, ".shelf-item, .book-item")
for book in books:
try:
# 获取书籍标题和链接
title_elem = book.find_element(By.CSS_SELECTOR, ".title, h3")
title = title_elem.text.strip()
# 尝试获取链接
link = "https://weread.qq.com/web/category/all"
try:
link_elem = book.find_element(By.TAG_NAME, "a")
link = link_elem.get_attribute("href")
except:
book_id = book.get_attribute("data-bid") or book.get_attribute("id")
if book_id:
link = f"https://weread.qq.com/web/reader/{book_id}"
# 获取作者
author = ""
try:
author_elem = book.find_element(By.CSS_SELECTOR, ".author, .writer")
author = author_elem.text.strip()
except:
pass
# 获取摘要/简介
intro = ""
try:
intro_elem = book.find_element(By.CSS_SELECTOR, ".intro, .desc")
intro = intro_elem.text.strip()
except:
pass
news = {
'title': f"热门书籍: {title}",
'url': link,
'content': f"作者: {author} | 简介: {intro[:50] if intro else '无简介'}",
'source': 'weixin',
'publish_time': current_time
}
result.append(news)
# 限制获取前20条
if len(result) >= 20:
break
except Exception as e:
continue
return result
except Exception as e:
return []
def crawler_name(self):
return "weixin"

View File

@@ -0,0 +1,155 @@
import json
import datetime
import requests
import urllib3
import re
from requests.sessions import Session
from .crawler import Crawler
from ...core import cache
urllib3.disable_warnings()
class XueqiuCrawler(Crawler):
"""雪球"""
def __init__(self):
super().__init__()
self.session = Session()
self._init_session()
def _init_session(self):
try:
# 第一步访问主页获取基础cookies
main_url = "https://xueqiu.com"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1'
}
resp = self.session.get(main_url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code == 200:
html_content = resp.text
# 尝试提取token
token_match = re.search(r'window\.SNB\s*=\s*\{[^}]*token["\']?\s*:\s*["\']([^"\']+)["\']', html_content)
if token_match:
token = token_match.group(1)
self.session.headers.update({'X-Requested-With': 'XMLHttpRequest'})
hot_page_url = "https://xueqiu.com/hot_event"
hot_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://xueqiu.com/',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1'
}
hot_resp = self.session.get(hot_page_url, headers=hot_headers, verify=False, timeout=self.timeout)
if hot_resp.status_code == 200:
print("雪球热门页面访问成功,已获取完整认证信息")
else:
print(f"雪球热门页面访问失败: {hot_resp.status_code}")
else:
print(f"雪球主页访问失败: {resp.status_code}")
except Exception as e:
print(f"初始化雪球会话失败: {e}")
def fetch(self, date_str) -> list:
current_time = datetime.datetime.now()
url = "https://xueqiu.com/hot_event/list.json?count=10"
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Referer': 'https://xueqiu.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest'
}
try:
resp = self.session.get(url=url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"雪球请求失败, status: {resp.status_code}")
self._init_session()
resp = self.session.get(url=url, headers=headers, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"雪球重试后仍失败, status: {resp.status_code}")
return []
json_data = resp.json()
if 'list' not in json_data:
print("雪球响应格式异常")
return []
result = []
cache_list = []
for idx, item in enumerate(json_data['list'][:10]): # 取前10条
try:
tag = item.get('tag', '').strip()
if tag.startswith('#') and tag.endswith('#'):
title = tag[1:-1]
else:
title = tag
if not title:
continue
item_id = item.get('id')
url_link = f"https://xueqiu.com/"
content = item.get('content', '').strip()
if len(content) > 200:
content = content[:200] + '...'
status_count = item.get('status_count', 0)
hot_value = item.get('hot', 0)
news = {
'title': title,
'url': url_link,
'content': content,
'source': 'xueqiu',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S'),
'score': status_count if status_count > 0 else 1000 - idx,
'rank': idx + 1
}
result.append(news)
cache_list.append(news)
except Exception as e:
print(f"解析雪球新闻项失败: {e}")
continue
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"获取雪球数据失败: {e}")
return []
def crawler_name(self):
return "xueqiu"

View File

@@ -0,0 +1,64 @@
import json
import datetime # 添加datetime导入
import requests
import urllib3
from bs4 import BeautifulSoup
# 移除 SQLAlchemy 导入
# from sqlalchemy.sql.functions import now
from .crawler import Crawler
from ...core import cache
from ...db.mysql import News
urllib3.disable_warnings()
class ZhiHuCrawler(Crawler):
"""知乎"""
def fetch(self, date_str):
# 获取当前时间
current_time = datetime.datetime.now()
url = "https://www.zhihu.com/api/v3/explore/guest/feeds?limit=30&ws_qiangzhisafe=0"
resp = requests.get(url=url, headers=self.header, verify=False, timeout=self.timeout)
if resp.status_code != 200:
print(f"request failed, status: {resp.status_code}")
return []
try:
json_data = resp.json()
data = json_data.get('data', [])
result = []
cache_list = []
for item in data:
target = item.get('target', {})
question = target.get('question', {})
title = question.get('title', '')
url = f"https://www.zhihu.com/question/{question.get('id')}"
excerpt = target.get('excerpt', '')
news = {
'title': title,
'url': url,
'content': excerpt,
'source': 'zhihu',
'publish_time': current_time.strftime('%Y-%m-%d %H:%M:%S')
}
result.append(news)
cache_list.append(news)
cache.hset(date_str, self.crawler_name(), json.dumps(cache_list, ensure_ascii=False))
return result
except Exception as e:
print(f"Error parsing JSON: {e}")
return []
def crawler_name(self):
return "zhihu"