Skip to content

event_loop

Classes:

  • Host

    A class that provides the necessary callbacks for start_guest_run.

Functions:

  • run

    Run func to completion asynchronously in the current thread using a backend

Host

Bases: Generic[T]

A class that provides the necessary callbacks for start_guest_run.

Methods:

  • current

    The host running in the corresponding thread or current thread.

  • run

    Run the loop in the current thread with a backend guest.

  • mainloop

    Start the main event loop of the host.

Attributes:

Source code in src/async_kernel/event_loop/run.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class Host(Generic[T]):
    """
    A class that provides the necessary callbacks for `start_guest_run`.
    """

    LOOP: Loop
    MATPLOTLIB_GUIS = ()
    _subclasses: dict[Loop, type[Self]] = {}
    _instances: dict[threading.Thread, Host] = {}

    _outcome: Outcome[T] | None = None
    start_guest: Callable[[], Any] = staticmethod(lambda: None)
    "A callback to start the guest. This must be called by a subclass."

    def __init_subclass__(cls) -> None:
        if cls.LOOP is not Loop.custom:
            cls._subclasses[cls.LOOP] = cls

    @classmethod
    def current(cls, thread: threading.Thread | None = None) -> Host | None:
        "The host running in the corresponding thread or current thread."
        thread = thread or threading.current_thread()
        return cls._instances.get(thread)

    @classmethod
    def run(cls, func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings, /) -> T:
        "Run the loop in the current thread with a backend guest."

        if (thread := threading.current_thread()) in cls._instances:
            msg = "A host is already running in this thread"
            raise RuntimeError(msg)

        loop = Loop(settings.get("loop"))
        backend = Backend(settings.get("backend", "asyncio"))
        backend_options = settings.get("backend_options") or {}
        loop_options = settings.get("loop_options") or {}

        if "host_class" in loop_options:
            loop_options = loop_options.copy()
            cls_ = loop_options.pop("host_class")
            if isinstance(cls_, str):
                cls_ = import_item(cls_)
            if not issubclass(cls_, cls):
                msg = f"{cls_} is not a subclass of {cls}!"
                raise TypeError(msg)
        else:
            assert loop != backend
            if loop not in cls._subclasses:
                import_module(f"async_kernel.event_loop.{loop}_host")
                assert loop in cls._subclasses, f"Host for {loop=} is not implemented correctly!"
            cls_ = cls._subclasses[loop]
        assert cls_.LOOP is loop

        host = cls_(**loop_options)
        # set the `start_guest` function (runs once).
        backend_options.setdefault("host_uses_signal_set_wakeup_fd", host.host_uses_signal_set_wakeup_fd)
        start_guest_run = get_start_guest_run(backend)
        host.start_guest = lambda: [
            start_guest_run(
                func,
                *args,
                run_sync_soon_threadsafe=host.run_sync_soon_threadsafe,
                run_sync_soon_not_threadsafe=host.run_sync_soon_not_threadsafe,
                done_callback=host.done_callback,
                **backend_options,
            ),
            setattr(host, "start_guest", lambda: None),
        ][1]
        host._instances[thread] = host
        try:
            return host.mainloop()
        finally:
            host._instances.pop(threading.current_thread())

    # Override the methods/attributes below as required.
    host_uses_signal_set_wakeup_fd = False

    def run_sync_soon_threadsafe(self, fn: Callable[[], Any]) -> None: ...
    def run_sync_soon_not_threadsafe(self, fn: Callable[[], Any]) -> None: ...

    def done_callback(self, outcome: Outcome) -> None:
        self._outcome = outcome

    def mainloop(self) -> T:
        "Start the main event loop of the host."
        self.start_guest()  # Call at an appropriate time in the overriding subclass.
        if not self._outcome:
            msg = "The mainloop should only exit once done_callback has been called!"
            raise RuntimeError(msg)
        return self._outcome.unwrap()  # pragma: no cover

start_guest class-attribute instance-attribute

start_guest: Callable[[], Any] = staticmethod(lambda: None)

A callback to start the guest. This must be called by a subclass.

current classmethod

current(thread: Thread | None = None) -> Host | None

The host running in the corresponding thread or current thread.

Source code in src/async_kernel/event_loop/run.py
 98
 99
