-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathauth_utils.py
More file actions
80 lines (59 loc) · 2.81 KB
/
auth_utils.py
File metadata and controls
80 lines (59 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""Utilities for OAuth 2.0 Resource Indicators (RFC 8707) and PKCE (RFC 7636)."""
import time
from urllib.parse import urlparse, urlsplit, urlunsplit
from pydantic import AnyUrl, HttpUrl
def resource_url_from_server_url(url: str | HttpUrl | AnyUrl) -> str:
"""Convert server URL to canonical resource URL per RFC 8707.
RFC 8707 section 2 states that resource URIs "MUST NOT include a fragment component".
Returns absolute URI with lowercase scheme/host for canonical form.
Args:
url: Server URL to convert
Returns:
Canonical resource URL string
"""
# Convert to string if needed
url_str = str(url)
# Parse the URL and remove fragment, create canonical form
parsed = urlsplit(url_str)
canonical = urlunsplit(parsed._replace(scheme=parsed.scheme.lower(), netloc=parsed.netloc.lower(), fragment=""))
return canonical
def check_resource_allowed(requested_resource: str, configured_resource: str) -> bool:
"""Check if a requested resource URL matches a configured resource URL.
A requested resource matches if it has the same scheme, domain, port,
and its path starts with the configured resource's path. This allows
hierarchical matching where a token for a parent resource can be used
for child resources.
Args:
requested_resource: The resource URL being requested
configured_resource: The resource URL that has been configured
Returns:
True if the requested resource matches the configured resource
"""
# Parse both URLs
requested = urlparse(requested_resource)
configured = urlparse(configured_resource)
# Compare scheme, host, and port (origin)
if requested.scheme.lower() != configured.scheme.lower() or requested.netloc.lower() != configured.netloc.lower():
return False
# Normalize trailing slashes before comparison so that
# "/foo" and "/foo/" are treated as equivalent.
requested_path = requested.path
configured_path = configured.path
if not requested_path.endswith("/"):
requested_path += "/"
if not configured_path.endswith("/"):
configured_path += "/"
# Check hierarchical match: requested must start with configured path.
# The trailing-slash normalization ensures "/api123/" won't match "/api/".
return requested_path.startswith(configured_path)
def calculate_token_expiry(expires_in: int | str | None) -> float | None:
"""Calculate token expiry timestamp from expires_in seconds.
Args:
expires_in: Seconds until token expiration (may be string from some servers)
Returns:
Unix timestamp when token expires, or None if no expiry specified
"""
if expires_in is None:
return None # pragma: no cover
# Defensive: handle servers that return expires_in as string
return time.time() + int(expires_in)