Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions meshtastic/stream_interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Stream Interface base class
"""
import contextlib
import io
import logging
import threading
Expand Down Expand Up @@ -61,9 +62,17 @@ def __init__( # pylint: disable=R0917

# Start the reader thread after superclass constructor completes init
if connectNow:
self.connect()
if not noProto:
self.waitForConfig()
try:
self.connect()
if not noProto:
self.waitForConfig()
except Exception:
# If the handshake raises, the caller never receives a reference
# to this instance and cannot call close() themselves. Clean up
# the reader thread + stream here so retries don't leak.
with contextlib.suppress(Exception):
self.close()
raise
Comment on lines +65 to +75
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New failure-path behavior (calling close() from __init__ when connect() / waitForConfig() raises) is important for leak prevention but currently isn’t covered by unit tests. Since there are existing tests for stream_interface.py, consider adding a test that forces connect() or waitForConfig() to raise and asserts cleanup occurs (e.g., close() is invoked and doesn’t raise when the thread wasn’t started).

Copilot generated this review using guidance from repository custom instructions.

def connect(self) -> None:
"""Connect to our radio
Expand Down Expand Up @@ -136,7 +145,13 @@ def close(self) -> None:
# reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
try:
self._rxThread.join() # wait for it to exit
except RuntimeError:
Comment on lines 147 to +150
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

join() without a timeout can block indefinitely if the reader thread is stuck in a blocking _readBytes() (e.g., TCPInterface uses blocking socket.recv() with no timeout). With the new __init__ exception handler calling self.close(), a handshake timeout against a silent TCP peer can hang here and never re-raise the original exception. Consider using a bounded join (and letting transport-specific close() logic interrupt the read), or otherwise ensuring blocking reads are interrupted before joining.

Copilot uses AI. Check for mistakes.
# Thread was never started — happens when close() is invoked
# from a failed __init__ before connect() could spawn it.
# Nothing to join; safe to ignore.
pass
Comment on lines +153 to +154
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If connect() fails before the reader thread is started (e.g., serial write raises before _rxThread.start()), this RuntimeError path will skip joining but also won’t close the underlying stream. For SerialInterface (which opens the port before calling StreamInterface.__init__), this means the serial port can remain open even though close() was invoked. Consider explicitly closing self.stream (and setting it to None) when the thread was never started / is not alive, instead of just pass.

Suggested change
# Nothing to join; safe to ignore.
pass
# In that case the reader thread cannot run _disconnected(),
# so close the stream here to avoid leaking the resource.
if self.stream is not None:
with contextlib.suppress(Exception):
self.stream.close()
self.stream = None

Copilot uses AI. Check for mistakes.

def _handleLogByte(self, b):
"""Handle a byte that is part of a log message from the device."""
Expand Down
95 changes: 95 additions & 0 deletions meshtastic/tests/test_stream_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,101 @@ def test_StreamInterface():
assert pytest_wrapped_e.type == Exception


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_close_safe_when_thread_never_started():
"""close() must not raise RuntimeError when called before connect() has started the reader.

Hits the cleanup path used by __init__ when the handshake raises before the
reader thread is started.
"""
iface = StreamInterface(noProto=True, connectNow=False)
iface.stream = MagicMock()
# _rxThread was created in __init__ but never .start()'d. close() should
# detect that and skip join() instead of raising RuntimeError.
iface.close()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleans_up_when_connect_raises():
"""If connect() raises during __init__, close() runs and the original exception propagates."""

cleanup_calls = []

class FailingConnectStream(StreamInterface):
"""Subclass whose connect() raises, to exercise the __init__ cleanup path."""

def __init__(self):
self.stream = MagicMock() # bypass StreamInterface abstract check
super().__init__(noProto=False, connectNow=True)

def connect(self):
raise RuntimeError("simulated handshake failure")

def close(self):
cleanup_calls.append("close")
super().close()

with pytest.raises(RuntimeError, match="simulated handshake failure"):
FailingConnectStream()
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake failure"


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleans_up_when_waitForConfig_raises():
"""If waitForConfig() raises after a successful connect(), close() runs and exception propagates."""

cleanup_calls = []

class FailingWaitStream(StreamInterface):
"""Subclass whose waitForConfig() raises, to exercise the second leg of cleanup."""

def __init__(self):
self.stream = MagicMock()
super().__init__(noProto=False, connectNow=True)

def connect(self):
# No-op connect — we are simulating handshake-stage failure, not connect-stage.
pass

def waitForConfig(self):
raise TimeoutError("simulated config-handshake timeout")

def close(self):
cleanup_calls.append("close")
super().close()

with pytest.raises(TimeoutError, match="simulated config-handshake timeout"):
FailingWaitStream()
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake timeout"


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleanup_does_not_shadow_original_exception():
"""If close() itself raises during __init__ cleanup, the original exception still propagates.

The cleanup uses contextlib.suppress(Exception) so that a secondary failure
in close() doesn't replace the real reason for the failed handshake.
"""

class CleanupRaisesStream(StreamInterface):
def __init__(self):
self.stream = MagicMock()
super().__init__(noProto=False, connectNow=True)

def connect(self):
raise RuntimeError("original handshake failure")

def close(self):
raise RuntimeError("secondary close failure — should be suppressed")

with pytest.raises(RuntimeError, match="original handshake failure"):
CleanupRaisesStream()


# Note: This takes a bit, so moving from unit to slow
@pytest.mark.unitslow
@pytest.mark.usefixtures("reset_mt_config")
Expand Down