Skip to content

Kernel

async_kernel.kernel

Classes:

  • Kernel

    An asynchronous kernel with an anyio backend providing an IPython AsyncInteractiveShell with zmq sockets.

  • KernelInterruptError

    Raised to interrupt the kernel.

Kernel

Kernel(settings: dict | None = None)

An asynchronous kernel with an anyio backend providing an IPython AsyncInteractiveShell with zmq sockets.

Only one instance will be created/run at a time. The instance can be obtained with Kernel() or [async_kernel.utils.get_kernel].

To start the kernel:

At the command prompt.

async-kernel -f .

See also:

-

from async_kernel.__main__ import main

main()
Kernel.start()
kernel = Kernel()
async with kernel.start_in_context():
    await anyio.sleep_forever()
Tip

This is a convenient way to start a kernel for debugging.

Origins: IPyKernel Kernel, IPyKernel IPKernelApp & IPyKernel IPythonKernel

Methods:

Attributes:

Source code in src/async_kernel/kernel.py
306
307
308
309
310
311
312
313
314
315
316
def __init__(self, settings: dict | None = None, /) -> None:
    if self._initialised:
        return  # Only initialize once
    self._initialised = True
    super().__init__()
    sys.excepthook = self.excepthook
    sys.unraisablehook = self.unraisablehook
    signal.signal(signal.SIGINT, self._signal_handler)
    if not os.environ.get("MPLBACKEND"):
        os.environ["MPLBACKEND"] = "module://matplotlib_inline.backend_inline"
    self._settings = settings or {}

anyio_backend class-attribute instance-attribute

anyio_backend = UseEnum(Backend)

anyio_backend_options class-attribute instance-attribute

anyio_backend_options: Dict[Backend, dict[str, Any] | None] = Dict(allow_none=True)

Default options to use with anyio.run. See also: Kernel.handle_message_request

comm_manager class-attribute instance-attribute

comm_manager: Instance[CommManager] = Instance('async_kernel.comm.CommManager')

concurrency_mode class-attribute instance-attribute

concurrency_mode = UseEnum(KernelConcurrencyMode)

The mode to use when getting the run mode for running the handler of a message request.

See also

connection_file class-attribute instance-attribute

connection_file: TraitType[Path, Path | str] = TraitType()

JSON file in which to store connection info [default: kernel-.json]

This file will contain the IP, ports, and authentication key needed to connect clients to this kernel. By default, this file will be created in the security dir of the current profile, but can be specified by absolute path.

debugger class-attribute instance-attribute

debugger = Instance(Debugger, ())

execution_count property

execution_count: int

The execution count in context of the current coroutine, else the current value if there isn't one in context.

help_links = Tuple()

ip class-attribute instance-attribute

ip = Unicode()

The kernel's IP address [default localhost].

If the IP address is something other than localhost, then Consoles on other machines will be able to connect to the Kernel, so be careful!

kernel_name class-attribute instance-attribute

kernel_name: str | Unicode = Unicode()

The kernels name - if it contains 'trio' a trio backend will be used instead of an asyncio backend.

log class-attribute instance-attribute

log = Instance(LoggerAdapter)

quiet class-attribute instance-attribute

quiet = Bool(True)

Only send stdout/stderr to output stream

session class-attribute instance-attribute

session = Instance(Session, ())

shell class-attribute instance-attribute

shell = Instance(AsyncInteractiveShell, ())

__aenter__ async

__aenter__() -> Self

Start the kernel.

  • Only one instance can (should) run at a time.
  • An instance can only be started once.
  • A new instance can be started after a previous instance has stopped.

Example

async with Kerne() as kernel:
    await anyio.sleep_forever()
Source code in src/async_kernel/kernel.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
async def __aenter__(self) -> Self:
    """
    Start the kernel.

    - Only one instance can (should) run at a time.
    - An instance can only be started once.
    - A new instance can be started after a previous instance has stopped.

    !!! example

        ```python
        async with Kerne() as kernel:
            await anyio.sleep_forever()
        ```
    """
    async with contextlib.AsyncExitStack() as stack:
        self._running = True
        await stack.enter_async_context(self._start_in_context())
        self.__stack = stack.pop_all()
    return self

