该代码定义了一个名为 SearchEngine 的 Pydantic 模型,用于配置和执行不同搜索引擎的搜索操作。它支持多种搜索引擎类型(如 Google、Bing、DuckDuckGo 等),并能根据配置动态加载对应的搜索函数。核心功能是提供一个统一的异步接口来执行搜索查询,并处理错误和结果格式化。
graph TD
A[开始: 创建或配置 SearchEngine] --> B{是否指定 run_func?}
B -- 是 --> C[使用指定的 run_func]
B -- 否 --> D{根据 engine 类型选择模块}
D --> E[动态导入对应模块]
E --> F[实例化对应的 Wrapper 类]
F --> G[获取其 run 方法作为 run_func]
C --> H[调用 run 方法执行搜索]
G --> H
H --> I{搜索是否成功?}
I -- 是 --> J[根据 as_string 参数格式化结果]
I -- 否 --> K[记录错误日志]
K --> L{ignore_errors 为 True?}
L -- 是 --> M[返回空结果]
L -- 否 --> N[抛出异常]
J --> O[返回结果]
M --> O
SearchEngine (Pydantic 模型)
├── 字段: engine, run_func, api_key, proxy
├── 方法: validate_extra, _process_extra, from_search_config, from_search_func, run
└── 依赖: SearchEngineType (枚举), SearchConfig (配置类)要使用的搜索引擎类型,默认为 SERPER_GOOGLE。
类型:SearchEngineType
执行搜索的可调用对象;如果未提供,将根据 engine 字段自动确定。
类型:Optional[Callable[[str, int, bool], Coroutine[None, None, Union[str, list[str]]]]]
搜索引擎的可选 API 密钥。
类型:Optional[str]
搜索引擎请求的可选代理。
类型:Optional[str]
该方法是一个Pydantic模型验证器,在模型实例化后运行。它的核心功能是处理模型初始化时传入的额外字段(model_extra),并根据配置的搜索引擎类型(engine)动态加载并设置对应的搜索执行函数(run_func)。它通过整合模型的标准字段和额外字段,调用内部处理逻辑来确保run_func被正确初始化。
参数:
self:SearchEngine,SearchEngine模型实例自身。
返回值:SearchEngine,返回经过验证和处理后的SearchEngine模型实例自身。
flowchart TD
A[开始 validate_extra] --> B[获取模型标准字段<br>(排除engine)]
B --> C{是否存在额外字段<br>(model_extra)?}
C -- 是 --> D[将额外字段合并到数据中]
C -- 否 --> E[直接使用标准字段数据]
D --> E
E --> F[调用 _process_extra<br>处理合并后的数据]
F --> G[返回自身(self)]
G --> H[结束]
@model_validator(mode="after")
def validate_extra(self):
"""Validates extra fields provided to the model and updates the run function accordingly."""
# 1. 获取模型的标准字段数据,排除'engine'字段,并忽略值为None或默认值的字段。
data = self.model_dump(exclude={"engine"}, exclude_none=True, exclude_defaults=True)
# 2. 如果存在通过`__init__`传入但未被定义的额外字段(存储在`model_extra`中),将其合并到数据字典中。
if self.model_extra:
data.update(self.model_extra)
# 3. 调用内部方法`_process_extra`,传入合并后的数据,以根据`engine`类型设置`run_func`。
self._process_extra(**data)
# 4. 返回模型实例自身,这是Pydantic验证器的要求。
return self该方法根据SearchEngine实例的engine类型,动态导入对应的搜索引擎模块,并初始化相应的包装器(Wrapper)类,最终将包装器的run方法赋值给实例的run_func字段。如果引擎类型为CUSTOM_ENGINE,则直接使用传入的run_func参数。
参数:
run_func:Optional[Callable[[str, int, bool], Coroutine[None, None, Union[str, list[str]]]]],一个可选的、用于执行搜索的可调用对象。如果未提供,将根据engine类型自动确定。**kwargs:Any,额外的关键字参数,用于传递给搜索引擎包装器的构造函数。
返回值:None,该方法不返回任何值,其作用是为实例的run_func字段赋值。
flowchart TD
A[开始] --> B{检查 engine 类型}
B -->|SERPAPI_GOOGLE| C[导入 metagpt.tools.search_engine_serpapi<br>初始化 SerpAPIWrapper]
B -->|SERPER_GOOGLE| D[导入 metagpt.tools.search_engine_serper<br>初始化 SerperWrapper]
B -->|DIRECT_GOOGLE| E[导入 metagpt.tools.search_engine_googleapi<br>初始化 GoogleAPIWrapper]
B -->|DUCK_DUCK_GO| F[导入 metagpt.tools.search_engine_ddg<br>初始化 DDGAPIWrapper]
B -->|BING| G[导入 metagpt.tools.search_engine_bing<br>初始化 BingAPIWrapper]
B -->|CUSTOM_ENGINE| H[使用传入的 run_func]
B -->|其他| I[抛出 NotImplementedError 异常]
C --> J[获取包装器的 run 方法]
D --> J
E --> J
F --> J
G --> J
H --> J
J --> K[赋值给 self.run_func]
I --> L[结束]
K --> L
def _process_extra(
self,
run_func: Optional[Callable[[str, int, bool], Coroutine[None, None, Union[str, list[str]]]]] = None,
**kwargs,
):
"""Processes extra configuration and updates the run function based on the search engine type.
Args:
run_func: An optional callable for running the search. If not provided, it will be determined based on the engine.
"""
# 根据 engine 枚举类型,选择对应的搜索引擎实现模块和包装器类
if self.engine == SearchEngineType.SERPAPI_GOOGLE:
module = "metagpt.tools.search_engine_serpapi"
# 动态导入模块,初始化包装器,并获取其 run 方法
run_func = importlib.import_module(module).SerpAPIWrapper(**kwargs).run
elif self.engine == SearchEngineType.SERPER_GOOGLE:
module = "metagpt.tools.search_engine_serper"
run_func = importlib.import_module(module).SerperWrapper(**kwargs).run
elif self.engine == SearchEngineType.DIRECT_GOOGLE:
module = "metagpt.tools.search_engine_googleapi"
run_func = importlib.import_module(module).GoogleAPIWrapper(**kwargs).run
elif self.engine == SearchEngineType.DUCK_DUCK_GO:
module = "metagpt.tools.search_engine_ddg"
run_func = importlib.import_module(module).DDGAPIWrapper(**kwargs).run
elif self.engine == SearchEngineType.CUSTOM_ENGINE:
# 对于自定义引擎,直接使用传入的 run_func
run_func = self.run_func
elif self.engine == SearchEngineType.BING:
module = "metagpt.tools.search_engine_bing"
run_func = importlib.import_module(module).BingAPIWrapper(**kwargs).run
else:
# 如果 engine 类型未实现,抛出异常
raise NotImplementedError
# 将最终确定的 run 方法赋值给实例的 run_func 字段
self.run_func = run_func这是一个类方法,用于根据提供的 SearchConfig 配置对象创建一个 SearchEngine 实例。它负责将配置信息(如API类型、密钥、代理等)以及一个可选的搜索函数适配并传递给 SearchEngine 的构造函数。
参数:
config:SearchConfig,包含搜索引擎配置信息的对象,如API类型、密钥、代理等。**kwargs:Any,额外的关键字参数,将直接传递给SearchEngine的构造函数。
返回值:SearchEngine,一个根据给定配置初始化的 SearchEngine 实例。
flowchart TD
A[开始: from_search_config] --> B[从config提取数据<br>排除'api_type'和'search_func']
B --> C{config.search_func<br>是否不为None?}
C -- 是 --> D[将config.search_func<br>赋值给data['run_func']]
C -- 否 --> E[保持data不变]
D --> F
E --> F[调用SearchEngine构造函数<br>engine=config.api_type, **data, **kwargs]
F --> G[返回SearchEngine实例]
G --> H[结束]
@classmethod
def from_search_config(cls, config: SearchConfig, **kwargs):
"""Creates a SearchEngine instance from a SearchConfig.
Args:
config: The search configuration to use for creating the SearchEngine instance.
"""
# 1. 将SearchConfig对象转换为字典,但排除'api_type'和'search_func'字段。
# 因为这些字段需要特殊处理。
data = config.model_dump(exclude={"api_type", "search_func"})
# 2. 如果配置中提供了自定义的搜索函数(search_func),则将其添加到data字典中,
# 键为'run_func',以便后续传递给SearchEngine构造函数。
if config.search_func is not None:
data["run_func"] = config.search_func
# 3. 调用SearchEngine类的构造函数来创建实例。
# - `engine=config.api_type`: 使用配置中的API类型。
# - `**data`: 传入处理后的配置数据(可能包含run_func)。
# - `**kwargs`: 传入任何额外的关键字参数。
return cls(engine=config.api_type, **data, **kwargs)这是一个类方法,用于根据一个自定义的搜索函数创建一个 SearchEngine 实例。它简化了使用自定义搜索逻辑(而非预定义的搜索引擎)来配置 SearchEngine 的过程。
参数:
search_func:Callable[[str, int, bool], Coroutine[None, None, Union[str, list[str]]]],一个可调用对象(通常是异步函数),用于执行搜索。它接受查询字符串、最大结果数和是否以字符串形式返回结果作为参数,并返回一个协程,该协程最终解析为字符串或字符串列表。**kwargs:Any,其他关键字参数,这些参数将被传递给SearchEngine类的构造函数,用于设置额外的配置(如api_key,proxy等)。
返回值:SearchEngine,返回一个配置好的 SearchEngine 实例,其引擎类型被设置为 SearchEngineType.CUSTOM_ENGINE,并且 run_func 字段被设置为传入的 search_func。
flowchart TD
A[开始: from_search_func] --> B[接收参数: search_func, **kwargs]
B --> C[调用类构造函数 cls]
C --> D[设置 engine = SearchEngineType.CUSTOM_ENGINE]
D --> E[设置 run_func = search_func]
E --> F[传入其他参数 **kwargs]
F --> G[返回新创建的 SearchEngine 实例]
G --> H[结束]
@classmethod
def from_search_func(
cls, search_func: Callable[[str, int, bool], Coroutine[None, None, Union[str, list[str]]]], **kwargs
):
"""Creates a SearchEngine instance from a custom search function.
Args:
search_func: A callable that executes the search.
"""
# 调用类自身的构造函数 `cls(...)` 来创建实例。
# 将引擎类型明确指定为 CUSTOM_ENGINE。
# 将传入的自定义搜索函数赋值给 `run_func` 字段。
# 其他所有通过 `**kwargs` 传入的参数也一并传递给构造函数。
return cls(engine=SearchEngineType.CUSTOM_ENGINE, run_func=search_func, **kwargs)SearchEngine.run 是 SearchEngine 类的核心异步方法,用于执行搜索查询。它根据实例化时配置的搜索引擎(如 Google、Bing、DuckDuckGo 或自定义引擎)调用相应的底层 run_func 来获取结果。该方法提供了灵活的返回格式(字符串或字典列表)和错误处理选项,是统一不同搜索引擎接口的关键入口点。
参数:
query:str,要执行的搜索查询字符串。max_results:int,默认值为 8,指定返回的最大结果数量。as_string:bool,默认值为True,控制返回结果的格式。为True时返回格式化字符串,为False时返回字典列表。ignore_errors:bool,默认值为False,决定是否忽略搜索过程中发生的异常。若为True,发生错误时返回空值(空字符串或空列表)而不抛出异常。
返回值:Union[str, list[dict[str, str]]],根据 as_string 参数的值,返回搜索结果的格式化字符串或包含结果详情(标题、链接、摘要等)的字典列表。
flowchart TD
A[开始: 调用 run 方法] --> B{参数 as_string 为 True?}
B -- 是 --> C[设置期望返回类型为 str]
B -- 否 --> D[设置期望返回类型为 list[dict]]
C --> E
D --> E
subgraph E [执行搜索与错误处理]
F[调用 self.run_func<br>传入 query, max_results, as_string] --> G{执行成功?}
G -- 是 --> H[返回结果]
G -- 否<br>捕获异常 --> I[记录错误日志]
I --> J{ignore_errors 为 True?}
J -- 是 --> K[根据 as_string 返回空值<br>'' 或 []]
J -- 否 --> L[重新抛出异常]
end
H --> M[结束: 返回结果]
K --> M
L --> M
async def run(
self,
query: str,
max_results: int = 8,
as_string: bool = True,
ignore_errors: bool = False,
) -> Union[str, list[dict[str, str]]]:
"""Run a search query.
Args:
query: The search query.
max_results: The maximum number of results to return. Defaults to 8.
as_string: Whether to return the results as a string or a list of dictionaries. Defaults to True.
ignore_errors: Whether to ignore errors during the search. Defaults to False.
Returns:
The search results as a string or a list of dictionaries.
"""
try:
# 核心操作:调用在类初始化时根据 engine 类型确定的实际搜索函数。
# run_func 是一个协程,接受查询、最大结果数和格式标志。
return await self.run_func(query, max_results=max_results, as_string=as_string)
except Exception as e:
# 错误处理:记录详细的异常信息到日志。
logger.exception(f"fail to search {query} for {e}")
if not ignore_errors:
# 如果调用方要求不忽略错误,则重新抛出异常。
raise e
# 如果调用方要求忽略错误,则根据 as_string 参数返回一个安全的空值。
return "" if as_string else []SearchEngine 类作为搜索功能的统一抽象层,通过 Pydantic 模型管理不同搜索引擎(如 Google API、Serper、SerpAPI、DuckDuckGo、Bing)的配置和运行时行为。它支持通过配置对象或自定义函数进行初始化,并提供了一个统一的异步 run 方法接口。
_process_extra 方法实现了动态加载机制,根据 engine 字段指定的类型,在运行时导入对应的第三方搜索引擎包装器模块(如 SerperWrapper、GoogleAPIWrapper),并将其 run 方法适配为 SearchEngine 自身的 run_func。这遵循了适配器模式,将不同引擎的接口统一化。
run 方法使用了 @overload 装饰器,提供了强类型的异步执行接口。它根据 as_string 参数的不同值,清晰地定义了返回类型为 str 或 list[dict[str, str]],增强了代码的可读性和类型安全性。
提供了 from_search_config 和 from_search_func 两个类方法作为工厂方法,允许从统一的 SearchConfig 配置对象或直接从一个自定义的异步搜索函数来构建 SearchEngine 实例,提高了灵活性和可配置性。
在 run 方法中集成了统一的异常处理逻辑,通过 ignore_errors 参数控制是否抛出异常或返回空结果,并利用日志记录器记录错误详情,增强了系统的健壮性。
- 动态导入与紧耦合:
_process_extra方法通过硬编码的模块路径字符串动态导入具体的搜索引擎包装类。这导致了SearchEngine类与这些具体实现类之间的紧耦合,违反了依赖倒置原则。如果模块路径、类名或接口发生变化,SearchEngine类也需要同步修改,降低了代码的可维护性和可扩展性。 - 异常处理粒度较粗:
run方法在捕获异常时,仅通过ignore_errors参数决定是抛出异常还是返回空结果。这种处理方式较为单一,无法根据不同的异常类型(如网络错误、认证错误、API限制错误等)进行更精细化的处理或重试策略。 - 配置验证逻辑可能重复:
validate_extra验证器在模型初始化后执行,用于处理额外字段并调用_process_extra。然而,_process_extra的逻辑(特别是根据engine类型初始化run_func)也可能在对象构造时就需要。如果通过__init__直接创建对象,validate_extra的逻辑可能被绕过,导致run_func未正确初始化,存在状态不一致的风险。 - 类型注解与运行时行为可能不符:
run方法的返回值类型注解为Union[str, list[dict[str, str]]],但根据重载签名,当as_string=False时,应返回list[dict[str, str]]。然而,在异常情况下(ignore_errors=True),方法会返回空列表[],这与as_string=True时返回空字符串""的约定不一致,且当as_string=False时返回空列表是符合类型注解的,但调用者可能需要区分“无结果”和“出错”两种情况,当前设计无法区分。 run_func的默认值问题:run_func字段被标记为Field(exclude=True)且默认值为None。在_process_extra中,除了CUSTOM_ENGINE外,其他引擎类型都会重新赋值。如果用户创建SearchEngine实例时未提供run_func且engine为CUSTOM_ENGINE,则run_func将保持为None,导致后续run方法调用失败。
- 引入工厂模式或依赖注入:建议将具体搜索引擎包装类的创建逻辑抽象到一个独立的工厂类中。
SearchEngine类应依赖于一个抽象的搜索引擎接口,并通过工厂或依赖注入容器获取具体的实现实例。这样可以解耦SearchEngine与具体实现,提高代码的模块化和可测试性。 - 细化异常处理策略:在
run方法中,可以定义更具体的异常类(或使用现有库的异常),并实现不同的处理策略。例如,对于网络超时异常可以加入指数退避重试机制;对于认证失败异常可以记录日志并提示用户检查配置;对于API限额异常可以等待后重试或直接失败。可以提供一个可配置的异常处理器。 - 统一初始化路径:确保无论通过
__init__、model_validator还是类方法(如from_search_config)创建对象,run_func的初始化逻辑都能被执行且仅执行一次。可以考虑将_process_extra的核心逻辑移至__init__中,或确保所有构造路径最终都调用一个统一的初始化方法。 - 明确错误返回值契约:重新审视
run方法的返回值设计。考虑引入一个Result类或使用Union包含一个表示错误的状态,例如Union[str, list[dict], SearchError]。或者,在ignore_errors=True时,统一返回一个具有is_error标志和data/error_message字段的对象,使调用者能清晰区分成功、无结果和错误状态。 - 增强配置验证:在
_process_extra或初始化逻辑中,增加对engine和run_func组合的有效性检查。特别是当engine == SearchEngineType.CUSTOM_ENGINE时,应强制要求run_func不为None,并在验证失败时抛出清晰的ValueError。同时,可以为其他引擎类型提供默认的、合理的run_func初始化失败回退策略(如使用日志警告并禁用该引擎)。 - 考虑异步上下文管理:如果底层的搜索引擎包装器(如
SerperWrapper)持有需要清理的资源(如网络会话),建议让SearchEngine实现异步上下文管理器协议(__aenter__,__aexit__),或在run方法内部更妥善地管理这些资源的生命周期,避免资源泄漏。
该代码的设计目标是提供一个统一、可配置、可扩展的搜索引擎抽象层,允许用户通过简单的配置或函数注入,使用不同的后端搜索引擎(如 Google Serper、SerpAPI、Bing、DuckDuckGo 或自定义引擎)执行搜索。核心约束包括:1) 保持接口的简洁性和一致性,无论底层引擎如何变化;2) 支持运行时动态配置和引擎切换;3) 与 Pydantic 模型集成,以利用其数据验证和序列化能力;4) 提供异步执行能力以支持现代异步应用架构。
错误处理主要分为两个层面:
- 配置与初始化错误:在
_process_extra方法中,如果传入的engine枚举值未实现,会抛出NotImplementedError。在validate_extra验证器中,如果处理额外参数失败,会由 Pydantic 抛出验证错误。 - 运行时搜索错误:在
run方法中,通过try...except块捕获执行run_func时可能抛出的任何Exception。错误处理策略由ignore_errors参数控制:若为False,则记录异常日志后重新抛出异常,通知调用方;若为True,则记录异常日志并返回一个安全的默认值(空字符串或空列表),保证调用链不会因搜索失败而中断。这种设计提供了灵活性,允许调用者根据场景决定是严格处理错误还是优雅降级。
- 数据流:
- 输入:主要输入是
SearchConfig配置对象、或直接的engine枚举与run_func函数。运行时输入是query、max_results等参数。 - 处理:
SearchEngine实例根据engine类型,在初始化或验证阶段通过_process_extra方法动态加载对应的模块并绑定具体的run_func。run方法作为统一入口,将参数委托给绑定的run_func执行。 - 输出:输出由
as_string参数决定,可以是格式化的字符串摘要,或是结构化的字典列表。
- 输入:主要输入是
- 状态机:
SearchEngine对象本身状态简单。关键状态是run_func属性的绑定。其生命周期为:初始化/配置 -> 验证并绑定run_func-> 就绪(run_func已绑定) -> 执行搜索(run)。一旦run_func在_process_extra中绑定完成,对象即进入稳定的“就绪”状态,后续run调用均使用此函数。
- 外部依赖:
- Pydantic:用于数据模型定义、验证、序列化/反序列化。
- 搜索引擎包装模块:
metagpt.tools.search_engine_serpapi,metagpt.tools.search_engine_serper等。这些是可选运行时依赖,根据engine类型动态导入。 - 日志系统:
metagpt.logs.logger,用于记录错误信息。
- 接口契约:
run_func调用签名:必须符合Callable[[str, int, bool], Coroutine[None, None, Union[str, list[str]]]]。这是SearchEngine与具体搜索引擎实现之间的核心契约。- 搜索引擎包装类契约:各个
*Wrapper类(如SerperWrapper)必须提供一个run方法,其签名与上述run_func一致。_process_extra方法假设这些模块和类存在且符合约定。 - 配置契约:
SearchConfig类必须提供model_dump方法并能转换为与SearchEngine字段兼容的字典。