Skip to content

async_kernel.kernel

Classes:

  • KernelInterruptError

    Raised to interrupt the kernel.

  • Kernel

    An asynchronous kernel with an anyio backend providing an IPython AsyncInteractiveShell with zmq sockets.

async_kernel.kernel.KernelInterruptError

Bases: InterruptedError

Raised to interrupt the kernel.

async_kernel.kernel.Kernel

Kernel(settings: dict | None = None)

Bases: HasTraits

An asynchronous kernel with an anyio backend providing an IPython AsyncInteractiveShell with zmq sockets.

Only one instance will be created/run at a time. The instance can be obtained with Kernel() or [async_kernel.utils.get_kernel].

To start the kernel:

At the command prompt.

async-kernel -f .

See also:

-

from async_kernel.__main__ import main

main()
Kernel.start()
kernel = Kernel()
async with kernel:
    await anyio.sleep_forever()
Tip

This is a convenient way to start a kernel for debugging.

Origins: IPyKernel Kernel, IPyKernel IPKernelApp & IPyKernel IPythonKernel

Methods:

Attributes:

Source code in src/async_kernel/kernel.py
311
312
313
314
315
316
317
318
319
320
321
322
323
def __init__(self, settings: dict | None = None, /) -> None:
    if self._initialised:
        return  # Only initialize once
    assert threading.current_thread() is threading.main_thread(), "The kernel must start in the main thread."
    self._initialised = True
    super().__init__()
    sys.excepthook = self.excepthook
    sys.unraisablehook = self.unraisablehook
    signal.signal(signal.SIGINT, self._signal_handler)
    if not os.environ.get("MPLBACKEND"):
        os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
    # setting get loaded in `_validate_settings`
    self._settings = settings or {}

anyio_backend class-attribute instance-attribute

anyio_backend = UseEnum(Backend)

anyio_backend_options class-attribute instance-attribute

anyio_backend_options: Dict[Backend, dict[str, Any] | None] = Dict(allow_none=True)

Default options to use with anyio.run. See also: Kernel.handle_message_request

concurrency_mode class-attribute instance-attribute

concurrency_mode = UseEnum(KernelConcurrencyMode)

The mode to use when getting the run mode for running the handler of a message request.

See also
help_links = Tuple()

quiet class-attribute instance-attribute

quiet = Bool(True)

Only send stdout/stderr to output stream

connection_file class-attribute instance-attribute

connection_file: TraitType[Path, Path | str] = TraitType()

JSON file in which to store connection info [default: kernel-.json]

This file will contain the IP, ports, and authentication key needed to connect clients to this kernel. By default, this file will be created in the security dir of the current profile, but can be specified by absolute path.

kernel_name class-attribute instance-attribute

kernel_name: str | Unicode = Unicode()

The kernels name - if it contains 'trio' a trio backend will be used instead of an asyncio backend.

ip class-attribute instance-attribute

ip = Unicode()

The kernel's IP address [default localhost].

If the IP address is something other than localhost, then Consoles on other machines will be able to connect to the Kernel, so be careful!

log class-attribute instance-attribute

log = Instance(LoggerAdapter)

shell class-attribute instance-attribute

shell = Instance(AsyncInteractiveShell, ())

session class-attribute instance-attribute

session = Instance(Session, ())

debugger class-attribute instance-attribute

debugger = Instance(Debugger, ())

comm_manager class-attribute instance-attribute

comm_manager: Instance[CommManager] = Instance('async_kernel.comm.CommManager')

event_started class-attribute instance-attribute

event_started = Instance(AsyncEvent, (), read_only=True)

An event that occurs when the kernel is started.

event_stopped class-attribute instance-attribute

event_stopped = Instance(AsyncEvent, (), read_only=True)

An event that occurs when the kernel is stopped.

execution_count property

execution_count: int

The execution count in context of the current coroutine, else the current value if there isn't one in context.

load_connection_info

load_connection_info(info: dict[str, Any]) -> None

Load connection info from a dict containing connection info.

Typically this data comes from a connection file and is called by load_connection_file.

Parameters:

  • info

    (dict[str, Any]) –

    Dictionary containing connection_info. See the connection_file spec for details.

