Skip to content

Commit 2c56a12

Browse files
committed
Update prose tests
1 parent 581b63a commit 2c56a12

10 files changed

Lines changed: 188 additions & 136 deletions

File tree

pymongo/asynchronous/helpers.py

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
OperationFailure,
3636
)
3737
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
38-
from pymongo.lock import _async_create_lock
3938

4039
_IS_SYNC = False
4140

@@ -79,8 +78,6 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7978

8079
_BACKOFF_INITIAL = 0.1
8180
_BACKOFF_MAX = 10
82-
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
83-
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8481

8582

8683
def _backoff(
@@ -90,78 +87,32 @@ def _backoff(
9087
return jitter * min(initial_delay * (2**attempt), max_delay)
9188

9289

93-
class _TokenBucket:
94-
"""A token bucket implementation for rate limiting."""
95-
96-
def __init__(
97-
self,
98-
capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY,
99-
return_rate: float = DEFAULT_RETRY_TOKEN_RETURN,
100-
):
101-
self.lock = _async_create_lock()
102-
self.capacity = capacity
103-
self.tokens = capacity
104-
self.return_rate = return_rate
105-
106-
async def consume(self) -> bool:
107-
"""Consume a token from the bucket if available."""
108-
async with self.lock:
109-
if self.tokens >= 1:
110-
self.tokens -= 1
111-
return True
112-
return False
113-
114-
async def deposit(self, retry: bool = False) -> None:
115-
"""Deposit a token back into the bucket."""
116-
retry_token = 1 if retry else 0
117-
async with self.lock:
118-
self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate)
119-
120-
12190
class _RetryPolicy:
122-
"""A retry limiter that performs exponential backoff with jitter.
123-
124-
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
125-
a prolonged outage or high load.
126-
"""
91+
"""A retry limiter that performs exponential backoff with jitter."""
12792

12893
def __init__(
12994
self,
130-
token_bucket: _TokenBucket,
13195
attempts: int = MAX_ADAPTIVE_RETRIES,
13296
backoff_initial: float = _BACKOFF_INITIAL,
13397
backoff_max: float = _BACKOFF_MAX,
13498
):
135-
self.token_bucket = token_bucket
13699
self.attempts = attempts
137100
self.backoff_initial = backoff_initial
138101
self.backoff_max = backoff_max
139-
self.adaptive_retry = False
140-
141-
async def record_success(self, retry: bool) -> None:
142-
"""Record a successful operation."""
143-
if self.adaptive_retry:
144-
await self.token_bucket.deposit(retry)
145102

146103
def backoff(self, attempt: int) -> float:
147-
"""Return the backoff duration for the given ."""
104+
"""Return the backoff duration for the given attempt."""
148105
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)
149106

150107
async def should_retry(self, attempt: int, delay: float) -> bool:
151-
"""Return if we have budget to retry and how long to backoff."""
108+
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
152109
if attempt > self.attempts:
153110
return False
154111

155-
# If the delay would exceed the deadline, bail early before consuming a token.
156112
if _csot.get_timeout():
157113
if time.monotonic() + delay > _csot.get_deadline():
158114
return False
159115

160-
# Check token bucket last since we only want to consume a token if we actually retry.
161-
if self.adaptive_retry and not await self.token_bucket.consume():
162-
# DRIVERS-3246 Improve diagnostics when this case happens.
163-
# We could add info to the exception and log.
164-
return False
165116
return True
166117

167118

pymongo/asynchronous/mongo_client.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
7171
from pymongo.asynchronous.helpers import (
7272
_RetryPolicy,
73-
_TokenBucket,
7473
)
7574
from pymongo.asynchronous.settings import TopologySettings
7675
from pymongo.asynchronous.topology import Topology, _ErrorContext
@@ -894,7 +893,7 @@ def __init__(
894893
self._options.read_concern,
895894
)
896895

897-
self._retry_policy = _RetryPolicy(_TokenBucket())
896+
self._retry_policy = _RetryPolicy(attempts=self._options.max_adaptive_retries)
898897

899898
self._init_based_on_options(self._seeds, srv_max_hosts, srv_service_name)
900899

@@ -2820,7 +2819,6 @@ async def run(self) -> T:
28202819
self._check_last_error(check_csot=True)
28212820
try:
28222821
res = await self._read() if self._is_read else await self._write()
2823-
await self._retry_policy.record_success(self._attempt_number > 0)
28242822
# Track whether the transaction has completed a command.
28252823
# If we need to apply backpressure to the first command,
28262824
# we will need to revert back to starting state.
@@ -2929,9 +2927,12 @@ async def run(self) -> T:
29292927
transaction.attempt = 0
29302928

29312929
if (
2932-
self._server is not None
2933-
and self._client.topology_description.topology_type_name == "Sharded"
2934-
or exc.has_error_label("SystemOverloadedError")
2930+
self._client.options.enable_overload_retargeting
2931+
and self._server is not None
2932+
and (
2933+
self._client.topology_description.topology_type_name == "Sharded"
2934+
or exc.has_error_label("SystemOverloadedError")
2935+
)
29352936
):
29362937
self._deprioritized_servers.append(self._server)
29372938

