Skip to content

event_loop

Classes:

  • Host

    A class that provides the necessary callbacks to run a gui event loop with a backend started using start_guest_run.

Functions:

  • run

    Run func to completion asynchronously in the current thread using a backend

Host

Bases: Generic[T]

A class that provides the necessary callbacks to run a gui event loop with a backend started using start_guest_run.

Methods:

  • current

    The host running in the corresponding thread or current thread.

  • run

    Run the loop in the current thread with a backend guest.

  • mainloop

    Start the main event loop of the host.

Attributes:

Source code in src/async_kernel/event_loop/run.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class Host(Generic[T]):
    """
    A class that provides the necessary callbacks to run a gui event loop with a `backend` started using `start_guest_run`.
    """

    HOST: Hosts
    MATPLOTLIB_GUIS = ()
    _subclasses: dict[Hosts, type[Self]] = {}
    _instances: dict[threading.Thread, Host] = {}

    _outcome: Outcome[T] | None = None
    start_guest: Callable[[], Any] = staticmethod(lambda: None)
    "A callback to start the guest. This must be called by a subclass."

    def __init_subclass__(cls) -> None:
        if cls.HOST is not Hosts.custom:
            cls._subclasses[cls.HOST] = cls

    @classmethod
    def current(cls, thread: threading.Thread | None = None) -> Host | None:
        "The host running in the corresponding thread or current thread."
        thread = thread or threading.current_thread()
        return cls._instances.get(thread)

    @classmethod
    def run(cls, func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings, /) -> T:
        "Run the loop in the current thread with a backend guest."

        if (thread := threading.current_thread()) in cls._instances:
            msg = "A host is already running in this thread"
            raise RuntimeError(msg)

        host = Hosts(settings.get("host"))
        backend = Backend(settings.get("backend", "asyncio"))
        backend_options = settings.get("backend_options") or {}
        host_options = settings.get("host_options") or {}

        if "host_class" in host_options:
            host_options = host_options.copy()
            cls_ = host_options.pop("host_class")
            if isinstance(cls_, str):
                cls_ = import_item(cls_)
            if not issubclass(cls_, cls):
                msg = f"{cls_} is not a subclass of {cls}!"
                raise TypeError(msg)
        else:
            assert host != backend
            if host not in cls._subclasses:
                import_module(f"async_kernel.event_loop.{host}_host")
                assert host in cls._subclasses, f"Host for {host=} is not implemented correctly!"
            cls_ = cls._subclasses[host]
        assert cls_.HOST is host

        host = cls_(**host_options)
        # set the `start_guest` function (runs once).
        backend_options.setdefault("host_uses_signal_set_wakeup_fd", host.host_uses_signal_set_wakeup_fd)
        start_guest_run = get_start_guest_run(backend)
        host.start_guest = lambda: [
            start_guest_run(
                func,
                *args,
                run_sync_soon_threadsafe=host.run_sync_soon_threadsafe,
                run_sync_soon_not_threadsafe=host.run_sync_soon_not_threadsafe,
                done_callback=host.done_callback,
                **backend_options,
            ),
            setattr(host, "start_guest", lambda: None),
        ][1]
        host._instances[thread] = host
        try:
            return host.mainloop()
        finally:
            host._instances.pop(threading.current_thread())

    # Override the methods/attributes below as required.
    host_uses_signal_set_wakeup_fd = False

    def run_sync_soon_threadsafe(self, fn: Callable[[], Any]) -> None: ...
    def run_sync_soon_not_threadsafe(self, fn: Callable[[], Any]) -> None: ...

    def done_callback(self, outcome: Outcome) -> None:
        self._outcome = outcome

    def mainloop(self) -> T:
        "Start the main event loop of the host."
        self.start_guest()  # Call at an appropriate time in the overriding subclass.
        if not self._outcome:
            msg = "The mainloop should only exit once done_callback has been called!"
            raise RuntimeError(msg)
        return self._outcome.unwrap()  # pragma: no cover

start_guest class-attribute instance-attribute

start_guest: Callable[[], Any] = staticmethod(lambda: None)

A callback to start the guest. This must be called by a subclass.

current classmethod

current(thread: Thread | None = None) -> Host | None

The host running in the corresponding thread or current thread.

Source code in src/async_kernel/event_loop/run.py
 97
 98
 99
