Skip to content

interface

Modules:

  • base

    The base class definition to interface with the kernel.

  • callable

    A collection of objects to provide a kernel interface based on callbacks.

  • ip_app

    An IPython application with a zmq interface.

  • zmq

    Defines a base kernel interface using zmq sockets.

Classes:

Functions:

BaseInterface

Bases: Application, AsyncContextManagerMixin, Generic[T_shell_co]

The base class for kernel interface (singleton).

The interface creates the kernel and provides external communication. It is also the parent object for all objects that subclass from HasInterface. Configurable objects that subclass from HasInterface inherit their configuration from the interface (Application).

Usage

launch:

Interface.launch_instance()
async context:
async with Interface() as interface:
    interface.kernel
    ...

Methods:

  • initialized

    Has an instance been created?

  • instance

    Get the singleton instance that was created using launch_instance.

  • initialize

    Initialize the interface DO NOT CALL DIRECTLY.

  • start

    Start the interface blocking until it stops.

  • run

    Run the kernel.

  • stop

    Stop the kernel and this interface.

  • input_request

    Forward an input request to the frontend.

  • msg

    Create a new message.

  • iopub_send

    Send an iopub message.

Attributes:

Source code in src/async_kernel/interface/base.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class BaseInterface(Application, anyio.AsyncContextManagerMixin, Generic[T_shell_co]):
    """
    The base class for kernel interface (singleton).

    The interface creates the kernel and provides external communication. It is also
    the parent object for all objects that subclass from `HasInterface`. Configurable
    objects that subclass from `HasInterface` inherit their configuration from the
    interface (Application).

    Usage:
        launch:
            ```python
            Interface.launch_instance()
            ```
        async context:
            ```python
            async with Interface() as interface:
                interface.kernel
                ...
            ```
    """

    classes: ClassesType = final([])
    "The classes registered with the interface."

    aliases: dict[str | tuple[str, ...], str] = {  # pyright: ignore[reportIncompatibleVariableOverride]
        ("name", "n"): "BaseInterface.name",
        "launcher": "BaseInterface.launcher",
        "timeout": "BaseShell.timeout",
        "kernel_class": "BaseInterface.kernel_class",
        "shell_class": "BaseInterface.shell_class",
        "help_links": "Kernel.help_links",
        "supported_features": "Kernel.supported_features",
        "interface_class": "BaseInterface.interface_class",
    } | Application.aliases
    ""
    flags = {
        "quiet": ({"BaseInterface": {"quiet": True}}, "Only send stdout/stderr to output stream."),
        "no-quiet": ({"BaseInterface": {"quiet": False}}, "Only send stdout/stderr to output stream."),
    } | Application.flags
    ""

    name = traitlets.Unicode("async").tag(config=True)
    "The name of the kernel used in the kernelspec."

    host: traitlets.TraitType[Hosts | None, Hosts | None] = traitlets.UseEnum(
        Hosts, default_value=None, allow_none=True
    ).tag(config=True)
    "The name of a (gui) event loop (if one is used)."

    host_options = DictValueLiteralEval(allow_none=True).tag(config=True)
    "Options for starting the loop."

    backend: traitlets.TraitType[Backend, Backend] = traitlets.UseEnum(Backend).tag(config=True)
    "The type of asynchronous backend used. Options are 'asyncio' or 'trio'."

    backend_options = DictValueLiteralEval(allow_none=True).tag(config=True)
    "Options for starting the backend."

    interface_class: traitlets.Type[type[Self], type[Self] | str] = traitlets.Type(
        "async_kernel.interface.base.BaseInterface"
    ).tag(  # pyright: ignore[reportAssignmentType]
        config=True
    )
    "The interface class to use when launching."

    kernel_class: traitlets.Type[type[Kernel[Self, T_shell_co]], type[Kernel[Self, T_shell_co]] | str] = traitlets.Type(
        "async_kernel.Kernel"
    ).tag(  # pyright: ignore[reportAssignmentType]
        config=True
    )
    "The Kernel class to use when creating the kernel."

    shell_class: traitlets.Type[type[T_shell_co], type[T_shell_co] | str] = traitlets.Type(
        "async_kernel.shell.ipshell.IPShell", "async_kernel.shell.BaseShell"
    ).tag(  # pyright: ignore[reportAssignmentType]
        config=True
    )
    "The class to use for shells and subshells."

    quiet = traitlets.Bool(True).tag(config=True)
    "Only send stdout/stderr to output stream."

    launcher = traitlets.Unicode("").tag(config=True)
    "The value used to import the interface using [async_kernel.kernelspec.import_launcher][]."

    kernel: Fixed[Self, Kernel[Self, T_shell_co]] = Fixed(
        lambda c: c["owner"].kernel_class(c["owner"], c["owner"].shell_class)
    )
    "The kernel."

    callers: Fixed[Self, dict[Literal[Channel.shell, Channel.control], Caller]] = Fixed(dict)
    "The caller associated with the kernel once it has started."

    started = Fixed(Pending)
    "A Pending that is set when the interface has started."

    stopping = Fixed(Pending)
    """
    A Pending that is set when stop is called.
    """

    _instance: Self | None = None
    _zmq_context = None
    last_interrupt_frame = None

    @traitlets.default("backend")
    def _default_backend(self) -> Backend:
        try:
            return Backend(current_async_library())
        except AsyncLibraryNotFoundError:
            if (
                not self.host
                and not self.trait_has_value("backend_options")
                and (importlib.util.find_spec("winloop") or importlib.util.find_spec("uvloop"))
            ):
                self.backend_options["use_uvloop"] = True
            return Backend.asyncio

    @traitlets.default("shell_class")
    def _default_shell_class(self):
        # We use a method to delay IPython import until it is needed
        from async_kernel.shell.ipshell import IPShell  # noqa: PLC0415

        return IPShell

    @property
    def summary(self) -> str:
        return f"name={self.name!r} backend={str(self.backend)!r}"

    @classmethod
    @override
    def initialized(cls) -> bool:
        """Has an instance been created?"""
        return cls._instance is not None

    @classmethod
    @override
    def instance(cls) -> Self:
        "Get the singleton instance that was created using `launch_instance`."
        if not cls._instance:
            msg = "An instance does not exist!"
            raise RuntimeError(msg)
        if not isinstance(cls._instance, cls):
            msg = f"An instance exists but it is not an instance of {cls}!"
            raise TypeError(msg)
        return cls._instance

    @classmethod
    @override
    def clear_instance(cls) -> None:
        raise NotImplementedError

    @classmethod
    @override
    def launch_instance(
        cls,
        argv: list[str] | None = None,
        kernel_class: type[Kernel[Self, T_shell_co]] | None = None,
        shell_class: type[T_shell_co] | None = None,
        **kwargs: Any,
    ) -> None:
        app = None
        if BaseInterface._instance:
            msg = "An interface already exists!"
            raise RuntimeError(msg)
        try:
            app = cls(argv, kernel_class=kernel_class, shell_class=shell_class, **kwargs)
            app.start()
            app.exit()
        except BaseException:
            del app
            BaseInterface._instance = None
            gc.collect()
            raise

    def __new__(cls, argv: list | None | NoValue = NoValue, /, **kwargs) -> Self:  # noqa: ARG004  # pyright: ignore[reportInvalidTypeForm]
        if BaseInterface._instance:
            msg = "An interface already exists!"
            raise RuntimeError(msg)
        BaseInterface._instance = super().__new__(cls, **kwargs)
        return BaseInterface._instance

    def __init__(
        self,
        argv: list | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        /,
        *,
        kernel_class: type[Kernel[Self, T_shell_co]] | str | None = None,
        shell_class: type[T_shell_co] | str | None = None,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)

        for name, value in [("kernel_class", kernel_class), ("shell_class", shell_class)]:
            if value:
                self.set_trait(name, value)
        self.initialize(argv)

    @override
    def initialize(self, argv: None | list | NoValue = NoValue) -> None:  # pyright: ignore[reportInvalidTypeForm]
        """
        Initialize the interface **DO NOT CALL DIRECTLY**.
        """
        assert self._instance is self

        def initialized(argv: Any = NoValue) -> None:
            msg = "Already initialized!"
            raise RuntimeError(msg)

        self.initialize = initialized

        # Environment variables
        if not os.environ.get("MPLBACKEND"):
            os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
        if not os.environ.get("UV_PROJECT_ENVIRONMENT"):
            os.environ["UV_PROJECT_ENVIRONMENT"] = sys.prefix
        self.parse_command_line([] if argv is NoValue else argv)
        self.interface_class = self.__class__

    @override
    def start(self) -> None:
        """
        Start the interface blocking until it stops.

        Warning:
            - Running in a thread other than the 'MainThread' is permitted, but discouraged.
            - Blocking calls can only be interrupted in the 'MainThread' because
                [*'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'*](https://docs.python.org/3/library/threading.html#module-threading).
            - Some libraries may assume the call is occurring in the 'MainThread'.
            - If there is an `asyncio` or `trio` event loop already running in the desired thread;
                start asynchronously instead (`async with interface: ...`).
        """
        if BaseInterface._instance is not self:
            msg = "This interface is not the global instance!"
            raise RuntimeError(msg)

        settings = RunSettings(
            backend=self.backend,
            backend_options=self.backend_options,
            host=self.host,
            host_options=self.host_options,
        )
        try:
            async_kernel.event_loop.run(self.run, (), settings)
        finally:
            if BaseInterface._instance is self:
                BaseInterface._instance = None

    @asynccontextmanager
    async def __asynccontextmanager__(self, *, set_started=True) -> AsyncGenerator[Self]:
        def cache_iopub_send(*args, **kwargs) -> None:  # pragma: no cover
            # Cache iopub messages, send when started or discard if stopped early.
            self.started.add_done_callback(lambda _: not self.stopping.done() and send(*args, **kwargs))

        if self.stopping.done() or self.started.done():
            msg = "Stopped early"
            raise RuntimeError(msg)

        send = self.iopub_send
        self.iopub_send = cache_iopub_send
        self.started.add_done_callback(lambda _: delattr(self, "iopub_send"))

        caller = Caller(
            "manual",
            name="Shell",
            protected=True,
            log=self.log,
            zmq_context=self._zmq_context,
            host=self.host,
        )
        self.callers[Channel.shell] = caller
        self.callers[Channel.control] = caller.get(name="Control", log=self.log, protected=True)
        self.backend = Backend(current_async_library())
        try:
            async with caller:
                with anyio.CancelScope() as scope:

                    def stop(_):
                        self.log.info("Stopping kernel")
                        caller.call_later(0.5, scope.cancel, "Stopping kernel")

                    self.stopping.add_done_callback(stop)
                    try:
                        async with self.kernel.running():
                            if set_started:
                                self._started()
                            yield self
                    finally:
                        self.stop()
        finally:
            if BaseInterface._instance is self:
                BaseInterface._instance = None
            self.log.info("Interface stopped")

    def _started(self):
        self.log.info("Interface started: %s", self.summary)
        self.started.set_result(None)

    async def run(self, *, stopped: Callable[[], Any] | None = None) -> None:
        """
        Run the kernel.

        Args:
            stopped: An optional callback that is called when the kernel has stopped.

        This method requires that a [Caller][async_kernel.caller.Caller] instance does not already exist in the current thread.
        """
        try:
            async with self:
                # Wait forever. This will exit when stop is called.
                await async_sleep_forever()
        finally:
            if stopped:
                stopped()

    def stop(self) -> None:
        """
        Stop the kernel and this interface.
        """
        self.stopping.set_result(None)
        if not self.started.done():
            self.started.cancel("Stopped early")
            if BaseInterface._instance is self:
                BaseInterface._instance = None

    def input_request(self, prompt: str, *, password: bool = False) -> str:
        """
        Forward an input request to the frontend.

        Args:
            prompt: The user prompt.
            password: If the prompt should be considered as a password.
        """
        raise NotImplementedError

    def msg(
        self,
        msg_type: str | MsgType,
        *,
        content: T | None = None,
        parent: Message | dict[str, Any] | None = None,
        header: MsgHeader | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        channel: Channel = Channel.shell,
    ) -> Message[T]:
        """
        Create a new message.
        """
        parent = parent or utils.get_parent_message()
        if header is None:
            session = ""
            if parent and (header := parent.get("header")):
                session = header.get("session", "")
            header = MsgHeader(
                date=datetime.now(UTC),
                msg_id=str(uuid4()),
                msg_type=msg_type,
                session=session,
                username="",
                version=async_kernel.kernel_protocol_version,
            )
        return Message(  # pyright: ignore[reportCallIssue]
            channel=channel,
            header=header,
            parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
            content={} if content is None else content,
            metadata=metadata if metadata is not None else {},
        )

    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """Send an iopub message."""
        raise NotImplementedError

    @override
    def print_help(self, classes: bool = False) -> None:
        from async_kernel.compat.attr_docs import get_attr_docs  # noqa: PLC0415

        # Copy trailing docstrings into trait.help.
        for cls in self.classes:
            try:
                for name, value in get_attr_docs(cls).items():
                    if value and isinstance(trait := getattr(cls, name), traitlets.TraitType) and not trait.help:
                        trait.help = value
            except OSError:
                continue  # Coverage can cause issues with some files.
        super().print_help(classes)

