gh-47798: Add a subprocess.run_pipeline() API#142080
Draft
gpshead wants to merge 40 commits intopython:mainfrom
Draft
gh-47798: Add a subprocess.run_pipeline() API#142080gpshead wants to merge 40 commits intopython:mainfrom
subprocess.run_pipeline() API#142080gpshead wants to merge 40 commits intopython:mainfrom
Conversation
Add a new run_pipeline() function to the subprocess module that enables
running multiple commands connected via pipes, similar to shell pipelines.
New API:
- run_pipeline(*commands, ...) - Run a pipeline of commands
- PipelineResult - Return type with commands, returncodes, stdout, stderr
- PipelineError - Raised when check=True and any command fails
Features:
- Supports arbitrary number of commands (minimum 2)
- capture_output, input, timeout, and check parameters like run()
- stdin= connects to first process, stdout= connects to last process
- Text mode support via text=True, encoding, errors
- All processes share a single stderr pipe for simplicity
- "pipefail" semantics: check=True fails if any command fails
Unlike run(), this function does not accept universal_newlines.
Use text=True instead.
Example:
result = subprocess.run_pipeline(
['cat', 'file.txt'],
['grep', 'pattern'],
['wc', '-l'],
capture_output=True, text=True
)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Document the new run_pipeline() function, PipelineResult class, and PipelineError exception in the subprocess module documentation. Includes: - Function signature with stdin, stdout, stderr, capture_output, etc. - Note about shared stderr pipe and text mode caveat for interleaved multi-byte character sequences - Note that universal_newlines is not supported (use text=True) - Explanation that stdin connects to first process, stdout to last - Usage examples showing basic pipelines, multi-command pipelines, input handling, and error handling with check=True - PipelineResult attributes: commands, returncodes, returncode, stdout, stderr, and check_returncodes() method - PipelineError attributes: commands, returncodes, stdout, stderr, and failed list Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Add _communicate_streams() helper function that properly multiplexes read/write operations to prevent pipe buffer deadlocks. The helper uses selectors on POSIX and threads on Windows, similar to Popen.communicate(). This fixes potential deadlocks when large amounts of data flow through the pipeline and significantly improves performance. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Add three tests that verify the multiplexed I/O implementation properly handles large data volumes that would otherwise cause pipe buffer deadlocks: - test_pipeline_large_data_no_deadlock: 256KB through 2-stage pipeline - test_pipeline_large_data_three_stages: 128KB through 3-stage pipeline - test_pipeline_large_data_with_stderr: 64KB with concurrent stderr These tests would timeout or deadlock without proper multiplexing. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Remove support for raw file descriptors in _communicate_streams(), requiring all streams to be file objects. This simplifies both the Windows and POSIX implementations by removing isinstance() checks and fd-wrapping logic. The run_pipeline() function now wraps the stderr pipe's read end with os.fdopen() immediately after creation. This change makes _communicate_streams() more compatible with Popen.communicate() which already uses file objects, enabling potential future refactoring to share the multiplexed I/O logic. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Update the test to write 64KB to stderr from each process (128KB total) instead of just small status messages. This better tests that the multiplexed I/O handles concurrent large data on both stdout and stderr without deadlocking. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
The comment suggested rewriting Popen._communicate() to use non-blocking I/O on file objects now that Python 3's io module is used instead of C stdio. This is unnecessary - the current approach using select() to detect ready fds followed by os.read()/os.write() is correct and efficient. The selector already solves "when is data ready?" so non-blocking mode would add complexity with no benefit. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Extract the core selector-based I/O loop into a new _communicate_io_posix() function that is shared by both _communicate_streams_posix() (used by run_pipeline) and Popen._communicate() (used by Popen.communicate). The new function: - Takes a pre-configured selector and output buffers - Supports resume via input_offset parameter (for Popen timeout retry) - Returns (new_offset, completed) instead of raising TimeoutExpired - Does not close streams (caller decides based on use case) This reduces code duplication and ensures both APIs use the same well-tested I/O multiplexing logic. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Move stdin writing to a background thread in _communicate_streams_windows to avoid blocking indefinitely when writing large input to a pipeline where the subprocess doesn't consume stdin quickly. This mirrors the fix made to Popen._communicate() for Windows in commit 5b1862b (pythongh-87512). Add test_pipeline_timeout_large_input to verify that TimeoutExpired is raised promptly when run_pipeline() is called with large input and a timeout, even when the first process is slow to consume stdin. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Apply the same fixes from Popen._communicate() to _communicate_streams_posix for run_pipeline(): 1. Handle non-byte memoryview input by casting to byte view (pythongh-134453): Non-byte memoryviews (e.g., int32 arrays) had incorrect length tracking because len() returns element count, not byte count. Now cast to "b" view for correct progress tracking. 2. Handle ValueError on stdin.flush() when stdin is closed (pythongh-74389): Ignore ValueError from flush() if stdin is already closed, matching the BrokenPipeError handling. Add tests for memoryview input to run_pipeline: - test_pipeline_memoryview_input: basic byte memoryview - test_pipeline_memoryview_input_nonbyte: int32 array memoryview Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Extract common stdin preparation logic into shared helper functions used by both _communicate_streams_posix() and Popen._communicate(): - _flush_stdin(stdin): Flush stdin, ignoring BrokenPipeError and ValueError (for closed files) - _make_input_view(input_data): Convert input data to a byte memoryview, handling non-byte memoryviews by casting to "b" view This ensures consistent behavior and makes the fixes for pythongh-134453 (memoryview) and pythongh-74389 (closed stdin) shared in one place. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
- Factor out _translate_newlines() as a module-level function, have Popen's method delegate to it for code sharing - Remove rejection of universal_newlines kwarg in run_pipeline(), treat it the same as text=True (consistent with Popen behavior) - Use _translate_newlines() for text mode decoding in run_pipeline() to properly handle \r\n and \r newline sequences - Update documentation to remove mention of universal_newlines rejection - Update test to verify universal_newlines=True works like text=True Co-authored-by: Claude <noreply@anthropic.com>
merwok
reviewed
Nov 29, 2025
This comment was marked as off-topic.
This comment was marked as off-topic.
…e-chaining-01R27VPueru4RfRXYDsV5TmW # Conflicts: # Lib/subprocess.py
text=/universal_newlines=/encoding=/errors= were forwarded to each per- command Popen, which wrapped parent-side pipes in TextIOWrapper. The threaded Windows _communicate_streams_* backend does fh.write(bytes) and fh.read()->bytes and so failed with TypeError/AttributeError. POSIX uses fd-level os.read/os.write and silently tolerated the mismatch. Pop those kwargs in run_pipeline and handle encoding at the pipeline boundary as already intended. Every parent-side pipe now stays binary, matching the documented _communicate_streams contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…_pipeline When run_pipeline() captured stderr but not stdout (e.g. stdout=DEVNULL with stderr=PIPE), a timeout would surface TimeoutExpired with stderr bytes in the .output field, because _communicate_streams used read_streams[0] regardless of which stream it actually was. Pass stdout and stderr explicitly to the helper and populate both TimeoutExpired.output and .stderr. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
stdin=PIPE without input= leaves a writable pipe owned by the parent that nobody writes or closes, so the first child blocks reading stdin forever. There is no useful semantic for this combination in run_pipeline (callers wanting to feed input use input=, callers wanting a file/fd pass it directly). Reject it explicitly with ValueError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
If close_fds=False is forwarded to each Popen, every child inherits copies of all the other children's pipe ends. Closing a write end in the parent then no longer signals EOF to the reader because other children still hold a copy open, leading to deadlocks. Reject explicit close_fds=False with ValueError; the default close_fds=True behavior is what works. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- errors=None default matches Popen convention (was 'strict', a divergence that bypassed the TextIOWrapper/bytes.decode default). - PipelineError now calls super().__init__(commands, returncodes) so e.args is populated, fixing repr() and pickle. - Drop the dead `if self.returncodes else None` fallback in PipelineResult.returncode; returncodes is always populated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cleanup `finally` block was killing and waiting on each child in turn, so a single hung wait() would leave later children un-killed. Match the kill-all-then-wait-all pattern already used by the timeout cleanup paths. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Rename _remaining_time_helper to _deadline_remaining. - Note that Popen._translate_newlines remains a method for subclass back-compat (logic moved to a module-level function). - Cap PipelineError.__str__ at three failures with "and N more" so a long failed pipeline doesn't produce an unwieldy message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
stderr=STDOUT redirects each child's stderr to its own stdout fd, so non-final processes route stderr into the next process's stdin - surprising for callers expecting shell-like 2>&1 to the pipeline's final stdout. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add tests for the check=True happy path, stderr=STDOUT routing of the final process's stderr, the intermediate-stdout-closed-in-parent contract, and pickle/repr round-tripping of PipelineError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
For consistency with CompletedProcess, returned by subprocess.run(). The error type PipelineError stays, paralleling CalledProcessError / CompletedProcess in the existing module. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- NEWS entry: drop `!` from :class: / :exc: markup so cross-references render. - run_pipeline: pop text=/universal_newlines= unconditionally so the short-circuited `or` chain no longer leaks universal_newlines into per-Popen kwargs when text= is truthy. - Use `is PIPE` and parenthesize the capture_stderr expression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Note that capturing stderr means every child shares a copy of the write fd, so a grandchild holding stderr open will prevent EOF and hang run_pipeline (matches shell 2>&1 | other behavior). - Note that pass_fds= is forwarded to every command in the pipeline. - run_pipeline docstring: clarify that stderr is also managed by the pipeline when capture is requested. - CompletedPipeline.returncode docs: clarify it matches shell-without- pipefail semantics — a zero returncode does not mean all commands succeeded; use check_returncodes() / check=True. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ed helper The refactor of Popen._communicate's POSIX inner loop into _communicate_io_posix dropped the inline close on EOF that the original loop did, deferring closes until after the loop completed. For Popen.communicate(), restore the original semantics by adding a close_on_eof= keyword to _communicate_io_posix and passing True from Popen._communicate. The new _communicate_streams_* used by run_pipeline keep close_on_eof=False (the default) and continue to close streams in their own post-loop cleanup. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…eout The refactor of Popen._communicate's POSIX inner loop into _communicate_io_posix moved input_offset tracking from the loop body to a return value. Add a regression test that writes more data than fits in the pipe buffer, triggers a TimeoutExpired mid-write, then calls communicate() again to drain the rest -- exercising the resume path through the new factoring. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add tests confirming that kwargs forwarded to each Popen reach every command in the pipeline: shell=True with simple shell pipelines, env= propagation, cwd= propagation, and pass_fds= sharing a single inheritable fd across all children. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ce, broken-pipe Three test gaps from review: - stderr=PIPE without capture_output= on the normal-completion path (existing tests only exercised the timeout path). - errors='replace' as the doc-recommended approach for tolerating multi-byte stderr that can split across reads from a shared pipe. - A middle process exiting early without consuming all input - ensures the producer's BrokenPipeError doesn't hang the pipeline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Both children share a kernel-level file description for the inherited fd, so the prior approach of `os.read` + `os.lseek` raced. Use `os.pread(fd, length, 0)` which doesn't touch the offset and is atomic with respect to concurrent reads. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This comment was marked as outdated.
This comment was marked as outdated.
…timeout Strengthen test_pipeline_timeout to assert that TimeoutExpired.output and TimeoutExpired.stderr are either None or bytes when a pipeline times out mid-flight. Both backends (POSIX selector and Windows threaded) populate these attributes from any partial reads, so the assertion is meaningful on every platform CI runs on. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop section-label and restate-the-code comments added with run_pipeline and its helpers, and reframe the remaining ones around the invariant they document (pipe-EOF on parent close, drain-writer-before-readers, multiplexing prevents buffer-fill deadlocks, _input_offset persists for resume) so future readers get the why, not a narration of the code. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ion point Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
f31429f to
86a1b19
Compare
…ams on Windows _communicate_io_posix referenced the POSIX-only _PIPE_BUF from unconditional module scope; move it into the non-Windows branch alongside _communicate_streams_posix. _communicate_streams_windows now closes each read stream once its reader thread has joined, matching the POSIX implementation and the helper's documented contract.
…ntry cap Negative returncodes on POSIX mean the child was killed by a signal; report them as "died with <Signals.SIGFOO>" rather than a bare negative integer, matching CalledProcessError. Also drop the 3-entry truncation: real pipelines are short enough that more than three failing stages is rare, and when it happens the traceback is exactly where a user wants to see every failure rather than "and N more". The full list remains on .failed regardless.
…skip timeout Hoist array and pickle to module-level imports rather than importing inside test methods. Correct the intermediate-stdout-close docstring (the producer hits a broken pipe, not EOF). Drop the Windows skip on test_pipeline_timeout: the body is platform-neutral and exercises the threaded backend's TimeoutExpired path too.
…awn failure test_pipeline_error_str_signal covers the negative-returncode rendering in PipelineError.__str__. test_pipeline_spawn_failure_cleans_up exercises the run_pipeline finally-block cleanup when a later stage fails to exec: stage 0 is already running and sleeping, stage 1's executable does not exist, and the call must return promptly with the OSError rather than hang on stage 0.
…ipe-chaining-01R27VPueru4RfRXYDsV5TmW
The pipeline replaces the shell; per-stage shell would re-introduce the quoting and injection surface this API exists to avoid. A future Stage() wrapper is the place for the rare stage that needs it.
…w entry The "Replacing shell pipeline" recipe now recommends run_pipeline() first and demotes the manual Popen chain to the streaming case. Note that PipelineError is a sibling of CalledProcessError, not a subclass.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This was a feature request from 2008. I'm undecided if we should add this feature, but it proved a good exercise and had me review a lot of existing subprocess.py code and notice a variety of other lingering subprocess issues while doing so (thus my recent spate of smaller subprocess PRs merged around when I opened this).
Summary
Read the docs in the PR for details, but it basically mirrors the
run()API, just with multiple commands:Design notes
A future
Stage(cmd, **overrides)wrapper is marked in-code as a natural extension point if per-stagestderr=STDOUT,pass_fds=,env=, etc. are wanted later.CompletedPipelineandPipelineErrorare siblings ofCompletedProcessandCalledProcessErrorunderSubprocessError, not subclasses. A pipeline carries N commands and N returncodes, so any single value picked forreturncode/cmdto satisfy theCalledProcessErrorparent contract would silently mislead existingexcept CalledProcessError as e:handlers -- particularly for thecheck=Truesaving-throw case where multiple stages may have failed. Users who want a single catch-all for any subprocess failure can useexcept SubprocessError:, the same umbrellaTimeoutExpiredalready lives under.Open questions
This matches the signaling behavior of
run()upon timeout, namely SIGKILL. But to each process. Q: Is that rude? should we start in a particular order so that their own SIGPIPE's propagate and wait a bit before we signal each? "what does a shell do" upon Ctrl-C is probably a good way to answer that. We've never allowed people to specify the specific signal used upon timeout inrun(), we could... but it'd always need to be in a "use this first, wait a bit, then resort to SIGKILL" sense to be reliable so it'd probably turn into two parameters and I doubt it would see a lot of use. I consider changing the signal to be a separate followon feature requestAlternative ideas
I was pondering the use of the
|pipe operator itself between objects. But this is unnatural and undesirable for Popen instances themselves as those start upon creation. Even though I could imagine allowing that to rewire their file descriptors. It'd get gross and raise questions around unclear management of the mess. You want processes started sequentially with the actual stdout->stdin chain of connections made from the start, so a run-like API makes sense to me.This lets people avoid using a shell.
It does not offer the same flexibility a raw Popen instance does though for people who need to do their own IO multiplexing. Though given you can provide whatever file object you want for input and output, that could still be done using this by having your own threads feed or consume those instead of relying on capture_output.
What PyPI subprocess pipe options exist?
I found two PyPI packages offering something resembling assembling pipelines of subprocesses:
Written entirely between my looking at subprocess sources, and driving a remote Claude Code for the web session and telling it what to do next. With the aid of gpshead/cpython-skills.
Co-authored-by: Claude Opus 4.5
Later picked up and updated locally using Opus 4.7.
📚 Documentation preview 📚: https://cpython-previews--142080.org.readthedocs.build/