100
101
@classmethod
def current(cls, thread: threading.Thread | None = None) -> Host | None:
    "The host running in the corresponding thread or current thread."
    thread = thread or threading.current_thread()
    return cls._instances.get(thread)

run classmethod

run(
    func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings
) -> T

Run the loop in the current thread with a backend guest.

Source code in src/async_kernel/event_loop/run.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@classmethod
def run(cls, func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings, /) -> T:
    "Run the loop in the current thread with a backend guest."

    if (thread := threading.current_thread()) in cls._instances:
        msg = "A host is already running in this thread"
        raise RuntimeError(msg)

    host = Hosts(settings.get("host"))
    backend = Backend(settings.get("backend", "asyncio"))
    backend_options = settings.get("backend_options") or {}
    host_options = settings.get("host_options") or {}

    if "host_class" in host_options:
        host_options = host_options.copy()
        cls_ = host_options.pop("host_class")
        if isinstance(cls_, str):
            cls_ = import_item(cls_)
        if not issubclass(cls_, cls):
            msg = f"{cls_} is not a subclass of {cls}!"
            raise TypeError(msg)
    else:
        assert host != backend
        if host not in cls._subclasses:
            import_module(f"async_kernel.event_loop.{host}_host")
            assert host in cls._subclasses, f"Host for {host=} is not implemented correctly!"
        cls_ = cls._subclasses[host]
    assert cls_.HOST is host

    host = cls_(**host_options)
    # set the `start_guest` function (runs once).
    backend_options.setdefault("host_uses_signal_set_wakeup_fd", host.host_uses_signal_set_wakeup_fd)
    start_guest_run = get_start_guest_run(backend)
    host.start_guest = lambda: [
        start_guest_run(
            func,
            *args,
            run_sync_soon_threadsafe=host.run_sync_soon_threadsafe,
            run_sync_soon_not_threadsafe=host.run_sync_soon_not_threadsafe,
            done_callback=host.done_callback,
            **backend_options,
        ),
        setattr(host, "start_guest", lambda: None),
    ][1]
    host._instances[thread] = host
    try:
        return host.mainloop()
    finally:
        host._instances.pop(threading.current_thread())

mainloop

mainloop() -> T

Start the main event loop of the host.

Source code in src/async_kernel/event_loop/run.py
162
163
164
165
166
167
168
def mainloop(self) -> T:
    "Start the main event loop of the host."
    self.start_guest()  # Call at an appropriate time in the overriding subclass.
    if not self._outcome:
        msg = "The mainloop should only exit once done_callback has been called!"
        raise RuntimeError(msg)
    return self._outcome.unwrap()  # pragma: no cover

run

Run func to completion asynchronously in the current thread using a backend with an optional host (gui event loop).

The default backend is 'asyncio'.

If host is specified in settings. A host (gui) mainloop will be started with the backend running as a guest (in the same thread). The backend will execute func asynchronously to completion. Once completed the backend and host are stopped and finally the result is returned.

Parameters:

Custom host

A custom host can be started by subclassing Host and passed as the 'host_class' as the class or a dotted path if it is importable.

Source code in src/async_kernel/event_loop/run.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def run(func: Callable[..., CoroutineType[Any, Any, T]], args: tuple, settings: RunSettings, /) -> T:
    """
    Run `func` to completion asynchronously in the current thread using a [backend][async_kernel.typing.Backend]
    with an optional host (gui event loop).

    The default backend is ['asyncio'][async_kernel.typing.Backend.asyncio].

    If [host][async_kernel.typing.Hosts] is specified in `settings`. A _host_ (gui) mainloop
    will be started with the `backend` running as a guest (in the same thread). The `backend`
    will execute `func` asynchronously to completion. Once completed the `backend` and `host`
    are stopped and finally the result is returned.

    Args:
        func: A coroutine function.
        args: Args to use when calling func.
        settings: Settings to use when running func.

    Custom host:
        A custom host can be started by subclassing [Host][] and passed as the 'host_class' as the
        class or a dotted path if it is importable.
    """
    if settings.get("host"):
        # A gui with the backend running as a guest.
        return Host.run(func, args, settings)
    # backend only.
    return anyio.run(
        func,
        *args,
        backend=Backend(settings.get("backend", "asyncio")),
        backend_options=settings.get("backend_options"),
    )

