Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions Lib/test/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sysconfig
import select
import shutil
import socket
import threading
import gc
import textwrap
Expand Down Expand Up @@ -1044,19 +1045,49 @@ def test_communicate_timeout_large_input(self):
# On Windows, stdin writing must also honor the timeout rather than
# blocking indefinitely when the pipe buffer fills.

# Input larger than typical pipe buffer (4-64KB on Windows)
input_data = b"x" * (128 * 1024)
input_data = b"x" * (128 * 1024) # > typical pipe buffer

# Cross-platform wake mechanism: the slow reader connects to a
# loopback TCP socket and blocks in select() on it (capped at 9s
# as a safety net we don't expect to hit). After phase 1 raises
# TimeoutExpired, the parent sends a byte to release the child so
# it drains stdin. A socket (rather than a raw pipe) is required
# because Windows select() only supports sockets, not arbitrary
# file descriptors.
server = socket.create_server(('127.0.0.1', 0), backlog=1)
server.settimeout(10) # bound the accept() if the child fails to start
port = server.getsockname()[1]
# The child sends one byte (low byte of its PID) first so the parent
# can detect the rare case of an unrelated process on the same host
# connecting to our ephemeral port before our child does. A single
# byte gives 1/256 collision odds, which is plenty for flake-prevention.
slow_reader = (
"import os, socket, sys, select; "
f"s = socket.create_connection(('127.0.0.1', {port}), timeout=9); "
"s.sendall(bytes([os.getpid() & 0xff])); "
"select.select([s], [], [], 9); "
"sys.stdout.buffer.write(sys.stdin.buffer.read())"
)

p = subprocess.Popen(
[sys.executable, "-c",
"import sys, time; "
"time.sleep(30); " # Don't read stdin for a long time
"sys.stdout.buffer.write(sys.stdin.buffer.read())"],
[sys.executable, "-c", slow_reader],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

conn = None
try:
conn, _ = server.accept()
server.close()
server = None

conn.settimeout(5)
peer_byte = conn.recv(1)
conn.settimeout(None)
self.assertEqual(peer_byte, bytes([p.pid & 0xff]),
f"loopback handshake byte {peer_byte!r} != "
f"low byte of child PID {p.pid} ({p.pid & 0xff:#x})")

timeout = 0.2
start = time.monotonic()
try:
Expand All @@ -1065,19 +1096,24 @@ def test_communicate_timeout_large_input(self):
elapsed = time.monotonic() - start
self.fail(
f"TimeoutExpired not raised. communicate() completed in "
f"{elapsed:.2f}s, but subprocess sleeps for 30s. "
f"{elapsed:.2f}s, but slow reader stalls for up to 9s. "
"Stdin writing blocked without enforcing timeout.")
except subprocess.TimeoutExpired:
elapsed = time.monotonic() - start

# Timeout should occur close to the specified timeout value,
# not after waiting for the subprocess to finish sleeping.
# Allow generous margin for slow CI, but must be well under
# the subprocess sleep time.
# the slow-reader's stall cap.
self.assertLess(elapsed, 5.0,
f"TimeoutExpired raised after {elapsed:.2f}s; expected ~{timeout}s. "
"Stdin writing blocked without checking timeout.")

# Release the slow reader so it stops blocking and drains stdin.
conn.sendall(b'go')
conn.close()
conn = None

# After timeout, continue communication. The remaining input
# should be sent and we should receive all data back.
stdout, stderr = p.communicate()
Expand All @@ -1087,6 +1123,10 @@ def test_communicate_timeout_large_input(self):
f"Expected {len(input_data)} bytes output but got {len(stdout)}")
self.assertEqual(stdout, input_data)
finally:
if conn is not None:
conn.close()
if server is not None:
server.close()
p.kill()
p.wait()

Expand Down
Loading