Source code in src/async_kernel/kernel.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def load_connection_info(self, info: dict[str, Any]) -> None:
    """
    Load connection info from a dict containing connection info.

    Typically this data comes from a connection file
    and is called by load_connection_file.

    Args:
        info: Dictionary containing connection_info. See the connection_file spec for details.
    """
    if self._ports:
        msg = "Connection info is already loaded!"
        raise RuntimeError(msg)
    self.transport = info.get("transport", self.transport)
    self.ip = info.get("ip") or self.ip
    for socket in SocketID:
        name = f"{socket}_port"
        if socket not in self._ports and name in info:
            self._ports[socket] = info[name]
    if "key" in info:
        key = info["key"]
        if isinstance(key, str):
            key = key.encode()
        assert isinstance(key, bytes)

        self.session.key = key
    if "signature_scheme" in info:
        self.session.signature_scheme = info["signature_scheme"]

__aenter__ async

__aenter__() -> Self

Start the kernel.

  • Only one instance can (should) run at a time.
  • An instance can only be started once.
  • A new instance can be started after a previous instance has stopped.

Example

async with Kerne() as kernel:
    await anyio.sleep_forever()
Source code in src/async_kernel/kernel.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
async def __aenter__(self) -> Self:
    """
    Start the kernel.

    - Only one instance can (should) run at a time.
    - An instance can only be started once.
    - A new instance can be started after a previous instance has stopped.

    !!! example

        ```python
        async with Kerne() as kernel:
            await anyio.sleep_forever()
        ```
    """
    async with contextlib.AsyncExitStack() as stack:
        self._running = True
        await stack.enter_async_context(self._start_in_context())
        self.__stack = stack.pop_all()
    return self

stop staticmethod

stop() -> None

Stop the running kernel.

Once a kernel is stopped; that instance of the kernel cannot be restarted. Instead, a new kernel must be started.

Source code in src/async_kernel/kernel.py
459
460
461
462
463
464
465
466
467
468
469
@staticmethod
def stop() -> None:
    """
    Stop the running kernel.

    Once a kernel is stopped; that instance of the kernel cannot be restarted.
    Instead, a new kernel must be started.
    """
    if instance := Kernel._instance:
        Kernel._instance = None
        instance.event_stopped.set()

handle_message_request async

handle_message_request(job: Job) -> None

The main handler for all shell and control messages.

Parameters:

Source code in src/async_kernel/kernel.py
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
async def handle_message_request(self, job: Job, /) -> None:
    """
    The main handler for all shell and control messages.

    Args:
        job: The packed [message][async_kernel.typing.Message] for handling.
    """
    try:
        msg_type = MsgType(job["msg"]["header"]["msg_type"])
        socket_id = job["socket_id"]
        handler = self.get_handler(msg_type)
    except (ValueError, TypeError):
        self.log.debug("Invalid job %s", job)
        return
    run_mode = self.get_run_mode(msg_type, socket_id=socket_id, job=job)
    self.log.debug("%s  %s run mode %s handler: %s", socket_id, msg_type, run_mode, handler)
    job["run_mode"] = run_mode
    runner = _wrap_handler(self.run_handler, handler)
    match run_mode:
        case RunMode.queue:
            Caller().queue_call(runner, job)
        case RunMode.thread:
            Caller.to_thread(runner, job)
        case RunMode.task:
            Caller().call_soon(runner, job)
        case RunMode.blocking:
            await runner(job)

get_run_mode

get_run_mode(
    msg_type: MsgType,
    *,
    socket_id: Literal[shell, control] = shell,
    concurrency_mode: KernelConcurrencyMode | NoValue = NoValue,
    job: Job | None = None,
) -> RunMode

Determine the run mode for a given channel, message type and concurrency mode.

The run mode determines how the kernel will execute the message.

Parameters:

Returns:

  • RunMode

    The run mode for the message.

Raises:

  • ValueError

    If a shutdown or debug request is received on the shell socket.

