Skip to content

kernel

Classes:

KernelInterruptError

Bases: Exception

Raised to interrupt the kernel.

Source code in src/async_kernel/asyncshell.py
44
45
class KernelInterruptError(Exception):
    "Raised to interrupt the kernel."

Kernel

Bases: HasTraits, AsyncContextManagerMixin

A Jupyter kernel that supports concurrent execution providing an IPython InteractiveShell with support for kernel subshells.

Info

Only one instance of a kernel is created at a time per subprocess. The instance can be obtained with Kernel() or [get_kernel].

Starting the kernel

The kernel should appear in the list of kernels just as other kernels are. Variants of the kernel can with custom configuration can be added at the command line.

async-kernel -f .
import async_kernel.interface

async_kernel.interface.start_kernel_zmq_interface()
async with Kernel():
    await anyio.sleep_forever()
Warning

Starting the kernel outside the main thread has the following implicatations: - Execute requests won't be run in the main thread. - Interrupts via signals won't work, so thread blocking calls in the shell cannot be interrupted.

Origins

Methods:

Attributes:

Source code in src/async_kernel/kernel.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
class Kernel(HasTraits, anyio.AsyncContextManagerMixin):
    """
    A Jupyter kernel that supports concurrent execution providing an [IPython InteractiveShell][async_kernel.asyncshell.AsyncInteractiveShell]
    with support for kernel subshells.

    Info:
        Only one instance of a kernel is created at a time per subprocess. The instance can be obtained
        with `Kernel()` or [get_kernel].

    Starting the kernel:
        The kernel should appear in the list of kernels just as other kernels are. Variants of the kernel
        can with custom configuration can be added at the [command line][command.command_line].

        === "From the shell"

            ``` shell
            async-kernel -f .
            ```

        === "Blocking"

            ```python
            import async_kernel.interface

            async_kernel.interface.start_kernel_zmq_interface()
            ```

        === "Inside a coroutine"

            ```python
            async with Kernel():
                await anyio.sleep_forever()
            ```

    Warning:
        Starting the kernel outside the main thread has the following implicatations:
            - Execute requests won't be run in the main thread.
            - Interrupts via signals won't work, so thread blocking calls in the shell cannot be interrupted.

    Origins:
        - [IPyKernel Kernel][ipykernel.kernelbase.Kernel]
        - [IPyKernel IPKernelApp][ipykernel.kernelapp.IPKernelApp]
        - [IPyKernel IPythonKernel][ipykernel.ipkernel.IPythonKernel]
    """

    _instance: Self | None = None
    _initialised = False

    _settings = Fixed(dict)

    interface = traitlets.Instance(BaseKernelInterface)
    "The abstraction to communicate with the kernel."

    callers: Fixed[Self, dict[Literal[Channel.shell, Channel.control], Caller]] = Fixed(dict)
    "The callers associated with the kernel once it has started."
    ""
    subshell_manager = Fixed(SubshellManager)
    "Dedicated to management of sub shells."

    # Public traits
    help_links = Tuple()
    ""
    quiet = traitlets.Bool(True)
    "Only send stdout/stderr to output stream."

    print_kernel_messages = traitlets.Bool(True)
    "When enabled the kernel will print startup, shutdown and terminal errors."

    connection_file: traitlets.TraitType[Path, Path | str] = traitlets.TraitType()
    """
    JSON file in which to store connection info 

    `"kernel-<pid>.json"`

    This file will contain the IP, ports, and authentication key needed to connect
    clients to this kernel. By default, this file will be created in the security dir
    of the current profile, but can be specified by absolute path.
    """

    kernel_name = CUnicode()
    "The kernels name - if it contains 'trio' a trio backend will be used instead of an asyncio backend."

    log = Instance(logging.LoggerAdapter)
    "The logging adapter."

    # Public fixed
    main_shell = Fixed(lambda _: AsyncInteractiveShell.instance())
    "The interactive shell."

    debugger = Fixed(Debugger)
    "Handles [debug requests](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request)."

    comm_manager = Fixed(CommManager)
    "Creates [async_kernel.comm.Comm][] instances and maintains a mapping to `comm_id` to `Comm` instances."

    event_started = Fixed(Event)
    "An event that occurs when the kernel is started."

    event_stopped = Fixed(Event)
    "An event that occurs when the kernel is stopped."

    def __new__(cls, settings: dict | None = None, /) -> Self:  # noqa: ARG004
        #  There is only one instance (including subclasses).
        if not (instance := Kernel._instance):
            Kernel._instance = instance = super().__new__(cls)
        return instance  # pyright: ignore[reportReturnType]

    def __init__(self, settings: dict | None = None, /) -> None:
        if not self._initialised:
            self._initialised = True
            super().__init__()
            if not os.environ.get("MPLBACKEND"):
                os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
        if settings:
            self.load_settings(settings)

    @override
    def __repr__(self) -> str:
        info = [f"{k}={v}" for k, v in self.settings.items()]
        return f"{self.__class__.__name__}<{', '.join(info)}>"

    @traitlets.default("log")
    def _default_log(self) -> LoggerAdapter[Logger]:
        return logging.LoggerAdapter(logging.getLogger(self.__class__.__name__))

    @traitlets.default("kernel_name")
    def _default_kernel_name(self):
        return "async-trio" if current_async_library(failsafe=True) == "trio" else "async"

    @traitlets.default("interface")
    def default_interface(self):
        from async_kernel.interface.zmq import ZMQKernelInterface  # noqa: PLC0415

        return ZMQKernelInterface()

    @traitlets.default("connection_file")
    def _default_connection_file(self) -> Path:
        return Path(jupyter_runtime_dir()).joinpath(f"kernel-{uuid.uuid4()}.json")

    @traitlets.default("help_links")
    def _default_help_links(self) -> tuple[dict[str, str], ...]:
        return (
            {
                "text": "Async Kernel Reference ",
                "url": "https://fleming79.github.io/async-kernel/",
            },
            {
                "text": "IPython Reference",
                "url": "https://ipython.readthedocs.io/en/stable/",
            },
            {
                "text": "IPython magic Reference",
                "url": "https://ipython.readthedocs.io/en/stable/interactive/magics.html",
            },
            {
                "text": "Matplotlib ipympl Reference",
                "url": "https://matplotlib.org/ipympl/",
            },
            {
                "text": "Matplotlib Reference",
                "url": "https://matplotlib.org/contents.html",
            },
        )

    @traitlets.observe("connection_file")
    def _observe_connection_file(self, change) -> None:
        if not self.interface.callers and (path := self.connection_file).exists():
            self.log.debug("Loading connection file %s", path)
            with path.open("r") as f:
                self.load_connection_info(json.load(f))

    @traitlets.validate("connection_file")
    def _validate_connection_file(self, proposal) -> Path:
        return pathlib.Path(proposal.value)

    @property
    def settings(self) -> dict[str, Any]:
        "Settings that have been set to customise the behaviour of the kernel."
        return {k: getattr(self, k) for k in ("kernel_name", "connection_file")} | self._settings

    @property
    def shell(self) -> AsyncInteractiveShell | AsyncInteractiveSubshell:
        """
        The shell given the current context.

        Notes:
            - The `subshell_id` of the main shell is `None`.
        """
        return self.subshell_manager.get_shell()

    @property
    def caller(self) -> Caller:
        "The caller for the shell channel."
        return self.callers[Channel.shell]

    @property
    def kernel_info(self) -> dict[str, str | dict[str, str | dict[str, str | int]] | Any | tuple[Any, ...] | bool]:
        "A dict of detail sent in reply to for a 'kernel_info_request'."
        supported_features = ["kernel subshells"]
        if not utils.LAUNCHED_BY_DEBUGPY and sys.platform != "emscripten":
            supported_features.append("debugger")

        return {
            "protocol_version": async_kernel.kernel_protocol_version,
            "implementation": "async_kernel",
            "implementation_version": async_kernel.__version__,
            "language_info": async_kernel.kernel_protocol_version_info,
            "banner": self.shell.banner,
            "help_links": self.help_links,
            "debugger": (not utils.LAUNCHED_BY_DEBUGPY) & (sys.platform != "emscripten"),
            "kernel_name": self.kernel_name,
            "supported_features": supported_features,
        }

    def load_settings(self, settings: dict[str, Any]) -> None:
        """
        Load settings into the kernel.

        Permitted until the kernel async context has been entered.

        Args:
            settings:
                key: dotted.path.of.attribute.
                value: The value to set.
        """
        if self.event_started:
            msg = "It is too late to load settings!"
            raise RuntimeError(msg)
        settings_ = self._settings or {"kernel_name": self.kernel_name}
        for k, v in settings.items():
            settings_ |= utils.setattr_nested(self, k, v)
        self._settings.update(settings_)

    def load_connection_info(self, info: dict[str, Any]) -> None:
        """
        Load connection info from a dict containing connection info.

        Typically this data comes from a connection file
        and is called by load_connection_file.

        Args:
            info: Dictionary containing connection_info. See the connection_file spec for details.
        """
        self.interface.load_connection_info(info)

    @staticmethod
    def stop() -> None:
        """
        A [staticmethod][] to stop the running kernel.

        Once an instance of a kernel is stopped the instance cannot be restarted.
        Instead a new instance should be started.
        """
        if (instance := Kernel._instance) and (stop := getattr(instance, "_stop", None)):
            stop()

    @asynccontextmanager
    async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
        """Start the kernel in an already running anyio event loop."""
        assert self.main_shell
        try:
            async with self.interface:
                self.callers.update(self.interface.callers)
                with anyio.CancelScope() as scope:
                    self._stop = lambda: self.caller.call_direct(scope.cancel, "Stopping kernel")
                    sys.excepthook = self.excepthook
                    sys.unraisablehook = self.unraisablehook

                    self.comm_manager.patch_comm()
                    try:
                        self.comm_manager.kernel = self
                        self.event_started.set()
                        self.log.info("Kernel started: %s", self)
                        yield self
                    except BaseException:
                        if not scope.cancel_called:
                            raise
                    finally:
                        self.comm_manager.kernel = None
                        self.event_stopped.set()
        finally:
            self.shell.reset(new_session=False)
            self.subshell_manager.stop_all_subshells(force=True)
            self.callers.clear()
            Kernel._instance = None
            AsyncInteractiveShell.clear_instance()
            with anyio.CancelScope(shield=True):
                await anyio.sleep(0.1)
            self.log.info("Kernel stopped: %s", self)
            gc.collect()

    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: Message[dict[str, Any]] | dict[str, Any] | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """Send a message on the iopub socket."""
        self.interface.iopub_send(
            msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent,
            ident=ident,
            buffers=buffers,
        )

    def topic(self, topic) -> bytes:
        """prefixed topic for IOPub messages."""
        return (f"kernel.{topic}").encode()

    def msg_handler(
        self,
        channel: Literal[Channel.shell, Channel.control],
        msg_type: MsgType,
        job: Job,
        send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
        /,
    ):
        """Schedule a message to be executed."""
        # Note: There are never any active pending trackers in this context.
        try:
            subshell_id = job["msg"]["content"]["subshell_id"]
        except KeyError:
            try:
                subshell_id = job["msg"]["header"]["subshell_id"]  # pyright: ignore[reportTypedDictNotRequiredAccess]
            except KeyError:
                subshell_id = None
        handler = cache_wrap_handler(subshell_id, send_reply, self.run_handler, self.get_handler(msg_type))
        run_mode = self.get_run_mode(msg_type, channel=channel, job=job)
        match run_mode:
            case RunMode.direct:
                self.callers[channel].call_direct(handler, job)
            case RunMode.queue:
                self.callers[channel].queue_call(handler, job).trackers = ()  # A slight optimisation
            case RunMode.task:
                self.callers[channel].call_soon(handler, job)
            case RunMode.thread:
                self.callers[channel].to_thread(handler, job)
        self.log.debug("%s %s %s %s", msg_type, handler, run_mode, job)

    def get_handler(self, msg_type: MsgType) -> HandlerType:
        if not callable(f := getattr(self, msg_type, None)):
            msg = f"A handler was not found for {msg_type=}"
            raise TypeError(msg)
        return f  # pyright: ignore[reportReturnType]

    async def run_handler(
        self,
        subshell_id: str | None,
        send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
        handler: HandlerType,
        job: Job[dict],
    ) -> None:
        """
        Asynchronously run a message handler for a given job, managing reply sending and execution state.

        Args:
            subshell_id: The id of the subshell to set the context of the handler.
            send_reply: A coroutine function responsible for sending the reply.
            handler: A coroutine function to handle the job / message.

                - It is a method on the kernel whose name corresponds to the [message type that it handles][async_kernel.typing.MsgType].
                - The handler should return a dict to use as 'content'in a reply.
                - If status is not included in the dict it gets added automatically as `{'status': 'ok'}`.
                - If a reply is not expected the handler should return `None`.

            job: The job dictionary containing message, socket, and identification information.

        Workflow:
            - Sets the current job and subshell_id context variables.
            - Sends a "busy" status message on the IOPub channel.
            - Awaits the handler; if the handler returns a content dict, a reply is sent using send_reply.
            - On exception, sends an error reply and logs the exception.
            - Resets the job and subshell_id context variables.
            - Sends an "idle" status message on the IOPub channel.

        Notes:
            - Replies are sent even if exceptions occur in the handler.
            - The reply message type is derived from the original request type.
        """
        job_token = utils._job_var.set(job)  # pyright: ignore[reportPrivateUsage]
        subshell_token = SubshellPendingManager._contextvar.set(subshell_id)  # pyright: ignore[reportPrivateUsage]

        try:
            self.iopub_send(
                msg_or_type="status",
                parent=job["msg"],
                content={"execution_state": "busy"},
                ident=self.topic(topic="status"),
            )
            if (content := await handler(job)) is not None:
                await send_reply(job, content)
        except Exception as e:
            await send_reply(job, utils.error_to_content(e))
            self.log.exception("Exception in message handler:", exc_info=e)
        finally:
            utils._job_var.reset(job_token)  # pyright: ignore[reportPrivateUsage]
            SubshellPendingManager._contextvar.reset(subshell_token)  # pyright: ignore[reportPrivateUsage]
            self.iopub_send(
                msg_or_type="status",
                parent=job["msg"],
                content={"execution_state": "idle"},
                ident=self.topic("status"),
            )
            del job

    def get_run_mode(
        self,
        msg_type: MsgType,
        *,
        channel: Literal[Channel.shell, Channel.control] = Channel.shell,
        job: Job | None = None,
    ) -> RunMode:
        """
        Determine the run mode for a given channel, message type and job.

        Args:
            channel: The channel the message was received on.
            msg_type: The type of the message.
            job: The job associated with the message, if any.

        Returns:
            The run mode for the message.
        """
        # receive_msg_loop - DEBUG WARNING

        # TODO: Are any of these options worth including?
        # if mode_from_metadata := job["msg"]["metadata"].get("run_mode"):
        #     return RunMode( mode_from_metadata)
        # if mode_from_header := job["msg"]["header"].get("run_mode"):
        #     return RunMode( mode_from_header)
        match (channel, msg_type):
            case _, MsgType.comm_msg:
                return RunMode.queue
            case Channel.control, MsgType.execute_request:
                return RunMode.queue
            case _, MsgType.execute_request:
                if job:
                    if content := job["msg"].get("content", {}):
                        if code := content.get("code"):
                            try:
                                if (code := code.strip().split("\n", maxsplit=1)[0]).startswith(("# ", "##")):
                                    return RunMode(code[2:])
                                if code.startswith("RunMode."):
                                    return RunMode(code.removeprefix("RunMode."))
                            except ValueError:
                                pass
                        if content.get("silent"):
                            return RunMode.task
                    if mode_ := set(utils.get_tags(job)).intersection(RunMode):
                        return RunMode(next(iter(mode_)))
                return RunMode.queue
            case (
                Channel.shell,
                MsgType.shutdown_request
                | MsgType.debug_request
                | MsgType.create_subshell_request
                | MsgType.delete_subshell_request
                | MsgType.list_subshell_request,
            ):
                msg = f"{msg_type=} not allowed on shell!"
                raise ValueError(msg)
            case _, MsgType.debug_request:
                return RunMode.queue
            case (
                _,
                MsgType.complete_request
                | MsgType.inspect_request
                | MsgType.history_request
                | MsgType.create_subshell_request
                | MsgType.delete_subshell_request
                | MsgType.is_complete_request,
            ):
                return RunMode.thread
            case _:
                pass
        return RunMode.direct

    def all_concurrency_run_modes(
        self,
        channels: Iterable[Literal[Channel.shell, Channel.control]] = (Channel.shell, Channel.control),
        msg_types: Iterable[MsgType] = MsgType,
    ) -> dict[
        Literal["SocketID", "MsgType", "RunMode"],
        tuple[Channel, MsgType, RunMode | None],
    ]:
        """
        Generates a dictionary containing all combinations of SocketID, and MsgType, along with their
        corresponding RunMode (if available).
        """
        data: list[Any] = []
        for channel in channels:
            for msg_type in msg_types:
                try:
                    mode = self.get_run_mode(msg_type, channel=channel)
                except ValueError:
                    mode = None
                data.append((channel, msg_type, mode))
        data_ = zip(*data, strict=True)
        return dict(zip(["SocketID", "MsgType", "RunMode"], data_, strict=True))

    async def kernel_info_request(self, job: Job[Content], /) -> Content:
        """Handle a [kernel info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-info)."""
        return self.kernel_info

    async def comm_info_request(self, job: Job[Content], /) -> Content:
        """Handle a [comm info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-info)."""
        c = job["msg"]["content"]
        target_name = c.get("target_name", None)
        comms = {
            k: {"target_name": v.target_name}
            for (k, v) in tuple(self.comm_manager.comms.items())
            if v.target_name == target_name or target_name is None
        }
        return {"comms": comms}

    async def execute_request(self, job: Job[ExecuteContent], /) -> Content:
        """Handle a [execute request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute)."""
        return await self.shell.execute_request(**job["msg"]["content"])  # pyright: ignore[reportArgumentType]

    async def complete_request(self, job: Job[Content], /) -> Content:
        """Handle a [completion request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#completion)."""
        return await self.shell.do_complete_request(
            code=job["msg"]["content"].get("code", ""), cursor_pos=job["msg"]["content"].get("cursor_pos", 0)
        )

    async def is_complete_request(self, job: Job[Content], /) -> Content:
        """Handle a [is_complete request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#code-completeness)."""
        return await self.shell.is_complete_request(job["msg"]["content"].get("code", ""))

    async def inspect_request(self, job: Job[Content], /) -> Content:
        """Handle a [inspect request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#introspection)."""
        c = job["msg"]["content"]
        return await self.shell.inspect_request(
            code=c.get("code", ""),
            cursor_pos=c.get("cursor_pos", 0),
            detail_level=c.get("detail_level", 0),
        )

    async def history_request(self, job: Job[Content], /) -> Content:
        """Handle a [history request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#history)."""
        return await self.shell.history_request(**job["msg"]["content"])

    async def comm_open(self, job: Job[Content], /) -> None:
        """Handle a [comm open request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm)."""
        self.comm_manager.comm_open(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

    async def comm_msg(self, job: Job[Content], /) -> None:
        """Handle a [comm msg request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-messages)."""
        self.comm_manager.comm_msg(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

    async def comm_close(self, job: Job[Content], /) -> None:
        """Handle a [comm close request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#tearing-down-comms)."""
        self.comm_manager.comm_close(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

    async def interrupt_request(self, job: Job[Content], /) -> Content:
        """Handle an [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt) (control only)."""
        self.interface.interrupt()
        return {}

    async def shutdown_request(self, job: Job[Content], /) -> Content:
        """Handle a [shutdown request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-shutdown) (control only)."""
        self.stop()
        return {"restart": job["msg"]["content"].get("restart", False)}

    async def debug_request(self, job: Job[Content], /) -> Content:
        """Handle a [debug request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request) (control only)."""
        return await self.debugger.process_request(job["msg"]["content"])

    async def create_subshell_request(self: Kernel, job: Job[Content], /) -> Content:
        """Handle a [create subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#create-subshell) (control only)."""

        return {"subshell_id": self.subshell_manager.create_subshell(protected=False).subshell_id}

    async def delete_subshell_request(self, job: Job[Content], /) -> Content:
        """Handle a [delete subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#delete-subshell) (control only)."""
        self.subshell_manager.delete_subshell(job["msg"]["content"]["subshell_id"])
        return {}

    async def list_subshell_request(self, job: Job[Content], /) -> Content:
        """Handle a [list subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#list-subshells) (control only)."""
        return {"subshell_id": list(self.subshell_manager.list_subshells())}

    def excepthook(self, etype, evalue, tb) -> None:
        """Handle an exception."""
        # write uncaught traceback to 'real' stderr, not zmq-forwarder
        if self.print_kernel_messages:
            traceback.print_exception(etype, evalue, tb, file=sys.__stderr__)

    def unraisablehook(self, unraisable: sys.UnraisableHookArgs, /) -> None:
        "Handle unraisable exceptions (during gc for instance)."
        exc_info = (
            unraisable.exc_type,
            unraisable.exc_value or unraisable.exc_type(unraisable.err_msg),
            unraisable.exc_traceback,
        )
        self.log.exception(unraisable.err_msg, exc_info=exc_info, extra={"object": unraisable.object})

    def get_connection_info(self) -> dict[str, Any]:
        """Return the connection info as a dict."""
        with self.connection_file.open("r") as f:
            return json.load(f)

    def get_parent(self) -> Message[dict[str, Any]] | None:
        """
        A convenience method to access the 'message' in the current context if there is one.

        'parent' is the parameter name used by [Session.send][jupyter_client.session.Session.send] to provide context when sending a reply.

        See also:
            - [Kernel.iopub_send][Kernel.iopub_send]
            - [ipywidgets.Output][ipywidgets.widgets.widget_output.Output]:
                Uses `get_ipython().kernel.get_parent()` to obtain the `msg_id` which
                is used to 'capture' output when its context has been acquired.
        """
        return utils.get_parent()

interface class-attribute instance-attribute

interface = Instance(BaseKernelInterface)

The abstraction to communicate with the kernel.

callers class-attribute instance-attribute

callers: Fixed[Self, dict[Literal[shell, control], Caller]] = Fixed(dict)

The callers associated with the kernel once it has started.

subshell_manager class-attribute instance-attribute

subshell_manager = Fixed(SubshellManager)

Dedicated to management of sub shells.

help_links = Tuple()

quiet class-attribute instance-attribute

quiet = Bool(True)

Only send stdout/stderr to output stream.

print_kernel_messages class-attribute instance-attribute

print_kernel_messages = Bool(True)

When enabled the kernel will print startup, shutdown and terminal errors.

connection_file class-attribute instance-attribute

connection_file: TraitType[Path, Path | str] = TraitType()

JSON file in which to store connection info

"kernel-<pid>.json"

This file will contain the IP, ports, and authentication key needed to connect clients to this kernel. By default, this file will be created in the security dir of the current profile, but can be specified by absolute path.

kernel_name class-attribute instance-attribute

kernel_name = CUnicode()

The kernels name - if it contains 'trio' a trio backend will be used instead of an asyncio backend.

log class-attribute instance-attribute

log = Instance(LoggerAdapter)

The logging adapter.

main_shell class-attribute instance-attribute

main_shell = Fixed(lambda _: instance())

The interactive shell.

debugger class-attribute instance-attribute

debugger = Fixed(Debugger)

Handles debug requests.

comm_manager class-attribute instance-attribute

comm_manager = Fixed(CommManager)

Creates async_kernel.comm.Comm instances and maintains a mapping to comm_id to Comm instances.

event_started class-attribute instance-attribute

event_started = Fixed(Event)

An event that occurs when the kernel is started.

event_stopped class-attribute instance-attribute

event_stopped = Fixed(Event)

An event that occurs when the kernel is stopped.

settings property

settings: dict[str, Any]

Settings that have been set to customise the behaviour of the kernel.

shell property

The shell given the current context.

Notes
  • The subshell_id of the main shell is None.

caller property

caller: Caller

The caller for the shell channel.

kernel_info property

kernel_info: dict[
    str, str | dict[str, str | dict[str, str | int]] | Any | tuple[Any, ...] | bool
]

A dict of detail sent in reply to for a 'kernel_info_request'.

load_settings

load_settings(settings: dict[str, Any]) -> None

Load settings into the kernel.

Permitted until the kernel async context has been entered.

Parameters:

  • settings

    (dict[str, Any]) –

    key: dotted.path.of.attribute. value: The value to set.

Source code in src/async_kernel/kernel.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def load_settings(self, settings: dict[str, Any]) -> None:
    """
    Load settings into the kernel.

    Permitted until the kernel async context has been entered.

    Args:
        settings:
            key: dotted.path.of.attribute.
            value: The value to set.
    """
    if self.event_started:
        msg = "It is too late to load settings!"
        raise RuntimeError(msg)
    settings_ = self._settings or {"kernel_name": self.kernel_name}
    for k, v in settings.items():
        settings_ |= utils.setattr_nested(self, k, v)
    self._settings.update(settings_)

load_connection_info

load_connection_info(info: dict[str, Any]) -> None

Load connection info from a dict containing connection info.

Typically this data comes from a connection file and is called by load_connection_file.

Parameters:

  • info

    (dict[str, Any]) –

    Dictionary containing connection_info. See the connection_file spec for details.

Source code in src/async_kernel/kernel.py
296
297
298
299
300
301
302
303
304
305
306
def load_connection_info(self, info: dict[str, Any]) -> None:
    """
    Load connection info from a dict containing connection info.

    Typically this data comes from a connection file
    and is called by load_connection_file.

    Args:
        info: Dictionary containing connection_info. See the connection_file spec for details.
    """
    self.interface.load_connection_info(info)

stop staticmethod

stop() -> None

A staticmethod to stop the running kernel.

Once an instance of a kernel is stopped the instance cannot be restarted. Instead a new instance should be started.

Source code in src/async_kernel/kernel.py
308
309
310
311
312
313
314
315
316
317
@staticmethod
def stop() -> None:
    """
    A [staticmethod][] to stop the running kernel.

    Once an instance of a kernel is stopped the instance cannot be restarted.
    Instead a new instance should be started.
    """
    if (instance := Kernel._instance) and (stop := getattr(instance, "_stop", None)):
        stop()

__asynccontextmanager__ async

__asynccontextmanager__() -> AsyncGenerator[Self]

Start the kernel in an already running anyio event loop.

Source code in src/async_kernel/kernel.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
@asynccontextmanager
async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
    """Start the kernel in an already running anyio event loop."""
    assert self.main_shell
    try:
        async with self.interface:
            self.callers.update(self.interface.callers)
            with anyio.CancelScope() as scope:
                self._stop = lambda: self.caller.call_direct(scope.cancel, "Stopping kernel")
                sys.excepthook = self.excepthook
                sys.unraisablehook = self.unraisablehook

                self.comm_manager.patch_comm()
                try:
                    self.comm_manager.kernel = self
                    self.event_started.set()
                    self.log.info("Kernel started: %s", self)
                    yield self
                except BaseException:
                    if not scope.cancel_called:
                        raise
                finally:
                    self.comm_manager.kernel = None
                    self.event_stopped.set()
    finally:
        self.shell.reset(new_session=False)
        self.subshell_manager.stop_all_subshells(force=True)
        self.callers.clear()
        Kernel._instance = None
        AsyncInteractiveShell.clear_instance()
        with anyio.CancelScope(shield=True):
            await anyio.sleep(0.1)
        self.log.info("Kernel stopped: %s", self)
        gc.collect()

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: Message[dict[str, Any]] | dict[str, Any] | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send a message on the iopub socket.

Source code in src/async_kernel/kernel.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: Message[dict[str, Any]] | dict[str, Any] | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send a message on the iopub socket."""
    self.interface.iopub_send(
        msg_or_type,
        content=content,
        metadata=metadata,
        parent=parent,
        ident=ident,
        buffers=buffers,
    )

topic

topic(topic) -> bytes

prefixed topic for IOPub messages.

Source code in src/async_kernel/kernel.py
374
375
376
def topic(self, topic) -> bytes:
    """prefixed topic for IOPub messages."""
    return (f"kernel.{topic}").encode()

msg_handler

msg_handler(
    channel: Literal[shell, control],
    msg_type: MsgType,
    job: Job,
    send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
)

Schedule a message to be executed.

Source code in src/async_kernel/kernel.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def msg_handler(
    self,
    channel: Literal[Channel.shell, Channel.control],
    msg_type: MsgType,
    job: Job,
    send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
    /,
):
    """Schedule a message to be executed."""
    # Note: There are never any active pending trackers in this context.
    try:
        subshell_id = job["msg"]["content"]["subshell_id"]
    except KeyError:
        try:
            subshell_id = job["msg"]["header"]["subshell_id"]  # pyright: ignore[reportTypedDictNotRequiredAccess]
        except KeyError:
            subshell_id = None
    handler = cache_wrap_handler(subshell_id, send_reply, self.run_handler, self.get_handler(msg_type))
    run_mode = self.get_run_mode(msg_type, channel=channel, job=job)
    match run_mode:
        case RunMode.direct:
            self.callers[channel].call_direct(handler, job)
        case RunMode.queue:
            self.callers[channel].queue_call(handler, job).trackers = ()  # A slight optimisation
        case RunMode.task:
            self.callers[channel].call_soon(handler, job)
        case RunMode.thread:
            self.callers[channel].to_thread(handler, job)
    self.log.debug("%s %s %s %s", msg_type, handler, run_mode, job)

run_handler async

run_handler(
    subshell_id: str | None,
    send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
    handler: HandlerType,
    job: Job[dict],
) -> None

Asynchronously run a message handler for a given job, managing reply sending and execution state.

Parameters:

  • subshell_id

    (str | None) –

    The id of the subshell to set the context of the handler.

  • send_reply

    (Callable[[Job, dict], CoroutineType[Any, Any, None]]) –

    A coroutine function responsible for sending the reply.

  • handler

    (HandlerType) –

    A coroutine function to handle the job / message.

    • It is a method on the kernel whose name corresponds to the message type that it handles.
    • The handler should return a dict to use as 'content'in a reply.
    • If status is not included in the dict it gets added automatically as {'status': 'ok'}.
    • If a reply is not expected the handler should return None.
  • job

    (Job[dict]) –

    The job dictionary containing message, socket, and identification information.

Workflow
  • Sets the current job and subshell_id context variables.
  • Sends a "busy" status message on the IOPub channel.
  • Awaits the handler; if the handler returns a content dict, a reply is sent using send_reply.
  • On exception, sends an error reply and logs the exception.
  • Resets the job and subshell_id context variables.
  • Sends an "idle" status message on the IOPub channel.
Notes
  • Replies are sent even if exceptions occur in the handler.
  • The reply message type is derived from the original request type.
Source code in src/async_kernel/kernel.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
async def run_handler(
    self,
    subshell_id: str | None,
    send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
    handler: HandlerType,
    job: Job[dict],
) -> None:
    """
    Asynchronously run a message handler for a given job, managing reply sending and execution state.

    Args:
        subshell_id: The id of the subshell to set the context of the handler.
        send_reply: A coroutine function responsible for sending the reply.
        handler: A coroutine function to handle the job / message.

            - It is a method on the kernel whose name corresponds to the [message type that it handles][async_kernel.typing.MsgType].
            - The handler should return a dict to use as 'content'in a reply.
            - If status is not included in the dict it gets added automatically as `{'status': 'ok'}`.
            - If a reply is not expected the handler should return `None`.

        job: The job dictionary containing message, socket, and identification information.

    Workflow:
        - Sets the current job and subshell_id context variables.
        - Sends a "busy" status message on the IOPub channel.
        - Awaits the handler; if the handler returns a content dict, a reply is sent using send_reply.
        - On exception, sends an error reply and logs the exception.
        - Resets the job and subshell_id context variables.
        - Sends an "idle" status message on the IOPub channel.

    Notes:
        - Replies are sent even if exceptions occur in the handler.
        - The reply message type is derived from the original request type.
    """
    job_token = utils._job_var.set(job)  # pyright: ignore[reportPrivateUsage]
    subshell_token = SubshellPendingManager._contextvar.set(subshell_id)  # pyright: ignore[reportPrivateUsage]

    try:
        self.iopub_send(
            msg_or_type="status",
            parent=job["msg"],
            content={"execution_state": "busy"},
            ident=self.topic(topic="status"),
        )
        if (content := await handler(job)) is not None:
            await send_reply(job, content)
    except Exception as e:
        await send_reply(job, utils.error_to_content(e))
        self.log.exception("Exception in message handler:", exc_info=e)
    finally:
        utils._job_var.reset(job_token)  # pyright: ignore[reportPrivateUsage]
        SubshellPendingManager._contextvar.reset(subshell_token)  # pyright: ignore[reportPrivateUsage]
        self.iopub_send(
            msg_or_type="status",
            parent=job["msg"],
            content={"execution_state": "idle"},
            ident=self.topic("status"),
        )
        del job

get_run_mode

get_run_mode(
    msg_type: MsgType, *, channel: Literal[shell, control] = shell, job: Job | None = None
) -> RunMode

Determine the run mode for a given channel, message type and job.

Parameters:

  • channel

    (Literal[shell, control], default: shell ) –

    The channel the message was received on.

  • msg_type

    (MsgType) –

    The type of the message.

  • job

    (Job | None, default: None ) –

    The job associated with the message, if any.

Returns:

  • RunMode

    The run mode for the message.

Source code in src/async_kernel/kernel.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def get_run_mode(
    self,
    msg_type: MsgType,
    *,
    channel: Literal[Channel.shell, Channel.control] = Channel.shell,
    job: Job | None = None,
) -> RunMode:
    """
    Determine the run mode for a given channel, message type and job.

    Args:
        channel: The channel the message was received on.
        msg_type: The type of the message.
        job: The job associated with the message, if any.

    Returns:
        The run mode for the message.
    """
    # receive_msg_loop - DEBUG WARNING

    # TODO: Are any of these options worth including?
    # if mode_from_metadata := job["msg"]["metadata"].get("run_mode"):
    #     return RunMode( mode_from_metadata)
    # if mode_from_header := job["msg"]["header"].get("run_mode"):
    #     return RunMode( mode_from_header)
    match (channel, msg_type):
        case _, MsgType.comm_msg:
            return RunMode.queue
        case Channel.control, MsgType.execute_request:
            return RunMode.queue
        case _, MsgType.execute_request:
            if job:
                if content := job["msg"].get("content", {}):
                    if code := content.get("code"):
                        try:
                            if (code := code.strip().split("\n", maxsplit=1)[0]).startswith(("# ", "##")):
                                return RunMode(code[2:])
                            if code.startswith("RunMode."):
                                return RunMode(code.removeprefix("RunMode."))
                        except ValueError:
                            pass
                    if content.get("silent"):
                        return RunMode.task
                if mode_ := set(utils.get_tags(job)).intersection(RunMode):
                    return RunMode(next(iter(mode_)))
            return RunMode.queue
        case (
            Channel.shell,
            MsgType.shutdown_request
            | MsgType.debug_request
            | MsgType.create_subshell_request
            | MsgType.delete_subshell_request
            | MsgType.list_subshell_request,
        ):
            msg = f"{msg_type=} not allowed on shell!"
            raise ValueError(msg)
        case _, MsgType.debug_request:
            return RunMode.queue
        case (
            _,
            MsgType.complete_request
            | MsgType.inspect_request
            | MsgType.history_request
            | MsgType.create_subshell_request
            | MsgType.delete_subshell_request
            | MsgType.is_complete_request,
        ):
            return RunMode.thread
        case _:
            pass
    return RunMode.direct

all_concurrency_run_modes

all_concurrency_run_modes(
    channels: Iterable[Literal[shell, control]] = (shell, control),
    msg_types: Iterable[MsgType] = MsgType,
) -> dict[
    Literal["SocketID", "MsgType", "RunMode"], tuple[Channel, MsgType, RunMode | None]
]

Generates a dictionary containing all combinations of SocketID, and MsgType, along with their corresponding RunMode (if available).

Source code in src/async_kernel/kernel.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
def all_concurrency_run_modes(
    self,
    channels: Iterable[Literal[Channel.shell, Channel.control]] = (Channel.shell, Channel.control),
    msg_types: Iterable[MsgType] = MsgType,
) -> dict[
    Literal["SocketID", "MsgType", "RunMode"],
    tuple[Channel, MsgType, RunMode | None],
]:
    """
    Generates a dictionary containing all combinations of SocketID, and MsgType, along with their
    corresponding RunMode (if available).
    """
    data: list[Any] = []
    for channel in channels:
        for msg_type in msg_types:
            try:
                mode = self.get_run_mode(msg_type, channel=channel)
            except ValueError:
                mode = None
            data.append((channel, msg_type, mode))
    data_ = zip(*data, strict=True)
    return dict(zip(["SocketID", "MsgType", "RunMode"], data_, strict=True))

kernel_info_request async

kernel_info_request(job: Job[Content]) -> Content

Handle a kernel info request.

Source code in src/async_kernel/kernel.py
569
570
571
async def kernel_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [kernel info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-info)."""
    return self.kernel_info

comm_info_request async

comm_info_request(job: Job[Content]) -> Content

Handle a comm info request.

Source code in src/async_kernel/kernel.py
573
574
575
576
577
578
579
580
581
582
async def comm_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [comm info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-info)."""
    c = job["msg"]["content"]
    target_name = c.get("target_name", None)
    comms = {
        k: {"target_name": v.target_name}
        for (k, v) in tuple(self.comm_manager.comms.items())
        if v.target_name == target_name or target_name is None
    }
    return {"comms": comms}

execute_request async

execute_request(job: Job[ExecuteContent]) -> Content

Handle a execute request.

Source code in src/async_kernel/kernel.py
584
585
586
async def execute_request(self, job: Job[ExecuteContent], /) -> Content:
    """Handle a [execute request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute)."""
    return await self.shell.execute_request(**job["msg"]["content"])  # pyright: ignore[reportArgumentType]

complete_request async

complete_request(job: Job[Content]) -> Content

Handle a completion request.

Source code in src/async_kernel/kernel.py
588
589
590
591
592
async def complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [completion request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#completion)."""
    return await self.shell.do_complete_request(
        code=job["msg"]["content"].get("code", ""), cursor_pos=job["msg"]["content"].get("cursor_pos", 0)
    )

is_complete_request async

is_complete_request(job: Job[Content]) -> Content

Handle a is_complete request.

Source code in src/async_kernel/kernel.py
594
595
596
async def is_complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [is_complete request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#code-completeness)."""
    return await self.shell.is_complete_request(job["msg"]["content"].get("code", ""))

inspect_request async

inspect_request(job: Job[Content]) -> Content

Handle a inspect request.

Source code in src/async_kernel/kernel.py
598
599
600
601
602
603
604
605
async def inspect_request(self, job: Job[Content], /) -> Content:
    """Handle a [inspect request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#introspection)."""
    c = job["msg"]["content"]
    return await self.shell.inspect_request(
        code=c.get("code", ""),
        cursor_pos=c.get("cursor_pos", 0),
        detail_level=c.get("detail_level", 0),
    )

history_request async

history_request(job: Job[Content]) -> Content

Handle a history request.

Source code in src/async_kernel/kernel.py
607
608
609
async def history_request(self, job: Job[Content], /) -> Content:
    """Handle a [history request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#history)."""
    return await self.shell.history_request(**job["msg"]["content"])

comm_open async

comm_open(job: Job[Content]) -> None

Handle a comm open request.

Source code in src/async_kernel/kernel.py
611
612
613
async def comm_open(self, job: Job[Content], /) -> None:
    """Handle a [comm open request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm)."""
    self.comm_manager.comm_open(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_msg async

comm_msg(job: Job[Content]) -> None

Handle a comm msg request.

Source code in src/async_kernel/kernel.py
615
616
617
async def comm_msg(self, job: Job[Content], /) -> None:
    """Handle a [comm msg request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-messages)."""
    self.comm_manager.comm_msg(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_close async

comm_close(job: Job[Content]) -> None

Handle a comm close request.

Source code in src/async_kernel/kernel.py
619
620
621
async def comm_close(self, job: Job[Content], /) -> None:
    """Handle a [comm close request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#tearing-down-comms)."""
    self.comm_manager.comm_close(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

interrupt_request async

interrupt_request(job: Job[Content]) -> Content

Handle an interrupt request (control only).

Source code in src/async_kernel/kernel.py
623
624
625
626
async def interrupt_request(self, job: Job[Content], /) -> Content:
    """Handle an [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt) (control only)."""
    self.interface.interrupt()
    return {}

shutdown_request async

shutdown_request(job: Job[Content]) -> Content

Handle a shutdown request (control only).

Source code in src/async_kernel/kernel.py
628
629
630
631
async def shutdown_request(self, job: Job[Content], /) -> Content:
    """Handle a [shutdown request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-shutdown) (control only)."""
    self.stop()
    return {"restart": job["msg"]["content"].get("restart", False)}

debug_request async

debug_request(job: Job[Content]) -> Content

Handle a debug request (control only).

Source code in src/async_kernel/kernel.py
633
634
635
async def debug_request(self, job: Job[Content], /) -> Content:
    """Handle a [debug request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request) (control only)."""
    return await self.debugger.process_request(job["msg"]["content"])

create_subshell_request async

create_subshell_request(job: Job[Content]) -> Content

Handle a create subshell request (control only).

Source code in src/async_kernel/kernel.py
637
638
639
640
async def create_subshell_request(self: Kernel, job: Job[Content], /) -> Content:
    """Handle a [create subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#create-subshell) (control only)."""

    return {"subshell_id": self.subshell_manager.create_subshell(protected=False).subshell_id}

delete_subshell_request async

delete_subshell_request(job: Job[Content]) -> Content

Handle a delete subshell request (control only).

Source code in src/async_kernel/kernel.py
642
643
644
645
async def delete_subshell_request(self, job: Job[Content], /) -> Content:
    """Handle a [delete subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#delete-subshell) (control only)."""
    self.subshell_manager.delete_subshell(job["msg"]["content"]["subshell_id"])
    return {}

list_subshell_request async

list_subshell_request(job: Job[Content]) -> Content

Handle a list subshell request (control only).

Source code in src/async_kernel/kernel.py
647
648
649
async def list_subshell_request(self, job: Job[Content], /) -> Content:
    """Handle a [list subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#list-subshells) (control only)."""
    return {"subshell_id": list(self.subshell_manager.list_subshells())}

excepthook

excepthook(etype, evalue, tb) -> None

Handle an exception.

Source code in src/async_kernel/kernel.py
651
652
653
654
655
def excepthook(self, etype, evalue, tb) -> None:
    """Handle an exception."""
    # write uncaught traceback to 'real' stderr, not zmq-forwarder
    if self.print_kernel_messages:
        traceback.print_exception(etype, evalue, tb, file=sys.__stderr__)

unraisablehook

unraisablehook(unraisable: UnraisableHookArgs) -> None

Handle unraisable exceptions (during gc for instance).

Source code in src/async_kernel/kernel.py
657
658
659
660
661
662
663
664
def unraisablehook(self, unraisable: sys.UnraisableHookArgs, /) -> None:
    "Handle unraisable exceptions (during gc for instance)."
    exc_info = (
        unraisable.exc_type,
        unraisable.exc_value or unraisable.exc_type(unraisable.err_msg),
        unraisable.exc_traceback,
    )
    self.log.exception(unraisable.err_msg, exc_info=exc_info, extra={"object": unraisable.object})

get_connection_info

get_connection_info() -> dict[str, Any]

Return the connection info as a dict.

Source code in src/async_kernel/kernel.py
666
667
668
669
def get_connection_info(self) -> dict[str, Any]:
    """Return the connection info as a dict."""
    with self.connection_file.open("r") as f:
        return json.load(f)

get_parent

get_parent() -> Message[dict[str, Any]] | None

A convenience method to access the 'message' in the current context if there is one.

'parent' is the parameter name used by Session.send to provide context when sending a reply.

See also
Source code in src/async_kernel/kernel.py
671
672
673
674
675
676
677
678
679
680
681
682
683
def get_parent(self) -> Message[dict[str, Any]] | None:
    """
    A convenience method to access the 'message' in the current context if there is one.

    'parent' is the parameter name used by [Session.send][jupyter_client.session.Session.send] to provide context when sending a reply.

    See also:
        - [Kernel.iopub_send][Kernel.iopub_send]
        - [ipywidgets.Output][ipywidgets.widgets.widget_output.Output]:
            Uses `get_ipython().kernel.get_parent()` to obtain the `msg_id` which
            is used to 'capture' output when its context has been acquired.
    """
    return utils.get_parent()