Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=protected-access,bad-continuation,missing-function-docstring

from tests.unit.vertexai.genai.replays import pytest_helper
from vertexai._genai import types
from google.genai import errors
import pytest


def test_get_a2a_task(client):
# Use the autopush environment.
client._api_client._http_options.base_url = (
"https://us-central1-autopush-aiplatform.sandbox.googleapis.com/"
)
agent_engine = client.agent_engines.create()
assert isinstance(agent_engine, types.AgentEngine)
assert isinstance(agent_engine.api_resource, types.ReasoningEngine)
# Use the internal API version for internal API access.
client._api_client._http_options.api_version = "internal"

created_task = client.agent_engines.a2a_tasks.create(
name=agent_engine.api_resource.name,
a2a_task_id="task123",
config=types.CreateAgentEngineTaskConfig(context_id="context123"),
)
assert isinstance(created_task, types.A2aTask)

# Test validity check.
task = client.agent_engines.a2a_tasks.get(
name=created_task.name,
)
assert task.name == f"{agent_engine.api_resource.name}/a2aTasks/task123"

client.agent_engines.a2a_tasks.delete(name=created_task.name)

with pytest.raises(errors.ClientError, match="404 NOT_FOUND"):
client.agent_engines.a2a_tasks.get(name=created_task.name)

# Clean up resources.
client.agent_engines.delete(name=agent_engine.api_resource.name, force=True)


pytestmark = pytest_helper.setup(
file=__file__,
globals_for_file=globals(),
)


pytest_plugins = ("pytest_asyncio",)


@pytest.mark.asyncio
async def test_get_a2a_task_async(client):
# Use the autopush environment.
client.aio._api_client._http_options.base_url = (
"https://us-central1-autopush-aiplatform.sandbox.googleapis.com/"
)
agent_engine = client.agent_engines.create()
assert isinstance(agent_engine, types.AgentEngine)
assert isinstance(agent_engine.api_resource, types.ReasoningEngine)
# Use the internal API version for internal API access.
client.aio._api_client._http_options.api_version = "internal"

created_task = await client.aio.agent_engines.a2a_tasks.create(
name=agent_engine.api_resource.name,
a2a_task_id="task123",
config=types.CreateAgentEngineTaskConfig(context_id="context123"),
)
assert isinstance(created_task, types.A2aTask)

# Test validity check.
task = await client.aio.agent_engines.a2a_tasks.get(
name=created_task.name,
)
assert task.name == f"{agent_engine.api_resource.name}/a2aTasks/task123"

await client.aio.agent_engines.a2a_tasks.delete(name=created_task.name)

with pytest.raises(errors.ClientError, match="404 NOT_FOUND"):
await client.aio.agent_engines.a2a_tasks.get(name=created_task.name)