Source code in src/async_kernel/kernel.py
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
def get_run_mode(
    self,
    msg_type: MsgType,
    *,
    socket_id: Literal[SocketID.shell, SocketID.control] = SocketID.shell,
    concurrency_mode: KernelConcurrencyMode | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    job: Job | None = None,
) -> RunMode:
    """
    Determine the run mode for a given channel, message type and concurrency mode.

    The run mode determines how the kernel will execute the message.

    Args:
        socket_id: The socket ID the message was received on.
        msg_type: The type of the message.
        concurrency_mode: The concurrency mode of the kernel. Defaults to [kernel.concurrency_mode][async_kernel.Kernel.concurrency_mode]
        job: The job associated with the message, if any.

    Returns:
        The run mode for the message.

    Raises:
        ValueError: If a shutdown or debug request is received on the shell socket.
    """

    concurrency_mode = self.concurrency_mode if concurrency_mode is NoValue else concurrency_mode
    # TODO: Are any of these options worth including?
    # if mode_from_metadata := job["msg"]["metadata"].get("run_mode"):
    #     return RunMode( mode_from_metadata)
    # if mode_from_header := job["msg"]["header"].get("run_mode"):
    #     return RunMode( mode_from_header)
    match (concurrency_mode, socket_id, msg_type):
        case _, SocketID.shell, MsgType.shutdown_request | MsgType.debug_request:
            msg = f"{msg_type=} not allowed on shell!"
            raise ValueError(msg)
        case KernelConcurrencyMode.blocking, _, _:
            return RunMode.blocking
        case _, SocketID.control, MsgType.execute_request:
            return RunMode.task
        case _, _, MsgType.execute_request:
            if job:
                if content := job["msg"].get("content", {}):
                    if (code := content.get("code")) and (mode_ := RunMode.get_mode(code)):
                        return mode_
                    if content.get("silent"):
                        return RunMode.task
                if mode_ := set(utils.get_tags(job)).intersection(RunMode):
                    return RunMode(next(iter(mode_)))
            return RunMode.queue
        case _, _, MsgType.inspect_request | MsgType.complete_request | MsgType.is_complete_request:
            return RunMode.thread
        case _, _, MsgType.history_request:
            return RunMode.thread
        case _, _, MsgType.comm_msg:
            return RunMode.queue
        case _, _, MsgType.kernel_info_request | MsgType.comm_info_request | MsgType.comm_open | MsgType.comm_close:
            return RunMode.blocking
        case _, _, MsgType.debug_request:
            return RunMode.blocking
        case _:
            return RunMode.blocking

all_concurrency_run_modes

all_concurrency_run_modes(
    socket_ids: Iterable[Literal[shell, control]] = (shell, control),
    msg_types: Iterable[MsgType] = MsgType,
) -> dict[
    Literal["SocketID", "KernelConcurrencyMode", "MsgType", "RunMode"],
    tuple[SocketID, KernelConcurrencyMode, MsgType, RunMode | None],
]

Generates a dictionary containing all combinations of SocketID, KernelConcurrencyMode, and MsgType, along with their corresponding RunMode (if available).

Source code in src/async_kernel/kernel.py
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
def all_concurrency_run_modes(
    self,
    socket_ids: Iterable[Literal[SocketID.shell, SocketID.control]] = (SocketID.shell, SocketID.control),
    msg_types: Iterable[MsgType] = MsgType,
) -> dict[
    Literal["SocketID", "KernelConcurrencyMode", "MsgType", "RunMode"],
    tuple[SocketID, KernelConcurrencyMode, MsgType, RunMode | None],
]:
    """
    Generates a dictionary containing all combinations of SocketID, KernelConcurrencyMode, and MsgType,
    along with their corresponding RunMode (if available)."""
    data: list[Any] = []
    for socket_id in socket_ids:
        for concurrency_mode in KernelConcurrencyMode:
            for msg_type in msg_types:
                try:
                    mode = self.get_run_mode(msg_type, socket_id=socket_id, concurrency_mode=concurrency_mode)
                except ValueError:
                    mode = None
                data.append((socket_id, concurrency_mode, msg_type, mode))
    data_ = zip(*data, strict=True)
    return dict(zip(["SocketID", "KernelConcurrencyMode", "MsgType", "RunMode"], data_, strict=True))

run_handler async

run_handler(handler: HandlerType, job: Job[dict]) -> None

Runs the handler in the context of the job/message sending the reply content if it is provided.

This method gets called for every valid request with the relevant handler.