classes class-attribute instance-attribute

classes: ClassesType = final([])

The classes registered with the interface.

aliases class-attribute instance-attribute

aliases: dict[str | tuple[str, ...], str] = {
    ("name", "n"): "BaseInterface.name",
    "launcher": "BaseInterface.launcher",
    "timeout": "BaseShell.timeout",
    "kernel_class": "BaseInterface.kernel_class",
    "shell_class": "BaseInterface.shell_class",
    "help_links": "Kernel.help_links",
    "supported_features": "Kernel.supported_features",
    "interface_class": "BaseInterface.interface_class",
} | Application.aliases

flags class-attribute instance-attribute

flags = {
    "quiet": (
        {"BaseInterface": {"quiet": True}},
        "Only send stdout/stderr to output stream.",
    ),
    "no-quiet": (
        {"BaseInterface": {"quiet": False}},
        "Only send stdout/stderr to output stream.",
    ),
} | Application.flags

name class-attribute instance-attribute

name = traitlets.Unicode('async').tag(config=True)

The name of the kernel used in the kernelspec.

host class-attribute instance-attribute

host: TraitType[Hosts | None, Hosts | None] = traitlets.UseEnum(
    Hosts, default_value=None, allow_none=True
).tag(config=True)

The name of a (gui) event loop (if one is used).

host_options class-attribute instance-attribute

host_options = DictValueLiteralEval(allow_none=True).tag(config=True)

Options for starting the loop.

backend class-attribute instance-attribute

backend: TraitType[Backend, Backend] = traitlets.UseEnum(Backend).tag(config=True)

The type of asynchronous backend used. Options are 'asyncio' or 'trio'.

backend_options class-attribute instance-attribute

backend_options = DictValueLiteralEval(allow_none=True).tag(config=True)

Options for starting the backend.

interface_class class-attribute instance-attribute

interface_class: Type[type[Self], type[Self] | str] = traitlets.Type(
    "async_kernel.interface.base.BaseInterface"
).tag(config=True)

The interface class to use when launching.

kernel_class class-attribute instance-attribute

kernel_class: Type[
    type[Kernel[Self, T_shell_co]], type[Kernel[Self, T_shell_co]] | str
] = traitlets.Type("async_kernel.Kernel").tag(config=True)

The Kernel class to use when creating the kernel.

shell_class class-attribute instance-attribute

shell_class: Type[type[T_shell_co], type[T_shell_co] | str] = traitlets.Type(
    "async_kernel.shell.ipshell.IPShell", "async_kernel.shell.BaseShell"
).tag(config=True)

The class to use for shells and subshells.

quiet class-attribute instance-attribute

quiet = traitlets.Bool(True).tag(config=True)

Only send stdout/stderr to output stream.

launcher class-attribute instance-attribute

launcher = traitlets.Unicode('').tag(config=True)

The value used to import the interface using async_kernel.kernelspec.import_launcher.

kernel class-attribute instance-attribute

kernel: Fixed[Self, Kernel[Self, T_shell_co]] = Fixed(
    lambda c: c["owner"].kernel_class(c["owner"], c["owner"].shell_class)
)

The kernel.

callers class-attribute instance-attribute

The caller associated with the kernel once it has started.

started class-attribute instance-attribute

started = Fixed(Pending)

A Pending that is set when the interface has started.

stopping class-attribute instance-attribute

stopping = Fixed(Pending)

A Pending that is set when stop is called.

initialized classmethod

initialized() -> bool

Has an instance been created?

Source code in src/async_kernel/interface/base.py
214
215
216
217
218
@classmethod
@override
def initialized(cls) -> bool:
    """Has an instance been created?"""
    return cls._instance is not None

instance classmethod

instance() -> Self

Get the singleton instance that was created using launch_instance.

Source code in src/async_kernel/interface/base.py
220
221
222
223
224
225
226
227
228
229
230
@classmethod
@override
def instance(cls) -> Self:
    "Get the singleton instance that was created using `launch_instance`."
    if not cls._instance:
        msg = "An instance does not exist!"
        raise RuntimeError(msg)
    if not isinstance(cls._instance, cls):
        msg = f"An instance exists but it is not an instance of {cls}!"
        raise TypeError(msg)
    return cls._instance

initialize

initialize(argv: None | list | NoValue = NoValue) -> None

Initialize the interface DO NOT CALL DIRECTLY.

Source code in src/async_kernel/interface/base.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@override
def initialize(self, argv: None | list | NoValue = NoValue) -> None:  # pyright: ignore[reportInvalidTypeForm]
    """
    Initialize the interface **DO NOT CALL DIRECTLY**.
    """
    assert self._instance is self

    def initialized(argv: Any = NoValue) -> None:
        msg = "Already initialized!"
        raise RuntimeError(msg)

    self.initialize = initialized

    # Environment variables
    if not os.environ.get("MPLBACKEND"):
        os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
    if not os.environ.get("UV_PROJECT_ENVIRONMENT"):
        os.environ["UV_PROJECT_ENVIRONMENT"] = sys.prefix
    self.parse_command_line([] if argv is NoValue else argv)
    self.interface_class = self.__class__

start

start() -> None

Start the interface blocking until it stops.

Warning
  • Running in a thread other than the 'MainThread' is permitted, but discouraged.
  • Blocking calls can only be interrupted in the 'MainThread' because 'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'.
  • Some libraries may assume the call is occurring in the 'MainThread'.
  • If there is an asyncio or trio event loop already running in the desired thread; start asynchronously instead (async with interface: ...).
Source code in src/async_kernel/interface/base.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@override
def start(self) -> None:
    """
    Start the interface blocking until it stops.

    Warning:
        - Running in a thread other than the 'MainThread' is permitted, but discouraged.
        - Blocking calls can only be interrupted in the 'MainThread' because
            [*'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'*](https://docs.python.org/3/library/threading.html#module-threading).
        - Some libraries may assume the call is occurring in the 'MainThread'.
        - If there is an `asyncio` or `trio` event loop already running in the desired thread;
            start asynchronously instead (`async with interface: ...`).
    """
    if BaseInterface._instance is not self:
        msg = "This interface is not the global instance!"
        raise RuntimeError(msg)

    settings = RunSettings(
        backend=self.backend,
        backend_options=self.backend_options,
        host=self.host,
        host_options=self.host_options,
    )
    try:
        async_kernel.event_loop.run(self.run, (), settings)
    finally:
        if BaseInterface._instance is self:
            BaseInterface._instance = None

run async

run(*, stopped: Callable[[], Any] | None = None) -> None

Run the kernel.