100
101
102
@classmethod
def current(cls, thread: threading.Thread | None = None) -> Host | None:
    "The host running in the corresponding thread or current thread."
    thread = thread or threading.current_thread()
    return cls._instances.get(thread)

run classmethod

run(
    func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings
) -> T

Run the loop in the current thread with a backend guest.

Source code in src/async_kernel/event_loop/run.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@classmethod
def run(cls, func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings, /) -> T:
    "Run the loop in the current thread with a backend guest."

    if (thread := threading.current_thread()) in cls._instances:
        msg = "A host is already running in this thread"
        raise RuntimeError(msg)

    loop = Loop(settings.get("loop"))
    backend = Backend(settings.get("backend", "asyncio"))
    backend_options = settings.get("backend_options") or {}
    loop_options = settings.get("loop_options") or {}

    if "host_class" in loop_options:
        loop_options = loop_options.copy()
        cls_ = loop_options.pop("host_class")
        if isinstance(cls_, str):
            cls_ = import_item(cls_)
        if not issubclass(cls_, cls):
            msg = f"{cls_} is not a subclass of {cls}!"
            raise TypeError(msg)
    else:
        assert loop != backend
        if loop not in cls._subclasses:
            import_module(f"async_kernel.event_loop.{loop}_host")
            assert loop in cls._subclasses, f"Host for {loop=} is not implemented correctly!"
        cls_ = cls._subclasses[loop]
    assert cls_.LOOP is loop

    host = cls_(**loop_options)
    # set the `start_guest` function (runs once).
    backend_options.setdefault("host_uses_signal_set_wakeup_fd", host.host_uses_signal_set_wakeup_fd)
    start_guest_run = get_start_guest_run(backend)
    host.start_guest = lambda: [
        start_guest_run(
            func,
            *args,
            run_sync_soon_threadsafe=host.run_sync_soon_threadsafe,
            run_sync_soon_not_threadsafe=host.run_sync_soon_not_threadsafe,
            done_callback=host.done_callback,
            **backend_options,
        ),
        setattr(host, "start_guest", lambda: None),
    ][1]
    host._instances[thread] = host
    try:
        return host.mainloop()
    finally:
        host._instances.pop(threading.current_thread())

mainloop

mainloop() -> T

Start the main event loop of the host.

Source code in src/async_kernel/event_loop/run.py
163
164
165
166
167
168
169
def mainloop(self) -> T:
    "Start the main event loop of the host."
    self.start_guest()  # Call at an appropriate time in the overriding subclass.
    if not self._outcome:
        msg = "The mainloop should only exit once done_callback has been called!"
        raise RuntimeError(msg)
    return self._outcome.unwrap()  # pragma: no cover

run

Run func to completion asynchronously in the current thread using a backend with an optional gui event loop (host).

The default backend is 'asyncio'.

If loop is specified in settings. A host (gui) mainloop will be started with the backend running as a guest (in the same thread). The backend will execute func asynchronously to completion. Once completed the backend and host are stopped and finally the result is returned.

Parameters:

Custom loop

A custom event loop can be used by subclassing Host. The host can be specified in the settings as the option 'host_class'. The value can be the class or a dotted path if it is importable.

Source code in src/async_kernel/event_loop/run.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def run(func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings, /) -> T:
    """
    Run `func` to completion asynchronously in the current thread using a [backend][async_kernel.typing.Backend]
    with an optional gui event loop (_host_).

    The default backend is ['asyncio'][async_kernel.typing.Backend.asyncio].

    If [loop][async_kernel.typing.Loop] is specified in `settings`. A _host_ (gui) mainloop
    will be started with the `backend` running as a guest (in the same thread). The `backend`
    will execute `func` asynchronously to completion. Once completed the backend and host
    are stopped and finally the result is returned.

    Args:
        func: A coroutine function.
        args: Args to use when calling func.
        settings: Settings to use when running func.

    Custom loop:
        A custom event loop can be used by subclassing [Host][].
        The host can be specified in the settings as the option 'host_class'. The value
        can be the class or a dotted path if it is importable.
    """
    if settings.get("loop"):
        # A loop with the backend running as a guest.
        return Host.run(func, args, settings)
    # backend only.
    return anyio.run(
        func,
        *args,
        backend=Backend(settings.get("backend", "asyncio")),
        backend_options=settings.get("backend_options"),
    )