Source code in src/async_kernel/kernel.py
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
async def run_handler(self, handler: HandlerType, job: Job[dict]) -> None:
    """
    Runs the handler in the context of the job/message sending the reply content if it is provided.

    This method gets called for every valid request with the relevant handler.
    """

    def send_reply(job: Job[dict], content: dict, /) -> None:
        if "status" not in content:
            content["status"] = "ok"
        msg = self.session.send(
            stream=job["socket"],
            msg_or_type=job["msg"]["header"]["msg_type"].replace("request", "reply"),
            content=content,
            parent=job["msg"]["header"],  # pyright: ignore[reportArgumentType]
            ident=job["ident"],
        )
        if msg:
            self.log.debug("*** _send_reply %s*** %s", job["socket_id"], msg)

    token = utils._job_var.set(job)  # pyright: ignore[reportPrivateUsage]
    try:
        self.iopub_send(msg_or_type="status", content={"execution_state": "busy"}, ident=self.topic("status"))
        if (content := await handler(job)) is not None:
            send_reply(job, content)
    except Exception as e:
        send_reply(job, error_to_content(e))
        self.log.exception("Exception in message handler:", exc_info=e)
    finally:
        utils._job_var.reset(token)  # pyright: ignore[reportPrivateUsage]
        self.iopub_send(
            msg_or_type="status", parent=job["msg"], content={"execution_state": "idle"}, ident=self.topic("status")
        )

iopub_send

