forked from langfuse/langfuse-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest.py
More file actions
140 lines (118 loc) · 4.41 KB
/
request.py
File metadata and controls
140 lines (118 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""@private"""
import json
import logging
from base64 import b64encode
from typing import Any, List, Union
import httpx
from langfuse._utils.serializer import EventSerializer
class LangfuseClient:
_public_key: str
_secret_key: str
_base_url: str
_version: str
_timeout: int
_session: httpx.Client
def __init__(
self,
public_key: str,
secret_key: str,
base_url: str,
version: str,
timeout: int,
session: httpx.Client,
):
self._public_key = public_key
self._secret_key = secret_key
self._base_url = base_url
self._version = version
self._timeout = timeout
self._session = session
def generate_headers(self) -> dict:
return {
"Authorization": "Basic "
+ b64encode(
f"{self._public_key}:{self._secret_key}".encode("utf-8")
).decode("ascii"),
"Content-Type": "application/json",
"x_langfuse_sdk_name": "python",
"x_langfuse_sdk_version": self._version,
"x_langfuse_public_key": self._public_key,
}
def batch_post(self, **kwargs: Any) -> httpx.Response:
"""Post the `kwargs` to the batch API endpoint for events"""
log = logging.getLogger("langfuse")
log.debug("uploading data: %s", kwargs)
res = self.post(**kwargs)
return self._process_response(
res, success_message="data uploaded successfully", return_json=False
)
def post(self, **kwargs: Any) -> httpx.Response:
"""Post the `kwargs` to the API"""
log = logging.getLogger("langfuse")
url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion"
data = json.dumps(kwargs, cls=EventSerializer, ensure_ascii=False)
log.debug("making request: %s to %s", data, url)
headers = self.generate_headers()
res = self._session.post(
url, content=data, headers=headers, timeout=self._timeout
)
if res.status_code == 200:
log.debug("data uploaded successfully")
return res
def _remove_trailing_slash(self, url: str) -> str:
"""Removes the trailing slash from a URL"""
if url.endswith("/"):
return url[:-1]
return url
def _process_response(
self, res: httpx.Response, success_message: str, *, return_json: bool = True
) -> Union[httpx.Response, Any]:
log = logging.getLogger("langfuse")
log.debug("received response: %s", res.text)
if res.status_code in (200, 201):
log.debug(success_message)
if return_json:
try:
return res.json()
except json.JSONDecodeError:
raise APIError(res.status_code, "Invalid JSON response received")
else:
return res
elif res.status_code == 207:
try:
payload = res.json()
errors = payload.get("errors", [])
if errors:
raise APIErrors(
[
APIError(
error.get("status"),
error.get("message", "No message provided"),
error.get("error", "No error details provided"),
)
for error in errors
]
)
else:
return res.json() if return_json else res
except json.JSONDecodeError:
raise APIError(res.status_code, "Invalid JSON response received")
try:
payload = res.json()
raise APIError(res.status_code, payload)
except (KeyError, ValueError):
raise APIError(res.status_code, res.text)
class APIError(Exception):
def __init__(self, status: Union[int, str], message: str, details: Any = None):
self.message = message
self.status = status
self.details = details
def __str__(self) -> str:
msg = "{0} ({1}): {2}"
return msg.format(self.message, self.status, self.details)
class APIErrors(Exception):
def __init__(self, errors: List[APIError]):
self.errors = errors
def __str__(self) -> str:
errors = ", ".join(str(error) for error in self.errors)
return f"[Langfuse] {errors}"