Skip to content

kernel

Classes:

KernelInterruptError

Bases: Exception

Raised to interrupt the kernel.

Source code in src/async_kernel/asyncshell.py
48
49
class KernelInterruptError(Exception):
    "Raised to interrupt the kernel."

Kernel

Bases: HasTraits, AsyncContextManagerMixin

A Jupyter kernel providing an IPython InteractiveShell.

Starting the kernel:

=== "From the shell"

    ``` shell
    async-kernel --kernel-name=async -f .
    ```

=== "Blocking"

    ```python
    import async_kernel.interface

    settings = {}  # Dotted name key/value pairs

    async_kernel.interface.start_kernel_zmq_interface(settings)
    ```

=== "Async"

    ```python
    settings = {}  # Dotted name key/value pairs
    async with Kernel(settings):
        ...
    ```
Warning

Starting the kernel outside the main thread has the following implicatations: - Execute requests are run in the thread where the kernel is started. - The signal based kernel interrupt is not possible.

Origins

Methods:

Attributes:

Source code in src/async_kernel/kernel.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
class Kernel(traitlets.HasTraits, anyio.AsyncContextManagerMixin):
    """
    A Jupyter kernel providing an [IPython InteractiveShell][async_kernel.asyncshell.AsyncInteractiveShell].

    Starting the kernel:

        === "From the shell"

            ``` shell
            async-kernel --kernel-name=async -f .
            ```

        === "Blocking"

            ```python
            import async_kernel.interface

            settings = {}  # Dotted name key/value pairs

            async_kernel.interface.start_kernel_zmq_interface(settings)
            ```

        === "Async"

            ```python
            settings = {}  # Dotted name key/value pairs
            async with Kernel(settings):
                ...
            ```

    Warning:
        Starting the kernel outside the main thread has the following implicatations:
            - Execute requests are run in the thread where the kernel is started.
            - The signal based kernel interrupt is not possible.

    Origins:
        - [IPyKernel Kernel][ipykernel.kernelbase.Kernel]
        - [IPyKernel IPKernelApp][ipykernel.kernelapp.IPKernelApp]
        - [IPyKernel IPythonKernel][ipykernel.ipkernel.IPythonKernel]
    """

    _instance: Self | None = None
    _initialised = False
    _restart = False

    _settings = Fixed(dict)
    _handler_cache: Fixed[Self, dict[tuple[str | None, MsgType, Callable], HandlerType]] = Fixed(dict)

    interface: traitlets.Instance[BaseKernelInterface] = traitlets.Instance(BaseKernelInterface)
    "The abstraction to interface with the kernel."

    callers: Fixed[Self, dict[Literal[Channel.shell, Channel.control], Caller]] = Fixed(dict)
    "The callers associated with the kernel once it has started."
    ""
    subshell_manager = Fixed(SubshellManager)
    "Dedicated to management of sub shells."

    # Public traits
    help_links = traitlets.Tuple()
    ""
    quiet = traitlets.Bool(True)
    "Only send stdout/stderr to output stream."

    print_kernel_messages = traitlets.Bool(True)
    "When enabled the kernel will print startup, shutdown and terminal errors."

    connection_file: traitlets.TraitType[Path, Path | str] = traitlets.TraitType()
    """
    JSON file in which to store connection info.

    `"kernel-<pid>.json"`

    This file will contain the IP, ports, and authentication key needed to connect
    clients to this kernel. By default, this file will be created in the security dir
    of the current profile, but can be specified by absolute path.
    """

    kernel_name = traitlets.CUnicode()
    "The kernels name - if it contains 'trio' a trio backend will be used instead of an asyncio backend."

    log = traitlets.Instance(logging.LoggerAdapter)
    "The logging adapter."

    # Public fixed
    main_shell = Fixed(lambda _: AsyncInteractiveShell.instance())
    "The interactive shell."

    debugger = Fixed(Debugger)
    "Handles [debug requests](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request)."

    comm_manager = Fixed(CommManager)
    "Creates [async_kernel.comm.Comm][] instances and maintains a mapping to `comm_id` to `Comm` instances."

    event_started = Fixed(Event)
    "An event that occurs when the kernel is started."

    event_stopped = Fixed(Event)
    "An event that occurs when the kernel is stopped."

    def __new__(cls, settings: dict | None = None, /) -> Self:  # noqa: ARG004
        #  There is only one instance (including subclasses).
        if not (instance := Kernel._instance):
            Kernel._instance = instance = super().__new__(cls)
        return instance  # pyright: ignore[reportReturnType]

    def __init__(self, settings: dict | None = None, /) -> None:
        if not self._initialised:
            self._initialised = True
            super().__init__()
            if not os.environ.get("MPLBACKEND"):
                os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
        if settings:
            self.load_settings(settings)

    @override
    def __repr__(self) -> str:
        info = [f"{k}={v}" for k, v in self.settings.items()]
        return f"{self.__class__.__name__}<{', '.join(info)}>"

    @traitlets.default("log")
    def _default_log(self) -> LoggerAdapter[Logger]:
        return logging.LoggerAdapter(logging.getLogger(self.__class__.__name__))

    @traitlets.default("kernel_name")
    def _default_kernel_name(self) -> Literal["async-trio", "async"]:
        return "async-trio" if current_async_library(failsafe=True) == "trio" else "async"

    @traitlets.default("interface")
    def default_interface(self) -> ZMQKernelInterface:
        from async_kernel.interface.zmq import ZMQKernelInterface  # noqa: PLC0415

        return ZMQKernelInterface()

    @traitlets.default("connection_file")
    def _default_connection_file(self) -> Path:
        return Path(jupyter_runtime_dir()).joinpath(f"kernel-{uuid.uuid4()}.json")

    @traitlets.default("help_links")
    def _default_help_links(self) -> tuple[dict[str, str], ...]:
        return (
            {
                "text": "Async Kernel Reference ",
                "url": "https://fleming79.github.io/async-kernel/",
            },
            {
                "text": "IPython Reference",
                "url": "https://ipython.readthedocs.io/en/stable/",
            },
            {
                "text": "IPython magic Reference",
                "url": "https://ipython.readthedocs.io/en/stable/interactive/magics.html",
            },
            {
                "text": "Matplotlib ipympl Reference",
                "url": "https://matplotlib.org/ipympl/",
            },
            {
                "text": "Matplotlib Reference",
                "url": "https://matplotlib.org/contents.html",
            },
        )

    @traitlets.observe("connection_file")
    def _observe_connection_file(self, change) -> None:
        if not self.interface.callers and (path := self.connection_file).exists():
            self.log.debug("Loading connection file %s", path)
            self.load_connection_info(json.loads(path.read_bytes()))

    @traitlets.validate("connection_file")
    def _validate_connection_file(self, proposal) -> Path:
        return pathlib.Path(proposal.value).expanduser()

    @property
    def settings(self) -> dict[str, Any]:
        "Settings that have been set to customise the behaviour of the kernel."
        return {k: getattr(self, k) for k in ("kernel_name", "connection_file")} | self._settings

    @property
    def shell(self) -> AsyncInteractiveShell | AsyncInteractiveSubshell:
        """
        The shell given the current context.

        Notes:
            - The `subshell_id` of the main shell is `None`.
        """
        return self.subshell_manager.get_shell()

    @property
    def caller(self) -> Caller:
        "The caller for the shell channel."
        return self.callers[Channel.shell]

    @property
    def kernel_info(self) -> dict[str, str | dict[str, str | dict[str, str | int]] | Any | tuple[Any, ...] | bool]:
        "A dict of detail sent in reply to for a 'kernel_info_request'."
        supported_features = ["kernel subshells"]
        if not utils.LAUNCHED_BY_DEBUGPY and sys.platform != "emscripten":
            supported_features.append("debugger")

        return {
            "protocol_version": async_kernel.kernel_protocol_version,
            "implementation": "async_kernel",
            "implementation_version": async_kernel.__version__,
            "language_info": async_kernel.kernel_protocol_version_info,
            "banner": self.shell.banner,
            "help_links": self.help_links,
            "debugger": (not utils.LAUNCHED_BY_DEBUGPY) & (sys.platform != "emscripten"),
            "kernel_name": self.kernel_name,
            "supported_features": supported_features,
        }

    def load_settings(self, settings: dict[str, Any]) -> None:
        """
        Load settings into the kernel.

        Permitted until the kernel async context has been entered.

        Args:
            settings:
                key: dotted.path.of.attribute.
                value: The value to set.
        """
        if self.event_started:
            msg = "It is too late to load settings!"
            raise RuntimeError(msg)
        settings_ = self._settings or {"kernel_name": self.kernel_name}
        for k, v in settings.items():
            settings_ |= utils.setattr_nested(self, k, v)
        self._settings.update(settings_)

    def load_connection_info(self, info: dict[str, Any]) -> None:
        """
        Load connection info from a dict containing connection info.

        Typically this data comes from a connection file
        and is called by load_connection_file.

        Args:
            info: Dictionary containing connection_info. See the connection_file spec for details.
        """
        self.interface.load_connection_info(info)

    @staticmethod
    def stop() -> None:
        """
        A [staticmethod][] to stop the running kernel.

        Once an instance of a kernel is stopped the instance cannot be restarted.
        Instead a new instance should be started.
        """
        if (kernel := Kernel._instance) and (scope := getattr(kernel, "_scope", None)):
            del kernel._scope
            kernel.log.info("Stopping kernel: %s", kernel)
            kernel.caller.call_direct(scope.cancel, "Stopping kernel")
            kernel.event_stopped.set()

    @asynccontextmanager
    async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
        """Start the kernel."""
        assert self.main_shell
        original_sys_hooks = sys.excepthook, sys.unraisablehook
        try:
            async with self.interface:
                self.callers.update(self.interface.callers)
                with anyio.CancelScope() as scope:
                    self._scope = scope
                    sys.excepthook, sys.unraisablehook = self.excepthook, self.unraisablehook
                    self.comm_manager.patch_comm()
                    self.event_started.set()
                    self.log.info("Kernel started: %s", self)
                    yield self
        finally:
            self.stop()
            sys.excepthook, sys.unraisablehook = original_sys_hooks
            with anyio.CancelScope(shield=True):
                await self.do_shutdown(self._restart)

    async def do_shutdown(self, restart: bool) -> None:
        "Matches signature of [ipykernel.kernelbase.Kernel.do_shutdown][]."
        assert self.event_stopped
        self.log.info("Kernel shutdown: %s", self)
        await anyio.sleep(0.1)
        self.shell.reset(new_session=False)
        self.subshell_manager.stop_all_subshells(force=True)
        self.callers.clear()
        self._handler_cache.clear()
        for comm in tuple(self.comm_manager.comms.values()):
            comm.close(deleting=True)
        self.comm_manager.comms.clear()
        await anyio.sleep(0.1)
        AsyncInteractiveShell.clear_instance()
        CommManager._instance = None  # pyright: ignore[reportPrivateUsage]
        Kernel._instance = None
        gc.collect()
        self.log.info("Kernel shutdown complete: %s", self)

    async def run(self, *, stopped: Callable[[], Any] | None = None) -> None:
        """
        Run the kernel asynchronously.

        Args:
            stopped: An optional callback that is called when the kernel has stopped.

        This method requires that a [Caller][async_kernel.caller.Caller] instance does not already exist in the current thread.
        """
        try:
            async with self:
                await self.event_stopped
        finally:
            if stopped:
                stopped()

    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: Message[dict[str, Any]] | dict[str, Any] | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """Send a message on the iopub socket."""
        if not self.event_stopped:
            self.interface.iopub_send(
                msg_or_type,
                content=content,
                metadata=metadata,
                parent=parent,
                ident=ident,
                buffers=buffers,
            )

    def topic(self, topic) -> bytes:
        """prefixed topic for IOPub messages."""
        return (f"kernel.{topic}").encode()

    def message_handler(
        self,
        channel: Literal[Channel.shell, Channel.control],
        msg_type: MsgType,
        job: Job,
        send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
        /,
    ) -> None:
        """
        Schedule the job for execution in a dedicated handler by `(subshell_id, msg_type, send_reply)`.

        'execute_request' and 'com_msg' are handled by the shells caller (typically the MainThread).
        All other shell messages are handled in the control thread.

        'execute_request' messages can also specify alternate run modes:
            - task: Run the execute request as a task.
            - thread: Run the execute request in a worker thread.

            The alternate run mode can be specified in a few ways:
            - as a comment on the first line of the code block `# task` or `# thread`.
            - As a tag `thread` or `task`

        Args:
            channel: The channel the message arrived on.
            msg_type: The type of msg.
            job: A dict with the msg and supporting details.
        """
        # Note: There are never any active pending trackers in this context.
        try:
            subshell_id = job["msg"]["content"]["subshell_id"]
        except KeyError:
            try:
                subshell_id = job["msg"]["header"]["subshell_id"]  # pyright: ignore[reportTypedDictNotRequiredAccess]
            except KeyError:
                subshell_id = None
        handler = self._get_handler(subshell_id, msg_type, send_reply)
        run_mode = self._get_run_mode(msg_type, job)
        caller = self.callers[channel]
        if channel is Channel.shell and msg_type not in RUN_IN_SHELL:
            caller = self.callers[Channel.control]
        # Schedule job
        match run_mode:
            case RunMode.queue:
                caller.queue_call(handler, job)
            case RunMode.task:
                caller.call_soon(handler, job)
            case RunMode.thread:
                caller.to_thread(handler, job)
        self.log.debug("%s %s %s %s %s", channel, msg_type, run_mode, handler, job)

    def _get_handler(
        self,
        subshell_id: str | None,
        msg_type: MsgType,
        send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
    ) -> HandlerType:

        handler: HandlerType = getattr(self, msg_type)
        if msg_type is MsgType.execute_request:
            key = (subshell_id, msg_type, send_reply)
        else:
            key = (None, msg_type, send_reply)
        try:
            return self._handler_cache[key]
        except KeyError:

            @functools.wraps(handler)
            async def run_handler(job: Job) -> None:
                job_token = utils._job_var.set(job)  # pyright: ignore[reportPrivateUsage]
                subshell_token = ShellPendingManager._id_contextvar.set(subshell_id)  # pyright: ignore[reportPrivateUsage]

                try:
                    self.iopub_send(
                        msg_or_type="status",
                        parent=job["msg"],
                        content={"execution_state": "busy"},
                        ident=self.topic(topic="status"),
                    )
                    if (content := await handler(job)) is not None:
                        await send_reply(job, content)
                except Exception as e:
                    await send_reply(job, utils.error_to_content(e))
                    self.log.exception("Exception in message handler:", exc_info=e)
                finally:
                    utils._job_var.reset(job_token)  # pyright: ignore[reportPrivateUsage]
                    ShellPendingManager._id_contextvar.reset(subshell_token)  # pyright: ignore[reportPrivateUsage]
                    self.iopub_send(
                        msg_or_type="status",
                        parent=job["msg"],
                        content={"execution_state": "idle"},
                        ident=self.topic("status"),
                    )
                    del job

            self._handler_cache[key] = run_handler
            return run_handler

    def _subshell_stopped(self, subshell_id: str) -> None:
        for key in list(self._handler_cache):
            if key[0] == subshell_id:
                self._handler_cache.pop(key, None)

    def _get_run_mode(self, msg_type: MsgType, job: Job, /) -> RunMode:
        # TODO: Are any of these options worth including?
        # if run_mode := job["msg"]["header"].get("run_mode"):
        #     return RunMode(run_mode)
        if msg_type is MsgType.execute_request:
            if content := job["msg"].get("content", {}):
                if code := content.get("code"):
                    try:
                        if (code := code.strip().split("\n", maxsplit=1)[0]).startswith(("# ", "##")):
                            return RunMode(code[2:])
                    except ValueError:
                        pass
                if content.get("silent"):
                    return RunMode.task
            if mode_ := set(utils.get_tags(job)).intersection(RunMode):
                return RunMode(next(iter(mode_)))
        return RunMode.queue

    async def kernel_info_request(self, job: Job[Content], /) -> Content:
        """Handle a [kernel info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-info)."""
        return self.kernel_info

    async def comm_info_request(self, job: Job[Content], /) -> Content:
        """Handle a [comm info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-info)."""
        c = job["msg"]["content"]
        target_name = c.get("target_name", None)
        comms = {
            k: {"target_name": v.target_name}
            for (k, v) in tuple(self.comm_manager.comms.items())
            if v.target_name == target_name or target_name is None
        }
        return {"comms": comms}

    async def execute_request(self, job: Job[ExecuteContent], /) -> Content:
        """Handle a [execute request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute)."""
        return await self.shell._execute_request(  # pyright: ignore[reportPrivateUsage]
            cell_id=job["msg"]["metadata"].get("cellId"),
            received_time=job["received_time"],
            **job["msg"]["content"],  # pyright: ignore[reportArgumentType]
        )

    async def complete_request(self, job: Job[Content], /) -> Content:
        """Handle a [completion request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#completion)."""
        return await self.shell._do_complete_request(  # pyright: ignore[reportPrivateUsage]
            code=job["msg"]["content"].get("code", ""), cursor_pos=job["msg"]["content"].get("cursor_pos", 0)
        )

    async def is_complete_request(self, job: Job[Content], /) -> Content:
        """Handle a [is_complete request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#code-completeness)."""
        return await self.shell._is_complete_request(job["msg"]["content"].get("code", ""))  # pyright: ignore[reportPrivateUsage]

    async def inspect_request(self, job: Job[Content], /) -> Content:
        """Handle a [inspect request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#introspection)."""
        c = job["msg"]["content"]
        return await self.shell._inspect_request(  # pyright: ignore[reportPrivateUsage]
            code=c.get("code", ""),
            cursor_pos=c.get("cursor_pos", 0),
            detail_level=c.get("detail_level", 0),
        )

    async def history_request(self, job: Job[Content], /) -> Content:
        """Handle a [history request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#history)."""
        return await self.shell._history_request(**job["msg"]["content"])  # pyright: ignore[reportPrivateUsage]

    async def comm_open(self, job: Job[Content], /) -> None:
        """Handle a [comm open request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm)."""
        self.comm_manager.comm_open(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

    async def comm_msg(self, job: Job[Content], /) -> None:
        """Handle a [comm msg request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-messages)."""
        self.comm_manager.comm_msg(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

    async def comm_close(self, job: Job[Content], /) -> None:
        """Handle a [comm close request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#tearing-down-comms)."""
        self.comm_manager.comm_close(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

    async def interrupt_request(self, job: Job[Content], /) -> Content:
        """Handle an [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt) (control only)."""
        self.interface.interrupt()
        return {}

    async def shutdown_request(self, job: Job[Content], /) -> Content:
        """Handle a [shutdown request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-shutdown) (control only)."""
        self._restart = job["msg"]["content"].get("restart", False)
        self.stop()
        return {"restart": self._restart}

    async def debug_request(self, job: Job[Content], /) -> Content:
        """Handle a [debug request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request) (control only)."""
        return await self.debugger.process_request(job["msg"]["content"])

    async def create_subshell_request(self: Kernel, job: Job[Content], /) -> Content:
        """Handle a [create subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#create-subshell) (control only)."""

        return {"subshell_id": self.subshell_manager.create_subshell(protected=False).subshell_id}

    async def delete_subshell_request(self, job: Job[Content], /) -> Content:
        """Handle a [delete subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#delete-subshell) (control only)."""
        self.subshell_manager.delete_subshell(job["msg"]["content"]["subshell_id"])
        return {}

    async def list_subshell_request(self, job: Job[Content], /) -> Content:
        """Handle a [list subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#list-subshells) (control only)."""
        return {"subshell_id": list(self.subshell_manager.list_subshells())}

    def excepthook(self, etype, evalue, tb) -> None:
        """Handle an exception."""
        # write uncaught traceback to 'real' stderr, not zmq-forwarder
        if self.print_kernel_messages:
            traceback.print_exception(etype, evalue, tb, file=sys.__stderr__)

    def unraisablehook(self, unraisable: sys.UnraisableHookArgs, /) -> None:
        "Handle unraisable exceptions (during gc for instance)."
        exc_info = (
            unraisable.exc_type,
            unraisable.exc_value or unraisable.exc_type(unraisable.err_msg),
            unraisable.exc_traceback,
        )
        self.log.exception(unraisable.err_msg, exc_info=exc_info, extra={"object": unraisable.object})

    def get_connection_info(self) -> dict[str, Any]:
        """Return the connection info as a dict."""
        return json.loads(self.connection_file.read_bytes())

    def get_parent(self) -> Message[dict[str, Any]] | None:
        """
        A convenience method to access the 'message' in the current context if there is one.

        'parent' is the parameter name used by [Session.send][jupyter_client.session.Session.send] to provide context when sending a reply.

        See also:
            - [Kernel.iopub_send][Kernel.iopub_send]
            - [ipywidgets.Output][ipywidgets.widgets.widget_output.Output]:
                Uses `get_ipython().kernel.get_parent()` to obtain the `msg_id` which
                is used to 'capture' output when its context has been acquired.
        """
        return utils.get_parent()

    async def do_complete(self, code: str, cursor_pos: int | None) -> Content:
        "Matches signature of [ipykernel.kernelbase.Kernel.do_history][]."
        return await self.shell._do_complete_request(code=code, cursor_pos=cursor_pos)  # pyright: ignore[reportPrivateUsage]

    async def do_inspect(self, code: str, cursor_pos: int | None, detail_level=0, omit_sections=()) -> Content:
        "Matches signature of [ipykernel.kernelbase.Kernel.do_history][]."
        return await self.shell._inspect_request(code=code, cursor_pos=cursor_pos)  # pyright: ignore[reportArgumentType, reportPrivateUsage]

    async def do_history(
        self,
        hist_access_type,
        output,
        raw,
        session=None,
        start=None,
        stop=None,
        n=None,
        pattern=None,
        unique=False,
    ) -> Content:
        "Matches signature of [ipykernel.kernelbase.Kernel.do_history][]."
        return await self.shell._history_request(  # pyright: ignore[reportPrivateUsage]
            output=output,
            raw=raw,
            hist_access_type=hist_access_type,
            session=session,  # pyright: ignore[reportArgumentType]
            start=start,  # pyright: ignore[reportArgumentType]
            stop=stop,
        )

    async def do_execute(
        self,
        code: str,
        silent: bool,
        store_history: bool = True,
        user_expressions: dict | None = None,
        allow_stdin=False,
        *,
        cell_id: str | None = None,
        **_ignored,
    ) -> Content:
        "Matches signature of [ipykernel.kernelbase.Kernel.do_execute][]."
        content = ExecuteContent(
            code=code,
            silent=silent,
            store_history=store_history,
            user_expressions=user_expressions or {},
            allow_stdin=allow_stdin,
            stop_on_error=False,
        )
        msg = self.interface.msg("execute_request", content=content)  # pyright: ignore[reportArgumentType]
        job = Job(msg=msg, ident=[], received_time=time.monotonic())
        token = utils._job_var.set(job)  # pyright: ignore[reportPrivateUsage]
        try:
            return await self.shell._execute_request(  # pyright: ignore[reportPrivateUsage]
                code=code,
                silent=silent,
                store_history=store_history,
                user_expressions=user_expressions,
                allow_stdin=allow_stdin,
                cell_id=cell_id,
                received_time=job["received_time"],
            )
        finally:
            utils._job_var.reset(token)  # pyright: ignore[reportPrivateUsage]

    def getpass(self, prompt="", stream=None) -> str:
        "Matches signature of [ipykernel.kernelbase.Kernel.getpass][]."
        return self.interface.getpass(prompt)

    def raw_input(self, prompt="") -> str:
        "Matches signature of [ipykernel.kernelbase.Kernel.raw_input][]."
        return self.interface.raw_input(prompt)

interface class-attribute instance-attribute

interface: Instance[BaseKernelInterface] = Instance(BaseKernelInterface)

The abstraction to interface with the kernel.

callers class-attribute instance-attribute

The callers associated with the kernel once it has started.

subshell_manager class-attribute instance-attribute

subshell_manager = Fixed(SubshellManager)

Dedicated to management of sub shells.

help_links = Tuple()

quiet class-attribute instance-attribute

quiet = Bool(True)

Only send stdout/stderr to output stream.

print_kernel_messages class-attribute instance-attribute

print_kernel_messages = Bool(True)

When enabled the kernel will print startup, shutdown and terminal errors.

connection_file class-attribute instance-attribute

connection_file: TraitType[Path, Path | str] = TraitType()

JSON file in which to store connection info.

"kernel-<pid>.json"

This file will contain the IP, ports, and authentication key needed to connect clients to this kernel. By default, this file will be created in the security dir of the current profile, but can be specified by absolute path.

kernel_name class-attribute instance-attribute

kernel_name = CUnicode()

The kernels name - if it contains 'trio' a trio backend will be used instead of an asyncio backend.

log class-attribute instance-attribute

log = Instance(LoggerAdapter)

The logging adapter.

main_shell class-attribute instance-attribute

main_shell = Fixed(lambda _: instance())

The interactive shell.

debugger class-attribute instance-attribute

debugger = Fixed(Debugger)

Handles debug requests.

comm_manager class-attribute instance-attribute

comm_manager = Fixed(CommManager)

Creates async_kernel.comm.Comm instances and maintains a mapping to comm_id to Comm instances.

event_started class-attribute instance-attribute

event_started = Fixed(Event)

An event that occurs when the kernel is started.

event_stopped class-attribute instance-attribute

event_stopped = Fixed(Event)

An event that occurs when the kernel is stopped.

settings property

settings: dict[str, Any]

Settings that have been set to customise the behaviour of the kernel.

shell property

The shell given the current context.

Notes
  • The subshell_id of the main shell is None.

caller property

caller: Caller

The caller for the shell channel.

kernel_info property

kernel_info: dict[
    str, str | dict[str, str | dict[str, str | int]] | Any | tuple[Any, ...] | bool
]

A dict of detail sent in reply to for a 'kernel_info_request'.

load_settings

load_settings(settings: dict[str, Any]) -> None

Load settings into the kernel.

Permitted until the kernel async context has been entered.

Parameters:

  • settings

    (dict[str, Any]) –

    key: dotted.path.of.attribute. value: The value to set.

Source code in src/async_kernel/kernel.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def load_settings(self, settings: dict[str, Any]) -> None:
    """
    Load settings into the kernel.

    Permitted until the kernel async context has been entered.

    Args:
        settings:
            key: dotted.path.of.attribute.
            value: The value to set.
    """
    if self.event_started:
        msg = "It is too late to load settings!"
        raise RuntimeError(msg)
    settings_ = self._settings or {"kernel_name": self.kernel_name}
    for k, v in settings.items():
        settings_ |= utils.setattr_nested(self, k, v)
    self._settings.update(settings_)

load_connection_info

load_connection_info(info: dict[str, Any]) -> None

Load connection info from a dict containing connection info.

Typically this data comes from a connection file and is called by load_connection_file.

Parameters:

  • info

    (dict[str, Any]) –

    Dictionary containing connection_info. See the connection_file spec for details.

Source code in src/async_kernel/kernel.py
288
289
290
291
292
293
294
295
296
297
298
def load_connection_info(self, info: dict[str, Any]) -> None:
    """
    Load connection info from a dict containing connection info.

    Typically this data comes from a connection file
    and is called by load_connection_file.

    Args:
        info: Dictionary containing connection_info. See the connection_file spec for details.
    """
    self.interface.load_connection_info(info)

stop staticmethod

stop() -> None

A staticmethod to stop the running kernel.

Once an instance of a kernel is stopped the instance cannot be restarted. Instead a new instance should be started.

Source code in src/async_kernel/kernel.py
300
301
302
303
304
305
306
307
308
309
310
311
312
@staticmethod
def stop() -> None:
    """
    A [staticmethod][] to stop the running kernel.

    Once an instance of a kernel is stopped the instance cannot be restarted.
    Instead a new instance should be started.
    """
    if (kernel := Kernel._instance) and (scope := getattr(kernel, "_scope", None)):
        del kernel._scope
        kernel.log.info("Stopping kernel: %s", kernel)
        kernel.caller.call_direct(scope.cancel, "Stopping kernel")
        kernel.event_stopped.set()

__asynccontextmanager__ async

__asynccontextmanager__() -> AsyncGenerator[Self]

Start the kernel.

Source code in src/async_kernel/kernel.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
@asynccontextmanager
async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
    """Start the kernel."""
    assert self.main_shell
    original_sys_hooks = sys.excepthook, sys.unraisablehook
    try:
        async with self.interface:
            self.callers.update(self.interface.callers)
            with anyio.CancelScope() as scope:
                self._scope = scope
                sys.excepthook, sys.unraisablehook = self.excepthook, self.unraisablehook
                self.comm_manager.patch_comm()
                self.event_started.set()
                self.log.info("Kernel started: %s", self)
                yield self
    finally:
        self.stop()
        sys.excepthook, sys.unraisablehook = original_sys_hooks
        with anyio.CancelScope(shield=True):
            await self.do_shutdown(self._restart)

do_shutdown async

do_shutdown(restart: bool) -> None

Matches signature of ipykernel.kernelbase.Kernel.do_shutdown.

Source code in src/async_kernel/kernel.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
async def do_shutdown(self, restart: bool) -> None:
    "Matches signature of [ipykernel.kernelbase.Kernel.do_shutdown][]."
    assert self.event_stopped
    self.log.info("Kernel shutdown: %s", self)
    await anyio.sleep(0.1)
    self.shell.reset(new_session=False)
    self.subshell_manager.stop_all_subshells(force=True)
    self.callers.clear()
    self._handler_cache.clear()
    for comm in tuple(self.comm_manager.comms.values()):
        comm.close(deleting=True)
    self.comm_manager.comms.clear()
    await anyio.sleep(0.1)
    AsyncInteractiveShell.clear_instance()
    CommManager._instance = None  # pyright: ignore[reportPrivateUsage]
    Kernel._instance = None
    gc.collect()
    self.log.info("Kernel shutdown complete: %s", self)

run async

run(*, stopped: Callable[[], Any] | None = None) -> None

Run the kernel asynchronously.

Parameters:

  • stopped

    (Callable[[], Any] | None, default: None ) –

    An optional callback that is called when the kernel has stopped.

This method requires that a Caller instance does not already exist in the current thread.

Source code in src/async_kernel/kernel.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
async def run(self, *, stopped: Callable[[], Any] | None = None) -> None:
    """
    Run the kernel asynchronously.

    Args:
        stopped: An optional callback that is called when the kernel has stopped.

    This method requires that a [Caller][async_kernel.caller.Caller] instance does not already exist in the current thread.
    """
    try:
        async with self:
            await self.event_stopped
    finally:
        if stopped:
            stopped()

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: Message[dict[str, Any]] | dict[str, Any] | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send a message on the iopub socket.

Source code in src/async_kernel/kernel.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: Message[dict[str, Any]] | dict[str, Any] | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send a message on the iopub socket."""
    if not self.event_stopped:
        self.interface.iopub_send(
            msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent,
            ident=ident,
            buffers=buffers,
        )

topic

topic(topic) -> bytes

prefixed topic for IOPub messages.

Source code in src/async_kernel/kernel.py
391
392
393
def topic(self, topic) -> bytes:
    """prefixed topic for IOPub messages."""
    return (f"kernel.{topic}").encode()

message_handler

message_handler(
    channel: Literal[shell, control],
    msg_type: MsgType,
    job: Job,
    send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
) -> None

Schedule the job for execution in a dedicated handler by (subshell_id, msg_type, send_reply).

'execute_request' and 'com_msg' are handled by the shells caller (typically the MainThread). All other shell messages are handled in the control thread.

'execute_request' messages can also specify alternate run modes: - task: Run the execute request as a task. - thread: Run the execute request in a worker thread.

The alternate run mode can be specified in a few ways:
- as a comment on the first line of the code block `# task` or `# thread`.
- As a tag `thread` or `task`

Parameters:

  • channel

    (Literal[shell, control]) –

    The channel the message arrived on.

  • msg_type

    (MsgType) –

    The type of msg.

  • job

    (Job) –

    A dict with the msg and supporting details.

Source code in src/async_kernel/kernel.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def message_handler(
    self,
    channel: Literal[Channel.shell, Channel.control],
    msg_type: MsgType,
    job: Job,
    send_reply: Callable[[Job, dict], CoroutineType[Any, Any, None]],
    /,
) -> None:
    """
    Schedule the job for execution in a dedicated handler by `(subshell_id, msg_type, send_reply)`.

    'execute_request' and 'com_msg' are handled by the shells caller (typically the MainThread).
    All other shell messages are handled in the control thread.

    'execute_request' messages can also specify alternate run modes:
        - task: Run the execute request as a task.
        - thread: Run the execute request in a worker thread.

        The alternate run mode can be specified in a few ways:
        - as a comment on the first line of the code block `# task` or `# thread`.
        - As a tag `thread` or `task`

    Args:
        channel: The channel the message arrived on.
        msg_type: The type of msg.
        job: A dict with the msg and supporting details.
    """
    # Note: There are never any active pending trackers in this context.
    try:
        subshell_id = job["msg"]["content"]["subshell_id"]
    except KeyError:
        try:
            subshell_id = job["msg"]["header"]["subshell_id"]  # pyright: ignore[reportTypedDictNotRequiredAccess]
        except KeyError:
            subshell_id = None
    handler = self._get_handler(subshell_id, msg_type, send_reply)
    run_mode = self._get_run_mode(msg_type, job)
    caller = self.callers[channel]
    if channel is Channel.shell and msg_type not in RUN_IN_SHELL:
        caller = self.callers[Channel.control]
    # Schedule job
    match run_mode:
        case RunMode.queue:
            caller.queue_call(handler, job)
        case RunMode.task:
            caller.call_soon(handler, job)
        case RunMode.thread:
            caller.to_thread(handler, job)
    self.log.debug("%s %s %s %s %s", channel, msg_type, run_mode, handler, job)

kernel_info_request async

kernel_info_request(job: Job[Content]) -> Content

Handle a kernel info request.

Source code in src/async_kernel/kernel.py
515
516
517
async def kernel_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [kernel info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-info)."""
    return self.kernel_info

comm_info_request async

comm_info_request(job: Job[Content]) -> Content

Handle a comm info request.

Source code in src/async_kernel/kernel.py
519
520
521
522
523
524
525
526
527
528
async def comm_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [comm info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-info)."""
    c = job["msg"]["content"]
    target_name = c.get("target_name", None)
    comms = {
        k: {"target_name": v.target_name}
        for (k, v) in tuple(self.comm_manager.comms.items())
        if v.target_name == target_name or target_name is None
    }
    return {"comms": comms}

execute_request async

execute_request(job: Job[ExecuteContent]) -> Content

Handle a execute request.

Source code in src/async_kernel/kernel.py
530
531
532
533
534
535
536
async def execute_request(self, job: Job[ExecuteContent], /) -> Content:
    """Handle a [execute request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute)."""
    return await self.shell._execute_request(  # pyright: ignore[reportPrivateUsage]
        cell_id=job["msg"]["metadata"].get("cellId"),
        received_time=job["received_time"],
        **job["msg"]["content"],  # pyright: ignore[reportArgumentType]
    )

complete_request async

complete_request(job: Job[Content]) -> Content

Handle a completion request.

Source code in src/async_kernel/kernel.py
538
539
540
541
542
async def complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [completion request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#completion)."""
    return await self.shell._do_complete_request(  # pyright: ignore[reportPrivateUsage]
        code=job["msg"]["content"].get("code", ""), cursor_pos=job["msg"]["content"].get("cursor_pos", 0)
    )

is_complete_request async

is_complete_request(job: Job[Content]) -> Content

Handle a is_complete request.

Source code in src/async_kernel/kernel.py
544
545
546
async def is_complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [is_complete request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#code-completeness)."""
    return await self.shell._is_complete_request(job["msg"]["content"].get("code", ""))  # pyright: ignore[reportPrivateUsage]

inspect_request async

inspect_request(job: Job[Content]) -> Content

Handle a inspect request.

Source code in src/async_kernel/kernel.py
548
549
550
551
552
553
554
555
async def inspect_request(self, job: Job[Content], /) -> Content:
    """Handle a [inspect request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#introspection)."""
    c = job["msg"]["content"]
    return await self.shell._inspect_request(  # pyright: ignore[reportPrivateUsage]
        code=c.get("code", ""),
        cursor_pos=c.get("cursor_pos", 0),
        detail_level=c.get("detail_level", 0),
    )

history_request async

history_request(job: Job[Content]) -> Content

Handle a history request.

Source code in src/async_kernel/kernel.py
557
558
559
async def history_request(self, job: Job[Content], /) -> Content:
    """Handle a [history request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#history)."""
    return await self.shell._history_request(**job["msg"]["content"])  # pyright: ignore[reportPrivateUsage]

comm_open async

comm_open(job: Job[Content]) -> None

Handle a comm open request.

Source code in src/async_kernel/kernel.py
561
562
563
async def comm_open(self, job: Job[Content], /) -> None:
    """Handle a [comm open request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm)."""
    self.comm_manager.comm_open(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_msg async

comm_msg(job: Job[Content]) -> None

Handle a comm msg request.

Source code in src/async_kernel/kernel.py
565
566
567
async def comm_msg(self, job: Job[Content], /) -> None:
    """Handle a [comm msg request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-messages)."""
    self.comm_manager.comm_msg(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_close async

comm_close(job: Job[Content]) -> None

Handle a comm close request.

Source code in src/async_kernel/kernel.py
569
570
571
async def comm_close(self, job: Job[Content], /) -> None:
    """Handle a [comm close request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#tearing-down-comms)."""
    self.comm_manager.comm_close(stream=None, ident=None, msg=job["msg"])  # pyright: ignore[reportArgumentType]

interrupt_request async

interrupt_request(job: Job[Content]) -> Content

Handle an interrupt request (control only).

Source code in src/async_kernel/kernel.py
573
574
575
576
async def interrupt_request(self, job: Job[Content], /) -> Content:
    """Handle an [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt) (control only)."""
    self.interface.interrupt()
    return {}

shutdown_request async

shutdown_request(job: Job[Content]) -> Content

Handle a shutdown request (control only).

Source code in src/async_kernel/kernel.py
578
579
580
581
582
async def shutdown_request(self, job: Job[Content], /) -> Content:
    """Handle a [shutdown request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-shutdown) (control only)."""
    self._restart = job["msg"]["content"].get("restart", False)
    self.stop()
    return {"restart": self._restart}

debug_request async

debug_request(job: Job[Content]) -> Content

Handle a debug request (control only).

Source code in src/async_kernel/kernel.py
584
585
586
async def debug_request(self, job: Job[Content], /) -> Content:
    """Handle a [debug request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request) (control only)."""
    return await self.debugger.process_request(job["msg"]["content"])

create_subshell_request async

create_subshell_request(job: Job[Content]) -> Content

Handle a create subshell request (control only).

Source code in src/async_kernel/kernel.py
588
589
590
591
async def create_subshell_request(self: Kernel, job: Job[Content], /) -> Content:
    """Handle a [create subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#create-subshell) (control only)."""

    return {"subshell_id": self.subshell_manager.create_subshell(protected=False).subshell_id}

delete_subshell_request async

delete_subshell_request(job: Job[Content]) -> Content

Handle a delete subshell request (control only).

Source code in src/async_kernel/kernel.py
593
594
595
596
async def delete_subshell_request(self, job: Job[Content], /) -> Content:
    """Handle a [delete subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#delete-subshell) (control only)."""
    self.subshell_manager.delete_subshell(job["msg"]["content"]["subshell_id"])
    return {}

list_subshell_request async

list_subshell_request(job: Job[Content]) -> Content

Handle a list subshell request (control only).

Source code in src/async_kernel/kernel.py
598
599
600
async def list_subshell_request(self, job: Job[Content], /) -> Content:
    """Handle a [list subshell request](https://jupyter.org/enhancement-proposals/91-kernel-subshells/kernel-subshells.html#list-subshells) (control only)."""
    return {"subshell_id": list(self.subshell_manager.list_subshells())}

excepthook

excepthook(etype, evalue, tb) -> None

Handle an exception.

Source code in src/async_kernel/kernel.py
602
603
604
605
606
def excepthook(self, etype, evalue, tb) -> None:
    """Handle an exception."""
    # write uncaught traceback to 'real' stderr, not zmq-forwarder
    if self.print_kernel_messages:
        traceback.print_exception(etype, evalue, tb, file=sys.__stderr__)

unraisablehook

unraisablehook(unraisable: UnraisableHookArgs) -> None

Handle unraisable exceptions (during gc for instance).

Source code in src/async_kernel/kernel.py
608
609
610
611
612
613
614
615
def unraisablehook(self, unraisable: sys.UnraisableHookArgs, /) -> None:
    "Handle unraisable exceptions (during gc for instance)."
    exc_info = (
        unraisable.exc_type,
        unraisable.exc_value or unraisable.exc_type(unraisable.err_msg),
        unraisable.exc_traceback,
    )
    self.log.exception(unraisable.err_msg, exc_info=exc_info, extra={"object": unraisable.object})

get_connection_info

get_connection_info() -> dict[str, Any]

Return the connection info as a dict.

Source code in src/async_kernel/kernel.py
617
618
619
def get_connection_info(self) -> dict[str, Any]:
    """Return the connection info as a dict."""
    return json.loads(self.connection_file.read_bytes())

get_parent

get_parent() -> Message[dict[str, Any]] | None

A convenience method to access the 'message' in the current context if there is one.

'parent' is the parameter name used by Session.send to provide context when sending a reply.

See also
Source code in src/async_kernel/kernel.py
621
622
623
624
625
626
627
628
629
630
631
632
633
def get_parent(self) -> Message[dict[str, Any]] | None:
    """
    A convenience method to access the 'message' in the current context if there is one.

    'parent' is the parameter name used by [Session.send][jupyter_client.session.Session.send] to provide context when sending a reply.

    See also:
        - [Kernel.iopub_send][Kernel.iopub_send]
        - [ipywidgets.Output][ipywidgets.widgets.widget_output.Output]:
            Uses `get_ipython().kernel.get_parent()` to obtain the `msg_id` which
            is used to 'capture' output when its context has been acquired.
    """
    return utils.get_parent()

do_complete async

do_complete(code: str, cursor_pos: int | None) -> Content

Matches signature of ipykernel.kernelbase.Kernel.do_history.

Source code in src/async_kernel/kernel.py
635
636
637
async def do_complete(self, code: str, cursor_pos: int | None) -> Content:
    "Matches signature of [ipykernel.kernelbase.Kernel.do_history][]."
    return await self.shell._do_complete_request(code=code, cursor_pos=cursor_pos)  # pyright: ignore[reportPrivateUsage]

do_inspect async

do_inspect(
    code: str, cursor_pos: int | None, detail_level=0, omit_sections=()
) -> Content

Matches signature of ipykernel.kernelbase.Kernel.do_history.

Source code in src/async_kernel/kernel.py
639
640
641
async def do_inspect(self, code: str, cursor_pos: int | None, detail_level=0, omit_sections=()) -> Content:
    "Matches signature of [ipykernel.kernelbase.Kernel.do_history][]."
    return await self.shell._inspect_request(code=code, cursor_pos=cursor_pos)  # pyright: ignore[reportArgumentType, reportPrivateUsage]

do_history async

do_history(
    hist_access_type,
    output,
    raw,
    session=None,
    start=None,
    stop=None,
    n=None,
    pattern=None,
    unique=False,
) -> Content

Matches signature of ipykernel.kernelbase.Kernel.do_history.

Source code in src/async_kernel/kernel.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
async def do_history(
    self,
    hist_access_type,
    output,
    raw,
    session=None,
    start=None,
    stop=None,
    n=None,
    pattern=None,
    unique=False,
) -> Content:
    "Matches signature of [ipykernel.kernelbase.Kernel.do_history][]."
    return await self.shell._history_request(  # pyright: ignore[reportPrivateUsage]
        output=output,
        raw=raw,
        hist_access_type=hist_access_type,
        session=session,  # pyright: ignore[reportArgumentType]
        start=start,  # pyright: ignore[reportArgumentType]
        stop=stop,
    )

do_execute async

do_execute(
    code: str,
    silent: bool,
    store_history: bool = True,
    user_expressions: dict | None = None,
    allow_stdin=False,
    *,
    cell_id: str | None = None,
    **_ignored,
) -> Content

Matches signature of ipykernel.kernelbase.Kernel.do_execute.

Source code in src/async_kernel/kernel.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
async def do_execute(
    self,
    code: str,
    silent: bool,
    store_history: bool = True,
    user_expressions: dict | None = None,
    allow_stdin=False,
    *,
    cell_id: str | None = None,
    **_ignored,
) -> Content:
    "Matches signature of [ipykernel.kernelbase.Kernel.do_execute][]."
    content = ExecuteContent(
        code=code,
        silent=silent,
        store_history=store_history,
        user_expressions=user_expressions or {},
        allow_stdin=allow_stdin,
        stop_on_error=False,
    )
    msg = self.interface.msg("execute_request", content=content)  # pyright: ignore[reportArgumentType]
    job = Job(msg=msg, ident=[], received_time=time.monotonic())
    token = utils._job_var.set(job)  # pyright: ignore[reportPrivateUsage]
    try:
        return await self.shell._execute_request(  # pyright: ignore[reportPrivateUsage]
            code=code,
            silent=silent,
            store_history=store_history,
            user_expressions=user_expressions,
            allow_stdin=allow_stdin,
            cell_id=cell_id,
            received_time=job["received_time"],
        )
    finally:
        utils._job_var.reset(token)  # pyright: ignore[reportPrivateUsage]

getpass

getpass(prompt='', stream=None) -> str

Matches signature of ipykernel.kernelbase.Kernel.getpass.

Source code in src/async_kernel/kernel.py
701
702
703
def getpass(self, prompt="", stream=None) -> str:
    "Matches signature of [ipykernel.kernelbase.Kernel.getpass][]."
    return self.interface.getpass(prompt)

raw_input

raw_input(prompt='') -> str

Matches signature of ipykernel.kernelbase.Kernel.raw_input.

Source code in src/async_kernel/kernel.py
705
706
707
def raw_input(self, prompt="") -> str:
    "Matches signature of [ipykernel.kernelbase.Kernel.raw_input][]."
    return self.interface.raw_input(prompt)