Skip to content

feat(modules): async modules#1920

Open
paul-nechifor wants to merge 1 commit intodevfrom
paul/feat/async-modules
Open

feat(modules): async modules#1920
paul-nechifor wants to merge 1 commit intodevfrom
paul/feat/async-modules

Conversation

@paul-nechifor
Copy link
Copy Markdown
Contributor

@paul-nechifor paul-nechifor commented Apr 26, 2026

Problem

Having so many threads in modules makes code hard to write.

Closes DIM-812

Solution

Breaking Changes

None.

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 26, 2026

Greptile Summary

This PR introduces a first-class async programming model for modules: an @arpc decorator for dual-mode dispatch, a spawn/process_observable API for scheduling coroutines on the module's dedicated event loop, an async def main() generator contract for lifecycle setup/teardown, auto-binding of async handle_* methods, and an AsyncSpecProxy that bridges synchronous RPC calls into awaitables for callers that type their refs with async def specs.

  • P0 — asyncio.Task(loop=loop) in _logging_task_factory (module.py:718): The loop kwarg was removed in Python 3.10; every coroutine scheduled on any module's event loop immediately raises TypeError, breaking the entire async task system on supported Python versions.

Confidence Score: 2/5

Not safe to merge: _logging_task_factory is broken on Python ≥ 3.10, making the entire async task system non-functional.

A single P0 finding caps the score at 2. The asyncio.Task(loop=loop) call in _logging_task_factory raises TypeError on any Python ≥ 3.10, which is the actively supported range. This factory is set on every module's event loop via get_loop(), so every coroutine scheduled by spawn(), process_observable(), or _start_main() would fail to create a task.

dimos/core/module.py — specifically _logging_task_factory at line 718

Important Files Changed

Filename Overview
dimos/core/module.py Adds async module infrastructure (get_loop, spawn, process_observable, _start_main/_stop_main, _auto_bind_handlers, _make_async_dispatch, logging helpers); asyncio.Task(loop=loop) in _logging_task_factory is broken on Python ≥ 3.10 (P0)
dimos/core/core.py Adds @ARPC decorator with dual-mode dispatch (same-loop → return coroutine, other-thread → run_coroutine_threadsafe + block); None-loop guard correctly handled with an early RuntimeError
dimos/core/rpc_client.py Adds AsyncSpecProxy that wraps sync RPC calls with run_in_executor to expose them as awaitables; default thread pool saturation is a potential concern under load
dimos/core/coordination/module_coordinator.py Extends _connect_module_refs to detect async spec methods and wrap proxies in AsyncSpecProxy; _async_methods_of_spec correctly inspects the MRO; logic looks sound
dimos/core/test_async_module_rpc.py Integration test for @ARPC cross-module call via AsyncSpecProxy; 100ms timeout may be flaky on slow CI
dimos/core/test_async_module_main.py Comprehensive tests for async main generator lifecycle (setup, teardown, zero yields, two yields, teardown error, setup error); good coverage
dimos/core/test_async_module_dispatch_serialization.py Tests LATEST-mailbox coalescing and serialization contract for process_observable dispatcher; well-structured with timing-based assertions
dimos/core/test_async_module_handles.py Tests auto-binding of async handle_* methods; straightforward and correct
dimos/core/test_async_module_process_observable.py Tests process_observable with an rx.interval source; verifies order and completeness of 26-letter sequence
dimos/core/test_async_module_rpc_sync_to_async.py Tests bidirectional sync-to-async RPC interop across three connected modules; 100ms timeout may be flaky on slow CI
docs/usage/modules.md Documentation updates for async module patterns; no code issues

Sequence Diagram