# Clean up resources.
client.agent_engines.delete(name=agent_engine.api_resource.name, force=True)
123 changes: 123 additions & 0 deletions vertexai/_genai/a2a_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ def _CreateAgentEngineTaskRequestParameters_to_vertex(
return to_object


def _DeleteAgentEngineTaskRequestParameters_to_vertex(
from_object: Union[dict[str, Any], object],
parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
to_object: dict[str, Any] = {}
if getv(from_object, ["name"]) is not None:
setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

return to_object


def _GetAgentEngineTaskRequestParameters_to_vertex(
from_object: Union[dict[str, Any], object],
parent_object: Optional[dict[str, Any]] = None,
Expand Down Expand Up @@ -128,6 +139,62 @@ def _ListAgentEngineTasksRequestParameters_to_vertex(

class A2aTasks(_api_module.BaseModule):

def delete(
self,
*,
name: str,
config: Optional[types.DeleteAgentEngineTaskConfigOrDict] = None,
) -> None:
"""
Deletes an agent engine task.

Args:
name (str): Required. The name of the Agent Engine task to delete. Format:
`projects/{project}/locations/{location}/reasoningEngines/{resource_id}/a2aTasks/{task_id}`.
config (DeleteAgentEngineTaskConfig):
Optional. Additional configurations for deleting the Agent Engine task.

Returns:
None

"""

parameter_model = types._DeleteAgentEngineTaskRequestParameters(
name=name,
config=config,
)

request_url_dict: Optional[dict[str, str]]
if not self._api_client.vertexai:
raise ValueError("This method is only supported in the Vertex AI client.")
else:
request_dict = _DeleteAgentEngineTaskRequestParameters_to_vertex(
parameter_model
)
request_url_dict = request_dict.get("_url")
if request_url_dict:
path = "{name}".format_map(request_url_dict)
else:
path = "{name}"

query_params = request_dict.get("_query")
if query_params:
path = f"{path}?{urlencode(query_params)}"
# TODO: remove the hack that pops config.
request_dict.pop("config", None)

http_options: Optional[types.HttpOptions] = None
if (
parameter_model.config is not None
and parameter_model.config.http_options is not None
):
http_options = parameter_model.config.http_options

request_dict = _common.convert_to_dict(request_dict)
request_dict = _common.encode_unserializable_types(request_dict)

self._api_client.request("delete", path, request_dict, http_options)

def get(
self,
*,
Expand Down Expand Up @@ -372,6 +439,62 @@ def events(self) -> "a2a_task_events_module.A2aTaskEvents":

class AsyncA2aTasks(_api_module.BaseModule):

async def delete(
self,
*,
name: str,
config: Optional[types.DeleteAgentEngineTaskConfigOrDict] = None,
) -> None:
"""
Deletes an agent engine task.

Args:
name (str): Required. The name of the Agent Engine task to delete. Format:
`projects/{project}/locations/{location}/reasoningEngines/{resource_id}/a2aTasks/{task_id}`.
config (DeleteAgentEngineTaskConfig):
Optional. Additional configurations for deleting the Agent Engine task.

Returns:
None

"""

parameter_model = types._DeleteAgentEngineTaskRequestParameters(
name=name,
config=config,
)

request_url_dict: Optional[dict[str, str]]
if not self._api_client.vertexai:
raise ValueError("This method is only supported in the Vertex AI client.")
else:
request_dict = _DeleteAgentEngineTaskRequestParameters_to_vertex(
parameter_model
)
request_url_dict = request_dict.get("_url")
if request_url_dict:
path = "{name}".format_map(request_url_dict)
else:
path = "{name}"

query_params = request_dict.get("_query")
if query_params:
path = f"{path}?{urlencode(query_params)}"
# TODO: remove the hack that pops config.
request_dict.pop("config", None)

http_options: Optional[types.HttpOptions] = None
if (
parameter_model.config is not None
and parameter_model.config.http_options is not None
):
http_options = parameter_model.config.http_options

request_dict = _common.convert_to_dict(request_dict)
request_dict = _common.encode_unserializable_types(request_dict)

await self._api_client.async_request("delete", path, request_dict, http_options)

async def get(
self,
*,
Expand Down
8 changes: 8 additions & 0 deletions vertexai/_genai/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from .common import _DeleteAgentEngineRequestParameters
from .common import _DeleteAgentEngineSandboxRequestParameters
from .common import _DeleteAgentEngineSessionRequestParameters
from .common import _DeleteAgentEngineTaskRequestParameters
from .common import _DeleteDatasetRequestParameters
from .common import _DeleteMultimodalDatasetRequestParameters
from .common import _DeletePromptVersionRequestParameters
Expand Down Expand Up @@ -301,6 +302,9 @@
from .common import DeleteAgentEngineSessionOperation
from .common import DeleteAgentEngineSessionOperationDict
from .common import DeleteAgentEngineSessionOperationOrDict
from .common import DeleteAgentEngineTaskConfig
from .common import DeleteAgentEngineTaskConfigDict
from .common import DeleteAgentEngineTaskConfigOrDict
from .common import DeletePromptConfig
from .common import DeletePromptConfigDict
from .common import DeletePromptConfigOrDict
Expand Down Expand Up @@ -1148,6 +1152,9 @@
from .common import WorkerPoolSpecOrDict

__all__ = [
"DeleteAgentEngineTaskConfig",
"DeleteAgentEngineTaskConfigDict",
"DeleteAgentEngineTaskConfigOrDict",
"GetAgentEngineTaskConfig",
"GetAgentEngineTaskConfigDict",
"GetAgentEngineTaskConfigOrDict",
Expand Down Expand Up @@ -2172,6 +2179,7 @@
"MessageDict",
"Importance",
"ParsedResponseUnion",
"_DeleteAgentEngineTaskRequestParameters",
"_GetAgentEngineTaskRequestParameters",
"_ListAgentEngineTasksRequestParameters",
"_CreateAgentEngineTaskRequestParameters",
Expand Down
46 changes: 46 additions & 0 deletions vertexai/_genai/types/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,52 @@ class OptimizationMethod(_common.CaseInSensitiveEnum):
"""The data driven prompt optimizer designer for prompts from Android core API."""


class DeleteAgentEngineTaskConfig(_common.BaseModel):
"""Config for deleting an Agent Engine Task."""

http_options: Optional[genai_types.HttpOptions] = Field(
default=None, description="""Used to override HTTP request options."""
)


class DeleteAgentEngineTaskConfigDict(TypedDict, total=False):
"""Config for deleting an Agent Engine Task."""

http_options: Optional[genai_types.HttpOptionsDict]
"""Used to override HTTP request options."""


DeleteAgentEngineTaskConfigOrDict = Union[
DeleteAgentEngineTaskConfig, DeleteAgentEngineTaskConfigDict
]


class _DeleteAgentEngineTaskRequestParameters(_common.BaseModel):
"""Parameters for deleting an agent engine task."""

name: Optional[str] = Field(
default=None, description="""Name of the agent engine task."""
)
config: Optional[DeleteAgentEngineTaskConfig] = Field(
default=None, description=""""""
)


class _DeleteAgentEngineTaskRequestParametersDict(TypedDict, total=False):
"""Parameters for deleting an agent engine task."""

name: Optional[str]
"""Name of the agent engine task."""

config: Optional[DeleteAgentEngineTaskConfigDict]
""""""


_DeleteAgentEngineTaskRequestParametersOrDict = Union[
_DeleteAgentEngineTaskRequestParameters, _DeleteAgentEngineTaskRequestParametersDict
]


class GetAgentEngineTaskConfig(_common.BaseModel):
"""Config for getting an Agent Engine Task."""

Expand Down
Loading