pymongo/client_options.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,6 @@ def __init__(
235235
self.__server_monitoring_mode = options.get(
236236
"servermonitoringmode", common.SERVER_MONITORING_MODE
237237
)
238-
self.__adaptive_retries = (
239-
options.get("adaptive_retries", common.ADAPTIVE_RETRIES)
240-
if "adaptive_retries" in options
241-
else options.get("adaptiveretries", common.ADAPTIVE_RETRIES)
242-
)
243238
self.__max_adaptive_retries = (
244239
options.get("max_adaptive_retries", common.MAX_ADAPTIVE_RETRIES)
245240
if "max_adaptive_retries" in options

pymongo/common.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,6 @@
140140
# Default value for serverMonitoringMode
141141
SERVER_MONITORING_MODE = "auto" # poll/stream/auto
142142

143-
# Default value for adaptiveRetries
144-
ADAPTIVE_RETRIES = False
145-
146143
# Default value for max adaptive retries
147144
MAX_ADAPTIVE_RETRIES = 2
148145

@@ -747,7 +744,6 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
747744
"srvmaxhosts": validate_non_negative_integer,
748745
"timeoutms": validate_timeoutms,
749746
"servermonitoringmode": validate_server_monitoring_mode,
750-
"adaptiveretries": validate_boolean_or_string,
751747
"maxadaptiveretries": validate_non_negative_integer,
752748
"enableoverloadretargeting": validate_boolean_or_string,
753749
}
@@ -783,7 +779,6 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
783779
"server_selector": validate_is_callable_or_none,
784780
"auto_encryption_opts": validate_auto_encryption_opts_or_none,
785781
"authoidcallowedhosts": validate_list,
786-
"adaptive_retries": validate_boolean_or_string,
787782
"max_adaptive_retries": validate_non_negative_integer,
788783
"enable_overload_retargeting": validate_boolean_or_string,
789784
}

pymongo/synchronous/helpers.py

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
OperationFailure,
3636
)
3737
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
38-
from pymongo.lock import _create_lock
3938

4039
_IS_SYNC = True
4140

@@ -79,8 +78,6 @@ def inner(*args: Any, **kwargs: Any) -> Any:
7978

8079
_BACKOFF_INITIAL = 0.1
8180
_BACKOFF_MAX = 10
82-
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
83-
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8481

8582

8683
def _backoff(
@@ -90,78 +87,32 @@ def _backoff(
9087
return jitter * min(initial_delay * (2**attempt), max_delay)
9188

9289

93-
class _TokenBucket:
94-
"""A token bucket implementation for rate limiting."""
95-
96-
def __init__(
97-
self,
98-
capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY,
99-
return_rate: float = DEFAULT_RETRY_TOKEN_RETURN,
100-
):
101-
self.lock = _create_lock()
102-
self.capacity = capacity
103-
self.tokens = capacity
104-
self.return_rate = return_rate
105-
106-
def consume(self) -> bool:
107-
"""Consume a token from the bucket if available."""
108-
with self.lock:
109-
if self.tokens >= 1:
110-
self.tokens -= 1
111-
return True
112-
return False
113-
114-
def deposit(self, retry: bool = False) -> None:
115-
"""Deposit a token back into the bucket."""
116-
retry_token = 1 if retry else 0
117-
with self.lock:
118-
self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate)
119-
120-
12190
class _RetryPolicy:
122-
"""A retry limiter that performs exponential backoff with jitter.
123-
124-
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
125-
a prolonged outage or high load.
126-
"""
91+
"""A retry limiter that performs exponential backoff with jitter."""
12792

12893
def __init__(
12994
self,
130-
token_bucket: _TokenBucket,
13195
attempts: int = MAX_ADAPTIVE_RETRIES,
13296
backoff_initial: float = _BACKOFF_INITIAL,
13397
backoff_max: float = _BACKOFF_MAX,
13498
):
135-
self.token_bucket = token_bucket
13699
self.attempts = attempts
137100
self.backoff_initial = backoff_initial
138101
self.backoff_max = backoff_max
139-
self.adaptive_retry = False
140-
141-
def record_success(self, retry: bool) -> None:
142-
"""Record a successful operation."""
143-
if self.adaptive_retry:
144-
self.token_bucket.deposit(retry)
145102

146103
def backoff(self, attempt: int) -> float:
147-
"""Return the backoff duration for the given ."""
104+
"""Return the backoff duration for the given attempt."""
148105
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)
149106

150107
def should_retry(self, attempt: int, delay: float) -> bool:
151-
"""Return if we have budget to retry and how long to backoff."""
108+
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
152109
if attempt > self.attempts:
153110
return False
154111

155-
# If the delay would exceed the deadline, bail early before consuming a token.
156112
if _csot.get_timeout():
157113
if time.monotonic() + delay > _csot.get_deadline():
158114
return False
159115

