feat: 添加基于 agno 框架的重构实验版本(experimental/agno_version/)#660
Conversation
在 experimental/agno_version/ 目录下新增 BettaFish 项目的一个 实验性重构版本,基于 agno (https://github.com/agno-agi/agno) 框架。 ## 完整保留的核心特性 - 段落级 Forum 反馈循环(BettaFish 最核心特性) - ForumHost 4 段式发言结构(事件梳理/观点整合/趋势预测/问题引导) - 三 Agent 并发执行(用 asyncio.gather 等价实现 Streamlit 子进程并发) - 原项目所有 SYSTEM_PROMPT(含 JSON Schema) ## 新增能力 - 海外数据源扩展:Hacker News / GitHub / YouTube / Reddit(12 个新工具) - 海外工具动态裁剪:未配置 key 的平台自动从 prompt 中移除 - ReportAgent 5 阶段综合报告生成(大纲 → 并发章节 → 跨源 → 摘要 → HTML) - 6 种专业可视化组件(KPI 卡片、Chart.js、Callout、信息源矩阵、时间线、Quote) ## 架构差异 - 进程模型:Flask + Streamlit 子进程 → 单进程 asyncio - Agent 间通信:logs/*.log 文件监控 → 内存 ForumState + asyncio.Lock - LLM 编排:自定义 Node 系统 → agno Agent - 报告生成:ReportEngine (1700 行) → ReportAgent (5 阶段) ## 完全不影响原项目 本 PR 只新增 experimental/agno_version/ 目录,不修改任何现有文件。 ## 当前状态 - ✅ 三个核心 Agent 完整迁移 - ✅ ForumHost + ForumState 段落级反馈 - ✅ ReportAgent 多阶段综合报告 - ✅ Chart.js + 6 种可视化组件 -⚠️ 用 SQLite mock 数据库代替 MindSpider 爬虫 -⚠️ 仅命令行入口,未实现 Web UI 详见 experimental/agno_version/CONTRIBUTION_NOTE.md 源仓库(含完整 git 历史):https://github.com/NextE-Moffatt/agno-mirofish
There was a problem hiding this comment.
Pull request overview
该 PR 在 experimental/agno_version/ 下新增一个基于 agno 多智能体框架的 BettaFish 实验性重构版本,旨在在不影响主项目的前提下,复刻“段落级 Forum 反馈循环 + 三 Agent 并发”的核心流程,并扩展海外数据源与 HTML 可视化报告能力。
Changes:
- 新增
agno_team/编排层:asyncio.gather三 Agent 并发 +ForumState/ForumHost段落级反馈循环。 - 新增
agno_agents/与agno_tools/:迁移/封装新闻搜索、媒体搜索、本地 DB 查询及海外平台工具,并提供 ReportAgent 的 HTML 渲染/自定义块解析。 - 新增可运行脚本与文档:mock DB 初始化、单/全流程 CLI、README/接口契约/前端接口说明及集成测试样例。
Reviewed changes
Copilot reviewed 42 out of 42 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| experimental/agno_version/test_integration.py | 新增端到端集成测试骨架(工具/Agent/Team/App 分层验收) |
| experimental/agno_version/scripts/init_mock_db.py | 初始化 SQLite mock 数据库与假数据,便于 InsightAgent 跑通流程 |
| experimental/agno_version/run_single_agent.py | 单 Agent 命令行入口(多步:结构→搜索→总结→反思→格式化) |
| experimental/agno_version/run_full_pipeline.py | 三 Agent 并发 + ForumHost 的完整流水线 CLI,落盘输出报告与日志 |
| experimental/agno_version/requirements.txt | 实验版本依赖集合(含 agno、Flask/SocketIO、各数据源与 ML 依赖) |
| experimental/agno_version/README.md | 实验版本说明、架构、运行方式、可视化组件与 FAQ |
| experimental/agno_version/main.py | Flask + SocketIO 的占位应用入口(健康检查 + mock 推送) |
| experimental/agno_version/interfaces.md | 三组协作的接口契约草案(tools/agents/team/socketio) |
| experimental/agno_version/frontend-api.md | 原 BettaFish 前端 HTTP/实时接口说明的对照文档 |
| experimental/agno_version/CONTRIBUTION_NOTE.md | 重构实验的取舍说明与快速验证步骤 |
| experimental/agno_version/agno_tools/init.py | 统一导出国内/海外工具与 overseas dispatch |
| experimental/agno_version/agno_tools/db_query_tools.py | InsightAgent 本地社媒库查询工具(SQLAlchemy async engine + sync wrapper) |
| experimental/agno_version/agno_tools/news_search_tools.py | QueryAgent 的 Tavily 新闻搜索工具与 dispatch |
| experimental/agno_version/agno_tools/media_search_tools.py | MediaAgent 的 Bocha 多模态搜索工具与 dispatch |
| experimental/agno_version/agno_tools/hackernews_tools.py | Hacker News(Algolia API)搜索工具 |
| experimental/agno_version/agno_tools/github_tools.py | GitHub 搜索(repos/issues/code)工具 |
| experimental/agno_version/agno_tools/youtube_tools.py | YouTube Data API 搜索与评论工具 |
| experimental/agno_version/agno_tools/reddit_tools.py | Reddit OAuth 搜索与评论工具 |
| experimental/agno_version/agno_tools/sentiment_tools.py | 多语言情感分析模型懒加载与批量分析输出 |
| experimental/agno_version/agno_tools/shared_utils.py | 工具层共享 util 占位(待迁移) |
| experimental/agno_version/agno_tools/keyword_tools.py | 关键词优化工具占位(待迁移) |
| experimental/agno_version/agno_tools/crawler_tools.py | 爬虫启动/状态工具占位(待迁移) |
| experimental/agno_version/agno_team/init.py | 编排层导出(ForumState/ForumHost/runner/pipeline) |
| experimental/agno_version/agno_team/_agno_setup.py | 启动时清代理 + patch agno 默认 httpx client(proxy=None) |
| experimental/agno_version/agno_team/forum_state.py | 内存论坛状态、Host 触发阈值与日志格式化 |
| experimental/agno_version/agno_team/forum_host.py | ForumHost(agno Agent)生成主持人引导发言 |
| experimental/agno_version/agno_team/agent_runner.py | 单 Agent 异步多步流程 + ForumState 集成 + 手写工具调度 |
| experimental/agno_version/agno_team/opinion_team.py | 三 Agent 并发主调度器(async + sync wrapper) |
| experimental/agno_version/agno_team/forum_agent.py | 论坛 Agent 占位(待迁移 LogMonitor 等) |
| experimental/agno_version/agno_agents/init.py | Agent 层导出(3 核心 agent + ReportAgent + 模型) |
| experimental/agno_version/agno_agents/media_agent.py | MediaAgent(agno Agent + Bocha 工具)与 prompt/schema |
| experimental/agno_version/agno_agents/query_agent.py | QueryAgent(agno Agent + Tavily 工具)与 prompt/schema |
| experimental/agno_version/agno_agents/models.py | 共享 pydantic 输出模型与解析函数 |
| experimental/agno_version/agno_agents/report_blocks.py | 自定义 HTML 标签块解析与渲染(Chart/KPI/Callout/Matrix/Timeline/Quote) |
| experimental/agno_version/agno_agents/report_styles.py | 报告 HTML/CSS/Chart.js CDN 样式与组件样式 |
| experimental/agno_version/.gitignore | 忽略报告产物、env、mock DB 等 |
| experimental/agno_version/.env.example | 实验版本环境变量示例(LLM/搜索/DB 等) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from agno_tools import ( | ||
| search_weibo, search_forum, search_news, | ||
| analyze_sentiment, optimize_keywords, | ||
| ) | ||
| assert all([search_weibo, search_forum, search_news, analyze_sentiment, optimize_keywords]) |
There was a problem hiding this comment.
agno_tools 当前并没有导出 search_weibo/search_forum/search_news(仓库里也未找到这三个函数定义),这里的 import 会直接失败,导致集成测试无法运行。建议改为导入现有工具(如 search_topic_globally/search_topic_on_platform/basic_search_news 等),或在 agno_tools 中补齐与接口文档一致的兼容函数。
| def test_sentiment_tool_basic(self): | ||
| """情感分析工具基础功能""" | ||
| from agno_tools import analyze_sentiment | ||
| results = analyze_sentiment(["今天天气真好", "这件事太糟糕了"]) | ||
| assert len(results) == 2 | ||
| assert results[0]["sentiment"] in ["positive", "negative", "neutral"] | ||
| assert 0 <= results[0]["confidence"] <= 1 |
There was a problem hiding this comment.
analyze_sentiment() 目前实现返回的是格式化字符串(供 LLM 消费),但此测试把它当作 List[Dict] 来断言 len() 和 results[0]["sentiment"],会必然失败。建议统一契约:要么把工具改为返回结构化 list(sentiment/confidence/language),要么调整测试按字符串输出做断言,并相应更新 interfaces.md 中的签名描述。
| def test_keyword_tool_basic(self): | ||
| """关键词优化工具基础功能""" | ||
| from agno_tools import optimize_keywords | ||
| keywords = optimize_keywords("特斯拉召回事件", num_keywords=3) | ||
| assert isinstance(keywords, list) | ||
| assert len(keywords) >= 1 |
There was a problem hiding this comment.
optimize_keywords() 在当前代码中还是 raise NotImplementedError(未迁移完成),这里直接调用会导致测试失败。建议先用 pytest.skip/xfail 标记该用例,或提供一个最小可用实现(哪怕是简单分词/规则扩展)以保证集成测试能通过。
| def test_team_instantiable(self): | ||
| """Opinion Team 可以无报错实例化""" | ||
| from agno_team.opinion_team import create_opinion_team | ||
| team = create_opinion_team() | ||
| assert team.name == "微舆舆情分析团队" |
There was a problem hiding this comment.
agno_team.opinion_team 中目前未实现 create_opinion_team(),这里的 import/调用会直接失败。建议:要么实现一个返回 Team/编排器的 create_opinion_team(与 README/接口契约一致),要么把测试改为验证现有的 run_opinion_analysis() 同步入口。
| def test_agent_stream_support(self): | ||
| """Agent 支持流式输出""" | ||
| from agno_agents import create_insight_agent | ||
| agent = create_insight_agent() | ||
| assert agent.stream is True |
There was a problem hiding this comment.
create_insight_agent() 创建的 Agent 未显式开启 stream,这里断言 agent.stream is True 会失败(除非 agno 默认就是 True)。建议要么在 Agent 工厂里显式设置 stream=True,要么将此断言改为根据实际需求检查(例如仅在启用流式场景下测试)。
| ```python | ||
| # 数据库查询 | ||
| search_weibo(keyword: str, limit: int = 20) -> List[Dict[str, Any]] | ||
| # 返回字段:content, author, publish_time, likes | ||
|
|
||
| search_forum(keyword: str, platform: str = "all", limit: int = 20) -> List[Dict[str, Any]] | ||
| # 返回字段:content, author, publish_time, replies | ||
|
|
||
| search_news(keyword: str, limit: int = 20) -> List[Dict[str, Any]] | ||
| # 返回字段:title, content, source, publish_time | ||
|
|
||
| # 情感分析 | ||
| analyze_sentiment(texts: List[str]) -> List[Dict[str, Any]] | ||
| # 返回字段:sentiment("positive"|"negative"|"neutral"), confidence(float), language(str) | ||
|
|
||
| # 关键词优化 | ||
| optimize_keywords(query: str, num_keywords: int = 5) -> List[str] | ||
|
|
||
| # 爬虫 | ||
| start_crawler(keyword: str, platforms: List[str], max_count: int = 100) -> Dict[str, Any] | ||
| # 返回字段:task_id(str), status("started"), estimated_seconds(int) | ||
|
|
||
| get_crawler_status(task_id: str) -> Dict[str, Any] | ||
| # 返回字段:task_id, status("running"|"done"|"failed"), progress(0~100), collected(int) | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## B → C 接口(agent 运行函数签名) | ||
|
|
||
| B 组必须严格按照此签名实现,C 组按此签名调用。 | ||
|
|
||
| ```python | ||
| # 工厂函数 | ||
| create_insight_agent(config: Settings = None) -> Agent | ||
| create_media_agent(config: Settings = None) -> Agent | ||
| create_query_agent(config: Settings = None) -> Agent | ||
|
|
||
| # 运行函数 | ||
| run_insight_analysis(query: str, config: Settings = None) -> str # 返回 Markdown 报告 | ||
| run_media_analysis(query: str, config: Settings = None) -> str # 返回 Markdown 报告 | ||
| run_query(query: str, config: Settings = None) -> str # 返回 Markdown 结果 | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## C 对外接口(Team 运行函数) | ||
|
|
||
| ```python | ||
| create_opinion_team(config: Settings = None) -> Team | ||
| run_opinion_analysis(topic: str, config: Settings = None) -> str # 返回报告路径或内容 | ||
| run_report_generation(insight_report: str, media_report: str, query_report: str, config: Settings = None) -> str | ||
| ``` |
There was a problem hiding this comment.
接口契约里约定了 search_weibo/search_forum/search_news/analyze_sentiment -> List[Dict] 以及 create_opinion_team/run_opinion_analysis(topic)->str,但当前实现(agno_tools/agno_team/opinion_team.py/run_full_pipeline.py)的函数集合与返回类型不一致,且 create_opinion_team 未实现,容易导致按文档对接时直接报错。建议更新此文档以匹配当前实现,或补齐兼容层并在测试中按同一契约校验。
| def run_opinion_analysis(query: str, host_threshold: int = 5, config=None) -> Dict[str, Any]: | ||
| """ | ||
| 同步入口:包装 run_opinion_pipeline,便于命令行/Flask 调用。 | ||
| """ | ||
| return asyncio.run(run_opinion_pipeline(query, host_threshold=host_threshold, config=config)) |
There was a problem hiding this comment.
run_opinion_analysis() 的实际签名是 (query: str, host_threshold: int = 5) -> Dict[str, Any],与接口契约中 run_opinion_analysis(topic: str, config: Settings=None) -> str 的约定不一致,也会让 test_integration.py 中的 Team 层测试无法按契约验证。建议补齐一个符合契约的包装函数(保持 topic/config 参数与返回值语义),或同步修改接口文档与测试用例。
| from agno_tools import ( | ||
| search_weibo, search_forum, search_news, | ||
| analyze_sentiment, optimize_keywords, | ||
| ) |
There was a problem hiding this comment.
这里还导入了 optimize_keywords,但 agno_tools/__init__.py 并未导出该符号(也没有找到 from .keyword_tools import optimize_keywords 之类的 re-export),因此即使补齐 search_weibo/search_forum/search_news,该 import 仍会失败。建议把 optimize_keywords 纳入 agno_tools.__init__ 的导出,或调整测试按实际导出内容导入。
| # 清理代理(避免 agno/httpx 走 SOCKS 代理) | ||
| for _k in ["http_proxy", "https_proxy", "all_proxy", "HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY"]: | ||
| os.environ.pop(_k, None) | ||
|
|
There was a problem hiding this comment.
脚本启动时无条件 pop 掉所有代理环境变量,会影响用户在必须通过代理访问外网/LLM API 的环境(也会影响脚本内除 agno/httpx 外的其他网络请求)。建议把“禁用代理”做成可选行为(例如通过 --no-proxy 参数或环境变量开关),默认尊重用户的代理设置,仅在检测到特定 SOCKS 代理导致问题时提示用户手动关闭。
| # ============== 第一步:清除代理环境变量 ============== | ||
| _PROXY_KEYS = [ | ||
| "http_proxy", "https_proxy", "all_proxy", | ||
| "HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", | ||
| ] | ||
| for _k in _PROXY_KEYS: | ||
| os.environ.pop(_k, None) | ||
|
|
There was a problem hiding this comment.
_agno_setup 在 import 时会无条件清空所有代理环境变量并 monkey-patch agno 的默认 httpx client,这对库代码/被嵌入式使用场景影响面较大(例如:同一进程里其他模块确实需要代理)。建议将该行为做成显式初始化函数或受环境变量控制(默认不改全局环境),并在需要时由 CLI 入口显式调用。
简介
本 PR 在
experimental/agno_version/目录下添加了一个基于 agno 多智能体框架的 BettaFish 重构实验版本。完全不影响原项目:本 PR 只新增
experimental/agno_version/目录,不修改任何现有文件。✨ 是什么
一个完整可运行的 agno 版本,完整保留了原 BettaFish 最核心的「段落级 Forum 反馈循环」特性:
ForumStateForumHost主持人发言🔧 与原项目的差异
logs/*.log文件 + LogMonitor 轮询ForumState+asyncio.LockAgent🌍 新增海外数据源(4 个平台,12 个工具)
InsightAgent 在原有 6 个中文社交媒体工具基础上新增:
动态裁剪:未配置 key 的平台自动从 tool list 和 prompt 中移除,不会被 LLM 调用。对于技术类话题(如 "Claude Code"),HN 和 GitHub 能提供非常高质量的开发者观点。
🎨 新增专业可视化组件
ReportAgent 生成的 HTML 报告支持 6 种自定义可视化组件:
一份典型报告会包含 30+ 个可视化组件。
📊 ReportAgent 5 阶段流程
🚀 快速验证
```bash
cd experimental/agno_version
pip install -r requirements.txt
cp .env.example .env # 填入各 Agent 的 API key
python scripts/init_mock_db.py # 初始化 SQLite mock 数据库
python run_full_pipeline.py "Claude Code 在中文程序员社区的舆情分析"
```
输出见 `reports/full_pipeline/{主题}_{时间戳}/final_report.html`。
💡 关键设计取舍
为什么用 asyncio 而不是 agno Team?
agno 的 `Team` 是回合制(agent 轮流说话)或路由式(协调者选一个),**不支持「三 agent 真并发 + 共享公告板 + 外部观察者反馈」**这种模式。BettaFish 原版用 Streamlit 子进程 + 文件监控实现了这个模式,agno 版用 `asyncio.gather + ForumState + ForumHost 回调` 实现了等价功能。
为什么工具调用没用 agno 自主 dispatch?
agno 原生支持 `agent.run()` 自主选择并调用工具,但这样会失去段落级反馈循环的插入点——我们需要在「工具调用」和「段落总结」之间插入 HOST 引导读取。所以保留了手写的 6 步流程(搜索决策 → 工具调用 → 读 HOST → 总结 → 反思 → 深化),但每一步的 LLM 调用都走 agno Agent。
🎯 期待
希望这个实验版本能给原项目作者提供一个新的架构参考:
完整文档见 `experimental/agno_version/CONTRIBUTION_NOTE.md` 和 `experimental/agno_version/README.md`。
🔗 源仓库
本目录代码的完整 git 历史见:https://github.com/NextE-Moffatt/agno-mirofish
由于是大规模重构,本 PR 采用文件复制方式合并,没有保留单次 commit 历史。完整的开发过程(包括每个 Stage 的迁移 commit)请查看源仓库的 git log。