-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathplanner_client.py
More file actions
117 lines (92 loc) · 3.6 KB
/
planner_client.py
File metadata and controls
117 lines (92 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright 2026 Canonical Ltd.
# See LICENSE file for licensing details.
"""Client to interact with the planner service."""
import json
import logging
from dataclasses import dataclass
from typing import Iterable
from urllib.parse import urljoin
import requests
import requests.adapters
import urllib3
from pydantic import AnyHttpUrl, BaseModel
logger = logging.getLogger(__name__)
@dataclass
class PressureInfo:
"""Pressure information for a flavor returned by the planner service.
Attributes:
pressure: Desired total runner count for the flavor.
"""
pressure: int
class PlannerConfiguration(BaseModel):
"""Configuration inputs for the PlannerClient.
Attributes:
base_url: Base URL of the planner service.
token: Bearer token used to authenticate against the planner service.
timeout: Default timeout in seconds for HTTP requests.
"""
base_url: AnyHttpUrl
token: str
timeout: int = 5 * 60
class PlannerApiError(Exception):
"""Represents an error while interacting with the planner service."""
class PlannerClient: # pylint: disable=too-few-public-methods
"""An HTTP client for the planner service."""
def __init__(self, config: PlannerConfiguration) -> None:
"""Initialize client with planner configuration.
Args:
config: Planner service configuration containing base URL,
authentication token, and default request timeout.
"""
self._session = self._create_session()
self._config = config
def stream_pressure(self, name: str) -> Iterable[PressureInfo]:
"""Stream pressure updates for the given flavor.
Args:
name: Flavor name.
Yields:
Parsed pressure updates.
Raises:
PlannerApiError: On HTTP or stream errors.
"""
url = urljoin(str(self._config.base_url), f"/api/v1/flavors/{name}/pressure?stream=true")
try:
with self._session.get(
url,
headers={"Authorization": f"Bearer {self._config.token}"},
timeout=self._config.timeout,
stream=True,
) as response:
response.raise_for_status()
for line in response.iter_lines(decode_unicode=True):
if not line:
continue
try:
data = json.loads(line)
if not isinstance(data, dict) or name not in data:
logger.debug("Skipping non-pressure stream line: %s", line)
continue
yield PressureInfo(pressure=int(data[name]))
except json.JSONDecodeError:
logger.warning("Skipping malformed stream line: %s", line)
continue
except requests.RequestException as exc:
logger.exception("Error while streaming pressure for flavor '%s' from planner.", name)
raise PlannerApiError from exc
@staticmethod
def _create_session() -> requests.Session:
"""Create a requests session with retries and no env proxies.
Returns:
A configured `requests.Session` instance.
"""
adapter = requests.adapters.HTTPAdapter(
max_retries=urllib3.Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
)
)
session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
return session