all_concurrency_run_modes

all_concurrency_run_modes(
    socket_ids: Iterable[Literal[shell, control]] = (shell, control),
    msg_types: Iterable[MsgType] = MsgType,
) -> dict[
    Literal["SocketID", "KernelConcurrencyMode", "MsgType", "RunMode"],
    tuple[SocketID, KernelConcurrencyMode, MsgType, RunMode | None],
]

Generates a dictionary containing all combinations of SocketID, KernelConcurrencyMode, and MsgType, along with their corresponding RunMode (if available).

Source code in src/async_kernel/kernel.py
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
def all_concurrency_run_modes(
    self,
    socket_ids: Iterable[Literal[SocketID.shell, SocketID.control]] = (SocketID.shell, SocketID.control),
    msg_types: Iterable[MsgType] = MsgType,
) -> dict[
    Literal["SocketID", "KernelConcurrencyMode", "MsgType", "RunMode"],
    tuple[SocketID, KernelConcurrencyMode, MsgType, RunMode | None],
]:
    """
    Generates a dictionary containing all combinations of SocketID, KernelConcurrencyMode, and MsgType,
    along with their corresponding RunMode (if available)."""
    data: list[Any] = []
    for socket_id in socket_ids:
        for concurrency_mode in KernelConcurrencyMode:
            for msg_type in msg_types:
                try:
                    mode = self.get_run_mode(msg_type, socket_id=socket_id, concurrency_mode=concurrency_mode)
                except ValueError:
                    mode = None
                data.append((socket_id, concurrency_mode, msg_type, mode))
    data_ = zip(*data, strict=True)
    return dict(zip(["SocketID", "KernelConcurrencyMode", "MsgType", "RunMode"], data_, strict=True))

comm_close async

comm_close(job: Job[Content]) -> None

Handle a comm close request.