Parameters:

  • stopped

    (Callable[[], Any] | None, default: None ) –

    An optional callback that is called when the kernel has stopped.

This method requires that a Caller instance does not already exist in the current thread.

Source code in src/async_kernel/interface/base.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
async def run(self, *, stopped: Callable[[], Any] | None = None) -> None:
    """
    Run the kernel.

    Args:
        stopped: An optional callback that is called when the kernel has stopped.

    This method requires that a [Caller][async_kernel.caller.Caller] instance does not already exist in the current thread.
    """
    try:
        async with self:
            # Wait forever. This will exit when stop is called.
            await async_sleep_forever()
    finally:
        if stopped:
            stopped()

stop

stop() -> None

Stop the kernel and this interface.

Source code in src/async_kernel/interface/base.py
400
401
402
403
404
405
406
407
408
def stop(self) -> None:
    """
    Stop the kernel and this interface.
    """
    self.stopping.set_result(None)
    if not self.started.done():
        self.started.cancel("Stopped early")
        if BaseInterface._instance is self:
            BaseInterface._instance = None

input_request

input_request(prompt: str, *, password: bool = False) -> str

Forward an input request to the frontend.

Parameters:

  • prompt

    (str) –

    The user prompt.

  • password

    (bool, default: False ) –

    If the prompt should be considered as a password.

Source code in src/async_kernel/interface/base.py
410
411
412
413
414
415
416
417
418
def input_request(self, prompt: str, *, password: bool = False) -> str:
    """
    Forward an input request to the frontend.

    Args:
        prompt: The user prompt.
        password: If the prompt should be considered as a password.
    """
    raise NotImplementedError

msg

msg(
    msg_type: str | MsgType,
    *,
    content: T | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = shell,
) -> Message[T]

Create a new message.

Source code in src/async_kernel/interface/base.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def msg(
    self,
    msg_type: str | MsgType,
    *,
    content: T | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = Channel.shell,
) -> Message[T]:
    """
    Create a new message.
    """
    parent = parent or utils.get_parent_message()
    if header is None:
        session = ""
        if parent and (header := parent.get("header")):
            session = header.get("session", "")
        header = MsgHeader(
            date=datetime.now(UTC),
            msg_id=str(uuid4()),
            msg_type=msg_type,
            session=session,
            username="",
            version=async_kernel.kernel_protocol_version,
        )
    return Message(  # pyright: ignore[reportCallIssue]
        channel=channel,
        header=header,
        parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
        content={} if content is None else content,
        metadata=metadata if metadata is not None else {},
    )

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send an iopub message.