iopub_send(
    msg_or_type: dict[str, Any] | str,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send a message on the zmq iopub socket.

Source code in src/async_kernel/kernel.py
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
def iopub_send(
    self,
    msg_or_type: dict[str, Any] | str,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send a message on the zmq iopub socket."""
    if socket := Caller.iopub_sockets.get(thread := threading.current_thread()):
        msg = self.session.send(
            stream=socket,
            msg_or_type=msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else utils.get_parent(),  # pyright: ignore[reportArgumentType]
            ident=ident,
            buffers=buffers,
        )
        if msg:
            self.log.debug(
                "iopub_send: (thread=%s) msg_type:'%s', content: %s", thread.name, msg["msg_type"], msg["content"]
            )
    else:
        self.control_thread_caller.call_direct(
            self.iopub_send,
            msg_or_type=msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else None,
            ident=ident,
            buffers=buffers,
        )

topic

topic(topic) -> bytes

prefixed topic for IOPub messages.

Source code in src/async_kernel/kernel.py
911
912
913
def topic(self, topic) -> bytes:
    """prefixed topic for IOPub messages."""
    return (f"kernel.{topic}").encode()

kernel_info_request async

kernel_info_request(job: Job[Content]) -> Content

Handle a kernel info request.

Source code in src/async_kernel/kernel.py
915
916
917
async def kernel_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [kernel info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-info)."""
    return self.kernel_info

comm_info_request async

comm_info_request(job: Job[Content]) -> Content

Handle a comm info request.

Source code in src/async_kernel/kernel.py
919
920
921
922
923
924
925
926
927
928
async def comm_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [comm info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-info)."""
    c = job["msg"]["content"]
    target_name = c.get("target_name", None)
    comms = {
        k: {"target_name": v.target_name}
        for (k, v) in tuple(self.comm_manager.comms.items())
        if v.target_name == target_name or target_name is None
    }
    return {"comms": comms}

execute_request async

execute_request(job: Job[ExecuteContent]) -> Content

Handle a execute request.

Source code in src/async_kernel/kernel.py
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
async def execute_request(self, job: Job[ExecuteContent], /) -> Content:
    """Handle a [execute request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute)."""
    c = job["msg"]["content"]
    if (
        job["run_mode"] is RunMode.queue
        and (job["received_time"] < self._stop_on_error_time)
        and not c.get("silent", False)
    ):
        self.log.info("Aborting execute_request: %s", job)
        return error_to_content(RuntimeError("Aborting due to prior exception")) | {
            "execution_count": self.execution_count
        }
    metadata = job["msg"].get("metadata") or {}
    if not (silent := c["silent"]):
        self._execution_count += 1
        utils._execution_count_var.set(self._execution_count)  # pyright: ignore[reportPrivateUsage]
        self.iopub_send(
            msg_or_type="execute_input",
            content={"code": c["code"], "execution_count": self.execution_count},
            parent=job["msg"],
            ident=self.topic("execute_input"),
        )
    fut = (Caller.to_thread if job["run_mode"] is RunMode.thread else Caller().call_soon)(
        self.shell.run_cell_async,
        raw_cell=c["code"],
        store_history=c.get("store_history", False),
        silent=silent,
        transformed_cell=self.shell.transform_cell(c["code"]),
        shell_futures=True,
        cell_id=metadata.get("cellId"),
    )
    if not silent:
        self._interrupts.add(fut.cancel)
        fut.add_done_callback(lambda fut: self._interrupts.discard(fut.cancel))
    try:
        result: ExecutionResult = await fut
        err = result.error_before_exec or result.error_in_exec if result else KernelInterruptError()
    except Exception as e:
        # A safeguard to catch exceptions not caught by the shell.
        err = e
    if (err) and (
        (Tags.suppress_error in metadata.get("tags", ()))
        or (isinstance(err, anyio.get_cancelled_exc_class()) and (utils.get_execute_request_timeout() is not None))
    ):
        # Suppress the error due to either:
        # 1. tag
        # 2. timeout
        err = None
    content = {
        "status": "error" if err else "ok",
        "execution_count": self.execution_count,
        "user_expressions": self.shell.user_expressions(c.get("user_expressions", {})),
    }
    if err:
        content |= error_to_content(err)
        if (not silent) and c.get("stop_on_error"):
            try:
                self._stop_on_error_time = math.inf
                self.log.info("An error occurred in a non-silent execution request")
                await anyio.sleep(0)
            finally:
                self._stop_on_error_time = time.monotonic()
    return content

complete_request async

complete_request(job: Job[Content]) -> Content

Handle a completion request.

Source code in src/async_kernel/kernel.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
async def complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [completion request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#completion)."""
    c = job["msg"]["content"]
    code: str = c["code"]
    cursor_pos = c.get("cursor_pos") or len(code)
    with IPython.core.completer.provisionalcompleter():
        completions = self.shell.Completer.completions(code, cursor_pos)
        completions = list(IPython.core.completer.rectify_completions(code, completions))
    comps = [
        {
            "start": comp.start,
            "end": comp.end,
            "text": comp.text,
            "type": comp.type,
            "signature": comp.signature,
        }
        for comp in completions
    ]
    s, e = completions[0].start, completions[0].end if completions else (cursor_pos, cursor_pos)
    matches = [c.text for c in completions]
    return {
        "matches": matches,
        "cursor_end": e,
        "cursor_start": s,
        "metadata": {"_jupyter_types_experimental": comps},
    }

is_complete_request async

is_complete_request(job: Job[Content]) -> Content

Handle a is_complete request.

Source code in src/async_kernel/kernel.py
1021
1022
1023
1024
1025
1026
1027
async def is_complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [is_complete request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#code-completeness)."""
    status, indent_spaces = self.shell.input_transformer_manager.check_complete(job["msg"]["content"]["code"])
    content = {"status": status}
    if status == "incomplete":
        content["indent"] = " " * indent_spaces
    return content

inspect_request async

inspect_request(job: Job[Content]) -> Content

Handle a inspect request.

Source code in src/async_kernel/kernel.py
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
async def inspect_request(self, job: Job[Content], /) -> Content:
    """Handle a [inspect request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#introspection)."""
    c = job["msg"]["content"]
    detail_level = int(c.get("detail_level", 0))
    omit_sections = set(c.get("omit_sections", []))
    name = token_at_cursor(c["code"], c["cursor_pos"])
    content = {"data": {}, "metadata": {}, "found": True}
    try:
        bundle = self.shell.object_inspect_mime(name, detail_level=detail_level, omit_sections=omit_sections)
        content["data"] = bundle
        if not self.shell.enable_html_pager:
            content["data"].pop("text/html")
    except KeyError:
        content["found"] = False
    return content

history_request async

history_request(job: Job[Content]) -> Content

Handle a history request.

Source code in src/async_kernel/kernel.py
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
async def history_request(self, job: Job[Content], /) -> Content:
    """Handle a [history request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#history)."""
    c = job["msg"]["content"]
    history_manager = self.shell.history_manager
    assert history_manager
    if c.get("hist_access_type") == "tail":
        hist = history_manager.get_tail(c["n"], raw=c.get("raw"), output=c.get("output"), include_latest=True)
    elif c.get("hist_access_type") == "range":
        hist = history_manager.get_range(
            c.get("session", 0),
            c.get("start", 1),
            c.get("stop", None),
            raw=c.get("raw", True),
            output=c.get("output", False),
        )
    elif c.get("hist_access_type") == "search":
        hist = history_manager.search(
            c.get("pattern"), raw=c.get("raw"), output=c.get("output"), n=c.get("n"), unique=c.get("unique")
        )
    else:
        hist = []
    return {"history": list(hist)}

comm_open async

comm_open(job: Job[Content]) -> None

Handle a comm open request.

Source code in src/async_kernel/kernel.py
1068
1069
1070
async def comm_open(self, job: Job[Content], /) -> None:
    """Handle a [comm open request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm)."""
    self.comm_manager.comm_open(stream=job["socket"], ident=job["ident"], msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_msg async

comm_msg(job: Job[Content]) -> None

Handle a comm msg request.

Source code in src/async_kernel/kernel.py
1072
1073
1074
async def comm_msg(self, job: Job[Content], /) -> None:
    """Handle a [comm msg request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-messages)."""
    self.comm_manager.comm_msg(stream=job["socket"], ident=job["ident"], msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_close async

comm_close(job: Job[Content]) -> None

Handle a comm close request.

Source code in src/async_kernel/kernel.py
1076
1077
1078
async def comm_close(self, job: Job[Content], /) -> None:
    """Handle a [comm close request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#tearing-down-comms)."""
    self.comm_manager.comm_close(stream=job["socket"], ident=job["ident"], msg=job["msg"])  # pyright: ignore[reportArgumentType]

interrupt_request async

interrupt_request(job: Job[Content]) -> Content

Handle a interrupt request (control only).

Source code in src/async_kernel/kernel.py
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
async def interrupt_request(self, job: Job[Content], /) -> Content:
    """Handle a [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt) (control only)."""
    self._interrupt_requested = True
    if sys.platform == "win32":
        signal.raise_signal(signal.SIGINT)
        time.sleep(0)
    else:
        os.kill(os.getpid(), signal.SIGINT)
    for interrupter in tuple(self._interrupts):
        interrupter()
    return {}

shutdown_request async

shutdown_request(job: Job[Content]) -> Content

Handle a shutdown request (control only).

Source code in src/async_kernel/kernel.py
1092
1093
1094
1095
async def shutdown_request(self, job: Job[Content], /) -> Content:
    """Handle a [shutdown request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-shutdown) (control only)."""
    self.stop()
    return {"restart": job["msg"]["content"].get("restart", False)}

debug_request async

debug_request(job: Job[Content]) -> Content

Handle a debug request (control only).

Source code in src/async_kernel/kernel.py
1097
1098
1099
async def debug_request(self, job: Job[Content], /) -> Content:
    """Handle a [debug request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request) (control only)."""
    return await self.debugger.process_request(job["msg"]["content"])

excepthook

excepthook(etype, evalue, tb) -> None

Handle an exception.

Source code in src/async_kernel/kernel.py
1101
1102
1103
1104
def excepthook(self, etype, evalue, tb) -> None:
    """Handle an exception."""
    # write uncaught traceback to 'real' stderr, not zmq-forwarder
    traceback.print_exception(etype, evalue, tb, file=sys.__stderr__)

unraisablehook

unraisablehook(unraisable: UnraisableHookArgs) -> None

Handle unraisable exceptions (during gc for instance).

Source code in src/async_kernel/kernel.py
1106
1107
1108
1109
1110
1111
1112
1113
def unraisablehook(self, unraisable: sys.UnraisableHookArgs, /) -> None:
    "Handle unraisable exceptions (during gc for instance)."
    exc_info = (
        unraisable.exc_type,
        unraisable.exc_value or unraisable.exc_type(unraisable.err_msg),
        unraisable.exc_traceback,
    )
    self.log.exception(unraisable.err_msg, exc_info=exc_info, extra={"object": unraisable.object})

raw_input

raw_input(prompt='') -> Any

Forward raw_input to frontends.

Raises

StdinNotImplementedError if active frontend doesn't support stdin.

Source code in src/async_kernel/kernel.py
1115
1116
1117
1118
1119
1120
1121
1122
1123
def raw_input(self, prompt="") -> Any:
    """
    Forward raw_input to frontends.

    Raises
    ------
    StdinNotImplementedError if active frontend doesn't support stdin.
    """
    return self._input_request(str(prompt), password=False)

getpass

getpass(prompt='') -> Any

Forward getpass to frontends.

Source code in src/async_kernel/kernel.py
1125
1126
1127
def getpass(self, prompt="") -> Any:
    """Forward getpass to frontends."""
    return self._input_request(prompt, password=True)

get_connection_info

get_connection_info() -> dict[str, Any]

Return the connection info as a dict.

Source code in src/async_kernel/kernel.py
1129
1130
1131
1132
def get_connection_info(self) -> dict[str, Any]:
    """Return the connection info as a dict."""
    with self.connection_file.open("r") as f:
        return json.load(f)

get_parent

get_parent() -> Message[dict[str, Any]] | None

A convenience method to access the 'message' in the current context if there is one.

'parent' is the parameter name uses in Session.send.

See also
Source code in src/async_kernel/kernel.py
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
def get_parent(self) -> Message[dict[str, Any]] | None:
    """A convenience method to access the 'message' in the current context if there is one.

    'parent' is the parameter name uses in [Session.send][jupyter_client.session.Session.send].

    See also:
        - [Kernel.iopub_send][async_kernel.Kernel.iopub_send]
        - [ipywidgets.Output][ipywidgets.widgets.widget_output.Output]:
            Uses `get_ipython().kernel.get_parent()` to obtain the `msg_id` which
            is used to 'capture' output when it's context has been acquired.
    """
    return utils.get_parent()

async_kernel.kernel.error_to_content

error_to_content(error: BaseException) -> Content

Convert the error to a dict.

ref: https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply

Source code in src/async_kernel/kernel.py
75
76
77
78
79
80
81
82
83
84
85
86
def error_to_content(error: BaseException, /) -> Content:
    """
    Convert the error to a dict.

    ref: https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply
    """
    return {
        "status": "error",
        "ename": type(error).__name__,
        "evalue": str(error),
        "traceback": traceback.format_exception(error),
    }

async_kernel.kernel.bind_socket

bind_socket(
    socket: Socket[SocketType],
    transport: Literal["tcp", "ipc"],
    ip: str,
    port: int = 0,
    max_attempts: int | NoValue = NoValue,
) -> int

Bind the socket to a port using the settings.

max_attempts: The maximum number of attempts to bind the socket. If un-specified, defaults to 100 if port missing, else 2 attempts.

Source code in src/async_kernel/kernel.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def bind_socket(
    socket: Socket[SocketType],
    transport: Literal["tcp", "ipc"],
    ip: str,
    port: int = 0,
    max_attempts: int | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
) -> int:
    """
    Bind the socket to a port using the settings.

    max_attempts: The maximum number of attempts to bind the socket. If un-specified,
    defaults to 100 if port missing, else 2 attempts.
    """
    if socket.TYPE == SocketType.ROUTER:
        # ref: https://github.com/ipython/ipykernel/issues/270
        socket.router_handover = 1
    if transport == "ipc":
        ip = Path(ip).as_posix()
    if max_attempts is NoValue:
        max_attempts = 2 if port else 100
    for attempt in range(max_attempts):
        try:
            if transport == "tcp":
                if not port:
                    port = socket.bind_to_random_port(f"tcp://{ip}")
                else:
                    socket.bind(f"tcp://{ip}:{port}")
            elif transport == "ipc":
                if not port:
                    port = 1
                    while Path(f"{ip}-{port}").exists():
                        port += 1
                socket.bind(f"ipc://{ip}-{port}")
            else:
                msg = f"Invalid transport: {transport}"  # pyright: ignore[reportUnreachable]
                raise ValueError(msg)
        except ZMQError as e:
            if e.errno not in {errno.EADDRINUSE, 98, 10048, 135}:
                raise
            if port and attempt < max_attempts - 1:
                time.sleep(0.1)
        else:
            return port
    msg = f"Failed to bind {socket} for {transport=} after {max_attempts} attempts."
    raise RuntimeError(msg)