Source code in src/async_kernel/kernel.py
1072
1073
1074
async def comm_close(self, job: Job[Content], /) -> None:
    """Handle a [comm close request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#tearing-down-comms)."""
    self.comm_manager.comm_close(stream=job["socket"], ident=job["ident"], msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_info_request async

comm_info_request(job: Job[Content]) -> Content

Handle a comm info request.

Source code in src/async_kernel/kernel.py
915
916
917
918
919
920
921
922
923
924
async def comm_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [comm info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-info)."""
    c = job["msg"]["content"]
    target_name = c.get("target_name", None)
    comms = {
        k: {"target_name": v.target_name}
        for (k, v) in tuple(self.comm_manager.comms.items())
        if v.target_name == target_name or target_name is None
    }
    return {"comms": comms}

comm_msg async

comm_msg(job: Job[Content]) -> None

Handle a comm msg request.

Source code in src/async_kernel/kernel.py
1068
1069
1070
async def comm_msg(self, job: Job[Content], /) -> None:
    """Handle a [comm msg request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#comm-messages)."""
    self.comm_manager.comm_msg(stream=job["socket"], ident=job["ident"], msg=job["msg"])  # pyright: ignore[reportArgumentType]

comm_open async

comm_open(job: Job[Content]) -> None

Handle a comm open request.

Source code in src/async_kernel/kernel.py
1064
1065
1066
async def comm_open(self, job: Job[Content], /) -> None:
    """Handle a [comm open request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm)."""
    self.comm_manager.comm_open(stream=job["socket"], ident=job["ident"], msg=job["msg"])  # pyright: ignore[reportArgumentType]

complete_request async

complete_request(job: Job[Content]) -> Content

Handle a completion request.

Source code in src/async_kernel/kernel.py
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
async def complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [completion request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#completion)."""
    c = job["msg"]["content"]
    code: str = c["code"]
    cursor_pos = c.get("cursor_pos") or len(code)
    with IPython.core.completer.provisionalcompleter():
        completions = self.shell.Completer.completions(code, cursor_pos)
        completions = list(IPython.core.completer.rectify_completions(code, completions))
    comps = [
        {
            "start": comp.start,
            "end": comp.end,
            "text": comp.text,
            "type": comp.type,
            "signature": comp.signature,
        }
        for comp in completions
    ]
    s, e = completions[0].start, completions[0].end if completions else (cursor_pos, cursor_pos)
    matches = [c.text for c in completions]
    return {
        "matches": matches,
        "cursor_end": e,
        "cursor_start": s,
        "metadata": {"_jupyter_types_experimental": comps},
    }

debug_request async

debug_request(job: Job[Content]) -> Content

Handle a debug request (control only).

Source code in src/async_kernel/kernel.py
1094
1095
1096
async def debug_request(self, job: Job[Content], /) -> Content:
    """Handle a [debug request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#debug-request) (control only)."""
    return await self.debugger.process_request(job["msg"]["content"])

excepthook

excepthook(etype, evalue, tb) -> None

Handle an exception.

Source code in src/async_kernel/kernel.py
1098
1099
1100
1101
def excepthook(self, etype, evalue, tb) -> None:
    """Handle an exception."""
    # write uncaught traceback to 'real' stderr, not zmq-forwarder
    traceback.print_exception(etype, evalue, tb, file=sys.__stderr__)

execute_request async

execute_request(job: Job[ExecuteContent]) -> Content

Handle a execute request.

Source code in src/async_kernel/kernel.py
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
async def execute_request(self, job: Job[ExecuteContent], /) -> Content:
    """Handle a [execute request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute)."""
    c = job["msg"]["content"]
    if (
        job["run_mode"] is RunMode.queue
        and (job["received_time"] < self._stop_on_error_time)
        and not c.get("silent", False)
    ):
        self.log.info("Aborting execute_request: %s", job)
        return error_to_content(RuntimeError("Aborting due to prior exception")) | {
            "execution_count": self.execution_count
        }
    metadata = job["msg"].get("metadata") or {}
    if not (silent := c["silent"]):
        self._execution_count += 1
        utils._execution_count_var.set(self._execution_count)  # pyright: ignore[reportPrivateUsage]
        self.iopub_send(
            msg_or_type="execute_input",
            content={"code": c["code"], "execution_count": self.execution_count},
            parent=job["msg"],
            ident=self.topic("execute_input"),
        )
    fut = (Caller.to_thread if job["run_mode"] is RunMode.thread else Caller().call_soon)(
        self.shell.run_cell_async,
        raw_cell=c["code"],
        store_history=c.get("store_history", False),
        silent=silent,
        transformed_cell=self.shell.transform_cell(c["code"]),
        shell_futures=True,
        cell_id=metadata.get("cellId"),
    )
    if not silent:
        self._interrupts.add(fut.cancel)
        fut.add_done_callback(lambda fut: self._interrupts.discard(fut.cancel))
    try:
        result: ExecutionResult = await fut
        err = result.error_before_exec or result.error_in_exec if result else KernelInterruptError()
    except Exception as e:
        # A safeguard to catch exceptions not caught by the shell.
        err = e
    if (err) and (
        (Tags.suppress_error in metadata.get("tags", ()))  # 1.
        or (isinstance(err, self.CancelledError) and (utils.get_execute_request_timeout() is not None))  # 2.
    ):
        # Suppress the error due to either:
        # 1. tag
        # 2. timeout
        err = None
    content = {
        "status": "error" if err else "ok",
        "execution_count": self.execution_count,
        "user_expressions": self.shell.user_expressions(c.get("user_expressions", {})),
    }
    if err:
        content |= error_to_content(err)
        if (not silent) and c.get("stop_on_error"):
            try:
                self._stop_on_error_time = float("inf")
                self.log.info("An error occurred in a non-silent execution request")
                await anyio.sleep(0)
            finally:
                self._stop_on_error_time = time.monotonic()
    return content

get_connection_info

get_connection_info() -> dict[str, Any]

Return the connection info as a dict.

Source code in src/async_kernel/kernel.py
1126
1127
1128
1129
def get_connection_info(self) -> dict[str, Any]:
    """Return the connection info as a dict."""
    with self.connection_file.open("r") as f:
        return json.load(f)

get_run_mode

get_run_mode(
    msg_type: MsgType,
    *,
    socket_id: Literal[shell, control] = shell,
    concurrency_mode: KernelConcurrencyMode | NoValue = NoValue,
    job: Job | None = None,
) -> RunMode

Determine the run mode for a given channel, message type and concurrency mode.

The run mode determines how the kernel will execute the message.

Parameters:

Returns:

  • RunMode

    The run mode for the message.

Raises:

  • ValueError

    If a shutdown or debug request is received on the shell socket.

Source code in src/async_kernel/kernel.py
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
def get_run_mode(
    self,
    msg_type: MsgType,
    *,
    socket_id: Literal[SocketID.shell, SocketID.control] = SocketID.shell,
    concurrency_mode: KernelConcurrencyMode | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    job: Job | None = None,
) -> RunMode:
    """
    Determine the run mode for a given channel, message type and concurrency mode.

    The run mode determines how the kernel will execute the message.

    Args:
        socket_id: The socket ID the message was received on.
        msg_type: The type of the message.
        concurrency_mode: The concurrency mode of the kernel. Defaults to [kernel.concurrency_mode][async_kernel.Kernel.concurrency_mode]
        job: The job associated with the message, if any.

    Returns:
        The run mode for the message.

    Raises:
        ValueError: If a shutdown or debug request is received on the shell socket.
    """

    concurrency_mode = self.concurrency_mode if concurrency_mode is NoValue else concurrency_mode
    # TODO: Are any of these options worth including?
    # if mode_from_metadata := job["msg"]["metadata"].get("run_mode"):
    #     return RunMode( mode_from_metadata)
    # if mode_from_header := job["msg"]["header"].get("run_mode"):
    #     return RunMode( mode_from_header)
    match (concurrency_mode, socket_id, msg_type):
        case KernelConcurrencyMode.blocking, _, _:
            return RunMode.blocking
        case _, SocketID.control, MsgType.execute_request:
            return RunMode.task
        case _, _, MsgType.execute_request:
            if job:
                if content := job["msg"].get("content", {}):
                    if (code := content.get("code")) and (mode_ := RunMode.get_mode(code)):
                        return mode_
                    if content.get("silent"):
                        return RunMode.task
                if mode_ := set(utils.get_tags(job)).intersection(RunMode):
                    return RunMode(next(iter(mode_)))
            return RunMode.queue
        case _, SocketID.shell, MsgType.shutdown_request | MsgType.debug_request:
            msg = f"{msg_type=} not allowed on shell!"
            raise ValueError(msg)
        case _, _, MsgType.inspect_request | MsgType.complete_request | MsgType.is_complete_request:
            return RunMode.thread
        case _, _, MsgType.history_request:
            return RunMode.thread
        case _, _, MsgType.kernel_info_request | MsgType.comm_info_request | MsgType.comm_open | MsgType.comm_close:
            return RunMode.blocking
        case _:
            return RunMode.task

getpass

getpass(prompt='') -> Any

Forward getpass to frontends.

Source code in src/async_kernel/kernel.py
1122
1123
1124
def getpass(self, prompt="") -> Any:
    """Forward getpass to frontends."""
    return self._input_request(prompt, password=True)

handle_message_request async

handle_message_request(job: Job) -> None

The main handler for all shell and control messages.

Parameters:

Source code in src/async_kernel/kernel.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
async def handle_message_request(self, job: Job, /) -> None:
    """
    The main handler for all shell and control messages.

    Args:
        job: The packed [message][async_kernel.typing.Message] for handling.
    """
    try:
        msg_type = MsgType(job["msg"]["header"]["msg_type"])
        socket_id = job["socket_id"]
        handler = self.get_handler(msg_type)
    except (ValueError, TypeError):
        self.log.debug("Invalid job %s", job)
        return
    run_mode = self.get_run_mode(msg_type, socket_id=socket_id, job=job)
    self.log.debug("%s  %s run mode %s handler: %s", socket_id, msg_type, run_mode, handler)
    job["run_mode"] = run_mode
    runner = _wrap_handler(self.run_handler, handler)
    match run_mode:
        case RunMode.queue:
            await Caller().queue_call(runner, job, wait=True)
        case RunMode.thread:
            Caller.to_thread(runner, job)
        case RunMode.task:
            Caller().call_soon(runner, job)
        case RunMode.blocking:
            await runner(job)

history_request async

history_request(job: Job[Content]) -> Content

Handle a history request.

Source code in src/async_kernel/kernel.py
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
async def history_request(self, job: Job[Content], /) -> Content:
    """Handle a [history request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#history)."""
    c = job["msg"]["content"]
    history_manager = self.shell.history_manager
    assert history_manager
    if c.get("hist_access_type") == "tail":
        hist = history_manager.get_tail(c["n"], raw=c.get("raw"), output=c.get("output"), include_latest=True)
    elif c.get("hist_access_type") == "range":
        hist = history_manager.get_range(
            c.get("session", 0),
            c.get("start", 1),
            c.get("stop", None),
            raw=c.get("raw", True),
            output=c.get("output", False),
        )
    elif c.get("hist_access_type") == "search":
        hist = history_manager.search(
            c.get("pattern"), raw=c.get("raw"), output=c.get("output"), n=c.get("n"), unique=c.get("unique")
        )
    else:
        hist = []
    return {"history": list(hist)}

inspect_request async

inspect_request(job: Job[Content]) -> Content

Handle a inspect request.

Source code in src/async_kernel/kernel.py
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
async def inspect_request(self, job: Job[Content], /) -> Content:
    """Handle a [inspect request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#introspection)."""
    c = job["msg"]["content"]
    detail_level = int(c.get("detail_level", 0))
    omit_sections = set(c.get("omit_sections", []))
    name = token_at_cursor(c["code"], c["cursor_pos"])
    content = {"data": {}, "metadata": {}, "found": True}
    try:
        bundle = self.shell.object_inspect_mime(name, detail_level=detail_level, omit_sections=omit_sections)
        content["data"] = bundle
        if not self.shell.enable_html_pager:
            content["data"].pop("text/html")
    except KeyError:
        content["found"] = False
    return content

interrupt_request async

interrupt_request(job: Job[Content]) -> Content

Handle a interrupt request (control only).

Source code in src/async_kernel/kernel.py
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
async def interrupt_request(self, job: Job[Content], /) -> Content:
    """Handle a [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt) (control only)."""
    self._interrupt_requested = True
    if sys.platform == "win32":
        signal.raise_signal(signal.SIGINT)
        time.sleep(0)
    else:
        os.kill(os.getpid(), signal.SIGINT)
    for interrupter in tuple(self._interrupts):
        interrupter()
    return {}

iopub_send

iopub_send(
    msg_or_type: dict[str, Any] | str,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send a message on the zmq iopub socket.

Source code in src/async_kernel/kernel.py
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
def iopub_send(
    self,
    msg_or_type: dict[str, Any] | str,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send a message on the zmq iopub socket."""
    if socket := Caller.iopub_sockets.get(thread := threading.current_thread()):
        msg = self.session.send(
            stream=socket,
            msg_or_type=msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else utils.get_parent(),  # pyright: ignore[reportArgumentType]
            ident=ident,
            buffers=buffers,
        )
        if msg:
            self.log.debug(
                "iopub_send: (thread=%s) msg_type:'%s', content: %s", thread.name, msg["msg_type"], msg["content"]
            )
    else:
        self.control_thread_caller.call_direct(
            self.iopub_send,
            msg_or_type=msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else None,
            ident=ident,
            buffers=buffers,
        )

is_complete_request async

is_complete_request(job: Job[Content]) -> Content

Handle a is_complete request.

Source code in src/async_kernel/kernel.py
1017
1018
1019
1020
1021
1022
1023
async def is_complete_request(self, job: Job[Content], /) -> Content:
    """Handle a [is_complete request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#code-completeness)."""
    status, indent_spaces = self.shell.input_transformer_manager.check_complete(job["msg"]["content"]["code"])
    content = {"status": status}
    if status == "incomplete":
        content["indent"] = " " * indent_spaces
    return content

kernel_info_request async

kernel_info_request(job: Job[Content]) -> Content

Handle a kernel info request.

Source code in src/async_kernel/kernel.py
911
912
913
async def kernel_info_request(self, job: Job[Content], /) -> Content:
    """Handle a [kernel info request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-info)."""
    return self.kernel_info

load_connection_info

load_connection_info(info: dict[str, Any]) -> None

Load connection info from a dict containing connection info.

Typically this data comes from a connection file and is called by load_connection_file.

Parameters:

  • info
    (dict[str, Any]) –

    Dictionary containing connection_info. See the connection_file spec for details.

Source code in src/async_kernel/kernel.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def load_connection_info(self, info: dict[str, Any]) -> None:
    """
    Load connection info from a dict containing connection info.

    Typically this data comes from a connection file
    and is called by load_connection_file.

    Args:
        info: Dictionary containing connection_info. See the connection_file spec for details.
    """
    if self._ports:
        msg = "Connection info is already loaded!"
        raise RuntimeError(msg)
    self.transport = info.get("transport", self.transport)
    self.ip = info.get("ip") or self.ip
    for socket in SocketID:
        name = f"{socket}_port"
        if socket not in self._ports and name in info:
            self._ports[socket] = info[name]
    if "key" in info:
        key = info["key"]
        if isinstance(key, str):
            key = key.encode()
        assert isinstance(key, bytes)

        self.session.key = key
    if "signature_scheme" in info:
        self.session.signature_scheme = info["signature_scheme"]

raw_input

raw_input(prompt='') -> Any

Forward raw_input to frontends.

Raises

StdinNotImplementedError if active frontend doesn't support stdin.

Source code in src/async_kernel/kernel.py
1112
1113
1114
1115
1116
1117
1118
1119
1120
def raw_input(self, prompt="") -> Any:
    """
    Forward raw_input to frontends.

    Raises
    ------
    StdinNotImplementedError if active frontend doesn't support stdin.
    """
    return self._input_request(str(prompt), password=False)

run_handler async

run_handler(handler: HandlerType, job: Job[dict]) -> None

Runs the handler in the context of the job/message sending the reply content if it is provided.

This method gets called for every valid request with the relevant handler.

Source code in src/async_kernel/kernel.py
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
async def run_handler(self, handler: HandlerType, job: Job[dict]) -> None:
    """
    Runs the handler in the context of the job/message sending the reply content if it is provided.

    This method gets called for every valid request with the relevant handler.
    """

    def _send_reply(content: dict, /) -> None:
        """Send a reply to the job with the specified content."""
        if "status" not in content:
            content["status"] = "ok"
        msg = self.session.send(
            stream=job["socket"],
            msg_or_type=job["msg"]["header"]["msg_type"].replace("request", "reply"),
            content=content,
            parent=job["msg"]["header"],  # pyright: ignore[reportArgumentType]
            ident=job["ident"],
        )
        if msg:
            self.log.debug("*** _send_reply %s*** %s", job["socket_id"], msg)

    utils._job_var.set(job)  # pyright: ignore[reportPrivateUsage]
    try:
        self.iopub_send(msg_or_type="status", content={"execution_state": "busy"}, ident=self.topic("status"))
        if (content := await handler(job)) is not None:
            _send_reply(content)
    except Exception as e:
        _send_reply(error_to_content(e))
        self.log.exception("Exception in message handler:", exc_info=e)
    finally:
        self.iopub_send(msg_or_type="status", content={"execution_state": "idle"}, ident=self.topic("status"))

shutdown_request async

shutdown_request(job: Job[Content]) -> Content

Handle a shutdown request (control only).

Source code in src/async_kernel/kernel.py
1088
1089
1090
1091
1092
async def shutdown_request(self, job: Job[Content], /) -> Content:
    """Handle a [shutdown request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-shutdown) (control only)."""
    await self.debugger.disconnect()
    Caller().call_direct(self.stop)
    return {"restart": job["msg"]["content"].get("restart", False)}

stop staticmethod

stop() -> None

Stop the running kernel.

Once a kernel is stopped; that instance of the kernel cannot be restarted. Instead, a new kernel must be started.

Source code in src/async_kernel/kernel.py
452
453
454
455
456
457
458
459
460
461
462
@staticmethod
def stop() -> None:
    """
    Stop the running kernel.

    Once a kernel is stopped; that instance of the kernel cannot be restarted.
    Instead, a new kernel must be started.
    """
    if instance := Kernel._instance:
        Kernel._instance = None
        instance._stop_event.set()

topic

topic(topic) -> bytes

prefixed topic for IOPub messages.

Source code in src/async_kernel/kernel.py
907
908
909
def topic(self, topic) -> bytes:
    """prefixed topic for IOPub messages."""
    return (f"kernel.{topic}").encode()

unraisablehook

unraisablehook(unraisable: UnraisableHookArgs) -> None

Handle unraisable exceptions (during gc for instance).

Source code in src/async_kernel/kernel.py
1103
1104
1105
1106
1107
1108
1109
1110
def unraisablehook(self, unraisable: sys.UnraisableHookArgs, /) -> None:
    "Handle unraisable exceptions (during gc for instance)."
    exc_info = (
        unraisable.exc_type,
        unraisable.exc_value or unraisable.exc_type(unraisable.err_msg),
        unraisable.exc_traceback,
    )
    self.log.exception(unraisable.err_msg, exc_info=exc_info, extra={"object": unraisable.object})

KernelInterruptError

Raised to interrupt the kernel.

bind_socket

bind_socket(
    socket: Socket[SocketType],
    transport: Literal["tcp", "ipc"],
    ip: str,
    port: int = 0,
    max_attempts: int | NoValue = NoValue,
) -> int

Bind the socket to a port using the settings.

max_attempts: The maximum number of attempts to bind the socket. If un-specified, defaults to 100 if port missing, else 2 attempts.

Source code in src/async_kernel/kernel.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def bind_socket(
    socket: Socket[SocketType],
    transport: Literal["tcp", "ipc"],
    ip: str,
    port: int = 0,
    max_attempts: int | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
) -> int:
    """
    Bind the socket to a port using the settings.

    max_attempts: The maximum number of attempts to bind the socket. If un-specified,
    defaults to 100 if port missing, else 2 attempts.
    """
    if socket.TYPE == SocketType.ROUTER:
        # ref: https://github.com/ipython/ipykernel/issues/270
        socket.router_handover = 1
    if transport == "ipc":
        ip = Path(ip).as_posix()
    if max_attempts is NoValue:
        max_attempts = 2 if port else 100
    for attempt in range(max_attempts):
        try:
            if transport == "tcp":
                if not port:
                    port = socket.bind_to_random_port(f"tcp://{ip}")
                else:
                    socket.bind(f"tcp://{ip}:{port}")
            elif transport == "ipc":
                if not port:
                    port = 1
                    while Path(f"{ip}-{port}").exists():
                        port += 1
                socket.bind(f"ipc://{ip}-{port}")
            else:
                msg = f"Invalid transport: {transport}"  # pyright: ignore[reportUnreachable]
                raise ValueError(msg)
        except ZMQError as e:
            if e.errno not in {errno.EADDRINUSE, 98, 10048, 135}:
                raise
            if port and attempt < max_attempts - 1:
                time.sleep(0.1)
        else:
            return port
    msg = f"Failed to bind {socket} for {transport=} after {max_attempts} attempts."
    raise RuntimeError(msg)

error_to_content

error_to_content(error: BaseException) -> Content

Convert the error to a dict.

ref: https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply

Source code in src/async_kernel/kernel.py
74
75
76
77
78
79
80
81
82
83
84
85
def error_to_content(error: BaseException, /) -> Content:
    """
    Convert the error to a dict.

    ref: https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply
    """
    return {
        "status": "error",
        "ename": type(error).__name__,
        "evalue": str(error),
        "traceback": traceback.format_exception(error),
    }