Source code in src/async_kernel/interface/base.py
454
455
456
457
458
459
460
461
462
463
464
465
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send an iopub message."""
    raise NotImplementedError

HasInterface

Bases: Generic[T_interface_co]

A mixin class providing a reference to the global interface.

This class is designed to be compatible with Configurable objects enabling the sharing of configuration and log objects. The global interface must exist before creating subclass instances using this mixin.

Attributes:

  • parent (T_interface_co) –

    The interface at the time of creation.

  • config (Config) –

    A reference to the parent.config.

Source code in src/async_kernel/interface/base.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class HasInterface(Generic[T_interface_co]):
    """
    A mixin class providing a reference to the global [interface][async_kernel.interface.base.BaseInterface].

    This class is designed to be compatible with [Configurable][] objects enabling the sharing
    of configuration and log objects. The global _interface_ must exist before creating subclass
    instances using this mixin.
    """

    _interface: weakref.ref

    @property
    def parent(self) -> T_interface_co:
        "The interface at the time of creation."
        return self._interface()  # pyright: ignore[reportReturnType]

    @parent.setter
    def parent(self, value: Any):
        pass

    @property
    def config(self) -> Config:
        """
        A reference to the `parent.config`.

        Setting the config will update `parent.config`instead of replacing it.
        """
        return self.parent.config

    @config.setter
    def config(self, value: Config) -> None:
        pass

    def __init_subclass__(cls, **kwargs) -> None:

        if cls.parent is not HasInterface.parent or cls.config is not HasInterface.config:
            replaced = [k for k in ["parent", "config"] if getattr(cls, k) is not getattr(HasInterface, k)]
            msg = f"Parameter override detected for class `{cls.__name__}`!"
            if len(replaced) == 2:
                msg = f"{msg}\nTip: Make `HasInterface` the first inherited class (left-most)."
            else:
                msg = f"{msg}\nThe parameter named {replaced[0]!r} must not be overloaded."
            raise TypeError(msg)

        super().__init_subclass__(**kwargs)

        # Register class for configuration
        if issubclass(cls, Configurable):
            BaseInterface.classes.insert(0, cls)

    def __new__(cls, *args, **kwargs) -> Self:

        if not (interface := BaseInterface._instance):  # pyright: ignore[reportPrivateUsage]
            msg = "A global BaseInterface has not been created yet!"
            raise RuntimeError(msg)
        inst = new_(cls) if (new_ := super().__new__) is object.__new__ else new_(cls, *args, **kwargs)
        inst._interface = weakref.ref(interface)
        return inst

parent property writable

parent: T_interface_co

The interface at the time of creation.

config property writable

config: Config

A reference to the parent.config.

Setting the config will update parent.configinstead of replacing it.

start_kernel_callable_interface async

start_kernel_callable_interface(
    *,
    send: Callable[[str, list | None, bool], None | str],
    stopped: Callable[[], None],
    settings: dict | None = None,
) -> Handlers

Start the global interface as an instance of CallableInterface.

Parameters:

  • send

    (Callable[[str, list | None, bool], None | str]) –

    A callback responsible for sending messages from the kernel on all channels.

  • stopped

    (Callable[[], None]) –

    A callback that is called once the kernel has stopped.

  • settings

    (dict | None, default: None ) –

    Additional settings to configure the interface/kernel/shell etc using traitlets config conventions. The settings are converted to argv using async_kernel.kernelspec.make_argv. All settings, including aliases and flags are accepted. flags should be passed as 'flags': [<flag1>, <flag2>, ...].

Tip
  • To list all config options available for a CallableInterface use the command:
    async-kernel --help-all --interface_class=async_kernel.interface.callable.CallableInterface
    
Source code in src/async_kernel/interface/__init__.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def start_kernel_callable_interface(
    *, send: Callable[[str, list | None, bool], None | str], stopped: Callable[[], None], settings: dict | None = None
) -> Handlers:
    """
    Start the global interface as an instance of [CallableInterface][async_kernel.interface.callable.CallableInterface].

    Args:
        send: A callback responsible for sending messages from the kernel on all channels.
        stopped: A callback that is called once the kernel has stopped.
        settings: Additional settings to configure the interface/kernel/shell etc using traitlets config conventions.
            The settings are converted to argv using [async_kernel.kernelspec.make_argv][]. All settings,
            including aliases and flags are accepted. _flags_ should be passed as `'flags': [<flag1>, <flag2>, ...]`.

    Tip:
        - To list all config options available for a `CallableInterface` use the command:
            ```shell
            async-kernel --help-all --interface_class=async_kernel.interface.callable.CallableInterface
            ```
    """

    settings = settings or {}
    interface_class = settings.get("interface_class") or "async_kernel.interface.callable.CallableInterface"
    cls: type[CallableInterface] = import_item(interface_class)

    argv = make_argv(command=(), connection_file="", **settings)[1:]
    app = cls(argv)
    assert issubclass(cls, BaseInterface)
    return await app.start_async(send=send, stopped=stopped)

launch_interface

launch_interface(settings: dict) -> None

Launch a kernel interface blocking until it has stopped.

Notes
  • Available in CPython.
  • 'interface_class' can be specified in settings as a subclass of BaseInterface or as an importable string.
  • settings are NOT loaded.
  • sys.argv is used for configuration. Use async-kernel --help-all to see all configuration options.
  • traitlets configuration documentation.
Source code in src/async_kernel/interface/__init__.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def launch_interface(settings: dict) -> None:
    """
    Launch a kernel interface blocking until it has stopped.

    Notes:
        - Available in CPython.
        - 'interface_class' can be specified in settings as a subclass of [BaseInterface][async_kernel.interface.base.BaseInterface]
            or as an importable string.
        - `settings` are NOT loaded.
        - `sys.argv` is used for configuration. Use `async-kernel --help-all` to see all configuration options.
        - [traitlets configuration documentation](https://traitlets.readthedocs.io/en/stable/config.html#module-traitlets.config).
    """

    val = settings.get("interface_class") or settings.get("BaseInterface.interface_class")
    val = val or "async_kernel.interface.ip_app.IPApp"
    cls = import_item(val) if isinstance(val, str) else val
    assert issubclass(cls, BaseInterface)
    cls.launch_instance()

The base class definition to interface with the kernel.

Classes:

BaseInterface

Bases: Application, AsyncContextManagerMixin, Generic[T_shell_co]

The base class for kernel interface (singleton).

The interface creates the kernel and provides external communication. It is also the parent object for all objects that subclass from HasInterface. Configurable objects that subclass from HasInterface inherit their configuration from the interface (Application).

Usage

launch:

Interface.launch_instance()
async context:
async with Interface() as interface:
    interface.kernel
    ...

Methods:

  • initialized

    Has an instance been created?

  • instance

    Get the singleton instance that was created using launch_instance.

  • initialize

    Initialize the interface DO NOT CALL DIRECTLY.

  • start

    Start the interface blocking until it stops.

  • run

    Run the kernel.

  • stop

    Stop the kernel and this interface.

  • input_request

    Forward an input request to the frontend.

  • msg

    Create a new message.

  • iopub_send

    Send an iopub message.

Attributes:

Source code in src/async_kernel/interface/base.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class BaseInterface(Application, anyio.AsyncContextManagerMixin, Generic[T_shell_co]):
    """
    The base class for kernel interface (singleton).

    The interface creates the kernel and provides external communication. It is also
    the parent object for all objects that subclass from `HasInterface`. Configurable
    objects that subclass from `HasInterface` inherit their configuration from the
    interface (Application).

    Usage:
        launch:
            ```python
            Interface.launch_instance()
            ```
        async context:
            ```python
            async with Interface() as interface:
                interface.kernel
                ...
            ```
    """

    classes: ClassesType = final([])
    "The classes registered with the interface."

    aliases: dict[str | tuple[str, ...], str] = {  # pyright: ignore[reportIncompatibleVariableOverride]
        ("name", "n"): "BaseInterface.name",
        "launcher": "BaseInterface.launcher",
        "timeout": "BaseShell.timeout",
        "kernel_class": "BaseInterface.kernel_class",
        "shell_class": "BaseInterface.shell_class",
        "help_links": "Kernel.help_links",
        "supported_features": "Kernel.supported_features",
        "interface_class": "BaseInterface.interface_class",
    } | Application.aliases
    ""
    flags = {
        "quiet": ({"BaseInterface": {"quiet": True}}, "Only send stdout/stderr to output stream."),
        "no-quiet": ({"BaseInterface": {"quiet": False}}, "Only send stdout/stderr to output stream."),
    } | Application.flags
    ""

    name = traitlets.Unicode("async").tag(config=True)
    "The name of the kernel used in the kernelspec."

    host: traitlets.TraitType[Hosts | None, Hosts | None] = traitlets.UseEnum(
        Hosts, default_value=None, allow_none=True
    ).tag(config=True)
    "The name of a (gui) event loop (if one is used)."

    host_options = DictValueLiteralEval(allow_none=True).tag(config=True)
    "Options for starting the loop."

    backend: traitlets.TraitType[Backend, Backend] = traitlets.UseEnum(Backend).tag(config=True)
    "The type of asynchronous backend used. Options are 'asyncio' or 'trio'."

    backend_options = DictValueLiteralEval(allow_none=True).tag(config=True)
    "Options for starting the backend."

    interface_class: traitlets.Type[type[Self], type[Self] | str] = traitlets.Type(
        "async_kernel.interface.base.BaseInterface"
    ).tag(  # pyright: ignore[reportAssignmentType]
        config=True
    )
    "The interface class to use when launching."

    kernel_class: traitlets.Type[type[Kernel[Self, T_shell_co]], type[Kernel[Self, T_shell_co]] | str] = traitlets.Type(
        "async_kernel.Kernel"
    ).tag(  # pyright: ignore[reportAssignmentType]
        config=True
    )
    "The Kernel class to use when creating the kernel."

    shell_class: traitlets.Type[type[T_shell_co], type[T_shell_co] | str] = traitlets.Type(
        "async_kernel.shell.ipshell.IPShell", "async_kernel.shell.BaseShell"
    ).tag(  # pyright: ignore[reportAssignmentType]
        config=True
    )
    "The class to use for shells and subshells."

    quiet = traitlets.Bool(True).tag(config=True)
    "Only send stdout/stderr to output stream."

    launcher = traitlets.Unicode("").tag(config=True)
    "The value used to import the interface using [async_kernel.kernelspec.import_launcher][]."

    kernel: Fixed[Self, Kernel[Self, T_shell_co]] = Fixed(
        lambda c: c["owner"].kernel_class(c["owner"], c["owner"].shell_class)
    )
    "The kernel."

    callers: Fixed[Self, dict[Literal[Channel.shell, Channel.control], Caller]] = Fixed(dict)
    "The caller associated with the kernel once it has started."

    started = Fixed(Pending)
    "A Pending that is set when the interface has started."

    stopping = Fixed(Pending)
    """
    A Pending that is set when stop is called.
    """

    _instance: Self | None = None
    _zmq_context = None
    last_interrupt_frame = None

    @traitlets.default("backend")
    def _default_backend(self) -> Backend:
        try:
            return Backend(current_async_library())
        except AsyncLibraryNotFoundError:
            if (
                not self.host
                and not self.trait_has_value("backend_options")
                and (importlib.util.find_spec("winloop") or importlib.util.find_spec("uvloop"))
            ):
                self.backend_options["use_uvloop"] = True
            return Backend.asyncio

    @traitlets.default("shell_class")
    def _default_shell_class(self):
        # We use a method to delay IPython import until it is needed
        from async_kernel.shell.ipshell import IPShell  # noqa: PLC0415

        return IPShell

    @property
    def summary(self) -> str:
        return f"name={self.name!r} backend={str(self.backend)!r}"

    @classmethod
    @override
    def initialized(cls) -> bool:
        """Has an instance been created?"""
        return cls._instance is not None

    @classmethod
    @override
    def instance(cls) -> Self:
        "Get the singleton instance that was created using `launch_instance`."
        if not cls._instance:
            msg = "An instance does not exist!"
            raise RuntimeError(msg)
        if not isinstance(cls._instance, cls):
            msg = f"An instance exists but it is not an instance of {cls}!"
            raise TypeError(msg)
        return cls._instance

    @classmethod
    @override
    def clear_instance(cls) -> None:
        raise NotImplementedError

    @classmethod
    @override
    def launch_instance(
        cls,
        argv: list[str] | None = None,
        kernel_class: type[Kernel[Self, T_shell_co]] | None = None,
        shell_class: type[T_shell_co] | None = None,
        **kwargs: Any,
    ) -> None:
        app = None
        if BaseInterface._instance:
            msg = "An interface already exists!"
            raise RuntimeError(msg)
        try:
            app = cls(argv, kernel_class=kernel_class, shell_class=shell_class, **kwargs)
            app.start()
            app.exit()
        except BaseException:
            del app
            BaseInterface._instance = None
            gc.collect()
            raise

    def __new__(cls, argv: list | None | NoValue = NoValue, /, **kwargs) -> Self:  # noqa: ARG004  # pyright: ignore[reportInvalidTypeForm]
        if BaseInterface._instance:
            msg = "An interface already exists!"
            raise RuntimeError(msg)
        BaseInterface._instance = super().__new__(cls, **kwargs)
        return BaseInterface._instance

    def __init__(
        self,
        argv: list | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        /,
        *,
        kernel_class: type[Kernel[Self, T_shell_co]] | str | None = None,
        shell_class: type[T_shell_co] | str | None = None,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)

        for name, value in [("kernel_class", kernel_class), ("shell_class", shell_class)]:
            if value:
                self.set_trait(name, value)
        self.initialize(argv)

    @override
    def initialize(self, argv: None | list | NoValue = NoValue) -> None:  # pyright: ignore[reportInvalidTypeForm]
        """
        Initialize the interface **DO NOT CALL DIRECTLY**.
        """
        assert self._instance is self

        def initialized(argv: Any = NoValue) -> None:
            msg = "Already initialized!"
            raise RuntimeError(msg)

        self.initialize = initialized

        # Environment variables
        if not os.environ.get("MPLBACKEND"):
            os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
        if not os.environ.get("UV_PROJECT_ENVIRONMENT"):
            os.environ["UV_PROJECT_ENVIRONMENT"] = sys.prefix
        self.parse_command_line([] if argv is NoValue else argv)
        self.interface_class = self.__class__

    @override
    def start(self) -> None:
        """
        Start the interface blocking until it stops.

        Warning:
            - Running in a thread other than the 'MainThread' is permitted, but discouraged.
            - Blocking calls can only be interrupted in the 'MainThread' because
                [*'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'*](https://docs.python.org/3/library/threading.html#module-threading).
            - Some libraries may assume the call is occurring in the 'MainThread'.
            - If there is an `asyncio` or `trio` event loop already running in the desired thread;
                start asynchronously instead (`async with interface: ...`).
        """
        if BaseInterface._instance is not self:
            msg = "This interface is not the global instance!"
            raise RuntimeError(msg)

        settings = RunSettings(
            backend=self.backend,
            backend_options=self.backend_options,
            host=self.host,
            host_options=self.host_options,
        )
        try:
            async_kernel.event_loop.run(self.run, (), settings)
        finally:
            if BaseInterface._instance is self:
                BaseInterface._instance = None

    @asynccontextmanager
    async def __asynccontextmanager__(self, *, set_started=True) -> AsyncGenerator[Self]:
        def cache_iopub_send(*args, **kwargs) -> None:  # pragma: no cover
            # Cache iopub messages, send when started or discard if stopped early.
            self.started.add_done_callback(lambda _: not self.stopping.done() and send(*args, **kwargs))

        if self.stopping.done() or self.started.done():
            msg = "Stopped early"
            raise RuntimeError(msg)

        send = self.iopub_send
        self.iopub_send = cache_iopub_send
        self.started.add_done_callback(lambda _: delattr(self, "iopub_send"))

        caller = Caller(
            "manual",
            name="Shell",
            protected=True,
            log=self.log,
            zmq_context=self._zmq_context,
            host=self.host,
        )
        self.callers[Channel.shell] = caller
        self.callers[Channel.control] = caller.get(name="Control", log=self.log, protected=True)
        self.backend = Backend(current_async_library())
        try:
            async with caller:
                with anyio.CancelScope() as scope:

                    def stop(_):
                        self.log.info("Stopping kernel")
                        caller.call_later(0.5, scope.cancel, "Stopping kernel")

                    self.stopping.add_done_callback(stop)
                    try:
                        async with self.kernel.running():
                            if set_started:
                                self._started()
                            yield self
                    finally:
                        self.stop()
        finally:
            if BaseInterface._instance is self:
                BaseInterface._instance = None
            self.log.info("Interface stopped")

    def _started(self):
        self.log.info("Interface started: %s", self.summary)
        self.started.set_result(None)

    async def run(self, *, stopped: Callable[[], Any] | None = None) -> None:
        """
        Run the kernel.

        Args:
            stopped: An optional callback that is called when the kernel has stopped.

        This method requires that a [Caller][async_kernel.caller.Caller] instance does not already exist in the current thread.
        """
        try:
            async with self:
                # Wait forever. This will exit when stop is called.
                await async_sleep_forever()
        finally:
            if stopped:
                stopped()

    def stop(self) -> None:
        """
        Stop the kernel and this interface.
        """
        self.stopping.set_result(None)
        if not self.started.done():
            self.started.cancel("Stopped early")
            if BaseInterface._instance is self:
                BaseInterface._instance = None

    def input_request(self, prompt: str, *, password: bool = False) -> str:
        """
        Forward an input request to the frontend.

        Args:
            prompt: The user prompt.
            password: If the prompt should be considered as a password.
        """
        raise NotImplementedError

    def msg(
        self,
        msg_type: str | MsgType,
        *,
        content: T | None = None,
        parent: Message | dict[str, Any] | None = None,
        header: MsgHeader | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        channel: Channel = Channel.shell,
    ) -> Message[T]:
        """
        Create a new message.
        """
        parent = parent or utils.get_parent_message()
        if header is None:
            session = ""
            if parent and (header := parent.get("header")):
                session = header.get("session", "")
            header = MsgHeader(
                date=datetime.now(UTC),
                msg_id=str(uuid4()),
                msg_type=msg_type,
                session=session,
                username="",
                version=async_kernel.kernel_protocol_version,
            )
        return Message(  # pyright: ignore[reportCallIssue]
            channel=channel,
            header=header,
            parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
            content={} if content is None else content,
            metadata=metadata if metadata is not None else {},
        )

    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """Send an iopub message."""
        raise NotImplementedError

    @override
    def print_help(self, classes: bool = False) -> None:
        from async_kernel.compat.attr_docs import get_attr_docs  # noqa: PLC0415

        # Copy trailing docstrings into trait.help.
        for cls in self.classes:
            try:
                for name, value in get_attr_docs(cls).items():
                    if value and isinstance(trait := getattr(cls, name), traitlets.TraitType) and not trait.help:
                        trait.help = value
            except OSError:
                continue  # Coverage can cause issues with some files.
        super().print_help(classes)

classes class-attribute instance-attribute

classes: ClassesType = final([])

The classes registered with the interface.

aliases class-attribute instance-attribute

aliases: dict[str | tuple[str, ...], str] = {
    ("name", "n"): "BaseInterface.name",
    "launcher": "BaseInterface.launcher",
    "timeout": "BaseShell.timeout",
    "kernel_class": "BaseInterface.kernel_class",
    "shell_class": "BaseInterface.shell_class",
    "help_links": "Kernel.help_links",
    "supported_features": "Kernel.supported_features",
    "interface_class": "BaseInterface.interface_class",
} | Application.aliases

flags class-attribute instance-attribute

flags = {
    "quiet": (
        {"BaseInterface": {"quiet": True}},
        "Only send stdout/stderr to output stream.",
    ),
    "no-quiet": (
        {"BaseInterface": {"quiet": False}},
        "Only send stdout/stderr to output stream.",
    ),
} | Application.flags

name class-attribute instance-attribute

name = traitlets.Unicode('async').tag(config=True)

The name of the kernel used in the kernelspec.

host class-attribute instance-attribute

host: TraitType[Hosts | None, Hosts | None] = traitlets.UseEnum(
    Hosts, default_value=None, allow_none=True
).tag(config=True)

The name of a (gui) event loop (if one is used).

host_options class-attribute instance-attribute

host_options = DictValueLiteralEval(allow_none=True).tag(config=True)

Options for starting the loop.

backend class-attribute instance-attribute

backend: TraitType[Backend, Backend] = traitlets.UseEnum(Backend).tag(config=True)

The type of asynchronous backend used. Options are 'asyncio' or 'trio'.

backend_options class-attribute instance-attribute

backend_options = DictValueLiteralEval(allow_none=True).tag(config=True)

Options for starting the backend.

interface_class class-attribute instance-attribute

interface_class: Type[type[Self], type[Self] | str] = traitlets.Type(
    "async_kernel.interface.base.BaseInterface"
).tag(config=True)

The interface class to use when launching.

kernel_class class-attribute instance-attribute

kernel_class: Type[
    type[Kernel[Self, T_shell_co]], type[Kernel[Self, T_shell_co]] | str
] = traitlets.Type("async_kernel.Kernel").tag(config=True)

The Kernel class to use when creating the kernel.

shell_class class-attribute instance-attribute

shell_class: Type[type[T_shell_co], type[T_shell_co] | str] = traitlets.Type(
    "async_kernel.shell.ipshell.IPShell", "async_kernel.shell.BaseShell"
).tag(config=True)

The class to use for shells and subshells.

quiet class-attribute instance-attribute

quiet = traitlets.Bool(True).tag(config=True)

Only send stdout/stderr to output stream.

launcher class-attribute instance-attribute

launcher = traitlets.Unicode('').tag(config=True)

The value used to import the interface using async_kernel.kernelspec.import_launcher.

kernel class-attribute instance-attribute

kernel: Fixed[Self, Kernel[Self, T_shell_co]] = Fixed(
    lambda c: c["owner"].kernel_class(c["owner"], c["owner"].shell_class)
)

The kernel.

callers class-attribute instance-attribute

The caller associated with the kernel once it has started.

started class-attribute instance-attribute

started = Fixed(Pending)

A Pending that is set when the interface has started.

stopping class-attribute instance-attribute

stopping = Fixed(Pending)

A Pending that is set when stop is called.

initialized classmethod

initialized() -> bool

Has an instance been created?

Source code in src/async_kernel/interface/base.py
214
215
216
217
218
@classmethod
@override
def initialized(cls) -> bool:
    """Has an instance been created?"""
    return cls._instance is not None

instance classmethod

instance() -> Self

Get the singleton instance that was created using launch_instance.

Source code in src/async_kernel/interface/base.py
220
221
222
223
224
225
226
227
228
229
230
@classmethod
@override
def instance(cls) -> Self:
    "Get the singleton instance that was created using `launch_instance`."
    if not cls._instance:
        msg = "An instance does not exist!"
        raise RuntimeError(msg)
    if not isinstance(cls._instance, cls):
        msg = f"An instance exists but it is not an instance of {cls}!"
        raise TypeError(msg)
    return cls._instance

initialize

initialize(argv: None | list | NoValue = NoValue) -> None

Initialize the interface DO NOT CALL DIRECTLY.

Source code in src/async_kernel/interface/base.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@override
def initialize(self, argv: None | list | NoValue = NoValue) -> None:  # pyright: ignore[reportInvalidTypeForm]
    """
    Initialize the interface **DO NOT CALL DIRECTLY**.
    """
    assert self._instance is self

    def initialized(argv: Any = NoValue) -> None:
        msg = "Already initialized!"
        raise RuntimeError(msg)

    self.initialize = initialized

    # Environment variables
    if not os.environ.get("MPLBACKEND"):
        os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
    if not os.environ.get("UV_PROJECT_ENVIRONMENT"):
        os.environ["UV_PROJECT_ENVIRONMENT"] = sys.prefix
    self.parse_command_line([] if argv is NoValue else argv)
    self.interface_class = self.__class__

start

start() -> None

Start the interface blocking until it stops.

Warning
  • Running in a thread other than the 'MainThread' is permitted, but discouraged.
  • Blocking calls can only be interrupted in the 'MainThread' because 'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'.
  • Some libraries may assume the call is occurring in the 'MainThread'.
  • If there is an asyncio or trio event loop already running in the desired thread; start asynchronously instead (async with interface: ...).
Source code in src/async_kernel/interface/base.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@override
def start(self) -> None:
    """
    Start the interface blocking until it stops.

    Warning:
        - Running in a thread other than the 'MainThread' is permitted, but discouraged.
        - Blocking calls can only be interrupted in the 'MainThread' because
            [*'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'*](https://docs.python.org/3/library/threading.html#module-threading).
        - Some libraries may assume the call is occurring in the 'MainThread'.
        - If there is an `asyncio` or `trio` event loop already running in the desired thread;
            start asynchronously instead (`async with interface: ...`).
    """
    if BaseInterface._instance is not self:
        msg = "This interface is not the global instance!"
        raise RuntimeError(msg)

    settings = RunSettings(
        backend=self.backend,
        backend_options=self.backend_options,
        host=self.host,
        host_options=self.host_options,
    )
    try:
        async_kernel.event_loop.run(self.run, (), settings)
    finally:
        if BaseInterface._instance is self:
            BaseInterface._instance = None

run async

run(*, stopped: Callable[[], Any] | None = None) -> None

Run the kernel.

Parameters:

  • stopped

    (Callable[[], Any] | None, default: None ) –

    An optional callback that is called when the kernel has stopped.

This method requires that a Caller instance does not already exist in the current thread.

Source code in src/async_kernel/interface/base.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
async def run(self, *, stopped: Callable[[], Any] | None = None) -> None:
    """
    Run the kernel.

    Args:
        stopped: An optional callback that is called when the kernel has stopped.

    This method requires that a [Caller][async_kernel.caller.Caller] instance does not already exist in the current thread.
    """
    try:
        async with self:
            # Wait forever. This will exit when stop is called.
            await async_sleep_forever()
    finally:
        if stopped:
            stopped()

stop

stop() -> None

Stop the kernel and this interface.

Source code in src/async_kernel/interface/base.py
400
401
402
403
404
405
406
407
408
def stop(self) -> None:
    """
    Stop the kernel and this interface.
    """
    self.stopping.set_result(None)
    if not self.started.done():
        self.started.cancel("Stopped early")
        if BaseInterface._instance is self:
            BaseInterface._instance = None

input_request

input_request(prompt: str, *, password: bool = False) -> str

Forward an input request to the frontend.

Parameters:

  • prompt

    (str) –

    The user prompt.

  • password

    (bool, default: False ) –

    If the prompt should be considered as a password.

Source code in src/async_kernel/interface/base.py
410
411
412
413
414
415
416
417
418
def input_request(self, prompt: str, *, password: bool = False) -> str:
    """
    Forward an input request to the frontend.

    Args:
        prompt: The user prompt.
        password: If the prompt should be considered as a password.
    """
    raise NotImplementedError

msg

msg(
    msg_type: str | MsgType,
    *,
    content: T | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = shell,
) -> Message[T]

Create a new message.

Source code in src/async_kernel/interface/base.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def msg(
    self,
    msg_type: str | MsgType,
    *,
    content: T | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = Channel.shell,
) -> Message[T]:
    """
    Create a new message.
    """
    parent = parent or utils.get_parent_message()
    if header is None:
        session = ""
        if parent and (header := parent.get("header")):
            session = header.get("session", "")
        header = MsgHeader(
            date=datetime.now(UTC),
            msg_id=str(uuid4()),
            msg_type=msg_type,
            session=session,
            username="",
            version=async_kernel.kernel_protocol_version,
        )
    return Message(  # pyright: ignore[reportCallIssue]
        channel=channel,
        header=header,
        parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
        content={} if content is None else content,
        metadata=metadata if metadata is not None else {},
    )

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send an iopub message.

Source code in src/async_kernel/interface/base.py
454
455
456
457
458
459
460
461
462
463
464
465
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send an iopub message."""
    raise NotImplementedError

HasInterface

Bases: Generic[T_interface_co]

A mixin class providing a reference to the global interface.

This class is designed to be compatible with Configurable objects enabling the sharing of configuration and log objects. The global interface must exist before creating subclass instances using this mixin.

Attributes:

  • parent (T_interface_co) –

    The interface at the time of creation.

  • config (Config) –

    A reference to the parent.config.

Source code in src/async_kernel/interface/base.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class HasInterface(Generic[T_interface_co]):
    """
    A mixin class providing a reference to the global [interface][async_kernel.interface.base.BaseInterface].

    This class is designed to be compatible with [Configurable][] objects enabling the sharing
    of configuration and log objects. The global _interface_ must exist before creating subclass
    instances using this mixin.
    """

    _interface: weakref.ref

    @property
    def parent(self) -> T_interface_co:
        "The interface at the time of creation."
        return self._interface()  # pyright: ignore[reportReturnType]

    @parent.setter
    def parent(self, value: Any):
        pass

    @property
    def config(self) -> Config:
        """
        A reference to the `parent.config`.

        Setting the config will update `parent.config`instead of replacing it.
        """
        return self.parent.config

    @config.setter
    def config(self, value: Config) -> None:
        pass

    def __init_subclass__(cls, **kwargs) -> None:

        if cls.parent is not HasInterface.parent or cls.config is not HasInterface.config:
            replaced = [k for k in ["parent", "config"] if getattr(cls, k) is not getattr(HasInterface, k)]
            msg = f"Parameter override detected for class `{cls.__name__}`!"
            if len(replaced) == 2:
                msg = f"{msg}\nTip: Make `HasInterface` the first inherited class (left-most)."
            else:
                msg = f"{msg}\nThe parameter named {replaced[0]!r} must not be overloaded."
            raise TypeError(msg)

        super().__init_subclass__(**kwargs)

        # Register class for configuration
        if issubclass(cls, Configurable):
            BaseInterface.classes.insert(0, cls)

    def __new__(cls, *args, **kwargs) -> Self:

        if not (interface := BaseInterface._instance):  # pyright: ignore[reportPrivateUsage]
            msg = "A global BaseInterface has not been created yet!"
            raise RuntimeError(msg)
        inst = new_(cls) if (new_ := super().__new__) is object.__new__ else new_(cls, *args, **kwargs)
        inst._interface = weakref.ref(interface)
        return inst

parent property writable

parent: T_interface_co

The interface at the time of creation.

config property writable

config: Config

A reference to the parent.config.

Setting the config will update parent.configinstead of replacing it.

A collection of objects to provide a kernel interface based on callbacks.

Classes:

Handlers

Bases: TypedDict

Handlers returned by async_kernel.interface.callable.CallableInterface when it is started.

Attributes:

Source code in src/async_kernel/interface/callable.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Handlers(TypedDict):
    "Handlers returned by [async_kernel.interface.callable.CallableInterface][] when it is started."

    handle_msg: Callable[[str, list[bytes] | list[bytearray] | None]]
    """
    Handle messages from the client.

    The handler requires two positional arguments

    1. The message serialized as a JSON string. The channel ("shell" or "control" ) 
        should also be included in the Message under the key "channel". 
    2. A list of buffers if there are any, or None if there are no buffers.
    """

    stop: Callable[[], None]
    "Stop the kernel."

handle_msg instance-attribute

handle_msg: Callable[[str, list[bytes] | list[bytearray] | None]]

Handle messages from the client.

The handler requires two positional arguments

  1. The message serialized as a JSON string. The channel ("shell" or "control" ) should also be included in the Message under the key "channel".
  2. A list of buffers if there are any, or None if there are no buffers.

stop instance-attribute

stop: Callable[[], None]

Stop the kernel.

CallableInterface

Bases: BaseInterface[T_shell_co], Generic[T_shell_co]

A callback based interface to interact with the kernel using serialized messages.

Usage:

```python
from async_kernel.interface.callable import CallableInterface

# Start the kernel providing the necessary callbacks.
kernel_interface = await CallableInterface(options).start(send=..., stopped=...)

# Pass messages to the kernel.
kernel_interface["handle_msg"](msg, buffer)

# Stop the kernel.
kernel_interface["stop"](msg, buffer)
```

See also: - [async_kernel.typing.CallableInterfaceReturnArgs]

Methods:

Attributes:

Source code in src/async_kernel/interface/callable.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class CallableInterface(BaseInterface[T_shell_co], Generic[T_shell_co]):
    """
    A callback based interface to interact with the kernel using serialized messages.

    Usage:

        ```python
        from async_kernel.interface.callable import CallableInterface

        # Start the kernel providing the necessary callbacks.
        kernel_interface = await CallableInterface(options).start(send=..., stopped=...)

        # Pass messages to the kernel.
        kernel_interface["handle_msg"](msg, buffer)

        # Stop the kernel.
        kernel_interface["stop"](msg, buffer)
        ```
    See also:
        - [async_kernel.typing.CallableInterfaceReturnArgs]
    """

    host: TraitType[Hosts | None, Hosts | None] = TraitType(None)
    "Not yet supported"

    _send: Callable[[str, list | None, bool], None | str]

    async def start_async(
        self,
        *,
        send: Callable[[str, list | None, bool], None | str],
        stopped: Callable[[], None],
    ) -> Handlers:
        """
        Start the kernel.

        Args:
            send: The function to send kernel messages to the client. It must accept

                1. A json string of the message.
                2. A list of buffers, or None if there are no buffers.
                3. A boolean value that indicates a response is required for the stdio channel.

            stopped: A callback that is called once the kernel has stopped.

        Returns: A pending that when resolved returns the message handler callback.
        """
        self._send = send
        self._task = asyncio.create_task(coro=self.run(stopped=stopped))
        await self.started
        return Handlers(handle_msg=self._handle_msg, stop=self.stop)

    def _send_to_frontend(
        self,
        msg: Message[dict],
        *,
        channel: Channel = Channel.shell,
        buffers: list[bytearray | bytes] | None = None,
        requires_reply=False,
    ) -> Message | None:
        msg["channel"] = channel
        reply = self._send(pack_json_str(msg), buffers, requires_reply)
        if requires_reply:
            assert reply
            return unpack_json(reply)
        return None

    async def _send_reply(self, job: Job, content: dict, /) -> None:
        if "status" not in content:
            content["status"] = "ok"
        msg_type = job["msg"]["header"]["msg_type"].replace("request", "reply")
        msg = self.msg(msg_type, content=content, parent=job["msg"])
        self._send_to_frontend(msg, channel=job["msg"]["channel"], buffers=content.pop("buffers", None))

    def _handle_msg(self, msg_json: str, buffers: list[bytearray] | list[bytes] | None = None, /):
        "The main message handler that gets returned by the `start` method."
        msg: Message[dict[str, Any]] = unpack_json(msg_json)
        # Copy the buffer
        msg["buffers"] = [b[:] for b in buffers] if buffers else []
        msg["channel"] = Channel(msg["channel"])
        job = Job(received_time=time.monotonic(), msg=msg, ident=b"")
        self.kernel.message_handler(job, self._send_reply, self.iopub_send)

    @override
    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        if parent is NoValue:
            parent = async_kernel.utils.get_parent_message()
        if not isinstance(msg_or_type, dict):
            msg_or_type = self.msg(msg_type=msg_or_type, content=content, parent=parent, metadata=metadata)  # pyright: ignore[reportArgumentType]
        self._send_to_frontend(msg_or_type, channel="iopub", buffers=buffers)  # pyright: ignore[reportArgumentType]

    @override
    def input_request(self, prompt: str, *, password=False) -> Any:
        job = async_kernel.utils.get_job()
        if not job["msg"].get("content", {}).get("allow_stdin", False):
            msg = "Stdin is not allowed in this context!"
            raise RuntimeError(msg)
        msg = self.msg("input_request", content={"prompt": prompt, "password": password})
        reply = self._send_to_frontend(msg, channel=Channel.stdin, requires_reply=True)
        assert reply
        return reply["content"]["value"]

host class-attribute instance-attribute

host: TraitType[Hosts | None, Hosts | None] = TraitType(None)

Not yet supported

start_async async

start_async(
    *, send: Callable[[str, list | None, bool], None | str], stopped: Callable[[], None]
) -> Handlers

Start the kernel.

Parameters:

  • send

    (Callable[[str, list | None, bool], None | str]) –

    The function to send kernel messages to the client. It must accept

    1. A json string of the message.
    2. A list of buffers, or None if there are no buffers.
    3. A boolean value that indicates a response is required for the stdio channel.
  • stopped

    (Callable[[], None]) –

    A callback that is called once the kernel has stopped.

Returns: A pending that when resolved returns the message handler callback.

Source code in src/async_kernel/interface/callable.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def start_async(
    self,
    *,
    send: Callable[[str, list | None, bool], None | str],
    stopped: Callable[[], None],
) -> Handlers:
    """
    Start the kernel.

    Args:
        send: The function to send kernel messages to the client. It must accept

            1. A json string of the message.
            2. A list of buffers, or None if there are no buffers.
            3. A boolean value that indicates a response is required for the stdio channel.

        stopped: A callback that is called once the kernel has stopped.

    Returns: A pending that when resolved returns the message handler callback.
    """
    self._send = send
    self._task = asyncio.create_task(coro=self.run(stopped=stopped))
    await self.started
    return Handlers(handle_msg=self._handle_msg, stop=self.stop)

Defines a base kernel interface using zmq sockets.

Classes:

  • ZMQInterface

    The base kernel interface using ZMQ sockets.

ZMQInterface

Bases: BaseInterface[T_shell_co], ConnectionFileMixin, Generic[T_shell_co]

The base kernel interface using ZMQ sockets.

Methods:

  • iopub_send

    Send a message on the zmq iopub socket.

  • receive_msg_loop

    Opens a zmq socket for the channel, receives messages and calls the message handler.

Attributes:

Source code in src/async_kernel/interface/zmq.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class ZMQInterface(BaseInterface[T_shell_co], ConnectionFileMixin, Generic[T_shell_co]):  # pyright: ignore[reportUnsafeMultipleInheritance]
    "The base kernel interface using ZMQ sockets."

    aliases = BaseInterface.aliases | {
        ("f", "connection_file"): "ZMQInterface.connection_file",
        "host": "ZMQInterface.host",
        "host_options": "ZMQInterface.host_options",
        "backend_options": "ZMQInterface.backend_options",
        "backend": "ZMQInterface.backend",
        "ip": "ZMQInterface.ip",
        "hb": "ZMQInterface.hb_port",
        "shell": "ZMQInterface.shell_port",
        "iopub": "ZMQInterface.iopub_port",
        "stdin": "ZMQInterface.stdin_port",
        "control": "ZMQInterface.control_port",
        "transport": "ZMQInterface.transport",
    }
    ""

    session = traitlets.Instance(AsyncSession, ())
    ""

    transport: traitlets.CaselessStrEnum[str] = traitlets.CaselessStrEnum(
        ["tcp", "ipc"] if sys.platform == "linux" else ["tcp"], default_value="tcp"
    ).tag(config=True)
    "Transport for sockets."

    _initialized = False
    _zmq_context = Fixed(zmq.Context)
    _iopub_url = "inproc://iopub-capture"
    _sockets: Fixed[Self, dict[Channel, zmq.Socket]] = Fixed(dict)

    @traitlets.validate("connection_file")
    def _validate_connection_file(self, proposal: dict) -> str:

        if self._sockets and self.trait_has_value("connection_file") and proposal["value"] != self.connection_file:
            msg = "It is too late to set the connection file!"
            raise RuntimeError(msg)
        return proposal["value"]

    @property
    @override
    def summary(self) -> str:
        return f"{super().summary} connection_file={str(self.connection_file)!r}"

    @override
    def load_connection_info(self, info: KernelConnectionInfo) -> None:
        if self._sockets:
            msg = "It is too late to configure!"
            raise RuntimeError(msg)
        super().load_connection_info(info)

    @override
    def blocking_client(self) -> Never:
        raise MethodNotSupported  # pragma: no cover

    @override
    def connect_control(self, identity: bytes | None = None) -> Never:
        raise MethodNotSupported  # pragma: no cover

    @override
    def connect_hb(self, identity: bytes | None = None) -> Never:
        raise MethodNotSupported  # pragma: no cover

    @override
    def connect_iopub(self, identity: bytes | None = None) -> Never:
        raise MethodNotSupported  # pragma: no cover

    @override
    def connect_shell(self, identity: bytes | None = None) -> Never:
        raise MethodNotSupported  # pragma: no cover

    @override
    def connect_stdin(self, identity: bytes | None = None) -> Never:
        raise MethodNotSupported  # pragma: no cover

    @override
    @asynccontextmanager
    async def __asynccontextmanager__(self, *, set_started=True) -> AsyncGenerator[Self]:

        if os.path.exists(self.connection_file):  # noqa: PTH110
            self.load_connection_file()
        self.write_connection_file()
        try:
            async with super().__asynccontextmanager__(set_started=False):
                self._start_hb_iopub_shell_control_threads()
                with self._bind_socket(Channel.stdin):
                    assert len(self._sockets) == len(Channel)
                    if set_started:
                        self._started()
                    yield self
        finally:
            self._zmq_context.term()

    def _start_hb_iopub_shell_control_threads(self) -> None:
        def heartbeat(ready: Callable[[], None]) -> None:
            # ref: https://jupyter-client.readthedocs.io/en/stable/messaging.html#heartbeat-for-kernels
            utils.mark_thread_pydev_do_not_trace()
            with self._bind_socket(Channel.heartbeat) as socket:
                ready()
                self.started.wait_sync()
                try:
                    zmq.proxy(socket, socket)
                except zmq.ContextTerminated:
                    return

        def pub_proxy(ready: Callable[[], None]) -> None:
            utils.mark_thread_pydev_do_not_trace()

            # We use an internal proxy to collect pub messages for distribution.
            # Each thread needs to open its own socket to publish to the internal proxy.
            # Ref: https://zguide.zeromq.org/docs/chapter2/#Working-with-Messages (fig 14)

            frontend: zmq.Socket = self._zmq_context.socket(zmq.XSUB)
            frontend.bind(Caller.iopub_url)

            # Capture broadcast messages received on both frontend and backend
            capture = self._zmq_context.socket(zmq.PUB)
            capture.bind(self._iopub_url)

            with self._bind_socket(Channel.iopub) as iopub_socket:
                ready()
                try:
                    zmq.proxy(frontend, iopub_socket, capture)
                except (zmq.ContextTerminated, Exception):
                    pass
            frontend.close(linger=50)
            capture.close(linger=50)

        ready = CountdownEvent(5)

        threading.Thread(target=heartbeat, name="heartbeat", args=[ready.down]).start()
        threading.Thread(target=pub_proxy, name="iopub proxy", args=[ready.down]).start()
        threading.Thread(target=self._pub_capture, args=[ready.down]).start()
        # message loops
        for channel in [Channel.shell, Channel.control]:
            name = f"{channel}-receive_msg_loop"
            threading.Thread(target=self.receive_msg_loop, name=name, args=(channel, ready.down)).start()
        ready.wait()

    def _pub_capture(self, ready: Callable[[], None]) -> None:
        """
        Capture connection messages on iopub.

        Will send an 'iopub_welcome' whenever a socket subscribes to the iopub socket [ref](https://jupyter-client.readthedocs.io/en/stable/messaging.html#welcome-message).
        """

        utils.mark_thread_pydev_do_not_trace()

        socket: zmq.Socket = self._zmq_context.socket(zmq.SUB)
        socket.linger = 0
        socket.connect(self._iopub_url)
        # welcome_message:  https://jupyter.org/enhancement-proposals/65-jupyter-xpub/jupyter-xpub.html#replace-pub-socket-with-xpub-socket
        # Only subscribe to the 'pub subscribe' topic byte `1` (byte `0` is 'pub unsubscribe').
        socket.subscribe(b"\x01")
        with socket:
            ready()
            self.started.wait_sync()
            while True:
                try:
                    if frames := socket.recv_multipart():
                        frame = next(iter(frames))
                        if frame[0] == 1:
                            msg = self.msg("iopub_welcome", content={"subscription": frame[1:].decode()})
                            self.iopub_send(msg, parent=None)
                except zmq.ContextTerminated:
                    break
                except Exception:
                    continue

    @contextlib.contextmanager
    def _bind_socket(self, channel: Channel) -> Generator[Any | Socket[Any], Any, None]:
        """
        Bind a zmq.Socket storing a reference to the socket and the port
        details and closing the socket on leaving the context.
        """
        port = int(getattr(self, f"{channel}_port"))
        assert port
        assert channel not in self._sockets

        match channel:
            case Channel.shell | Channel.control | Channel.heartbeat | Channel.stdin:
                socket_type = zmq.ROUTER
            case Channel.iopub:
                socket_type = zmq.XPUB
        socket: zmq.Socket = self._zmq_context.socket(socket_type)
        socket.linger = 1000
        if self.curve_secretkey is not None and self.curve_publickey is not None:
            socket.curve_secretkey = self.curve_secretkey
            socket.curve_publickey = self.curve_publickey
            socket.curve_server = True
        # bind the socket
        addr = f"tcp://{self.ip}:{port}" if self.transport == "tcp" else f"ipc://{self.ip}-{port}"
        socket.bind(addr)
        self.log.debug("%s socket on port: %i", channel, port)
        self._sockets[channel] = socket
        try:
            yield socket
        finally:
            socket.close(linger=50)
            self._sockets.pop(channel)

    @override
    def input_request(self, prompt: str, *, password=False) -> Any:
        job = utils.get_job()
        if not job["msg"].get("content", {}).get("allow_stdin", False):
            msg = "Stdin is not allowed in this context!"
            raise RuntimeError(msg)
        socket = self._sockets[Channel.stdin]
        # Clear messages on the stdin socket
        while socket.get(SocketOption.EVENTS) & PollEvent.POLLIN:  # pyright: ignore[reportOperatorIssue]
            socket.recv_multipart(flags=Flag.DONTWAIT, copy=False)
        # Send the input request.
        assert self is not None
        self.session.send(
            stream=socket,
            msg_or_type="input_request",
            content={"prompt": prompt, "password": password},
            parent=job["msg"],  # pyright: ignore[reportArgumentType]
            ident=job["ident"],
        )
        # Poll for a reply.
        while not (socket.poll(100) & PollEvent.POLLIN):
            if pen := self.kernel._interrupt_requested:  # pyright: ignore[reportPrivateUsage]
                with enable_signal_safety():
                    pen.set_result(None)
                raise KernelInterrupt
        return self.session.recv(socket)[1]["content"]["value"]  # pyright: ignore[reportOptionalSubscript]

    @override
    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """
        Send a message on the zmq iopub socket.
        """
        if socket := Caller.iopub_sockets.get(t_ident := Caller.id_current()):
            msg = self.session.send(
                stream=socket,
                msg_or_type=msg_or_type,  # pyright: ignore[reportArgumentType]
                content=content,
                metadata=metadata,
                parent=parent if parent is not NoValue else utils.get_parent_message(),  # pyright: ignore[reportArgumentType]
                ident=ident,
                buffers=buffers,  # pyright: ignore[reportArgumentType]
            )
            if msg:
                self.log.debug("iopub_send: msg_type:'%s', content: %s", msg["header"]["msg_type"], msg["content"])
        elif (caller := self.callers.get(Channel.control)) and caller.id != t_ident:
            caller.call_direct(
                self.iopub_send,
                msg_or_type=msg_or_type,
                content=content,
                metadata=metadata,
                parent=parent if parent is not NoValue else None,
                ident=ident,
                buffers=buffers,
            )

    def receive_msg_loop(self, channel: Literal[Channel.control, Channel.shell], ready: Callable[[], None]) -> None:
        """
        Opens a zmq socket for the channel, receives messages and calls the message handler.
        """
        if not utils.LAUNCHED_BY_DEBUGPY:
            utils.mark_thread_pydev_do_not_trace()

        session, log, message_handler = self.session, self.log, self.kernel.message_handler
        lock = BinarySemaphore()

        async def send_reply(job: Job, content: dict, /) -> None:
            if "status" not in content:
                content["status"] = "ok"
            async with lock:
                msg = session.send(
                    stream=socket,
                    msg_or_type=job["msg"]["header"]["msg_type"].replace("request", "reply"),
                    content=content,
                    parent=job["msg"],  # pyright: ignore[reportArgumentType]
                    ident=job["ident"],
                    buffers=content.pop("buffers", None),
                )
                if msg:
                    log.debug("***send_reply %s*** %s", channel, msg)

        with self._bind_socket(channel) as socket:
            ready()
            self.started.wait_sync()

            while True:
                try:
                    ident, msg = session.recv(socket, mode=zmq.BLOCKY, copy=False)
                    msg["channel"] = channel  # pyright: ignore[reportOptionalSubscript]
                    job = Job(received_time=time.monotonic(), msg=msg, ident=ident)  # pyright: ignore[reportArgumentType]
                    message_handler(job, send_reply, self.iopub_send)
                except zmq.ContextTerminated:
                    break
                except Exception as e:
                    log.debug("Bad message on %s: %s", channel, e)
                    continue

aliases class-attribute instance-attribute

aliases = BaseInterface.aliases | {
    ("f", "connection_file"): "ZMQInterface.connection_file",
    "host": "ZMQInterface.host",
    "host_options": "ZMQInterface.host_options",
    "backend_options": "ZMQInterface.backend_options",
    "backend": "ZMQInterface.backend",
    "ip": "ZMQInterface.ip",
    "hb": "ZMQInterface.hb_port",
    "shell": "ZMQInterface.shell_port",
    "iopub": "ZMQInterface.iopub_port",
    "stdin": "ZMQInterface.stdin_port",
    "control": "ZMQInterface.control_port",
    "transport": "ZMQInterface.transport",
}

session class-attribute instance-attribute

session = traitlets.Instance(AsyncSession, ())

transport class-attribute instance-attribute

transport: CaselessStrEnum[str] = traitlets.CaselessStrEnum(
    ["tcp", "ipc"] if sys.platform == "linux" else ["tcp"], default_value="tcp"
).tag(config=True)

Transport for sockets.

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send a message on the zmq iopub socket.

Source code in src/async_kernel/interface/zmq.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@override
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """
    Send a message on the zmq iopub socket.
    """
    if socket := Caller.iopub_sockets.get(t_ident := Caller.id_current()):
        msg = self.session.send(
            stream=socket,
            msg_or_type=msg_or_type,  # pyright: ignore[reportArgumentType]
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else utils.get_parent_message(),  # pyright: ignore[reportArgumentType]
            ident=ident,
            buffers=buffers,  # pyright: ignore[reportArgumentType]
        )
        if msg:
            self.log.debug("iopub_send: msg_type:'%s', content: %s", msg["header"]["msg_type"], msg["content"])
    elif (caller := self.callers.get(Channel.control)) and caller.id != t_ident:
        caller.call_direct(
            self.iopub_send,
            msg_or_type=msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else None,
            ident=ident,
            buffers=buffers,
        )

receive_msg_loop

receive_msg_loop(channel: Literal[control, shell], ready: Callable[[], None]) -> None

Opens a zmq socket for the channel, receives messages and calls the message handler.

Source code in src/async_kernel/interface/zmq.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def receive_msg_loop(self, channel: Literal[Channel.control, Channel.shell], ready: Callable[[], None]) -> None:
    """
    Opens a zmq socket for the channel, receives messages and calls the message handler.
    """
    if not utils.LAUNCHED_BY_DEBUGPY:
        utils.mark_thread_pydev_do_not_trace()

    session, log, message_handler = self.session, self.log, self.kernel.message_handler
    lock = BinarySemaphore()

    async def send_reply(job: Job, content: dict, /) -> None:
        if "status" not in content:
            content["status"] = "ok"
        async with lock:
            msg = session.send(
                stream=socket,
                msg_or_type=job["msg"]["header"]["msg_type"].replace("request", "reply"),
                content=content,
                parent=job["msg"],  # pyright: ignore[reportArgumentType]
                ident=job["ident"],
                buffers=content.pop("buffers", None),
            )
            if msg:
                log.debug("***send_reply %s*** %s", channel, msg)

    with self._bind_socket(channel) as socket:
        ready()
        self.started.wait_sync()

        while True:
            try:
                ident, msg = session.recv(socket, mode=zmq.BLOCKY, copy=False)
                msg["channel"] = channel  # pyright: ignore[reportOptionalSubscript]
                job = Job(received_time=time.monotonic(), msg=msg, ident=ident)  # pyright: ignore[reportArgumentType]
                message_handler(job, send_reply, self.iopub_send)
            except zmq.ContextTerminated:
                break
            except Exception as e:
                log.debug("Bad message on %s: %s", channel, e)
                continue

An IPython application with a zmq interface.

Classes:

  • IPApp

    An IPython application with a zmq interface.

IPApp

Bases: ZMQInterface[T_ipshell_co], BaseIPythonApplication, InteractiveShellApp, Generic[T_ipshell_co]

An IPython application with a zmq interface.

Attributes:

Source code in src/async_kernel/interface/ip_app.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class IPApp(  # pyright: ignore[reportUnsafeMultipleInheritance, reportIncompatibleVariableOverride]
    ZMQInterface[T_ipshell_co], BaseIPythonApplication, InteractiveShellApp, Generic[T_ipshell_co]
):
    """
    An IPython application with a zmq interface.
    """

    description = traitlets.Unicode(
        "async-kernel: A Jupyter kernel providing an asynchronous IPython shell.",
    ).tag(config=True)
    "A description to use for the command line interface."

    aliases = (
        ZMQInterface.aliases
        | {
            "profile-dir": "ProfileDir.location",
            "profile": "BaseIPythonApplication.profile",
            "ipython-dir": "BaseIPythonApplication.ipython_dir",
            "config": "BaseIPythonApplication.extra_config_file",
        }
        | shell_aliases
    )
    ""

    flags = (
        ZMQInterface.flags
        | {
            "automagic": (
                {"InteractiveShell": {"automagic": True}},
                "Turn on the auto calling of magic commands. Type %%magic at the IPython  prompt  for  more information.",
            ),
            "no-automagic": (
                {"InteractiveShell": {"automagic": False}},
                "Turn off the auto calling of magic commands.",
            ),
        }
        | shell_flags
    )
    ""

    @property
    @override
    def user_ns(self) -> dict[str, Any]:
        return self.shell.user_ns

    @override
    def initialize(self, argv: None | list | NoValue = ...) -> None:  # pyright: ignore[reportInvalidTypeForm]
        super().initialize(argv)
        if self.host is None:
            for k in ["pylab", "gui", "matplotlib"]:
                if host := Hosts.from_gui(getattr(self, k, None)):
                    self.host = host
                    break

    @override
    @asynccontextmanager
    async def __asynccontextmanager__(self, *, set_started=True) -> AsyncGenerator[Self]:
        async with super().__asynccontextmanager__(set_started=False):
            self.shell = self.kernel.main_shell
            self.init_path()
            self.init_gui_pylab()
            self.init_code()
            self.init_extensions()
            if set_started:
                self._started()
            yield self

description class-attribute instance-attribute

description = traitlets.Unicode(
    "async-kernel: A Jupyter kernel providing an asynchronous IPython shell."
).tag(config=True)

A description to use for the command line interface.

aliases class-attribute instance-attribute

aliases = (
    ZMQInterface.aliases
    | {
        "profile-dir": "ProfileDir.location",
        "profile": "BaseIPythonApplication.profile",
        "ipython-dir": "BaseIPythonApplication.ipython_dir",
        "config": "BaseIPythonApplication.extra_config_file",
    }
    | shell_aliases
)

flags class-attribute instance-attribute

flags = (
    ZMQInterface.flags
    | {
        "automagic": (
            {"InteractiveShell": {"automagic": True}},
            "Turn on the auto calling of magic commands. Type %%magic at the IPython  prompt  for  more information.",
        ),
        "no-automagic": (
            {"InteractiveShell": {"automagic": False}},
            "Turn off the auto calling of magic commands.",
        ),
    }
    | shell_flags
)