160-
# Check token bucket last since we only want to consume a token if we actually retry.
161-
if self.adaptive_retry and not self.token_bucket.consume():
162-
# DRIVERS-3246 Improve diagnostics when this case happens.
163-
# We could add info to the exception and log.
164-
return False
165116
return True
166117

167118

pymongo/synchronous/mongo_client.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@
113113
from pymongo.synchronous.command_cursor import CommandCursor
114114
from pymongo.synchronous.helpers import (
115115
_RetryPolicy,
116-
_TokenBucket,
117116
)
118117
from pymongo.synchronous.settings import TopologySettings
119118
from pymongo.synchronous.topology import Topology, _ErrorContext
@@ -894,7 +893,7 @@ def __init__(
894893
self._options.read_concern,
895894
)
896895

897-
self._retry_policy = _RetryPolicy(_TokenBucket())
896+
self._retry_policy = _RetryPolicy(attempts=self._options.max_adaptive_retries)
898897

899898
self._init_based_on_options(self._seeds, srv_max_hosts, srv_service_name)
900899

@@ -2810,7 +2809,6 @@ def run(self) -> T:
28102809
self._check_last_error(check_csot=True)
28112810
try:
28122811
res = self._read() if self._is_read else self._write()
2813-
self._retry_policy.record_success(self._attempt_number > 0)
28142812
# Track whether the transaction has completed a command.
28152813
# If we need to apply backpressure to the first command,
28162814
# we will need to revert back to starting state.
@@ -2919,9 +2917,12 @@ def run(self) -> T:
29192917
transaction.attempt = 0
29202918

29212919
if (
2922-
self._server is not None
2923-
and self._client.topology_description.topology_type_name == "Sharded"
2924-
or exc.has_error_label("SystemOverloadedError")
2920+
self._client.options.enable_overload_retargeting
2921+
and self._server is not None
2922+
and (
2923+
self._client.topology_description.topology_type_name == "Sharded"
2924+
or exc.has_error_label("SystemOverloadedError")
2925+
)
29252926
):
29262927
self._deprioritized_servers.append(self._server)
29272928

test/asynchronous/test_client_backpressure.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from time import perf_counter
2222
from unittest.mock import patch
2323

24+
from pymongo import ReadPreference
2425
from pymongo.common import MAX_ADAPTIVE_RETRIES
2526

2627
sys.path[0:0] = [""]
@@ -36,7 +37,7 @@
3637

3738
import pymongo
3839
from pymongo.asynchronous import helpers
39-
from pymongo.asynchronous.helpers import _RetryPolicy, _TokenBucket
40+
from pymongo.asynchronous.helpers import _RetryPolicy
4041
from pymongo.errors import OperationFailure, PyMongoError
4142

4243
_IS_SYNC = False
@@ -235,7 +236,7 @@ async def test_01_operation_retry_uses_exponential_backoff(self, random_func):
235236

236237
@async_client_context.require_failCommand_appName
237238
async def test_03_overload_retries_limited(self):
238-
# Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of three times.
239+
# Drivers should test that overload errors are retried a maximum of two times.
239240

240241
# 1. Let `client` be a `MongoClient`.
241242
client = self.client
@@ -265,6 +266,41 @@ async def test_03_overload_retries_limited(self):
265266
# 6. Assert that the total number of started commands is MAX_ADAPTIVE_RETRIES + 1.
266267
self.assertEqual(len(self.listener.started_events), MAX_ADAPTIVE_RETRIES + 1)
267268

269+
@async_client_context.require_failCommand_appName
270+
async def test_04_overload_retries_limited_configured(self):
271+
# Drivers should test that overload errors are retried a maximum of maxAdaptiveRetries times.
272+
max_retries = 1
273+
274+
# 1. Let `client` be a `MongoClient` with `maxAdaptiveRetries=1` and command event monitoring enabled.
275+
client = await self.async_single_client(
276+
maxAdaptiveRetries=max_retries, event_listeners=[self.listener]
277+
)
278+
# 2. Let `coll` be a collection.
279+
coll = client.pymongo_test.coll
280+
281+
# 3. Configure the following failpoint:
282+
failpoint = {
283+
"configureFailPoint": "failCommand",
284+
"mode": "alwaysOn",
285+
"data": {
286+
"failCommands": ["find"],
287+
"errorCode": 462, # IngressRequestRateLimitExceeded
288+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
289+
},
290+
}
291+
292+
# 4. Perform a find operation with `coll` that fails.
293+
async with self.fail_point(failpoint):
294+
with self.assertRaises(PyMongoError) as error:
295+
await coll.find_one({})
296+
297+
# 5. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
298+
self.assertIn("RetryableError", str(error.exception))
299+
self.assertIn("SystemOverloadedError", str(error.exception))
300+
301+
# 6. Assert that the total number of started commands is max_retries + 1.
302+
self.assertEqual(len(self.listener.started_events), max_retries + 1)
303+
268304

269305
# Location of JSON test specifications.
270306
if _IS_SYNC:

0 commit comments

Comments
 (0)