Bases: Host[T]

Methods:

Source code in src/async_kernel/event_loop/tk_host.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class TkHost(Host[T]):
    HOST = Hosts.tk
    MATPLOTLIB_GUIS = ("tk",)

    def __init__(self) -> None:
        root = tk.Tk()
        root.withdraw()
        self.root = root
        self._tk_func_name = root.register(self._tk_func)
        self._q = collections.deque()

    def _tk_func(self) -> None:
        self._q.popleft()()

    @override
    def run_sync_soon_threadsafe(self, fn: Callable[[], Any]) -> None:
        """
        Use Tcl "after" command to schedule a function call.

        Based on [tkinter source comments](https://github.com/python/cpython/blob/a5d6aba318ead9cc756ba750a70da41f5def3f8f/Modules/_tkinter.c#L1472-L1555)
        the issuance of the tcl call to after itself is thread-safe since it is sent
        to the [appropriate thread](https://github.com/python/cpython/blob/a5d6aba318ead9cc756ba750a70da41f5def3f8f/Modules/_tkinter.c#L814-L824>) on line 1522.
        `Tkapp_ThreadSend` effectively uses "after 0" while putting the command in the
        event queue so the ["after idle after 0"](https://wiki.tcl-lang.org/page/after#096aeab6629eae8b244ae2eb2000869fbe377fa988d192db5cf63defd3d8c061)` incantation
        is unnecessary here.

        Compare to [tkthread](https://github.com/serwy/tkthread/blob/1f612e1dd46e770bd0d0bb64d7ecb6a0f04875a3/tkthread/__init__.py#L163)
        where definitely thread unsafe [eval](https://github.com/python/cpython/blob/a5d6aba318ead9cc756ba750a70da41f5def3f8f/Modules/_tkinter.c#L1567-L1585)
        is used to send thread safe signals between tcl interpreters.
        """
        # self.root.after_idle(lambda:self.root.after(0, func)) # does a fairly intensive wrapping to each func
        self._q.append(fn)
        self.root.call("after", "idle", self._tk_func_name)

    @override
    def run_sync_soon_not_threadsafe(self, fn) -> None:
        """
        Use Tcl "after" command to schedule a function call from the main thread

        If .call is called from the Tcl thread, the locking and sending are optimized away
        so it should be fast enough.

        The incantation ["after idle after 0"](https://wiki.tcl-lang.org/page/after#096aeab6629eae8b244ae2eb2000869fbe377fa988d192db5cf63defd3d8c061)
        avoids blocking the normal event queue when faced with an unending stream of tasks, for
        example `while True: await trio.sleep(0)`.
        """
        self._q.append(fn)
        self.root.call("after", "idle", "after", 0, self._tk_func_name)
        # Not sure if this is actually an optimization because Tcl parses this eval string fresh each time.
        # However it's definitely thread unsafe because the string is fed directly into the Tcl interpreter
        # from the current Python thread
        # self.root.eval(f'after idle after 0 {self._tk_func_name}')

    @override
    def done_callback(self, outcome) -> None:
        """
        End the Tk app.
        """
        super().done_callback(outcome)
        self.root.destroy()

    @override
    def mainloop(self) -> T:
        self.start_guest()
        self.root.mainloop()
        return super().mainloop()

run_sync_soon_threadsafe

run_sync_soon_threadsafe(fn: Callable[[], Any]) -> None

Use Tcl "after" command to schedule a function call.

