Skip to content
Open
20 changes: 16 additions & 4 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -869,10 +869,13 @@ Note that one can also create a shared queue by using a manager object -- see
bother you then you can instead use a queue created with a
:ref:`manager <multiprocessing-managers>`.

(1) After putting an object on an empty queue there may be an
infinitesimal delay before the queue's :meth:`~Queue.empty`
method returns :const:`False` and :meth:`~Queue.get_nowait` can
return without raising :exc:`queue.Empty`.
(1) After putting an object on an empty queue there may be a delay
before :meth:`~Queue.get_nowait` can return without raising
:exc:`queue.Empty`, because the feeder thread flushes objects to
the underlying pipe asynchronously. On platforms where
``sem_getvalue()`` is not implemented (for example macOS), the
queue's :meth:`~Queue.empty` method may also remain :const:`True`
during this delay.

(2) If multiple processes are enqueuing objects, it is possible for
the objects to be received at the other end out-of-order.
Expand Down Expand Up @@ -947,8 +950,17 @@ For an example of the usage of queues for interprocess communication see
Return ``True`` if the queue is empty, ``False`` otherwise. Because of
multithreading/multiprocessing semantics, this is not reliable.

On platforms where ``sem_getvalue()`` is implemented, this method
uses the same approximate size accounting as :meth:`~Queue.qsize`.
Otherwise, it may report ``True`` while items are still buffered and
waiting to be flushed to the underlying pipe.

May raise an :exc:`OSError` on closed queues. (not guaranteed)

.. versionchanged:: 3.15
On platforms where ``sem_getvalue()`` is implemented, this method
now uses semaphore-based queue size accounting.

.. method:: full()

Return ``True`` if the queue is full, ``False`` otherwise. Because of
Expand Down
12 changes: 11 additions & 1 deletion Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,17 @@ def qsize(self):
return self._maxsize - self._sem.get_value()

def empty(self):
return not self._poll()
# Preserve the historical "closed queue may raise OSError" behavior.
# q.close() is a no-op for unused queues, so this only raises once the
# reader end has actually been closed.
if self._closed:
self._poll()

try:
return self._sem.get_value() == self._maxsize
except NotImplementedError:
# Fallback for platforms without sem_getvalue() (for example macOS).
return not self._poll()

def full(self):
return self._sem._semlock._is_zero()
Expand Down
36 changes: 35 additions & 1 deletion Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,18 @@ def test_get(self):
break
self.assertEqual(queue_empty(queue), False)

self.assertEqual(queue.get_nowait(), 1)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
try:
value = queue.get_nowait()
except pyqueue.Empty:
# Queue.empty() may become false before the feeder thread
# flushes objects to the pipe.
continue
else:
break
else:
self.fail("queue.get_nowait() unexpectedly raised Empty")
self.assertEqual(value, 1)
self.assertEqual(queue.get(True, None), 2)
self.assertEqual(queue.get(True), 3)
self.assertEqual(queue.get(timeout=1), 4)
Expand Down Expand Up @@ -1320,6 +1331,29 @@ def test_qsize(self):
self.assertEqual(q.qsize(), 0)
close_queue(q)

def test_empty_uses_semaphore_count(self):
if self.TYPE != 'processes':
self.skipTest(f'test not appropriate for {self.TYPE}')

q = self.Queue()
try:
q._sem.get_value()
except NotImplementedError:
close_queue(q)
self.skipTest('sem_getvalue not implemented on this platform')

q.put('sentinel')
original_poll = q._poll
q._poll = lambda timeout=0.0: False
try:
Copy link
Copy Markdown
Contributor

@YvesDup YvesDup Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if try and finally are very useful here ?

self.assertFalse(q.empty())
finally:
q._poll = original_poll

self.assertEqual(q.get(timeout=support.SHORT_TIMEOUT), 'sentinel')
self.assertTrue(q.empty())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a test when queue is empty which looks like your first one just above ?

close_queue(q)

@classmethod
def _test_task_done(cls, q):
for obj in iter(q.get, None):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed :meth:`multiprocessing.Queue.empty` to use semaphore-based size
accounting on platforms that support ``sem_getvalue()``, making its behavior
consistent with :meth:`multiprocessing.Queue.qsize` and
:meth:`multiprocessing.Queue.full`.
Loading