|
2 | 2 | from unittest import mock |
3 | 3 | from test import support |
4 | 4 | from test.support import ( |
5 | | - cpython_only, is_apple, os_helper, refleak_helper, socket_helper, threading_helper |
| 5 | + cpython_only, is_apple, os_helper, refleak_helper, script_helper, socket_helper, threading_helper |
6 | 6 | ) |
7 | 7 | from test.support.import_helper import ensure_lazy_imports |
8 | 8 | import _thread as thread |
|
28 | 28 | import struct |
29 | 29 | import sys |
30 | 30 | import tempfile |
| 31 | +import textwrap |
31 | 32 | import threading |
32 | 33 | import time |
33 | 34 | import traceback |
@@ -7498,6 +7499,57 @@ def close_fds(fds): |
7498 | 7499 | self.assertEqual(data, str(index).encode()) |
7499 | 7500 |
|
7500 | 7501 |
|
| 7502 | +@support.requires_subprocess() |
| 7503 | +@unittest.skipUnless(hasattr(sys, "gettotalrefcount"), |
| 7504 | + "requires sys.gettotalrefcount()") |
| 7505 | +class AuditHookLeakTests(unittest.TestCase): |
| 7506 | + # gh-146245: Reference and buffer may leaks in audit hook's failures path. |
| 7507 | + |
| 7508 | + def test_getaddrinfo_audit_hook_leak(self): |
| 7509 | + code = textwrap.dedent(""" |
| 7510 | + import socket |
| 7511 | + import sys |
| 7512 | + import gc |
| 7513 | + sys.addaudithook(lambda *a: (_ for _ in ()).throw(RuntimeError("audit"))) |
| 7514 | + gc.collect() |
| 7515 | + before = sys.gettotalrefcount() |
| 7516 | + for _ in range(100): |
| 7517 | + try: |
| 7518 | + socket.getaddrinfo(None, 80) |
| 7519 | + except RuntimeError: |
| 7520 | + pass |
| 7521 | + gc.collect() |
| 7522 | + after = sys.gettotalrefcount() |
| 7523 | + print(after - before) |
| 7524 | + """) |
| 7525 | + rc, out, err = script_helper.assert_python_ok("-c", code) |
| 7526 | + leaked = int(out.strip()) |
| 7527 | + self.assertLessEqual(leaked, 2, f"Leaked {leaked} references") |
| 7528 | + |
| 7529 | + def test_sendto_audit_hook_leak(self): |
| 7530 | + code = textwrap.dedent(""" |
| 7531 | + import socket |
| 7532 | + import sys |
| 7533 | + import gc |
| 7534 | + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 7535 | + sys.addaudithook(lambda *a: (_ for _ in ()).throw(RuntimeError("audit"))) |
| 7536 | + gc.collect() |
| 7537 | + before = sys.gettotalrefcount() |
| 7538 | + for _ in range(100): |
| 7539 | + try: |
| 7540 | + s.sendto(bytearray(b"x"), ("127.0.0.1", 80)) |
| 7541 | + except RuntimeError: |
| 7542 | + pass |
| 7543 | + gc.collect() |
| 7544 | + after = sys.gettotalrefcount() |
| 7545 | + s.close() |
| 7546 | + print(after - before) |
| 7547 | + """) |
| 7548 | + rc, out, err = script_helper.assert_python_ok("-c", code) |
| 7549 | + leaked = int(out.strip()) |
| 7550 | + self.assertLessEqual(leaked, 2, f"Leaked {leaked} references") |
| 7551 | + |
| 7552 | + |
7501 | 7553 | class FreeThreadingTests(unittest.TestCase): |
7502 | 7554 |
|
7503 | 7555 | def test_close_detach_race(self): |
|
0 commit comments