Based on tkinter source comments the issuance of the tcl call to after itself is thread-safe since it is sent to the appropriate thread on line 1522. Tkapp_ThreadSend effectively uses "after 0" while putting the command in the event queue so the "after idle after 0"` incantation is unnecessary here.

Compare to tkthread where definitely thread unsafe eval is used to send thread safe signals between tcl interpreters.

Source code in src/async_kernel/event_loop/tk_host.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@override
def run_sync_soon_threadsafe(self, fn: Callable[[], Any]) -> None:
    """
    Use Tcl "after" command to schedule a function call.

    Based on [tkinter source comments](https://github.com/python/cpython/blob/a5d6aba318ead9cc756ba750a70da41f5def3f8f/Modules/_tkinter.c#L1472-L1555)
    the issuance of the tcl call to after itself is thread-safe since it is sent
    to the [appropriate thread](https://github.com/python/cpython/blob/a5d6aba318ead9cc756ba750a70da41f5def3f8f/Modules/_tkinter.c#L814-L824>) on line 1522.
    `Tkapp_ThreadSend` effectively uses "after 0" while putting the command in the
    event queue so the ["after idle after 0"](https://wiki.tcl-lang.org/page/after#096aeab6629eae8b244ae2eb2000869fbe377fa988d192db5cf63defd3d8c061)` incantation
    is unnecessary here.

    Compare to [tkthread](https://github.com/serwy/tkthread/blob/1f612e1dd46e770bd0d0bb64d7ecb6a0f04875a3/tkthread/__init__.py#L163)
    where definitely thread unsafe [eval](https://github.com/python/cpython/blob/a5d6aba318ead9cc756ba750a70da41f5def3f8f/Modules/_tkinter.c#L1567-L1585)
    is used to send thread safe signals between tcl interpreters.
    """
    # self.root.after_idle(lambda:self.root.after(0, func)) # does a fairly intensive wrapping to each func
    self._q.append(fn)
    self.root.call("after", "idle", self._tk_func_name)

run_sync_soon_not_threadsafe

run_sync_soon_not_threadsafe(fn) -> None

Use Tcl "after" command to schedule a function call from the main thread

If .call is called from the Tcl thread, the locking and sending are optimized away so it should be fast enough.

The incantation "after idle after 0" avoids blocking the normal event queue when faced with an unending stream of tasks, for example while True: await trio.sleep(0).

Source code in src/async_kernel/event_loop/tk_host.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@override
def run_sync_soon_not_threadsafe(self, fn) -> None:
    """
    Use Tcl "after" command to schedule a function call from the main thread

    If .call is called from the Tcl thread, the locking and sending are optimized away
    so it should be fast enough.

    The incantation ["after idle after 0"](https://wiki.tcl-lang.org/page/after#096aeab6629eae8b244ae2eb2000869fbe377fa988d192db5cf63defd3d8c061)
    avoids blocking the normal event queue when faced with an unending stream of tasks, for
    example `while True: await trio.sleep(0)`.
    """
    self._q.append(fn)
    self.root.call("after", "idle", "after", 0, self._tk_func_name)

done_callback

done_callback(outcome) -> None

End the Tk app.

Source code in src/async_kernel/event_loop/tk_host.py
89
90
91
92
93
94
95
@override
def done_callback(self, outcome) -> None:
    """
    End the Tk app.
    """
    super().done_callback(outcome)
    self.root.destroy()

options: show_root_heading: true

Bases: Host[T]

Source code in src/async_kernel/event_loop/qt_host.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class QtHost(Host[T]):
    HOST = Hosts.qt
    MATPLOTLIB_GUIS = ("qt",)

    def __init__(self, module: Literal["PySide6", "PySide2", "PyQt5", "PyQt6"] = "PySide6") -> None:
        if threading.current_thread() is not threading.main_thread():
            msg = "QT can only be run in main thread!"
            raise RuntimeError(msg)

        globals()["QtCore"] = import_from(module, "QtCore")
        globals()["QtWidgets"] = import_from(module, "QtWidgets")

        REENTER_EVENT_TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())

        class ReenterEvent(QtCore.QEvent):
            fn: Callable[[], Any]

        class Reenter(QtCore.QObject):
            @override
            def event(self, event: ReenterEvent) -> Literal[False]:  # pyright: ignore[reportIncompatibleMethodOverride]
                event.fn()
                return False

        reenter = Reenter()

        if (app := QtWidgets.QApplication.instance()) is None:
            app = QtWidgets.QApplication([])
            app.setQuitOnLastWindowClosed(False)  # prevent app sudden death

        def run_soon_threadsafe(fn):
            event = ReenterEvent(REENTER_EVENT_TYPE)
            event.fn = fn
            app.postEvent(reenter, event)

        self.run_sync_soon_threadsafe = run_soon_threadsafe
        self.run_sync_soon_not_threadsafe = run_soon_threadsafe
        self.app = app

    @override
    def done_callback(self, outcome) -> None:
        super().done_callback(outcome)
        self.app.quit()

    @override
    def mainloop(self) -> T:
        self.start_guest()
        self.app.exec()
        return super().mainloop()

options: show_root_heading: true