sequenceDiagram
    participant C as Sync Caller
    participant W as arpc Wrapper
    participant EL as Module Event Loop
    participant P as AsyncSpecProxy
    participant TP as ThreadPoolExecutor
    participant RL as Remote Module Loop

    C->>W: handle_x(msg) from RPC dispatcher
    W->>W: get_running_loop raises, running=None
    W->>EL: run_coroutine_threadsafe(coro)
    W->>W: future.result() blocks caller
    EL->>EL: await coro
    EL-->>C: result returned

    EL->>P: await self._ref.remote_method(x)
    P->>TP: run_in_executor(None, RpcCall)
    TP->>RL: run_coroutine_threadsafe(coro)
    RL-->>TP: result
    TP-->>EL: Future resolved

    C->>EL: start - run_coroutine_threadsafe gen anext
    EL->>EL: setup code then yield
    EL-->>C: setup done, start returns
    C->>EL: stop - run_coroutine_threadsafe gen anext
    EL->>EL: teardown then StopAsyncIteration
    EL-->>C: teardown done, stop returns
Loading

Reviews (4): Last reviewed commit: "feat(modules): async modules" | Re-trigger Greptile

Comment thread dimos/asdf.py Outdated
Comment thread dimos/core/core.py Outdated
Comment thread dimos/core/module.py Outdated
Comment on lines +264 to +275
def _log_async_handler_error(self, fut: Any) -> None:
try:
fut.result()
except (asyncio.CancelledError, RuntimeError):
pass # loop stopped or task cancelled during shutdown
except BaseException as e:
# Include exception type+message in the event string so it is
# visible on consoles whose formatters strip exc_info/traceback.
logger.exception(
f"Unhandled error in async task on {type(self).__name__}._loop: "
f"{type(e).__name__}: {e}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 RuntimeError catch swallows user exceptions from spawned tasks

The handler silently discards any RuntimeError raised inside user-written coroutines scheduled via spawn() or process_observable(). RuntimeError is extremely common in Python (e.g., StopIteration propagation, generator misuse, and countless library errors). The intent — catching "event loop is closed" — is better served by narrowing the RuntimeError arm:

import concurrent.futures

def _log_async_handler_error(self, fut: Any) -> None:
    try:
        fut.result()
    except (asyncio.CancelledError, concurrent.futures.CancelledError):
        pass  # task cancelled during shutdown
    except RuntimeError as e:
        if "event loop" in str(e).lower() or "loop is closed" in str(e).lower():
            pass  # loop shut down before task completed
        else:
            logger.exception(
                f"Unhandled error in async task on {type(self).__name__}._loop: "
                f"{type(e).__name__}: {e}"
            )
    except BaseException as e:
        logger.exception(
            f"Unhandled error in async task on {type(self).__name__}._loop: "
            f"{type(e).__name__}: {e}"
        )

As written, any RuntimeError in a spawned task is swallowed, defeating the whole purpose of spawn() over bare run_coroutine_threadsafe.

@paul-nechifor paul-nechifor force-pushed the paul/feat/async-modules branch from 02b6e09 to 079c62a Compare April 28, 2026 22:28
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 28, 2026

Want your agent to iterate on Greptile's feedback? Try greploops.

Comment thread dimos/core/module.py
@paul-nechifor paul-nechifor force-pushed the paul/feat/async-modules branch from 079c62a to c87457c Compare April 28, 2026 23:54
@paul-nechifor paul-nechifor force-pushed the paul/feat/async-modules branch from c87457c to c9a4d56 Compare April 29, 2026 01:42
Comment thread dimos/core/module.py
Adds a done callback to log unhandled exceptions from any task created on
the loop.
"""
task = asyncio.Task(coro, loop=loop, **kwargs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 asyncio.Task(loop=...) removed in Python 3.10+

asyncio.Task.__init__ dropped the loop keyword argument in Python 3.10 (deprecated in 3.8). On Python ≥ 3.10, every coroutine scheduled on a module's loop will immediately raise TypeError: __init__() got an unexpected keyword argument 'loop', silently breaking the entire async task system. Inside a task factory the loop is already the running loop, so the fix is simply to drop the kwarg:

Suggested change
task = asyncio.Task(coro, loop=loop, **kwargs)
task = asyncio.Task(coro, **kwargs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant