Skip to content

interface

Modules:

  • base

    The base class definition to interface with the kernel.

  • callable

    A collection of objects to provide a kernel interface based on callbacks.

  • zmq

    A collection of objects defining the kernel interface using zmq sockets.

Classes:

  • BaseKernelInterface

    The base class required interface with the kernel. Must be overloaded to be useful.

Functions:

BaseKernelInterface

Bases: HasTraits, AsyncContextManagerMixin

The base class required interface with the kernel. Must be overloaded to be useful.

Methods:

Attributes:

Source code in src/async_kernel/interface/base.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class BaseKernelInterface(HasTraits, anyio.AsyncContextManagerMixin):
    """
    The base class required interface with the kernel. Must be overloaded to be useful.
    """

    log = Instance(logging.LoggerAdapter)
    "The logging adapter."

    callers: Fixed[Self, dict[Literal[Channel.shell, Channel.control], Caller]] = Fixed(dict)
    "The caller associated with the kernel once it has started."
    ""
    kernel: Fixed[Self, Kernel] = Fixed(lambda _: async_kernel.Kernel())
    "The kernel."

    interrupts: Fixed[Self, set[Callable[[], object]]] = Fixed(set)
    "A set for callables can be added to run code when a kernel interrupt is initiated (control thread)."

    last_interrupt_frame = None
    "This frame is set when an interrupt is intercepted and cleared once the interrupt has been handled."

    wait_exit = Fixed(Event)
    "An event that when set will leave the kernel context if the kernel was started by this interface."

    def load_connection_info(self, info: dict[str, Any]) -> None:
        raise NotImplementedError

    @traitlets.default("log")
    def _default_log(self) -> LoggerAdapter[Logger]:
        return logging.LoggerAdapter(logging.getLogger(self.__class__.__name__))

    def __init__(self, kernel_settings: dict[str, Any] | None = None, /) -> None:
        if self.kernel.trait_has_value("interface"):
            msg = "The kernel already has an interface!"
            raise RuntimeError(msg)
        super().__init__()
        self.kernel.interface = self
        if kernel_settings:
            self.kernel.load_settings(kernel_settings)

    @asynccontextmanager
    async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
        """Create caller, and open socketes."""
        self.anyio_backend = Backend(current_async_library())
        restore_io = None
        caller = Caller("manual", name="Shell", protected=True, log=self.kernel.log)
        self.callers[Channel.shell] = caller
        self.callers[Channel.control] = caller.get(name="Control", log=self.kernel.log, protected=True)
        async with caller:
            try:
                restore_io = self._patch_io()
                yield self
            finally:
                if restore_io:
                    restore_io()

    def _patch_io(self) -> Callable[[], None]:
        original_io = sys.stdout, sys.stderr, sys.displayhook, builtins.input, self.getpass

        def restore():
            sys.stdout, sys.stderr, sys.displayhook, builtins.input, getpass.getpass = original_io

        builtins.input = self.raw_input
        getpass.getpass = self.getpass
        for name in ["stdout", "stderr"]:

            def flusher(string: str, name=name):
                "Publish stdio or stderr when flush is called"
                self.iopub_send(
                    msg_or_type="stream",
                    content={"name": name, "text": string},
                    ident=f"stream.{name}".encode(),
                )
                if not self.kernel.quiet and (echo := (sys.__stdout__ if name == "stdout" else sys.__stderr__)):
                    echo.write(string)
                    echo.flush()

            wrapper = OutStream(flusher=flusher)
            setattr(sys, name, wrapper)

        return restore

    def input_request(self, prompt: str, *, password: bool = False) -> str:
        """
        Forward an input request to the frontend.

        Args:
            prompt: The user prompt.
            password: If the prompt should be considered as a password.

        Raises:
           IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdi
        """
        raise NotImplementedError

    def raw_input(self, prompt: str = "") -> str:
        """
        Forward a raw_input request to the client.

        Args:
            prompt: The user prompt.

        Raises:
           IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
        """
        return self.input_request(str(prompt), password=False)

    def getpass(self, prompt: str = "") -> str:
        """
        Forward getpass to the client.

        Args:
            prompt: The user prompt.

        Raises:
           IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
        """
        return self.input_request(prompt, password=True)

    def interrupt(self):
        """
        Interrupt execution, possible raising a [async_kernel.asyncshell.KernelInterruptError][].
        """
        while self.interrupts:
            try:
                self.interrupts.pop()()
            except Exception:
                pass

    def msg(
        self,
        msg_type: str,
        *,
        content: dict | None = None,
        parent: Message | dict[str, Any] | None = None,
        header: MsgHeader | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        channel: Channel = Channel.shell,
    ) -> Message[dict[str, Any]]:
        """Return the nested message dict.

        This format is different from what is sent over the wire. The
        serialize/deserialize methods converts this nested message dict to the wire
        format, which is a list of message parts.
        """
        parent = parent or async_kernel.utils.get_parent()
        if header is None:
            session = ""
            if parent and (header := parent.get("header")):
                session = header.get("session", "")
            header = MsgHeader(
                date=datetime.now(UTC),
                msg_id=str(uuid4()),
                msg_type=msg_type,
                session=session,
                username="",
                version=async_kernel.kernel_protocol_version,
            )
        return Message(  # pyright: ignore[reportCallIssue]
            channel=channel,
            header=header,
            parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
            content={} if content is None else content,
            metadata=metadata if metadata is not None else {},
        )

    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """Send an iopub message."""
        raise NotImplementedError

log class-attribute instance-attribute

log = Instance(LoggerAdapter)

The logging adapter.

callers class-attribute instance-attribute

The caller associated with the kernel once it has started.

kernel class-attribute instance-attribute

kernel: Fixed[Self, Kernel] = Fixed(lambda _: Kernel())

The kernel.

interrupts class-attribute instance-attribute

interrupts: Fixed[Self, set[Callable[[], object]]] = Fixed(set)

A set for callables can be added to run code when a kernel interrupt is initiated (control thread).

last_interrupt_frame class-attribute instance-attribute

last_interrupt_frame = None

This frame is set when an interrupt is intercepted and cleared once the interrupt has been handled.

wait_exit class-attribute instance-attribute

wait_exit = Fixed(Event)

An event that when set will leave the kernel context if the kernel was started by this interface.

__asynccontextmanager__ async

__asynccontextmanager__() -> AsyncGenerator[Self]

Create caller, and open socketes.

Source code in src/async_kernel/interface/base.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@asynccontextmanager
async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
    """Create caller, and open socketes."""
    self.anyio_backend = Backend(current_async_library())
    restore_io = None
    caller = Caller("manual", name="Shell", protected=True, log=self.kernel.log)
    self.callers[Channel.shell] = caller
    self.callers[Channel.control] = caller.get(name="Control", log=self.kernel.log, protected=True)
    async with caller:
        try:
            restore_io = self._patch_io()
            yield self
        finally:
            if restore_io:
                restore_io()

input_request

input_request(prompt: str, *, password: bool = False) -> str

Forward an input request to the frontend.

Parameters:

  • prompt

    (str) –

    The user prompt.

  • password

    (bool, default: False ) –

    If the prompt should be considered as a password.

Raises:

Source code in src/async_kernel/interface/base.py
135
136
137
138
139
140
141
142
143
144
145
146
def input_request(self, prompt: str, *, password: bool = False) -> str:
    """
    Forward an input request to the frontend.

    Args:
        prompt: The user prompt.
        password: If the prompt should be considered as a password.

    Raises:
       IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdi
    """
    raise NotImplementedError

raw_input

raw_input(prompt: str = '') -> str

Forward a raw_input request to the client.

Parameters:

  • prompt

    (str, default: '' ) –

    The user prompt.

Raises:

Source code in src/async_kernel/interface/base.py
148
149
150
151
152
153
154
155
156
157
158
def raw_input(self, prompt: str = "") -> str:
    """
    Forward a raw_input request to the client.

    Args:
        prompt: The user prompt.

    Raises:
       IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
    """
    return self.input_request(str(prompt), password=False)

getpass

getpass(prompt: str = '') -> str

Forward getpass to the client.

Parameters:

  • prompt

    (str, default: '' ) –

    The user prompt.

Raises:

Source code in src/async_kernel/interface/base.py
160
161
162
163
164
165
166
167
168
169
170
def getpass(self, prompt: str = "") -> str:
    """
    Forward getpass to the client.

    Args:
        prompt: The user prompt.

    Raises:
       IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
    """
    return self.input_request(prompt, password=True)

interrupt

interrupt()

Interrupt execution, possible raising a async_kernel.asyncshell.KernelInterruptError.

Source code in src/async_kernel/interface/base.py
172
173
174
175
176
177
178
179
180
def interrupt(self):
    """
    Interrupt execution, possible raising a [async_kernel.asyncshell.KernelInterruptError][].
    """
    while self.interrupts:
        try:
            self.interrupts.pop()()
        except Exception:
            pass

msg

msg(
    msg_type: str,
    *,
    content: dict | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = shell,
) -> Message[dict[str, Any]]

Return the nested message dict.

This format is different from what is sent over the wire. The serialize/deserialize methods converts this nested message dict to the wire format, which is a list of message parts.

Source code in src/async_kernel/interface/base.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def msg(
    self,
    msg_type: str,
    *,
    content: dict | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = Channel.shell,
) -> Message[dict[str, Any]]:
    """Return the nested message dict.

    This format is different from what is sent over the wire. The
    serialize/deserialize methods converts this nested message dict to the wire
    format, which is a list of message parts.
    """
    parent = parent or async_kernel.utils.get_parent()
    if header is None:
        session = ""
        if parent and (header := parent.get("header")):
            session = header.get("session", "")
        header = MsgHeader(
            date=datetime.now(UTC),
            msg_id=str(uuid4()),
            msg_type=msg_type,
            session=session,
            username="",
            version=async_kernel.kernel_protocol_version,
        )
    return Message(  # pyright: ignore[reportCallIssue]
        channel=channel,
        header=header,
        parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
        content={} if content is None else content,
        metadata=metadata if metadata is not None else {},
    )

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send an iopub message.

Source code in src/async_kernel/interface/base.py
219
220
221
222
223
224
225
226
227
228
229
230
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send an iopub message."""
    raise NotImplementedError

start_kernel_callable_interface async

start_kernel_callable_interface(
    *,
    send: Callable[[str, list | None, bool], None | str],
    stopped: Callable[[], None],
    settings: dict | None = None,
) -> Handlers

Start the kernel with the callback based kernel interface CallableKernelInterface.

Source code in src/async_kernel/interface/__init__.py
15
16
17
18
19
20
21
22
23
24
25
26
async def start_kernel_callable_interface(
    *,
    send: Callable[[str, list | None, bool], None | str],
    stopped: Callable[[], None],
    settings: dict | None = None,
) -> Handlers:
    """
    Start the kernel with the callback based kernel interface [CallableKernelInterface][async_kernel.interface.callable.CallableKernelInterface].
    """
    from async_kernel.interface.callable import CallableKernelInterface  # noqa: PLC0415

    return await CallableKernelInterface(settings).start(send=send, stopped=stopped)

start_kernel_zmq_interface

start_kernel_zmq_interface(settings: dict | None = None) -> None

Start the kernel with the zmq socket based kernel interface ZMQKernelInterface.

Available in CPython.

Source code in src/async_kernel/interface/__init__.py
29
30
31
32
33
34
35
36
37
def start_kernel_zmq_interface(settings: dict | None = None) -> None:
    """
    Start the kernel with the zmq socket based kernel interface [ZMQKernelInterface][async_kernel.interface.zmq.ZMQKernelInterface].

    Available in CPython.
    """
    from async_kernel.interface.zmq import ZMQKernelInterface  # noqa: PLC0415

    ZMQKernelInterface(settings).start()

The base class definition to interface with the kernel.

Classes:

  • BaseKernelInterface

    The base class required interface with the kernel. Must be overloaded to be useful.

BaseKernelInterface

Bases: HasTraits, AsyncContextManagerMixin

The base class required interface with the kernel. Must be overloaded to be useful.

Methods:

Attributes:

Source code in src/async_kernel/interface/base.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class BaseKernelInterface(HasTraits, anyio.AsyncContextManagerMixin):
    """
    The base class required interface with the kernel. Must be overloaded to be useful.
    """

    log = Instance(logging.LoggerAdapter)
    "The logging adapter."

    callers: Fixed[Self, dict[Literal[Channel.shell, Channel.control], Caller]] = Fixed(dict)
    "The caller associated with the kernel once it has started."
    ""
    kernel: Fixed[Self, Kernel] = Fixed(lambda _: async_kernel.Kernel())
    "The kernel."

    interrupts: Fixed[Self, set[Callable[[], object]]] = Fixed(set)
    "A set for callables can be added to run code when a kernel interrupt is initiated (control thread)."

    last_interrupt_frame = None
    "This frame is set when an interrupt is intercepted and cleared once the interrupt has been handled."

    wait_exit = Fixed(Event)
    "An event that when set will leave the kernel context if the kernel was started by this interface."

    def load_connection_info(self, info: dict[str, Any]) -> None:
        raise NotImplementedError

    @traitlets.default("log")
    def _default_log(self) -> LoggerAdapter[Logger]:
        return logging.LoggerAdapter(logging.getLogger(self.__class__.__name__))

    def __init__(self, kernel_settings: dict[str, Any] | None = None, /) -> None:
        if self.kernel.trait_has_value("interface"):
            msg = "The kernel already has an interface!"
            raise RuntimeError(msg)
        super().__init__()
        self.kernel.interface = self
        if kernel_settings:
            self.kernel.load_settings(kernel_settings)

    @asynccontextmanager
    async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
        """Create caller, and open socketes."""
        self.anyio_backend = Backend(current_async_library())
        restore_io = None
        caller = Caller("manual", name="Shell", protected=True, log=self.kernel.log)
        self.callers[Channel.shell] = caller
        self.callers[Channel.control] = caller.get(name="Control", log=self.kernel.log, protected=True)
        async with caller:
            try:
                restore_io = self._patch_io()
                yield self
            finally:
                if restore_io:
                    restore_io()

    def _patch_io(self) -> Callable[[], None]:
        original_io = sys.stdout, sys.stderr, sys.displayhook, builtins.input, self.getpass

        def restore():
            sys.stdout, sys.stderr, sys.displayhook, builtins.input, getpass.getpass = original_io

        builtins.input = self.raw_input
        getpass.getpass = self.getpass
        for name in ["stdout", "stderr"]:

            def flusher(string: str, name=name):
                "Publish stdio or stderr when flush is called"
                self.iopub_send(
                    msg_or_type="stream",
                    content={"name": name, "text": string},
                    ident=f"stream.{name}".encode(),
                )
                if not self.kernel.quiet and (echo := (sys.__stdout__ if name == "stdout" else sys.__stderr__)):
                    echo.write(string)
                    echo.flush()

            wrapper = OutStream(flusher=flusher)
            setattr(sys, name, wrapper)

        return restore

    def input_request(self, prompt: str, *, password: bool = False) -> str:
        """
        Forward an input request to the frontend.

        Args:
            prompt: The user prompt.
            password: If the prompt should be considered as a password.

        Raises:
           IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdi
        """
        raise NotImplementedError

    def raw_input(self, prompt: str = "") -> str:
        """
        Forward a raw_input request to the client.

        Args:
            prompt: The user prompt.

        Raises:
           IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
        """
        return self.input_request(str(prompt), password=False)

    def getpass(self, prompt: str = "") -> str:
        """
        Forward getpass to the client.

        Args:
            prompt: The user prompt.

        Raises:
           IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
        """
        return self.input_request(prompt, password=True)

    def interrupt(self):
        """
        Interrupt execution, possible raising a [async_kernel.asyncshell.KernelInterruptError][].
        """
        while self.interrupts:
            try:
                self.interrupts.pop()()
            except Exception:
                pass

    def msg(
        self,
        msg_type: str,
        *,
        content: dict | None = None,
        parent: Message | dict[str, Any] | None = None,
        header: MsgHeader | dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
        channel: Channel = Channel.shell,
    ) -> Message[dict[str, Any]]:
        """Return the nested message dict.

        This format is different from what is sent over the wire. The
        serialize/deserialize methods converts this nested message dict to the wire
        format, which is a list of message parts.
        """
        parent = parent or async_kernel.utils.get_parent()
        if header is None:
            session = ""
            if parent and (header := parent.get("header")):
                session = header.get("session", "")
            header = MsgHeader(
                date=datetime.now(UTC),
                msg_id=str(uuid4()),
                msg_type=msg_type,
                session=session,
                username="",
                version=async_kernel.kernel_protocol_version,
            )
        return Message(  # pyright: ignore[reportCallIssue]
            channel=channel,
            header=header,
            parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
            content={} if content is None else content,
            metadata=metadata if metadata is not None else {},
        )

    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """Send an iopub message."""
        raise NotImplementedError

log class-attribute instance-attribute

log = Instance(LoggerAdapter)

The logging adapter.

callers class-attribute instance-attribute

The caller associated with the kernel once it has started.

kernel class-attribute instance-attribute

kernel: Fixed[Self, Kernel] = Fixed(lambda _: Kernel())

The kernel.

interrupts class-attribute instance-attribute

interrupts: Fixed[Self, set[Callable[[], object]]] = Fixed(set)

A set for callables can be added to run code when a kernel interrupt is initiated (control thread).

last_interrupt_frame class-attribute instance-attribute

last_interrupt_frame = None

This frame is set when an interrupt is intercepted and cleared once the interrupt has been handled.

wait_exit class-attribute instance-attribute

wait_exit = Fixed(Event)

An event that when set will leave the kernel context if the kernel was started by this interface.

__asynccontextmanager__ async

__asynccontextmanager__() -> AsyncGenerator[Self]

Create caller, and open socketes.

Source code in src/async_kernel/interface/base.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@asynccontextmanager
async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
    """Create caller, and open socketes."""
    self.anyio_backend = Backend(current_async_library())
    restore_io = None
    caller = Caller("manual", name="Shell", protected=True, log=self.kernel.log)
    self.callers[Channel.shell] = caller
    self.callers[Channel.control] = caller.get(name="Control", log=self.kernel.log, protected=True)
    async with caller:
        try:
            restore_io = self._patch_io()
            yield self
        finally:
            if restore_io:
                restore_io()

input_request

input_request(prompt: str, *, password: bool = False) -> str

Forward an input request to the frontend.

Parameters:

  • prompt

    (str) –

    The user prompt.

  • password

    (bool, default: False ) –

    If the prompt should be considered as a password.

Raises:

Source code in src/async_kernel/interface/base.py
135
136
137
138
139
140
141
142
143
144
145
146
def input_request(self, prompt: str, *, password: bool = False) -> str:
    """
    Forward an input request to the frontend.

    Args:
        prompt: The user prompt.
        password: If the prompt should be considered as a password.

    Raises:
       IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdi
    """
    raise NotImplementedError

raw_input

raw_input(prompt: str = '') -> str

Forward a raw_input request to the client.

Parameters:

  • prompt

    (str, default: '' ) –

    The user prompt.

Raises:

Source code in src/async_kernel/interface/base.py
148
149
150
151
152
153
154
155
156
157
158
def raw_input(self, prompt: str = "") -> str:
    """
    Forward a raw_input request to the client.

    Args:
        prompt: The user prompt.

    Raises:
       IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
    """
    return self.input_request(str(prompt), password=False)

getpass

getpass(prompt: str = '') -> str

Forward getpass to the client.

Parameters:

  • prompt

    (str, default: '' ) –

    The user prompt.

Raises:

Source code in src/async_kernel/interface/base.py
160
161
162
163
164
165
166
167
168
169
170
def getpass(self, prompt: str = "") -> str:
    """
    Forward getpass to the client.

    Args:
        prompt: The user prompt.

    Raises:
       IPython.core.error.StdinNotImplementedError: if active frontend doesn't support stdin.
    """
    return self.input_request(prompt, password=True)

interrupt

interrupt()

Interrupt execution, possible raising a async_kernel.asyncshell.KernelInterruptError.

Source code in src/async_kernel/interface/base.py
172
173
174
175
176
177
178
179
180
def interrupt(self):
    """
    Interrupt execution, possible raising a [async_kernel.asyncshell.KernelInterruptError][].
    """
    while self.interrupts:
        try:
            self.interrupts.pop()()
        except Exception:
            pass

msg

msg(
    msg_type: str,
    *,
    content: dict | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = shell,
) -> Message[dict[str, Any]]

Return the nested message dict.

This format is different from what is sent over the wire. The serialize/deserialize methods converts this nested message dict to the wire format, which is a list of message parts.

Source code in src/async_kernel/interface/base.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def msg(
    self,
    msg_type: str,
    *,
    content: dict | None = None,
    parent: Message | dict[str, Any] | None = None,
    header: MsgHeader | dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    channel: Channel = Channel.shell,
) -> Message[dict[str, Any]]:
    """Return the nested message dict.

    This format is different from what is sent over the wire. The
    serialize/deserialize methods converts this nested message dict to the wire
    format, which is a list of message parts.
    """
    parent = parent or async_kernel.utils.get_parent()
    if header is None:
        session = ""
        if parent and (header := parent.get("header")):
            session = header.get("session", "")
        header = MsgHeader(
            date=datetime.now(UTC),
            msg_id=str(uuid4()),
            msg_type=msg_type,
            session=session,
            username="",
            version=async_kernel.kernel_protocol_version,
        )
    return Message(  # pyright: ignore[reportCallIssue]
        channel=channel,
        header=header,
        parent_header=extract_header(parent),  # pyright: ignore[reportArgumentType]
        content={} if content is None else content,
        metadata=metadata if metadata is not None else {},
    )

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send an iopub message.

Source code in src/async_kernel/interface/base.py
219
220
221
222
223
224
225
226
227
228
229
230
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send an iopub message."""
    raise NotImplementedError

A collection of objects to provide a kernel interface based on callbacks.

Classes:

Handlers

Bases: TypedDict

Handlers returned by async_kernel.interface.callable.CallableKernelInterface when it is started.

Attributes:

Source code in src/async_kernel/interface/callable.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Handlers(TypedDict):
    "Handlers returned by [async_kernel.interface.callable.CallableKernelInterface][] when it is started."

    handle_msg: Callable[[str, list[bytes] | list[bytearray] | None]]
    """
    Handle messages from the client.

    The handler requires two positional arguments

    1. The message serialized as a JSON string. The channel ("shell" or "control" ) 
        should also be included in the Message under the key "channel". 
    2. A list of buffers if there are any, or None if there are no buffers.
    """

    stop: Callable[[], None]
    "Stop the kernel."

handle_msg instance-attribute

handle_msg: Callable[[str, list[bytes] | list[bytearray] | None]]

Handle messages from the client.

The handler requires two positional arguments

  1. The message serialized as a JSON string. The channel ("shell" or "control" ) should also be included in the Message under the key "channel".
  2. A list of buffers if there are any, or None if there are no buffers.

stop instance-attribute

stop: Callable[[], None]

Stop the kernel.

CallableKernelInterface

Bases: BaseKernelInterface

A callback based interface to interact with the kernel using serialized messages.

Usage:

```python
from async_kernel.interface.callable import CallableKernelInterface

# Start the kernel providing the necessary callbacks.
kernel_interface = await CallableKernelInterface(options).start(send=..., stopped=...)

# Pass messages to the kernel.
kernel_interface["handle_msg"](msg, buffer)

# Stop the kernel.
kernel_interface["stop"](msg, buffer)
```

See also: - [async_kernel.typing.CallableKernelInterfaceReturnArgs]

Methods:

  • pack

    Pack a message to a string.

  • unpack

    Unpack a message from a json string.

  • start

    Start the kernel.

Source code in src/async_kernel/interface/callable.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class CallableKernelInterface(BaseKernelInterface):
    """
    A callback based interface to interact with the kernel using serialized messages.

    Usage:

        ```python
        from async_kernel.interface.callable import CallableKernelInterface

        # Start the kernel providing the necessary callbacks.
        kernel_interface = await CallableKernelInterface(options).start(send=..., stopped=...)

        # Pass messages to the kernel.
        kernel_interface["handle_msg"](msg, buffer)

        # Stop the kernel.
        kernel_interface["stop"](msg, buffer)
        ```
    See also:
        - [async_kernel.typing.CallableKernelInterfaceReturnArgs]
    """

    ORJSON_OPTION = orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_NAIVE_UTC | orjson.OPT_UTC_Z
    _send: Callable[[str, list | None, bool], None | str]

    def pack(self, msg: Message, /) -> str:
        """
        Pack a message to a string.
        """
        return orjson.dumps(msg, default=repr, option=self.ORJSON_OPTION).decode()

    def unpack(self, msg_string, /) -> Message[dict[str, Any]]:
        """
        Unpack a message from a json string.
        """
        try:
            return orjson.loads(msg_string)
        except Exception:
            return json.loads(msg_string)

    async def start(
        self,
        *,
        send: Callable[[str, list | None, bool], None | str],
        stopped: Callable[[], None],
    ) -> Handlers:
        """
        Start the kernel.

        Args:
            send: The function to send kernel messages to the client. It must accept

                1. A json string of the message.
                2. A list of buffers, or None if there are no buffers.
                3. A boolean value that indicates a response is required for the stdio channel.

            stopped: A callback that is called once the kernel has stopped.

        Returns: A pending that when resolved returns the message handler callback.
        """
        self._send = send
        ready: Pending[Handlers] = async_kernel.Pending()
        sig = signal.signal(signal.SIGINT, self._signal_handler)

        async def run_kernel():
            try:
                async with self.kernel:
                    ready.set_result(Handlers(handle_msg=self._handle_msg, stop=self.kernel.stop))
                    await anyio.sleep_forever()
            except Exception as e:
                del self._send
                if not ready.done():
                    ready.set_exception(e)
            finally:
                signal.signal(signal.SIGINT, sig)
                stopped()

        self._task = asyncio.create_task(run_kernel())
        return await ready

    @enable_signal_safety
    def _signal_handler(self, signum, frame: FrameType | None) -> None:
        self.last_interrupt_frame = frame
        self.interrupt()
        self.last_interrupt_frame = None
        raise KernelInterruptError

    def _send_to_frontend(
        self,
        msg: Message[dict],
        *,
        channel: Channel = Channel.shell,
        buffers: list[bytearray | bytes] | None = None,
        requires_reply=False,
    ) -> Message | None:
        msg["channel"] = channel
        reply = self._send(self.pack(msg), buffers, requires_reply)
        if requires_reply:
            assert reply
            return self.unpack(reply)
        return None

    async def _send_reply(self, job: Job, content: dict, /) -> None:
        if "status" not in content:
            content["status"] = "ok"
        msg_type = job["msg"]["header"]["msg_type"].replace("request", "reply")
        msg = self.msg(msg_type, content=content, parent=job["msg"])
        self._send_to_frontend(msg, channel=job["msg"]["channel"], buffers=content.pop("buffers", None))

    def _handle_msg(self, msg_json: str, buffers: list[bytearray] | list[bytes] | None = None, /):
        "The main message handler that gets returned by the `start` method."
        msg: Message[dict[str, Any]] = self.unpack(msg_json)
        # Copy the buffer
        msg["buffers"] = [b[:] for b in buffers] if buffers else []
        msg["channel"] = Channel(msg["channel"])
        job = Job(received_time=time.monotonic(), msg=msg, ident=b"")
        self.kernel.msg_handler(msg["channel"], MsgType(job["msg"]["header"]["msg_type"]), job, self._send_reply)  # pyright: ignore[reportArgumentType]

    @override
    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        if parent is NoValue:
            parent = async_kernel.utils.get_parent()
        if not isinstance(msg_or_type, dict):
            msg_or_type = self.msg(msg_type=msg_or_type, content=content, parent=parent, metadata=metadata)  # pyright: ignore[reportArgumentType]
        self._send_to_frontend(msg_or_type, channel="iopub", buffers=buffers)  # pyright: ignore[reportArgumentType]

    @override
    def input_request(self, prompt: str, *, password=False) -> Any:
        job = async_kernel.utils.get_job()
        if not job["msg"].get("content", {}).get("allow_stdin", False):
            msg = "Stdin is not allowed in this context!"
            raise StdinNotImplementedError(msg)
        msg = self.msg("input_request", content={"prompt": prompt, "password": password})
        reply = self._send_to_frontend(msg, channel=Channel.stdin, requires_reply=True)
        assert reply
        return reply["content"]["value"]

pack

pack(msg: Message) -> str

Pack a message to a string.

Source code in src/async_kernel/interface/callable.py
75
76
77
78
79
def pack(self, msg: Message, /) -> str:
    """
    Pack a message to a string.
    """
    return orjson.dumps(msg, default=repr, option=self.ORJSON_OPTION).decode()

unpack

unpack(msg_string) -> Message[dict[str, Any]]

Unpack a message from a json string.

Source code in src/async_kernel/interface/callable.py
81
82
83
84
85
86
87
88
def unpack(self, msg_string, /) -> Message[dict[str, Any]]:
    """
    Unpack a message from a json string.
    """
    try:
        return orjson.loads(msg_string)
    except Exception:
        return json.loads(msg_string)

start async

start(
    *, send: Callable[[str, list | None, bool], None | str], stopped: Callable[[], None]
) -> Handlers

Start the kernel.

Parameters:

  • send

    (Callable[[str, list | None, bool], None | str]) –

    The function to send kernel messages to the client. It must accept

    1. A json string of the message.
    2. A list of buffers, or None if there are no buffers.
    3. A boolean value that indicates a response is required for the stdio channel.
  • stopped

    (Callable[[], None]) –

    A callback that is called once the kernel has stopped.

Returns: A pending that when resolved returns the message handler callback.

Source code in src/async_kernel/interface/callable.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
async def start(
    self,
    *,
    send: Callable[[str, list | None, bool], None | str],
    stopped: Callable[[], None],
) -> Handlers:
    """
    Start the kernel.

    Args:
        send: The function to send kernel messages to the client. It must accept

            1. A json string of the message.
            2. A list of buffers, or None if there are no buffers.
            3. A boolean value that indicates a response is required for the stdio channel.

        stopped: A callback that is called once the kernel has stopped.

    Returns: A pending that when resolved returns the message handler callback.
    """
    self._send = send
    ready: Pending[Handlers] = async_kernel.Pending()
    sig = signal.signal(signal.SIGINT, self._signal_handler)

    async def run_kernel():
        try:
            async with self.kernel:
                ready.set_result(Handlers(handle_msg=self._handle_msg, stop=self.kernel.stop))
                await anyio.sleep_forever()
        except Exception as e:
            del self._send
            if not ready.done():
                ready.set_exception(e)
        finally:
            signal.signal(signal.SIGINT, sig)
            stopped()

    self._task = asyncio.create_task(run_kernel())
    return await ready

A collection of objects defining the kernel interface using zmq sockets.

Classes:

ZMQKernelInterface

Bases: BaseKernelInterface

An interface for the kernel that uses zmq sockets.

Methods:

  • start

    Start the kernel blocking until the kernel stops.

  • load_connection_info

    Load connection info from a dict containing connection info.

  • __asynccontextmanager__

    Create caller, and open socketes.

  • iopub_send

    Send a message on the zmq iopub socket.

  • receive_msg_loop

    Opens a zmq socket for the channel, receives messages and calls the message handler.

Attributes:

Source code in src/async_kernel/interface/zmq.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
class ZMQKernelInterface(BaseKernelInterface):
    "An interface for the kernel that uses zmq sockets."

    _zmq_context = Fixed(zmq.Context)
    _interrupt_requested: bool | Literal["FORCE"] = False
    _iopub_url = "inproc://iopub-capture"

    sockets: Fixed[Self, dict[Channel, zmq.Socket]] = Fixed(dict)
    ""
    ports: Fixed[Self, dict[Channel, int]] = Fixed(dict)
    ""
    ip = Unicode()
    """
    The kernel's IP address [default localhost].

    If the IP address is something other than localhost, then Consoles on other machines 
    will be able to connect to the Kernel, so be careful!
    """
    session = Fixed(Session)
    "Handles serialization and sending of messages."

    transport: CaselessStrEnum[str] = CaselessStrEnum(
        ["tcp", "ipc"] if sys.platform == "linux" else ["tcp"], default_value="tcp"
    )
    "Transport for sockets."

    backend: traitlets.Container[Backend] = UseEnum(Backend)  # pyright: ignore[reportAssignmentType]
    "The the backend used to provide the shell event loop."

    backend_options = Dict(allow_none=True)
    "The `backend_options` to use with [anyio.run][]."

    @default("backend_options")
    def _default_backend_options(self):
        if importlib.util.find_spec("winloop") or importlib.util.find_spec("uvloop"):
            return {"use_uvloop": True}
        return None  # pragma: no cover

    def start(self):
        """
        Start the kernel blocking until the kernel stops.

        Warning:
            - Running the kernel in a thread other than the 'MainThread' is permitted, but discouraged.
            - Blocking calls can only be interrupted in the 'MainThread' because [*'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'*](https://docs.python.org/3/library/threading.html#module-threading).
            - Some libraries may assume the call is occurring in the 'MainThread'.
            - If there is an `asyncio` or `trio` event loop already running in the 'MainThread. Use `async with kernel` instead.
        """

        async def run_kernel() -> None:
            async with self.kernel:
                await self.wait_exit

        anyio.run(run_kernel, backend=self.backend, backend_options=self.backend_options)

    @override
    def load_connection_info(self, info: dict[str, Any]) -> None:
        """
        Load connection info from a dict containing connection info.

        Typically this data comes from a connection file
        and is called by load_connection_file.

        Args:
            info: Dictionary containing connection_info. See the connection_file spec for details.
        """
        if self.ports:
            msg = "Connection info is already loaded!"
            raise RuntimeError(msg)
        self.transport = info.get("transport", self.transport)
        self.ip = info.get("ip") or self.ip
        for channel in Channel:
            name = f"{channel}_port"
            if channel not in self.ports and name in info:
                self.ports[channel] = info[name]
        if "key" in info:
            key = info["key"]
            if isinstance(key, str):
                key = key.encode()
            assert isinstance(key, bytes)

            self.session.key = key
        if "signature_scheme" in info:
            self.session.signature_scheme = info["signature_scheme"]

    @traitlets.validate("ip")
    def _validate_ip(self, proposal) -> str:
        return "0.0.0.0" if (val := proposal["value"]) == "*" else val

    @traitlets.default("ip")
    def _default_ip(self) -> str:
        return str(self.kernel.connection_file) + "-ipc" if self.transport == "ipc" else localhost()

    @override
    @asynccontextmanager
    async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
        """Create caller, and open socketes."""
        sig = restore_io = None
        caller = Caller("manual", name="Shell", protected=True, log=self.kernel.log, zmq_context=self._zmq_context)
        self.callers[Channel.shell] = caller
        self.callers[Channel.control] = caller.get(name="Control", log=self.kernel.log, protected=True)
        start = Event()
        self.anyio_backend = Backend(current_async_library())
        try:
            async with caller:
                self._start_hb_iopub_shell_control_threads(start)
                with self._bind_socket(Channel.stdin):
                    assert len(self.sockets) == len(Channel)
                    self._write_connection_file()
                    restore_io = self._patch_io()
                    with contextlib.suppress(ValueError):
                        sig = signal.signal(signal.SIGINT, self._signal_handler)
                    start.set()
                    yield self
        finally:
            start.set()
            if sig:
                signal.signal(signal.SIGINT, sig)
            if restore_io:
                restore_io()
            self._zmq_context.term()

    def _start_hb_iopub_shell_control_threads(self, start: Event) -> None:
        def heartbeat(ready: Event) -> None:
            # ref: https://jupyter-client.readthedocs.io/en/stable/messaging.html#heartbeat-for-kernels
            async_kernel.utils.mark_thread_pydev_do_not_trace()
            with self._bind_socket(Channel.heartbeat) as socket:
                ready.set()
                try:
                    zmq.proxy(socket, socket)
                except zmq.ContextTerminated:
                    return

        def pub_proxy(ready: Event) -> None:
            utils.mark_thread_pydev_do_not_trace()

            # We use an internal proxy to collect pub messages for distribution.
            # Each thread needs to open its own socket to publish to the internal proxy.
            # Ref: https://zguide.zeromq.org/docs/chapter2/#Working-with-Messages (fig 14)

            frontend: zmq.Socket = self._zmq_context.socket(zmq.XSUB)
            frontend.bind(Caller.iopub_url)

            # Capture broadcasts messages received on both frontend and backend
            capture = self._zmq_context.socket(zmq.PUB)
            capture.bind(self._iopub_url)
            threading.Thread(target=self._pub_capture).start()

            with self._bind_socket(Channel.iopub) as iopub_socket:
                ready.set()
                try:
                    zmq.proxy(frontend, iopub_socket, capture)
                except (zmq.ContextTerminated, Exception):
                    pass
            frontend.close(linger=50)
            capture.close(linger=50)

        hb_ready, iopub_ready = (Event(), Event())
        threading.Thread(target=heartbeat, name="heartbeat", args=[hb_ready]).start()
        hb_ready.wait()
        threading.Thread(target=pub_proxy, name="iopub proxy", args=[iopub_ready]).start()
        iopub_ready.wait()
        # message loops
        for channel in [Channel.shell, Channel.control]:
            ready = Event()
            name = f"{channel}-receive_msg_loop"
            threading.Thread(target=self.receive_msg_loop, name=name, args=(channel, ready, start)).start()
            ready.wait()

    def _pub_capture(self):
        """
        Capture connection messages on iopub.

        Will send an 'iopub_welcome' whenever a socket subscribes to the iopub socket [ref](https://jupyter-client.readthedocs.io/en/stable/messaging.html#welcome-message).
        """

        utils.mark_thread_pydev_do_not_trace()

        socket: zmq.Socket = self._zmq_context.socket(zmq.SUB)
        socket.linger = 0
        socket.connect(self._iopub_url)
        # welcome_message:  https://jupyter.org/enhancement-proposals/65-jupyter-xpub/jupyter-xpub.html#replace-pub-socket-with-xpub-socket
        # Only subscribe to the 'pub subscribe' topic byte `1` (byte `0` is 'pub unsubscribe').
        socket.subscribe(b"\x01")
        with socket:
            while True:
                try:
                    if frames := socket.recv_multipart():
                        frame = next(iter(frames))
                        if frame[0] == 1:
                            msg = self.msg("iopub_welcome", content={"subscription": frame[1:].decode()})
                            self.iopub_send(msg, parent=None)
                except zmq.ContextTerminated:
                    break
                except Exception:
                    continue

    @contextlib.contextmanager
    def _bind_socket(self, channel: Channel) -> Generator[Any | Socket[Any], Any, None]:
        """
        Bind a zmq.Socket storing a reference to the socket and the port
        details and closing the socket on leaving the context.
        """
        match channel:
            case Channel.shell | Channel.control | Channel.heartbeat | Channel.stdin:
                socket_type = zmq.ROUTER
            case Channel.iopub:
                socket_type = zmq.XPUB
        socket: zmq.Socket = self._zmq_context.socket(socket_type)
        socket.linger = 50
        port = bind_socket(socket=socket, transport=self.transport, ip=self.ip, port=self.ports.get(channel, 0))  # pyright: ignore[reportArgumentType]
        self.ports[channel] = port
        self.log.debug("%s socket on port: %i", channel, port)
        self.sockets[channel] = socket
        try:
            yield socket
        finally:
            socket.close(linger=50)
            self.sockets.pop(channel)

    def _write_connection_file(
        self,
    ) -> None:
        """Write connection info to JSON dict in kernel.connection_file."""
        if not (path := self.kernel.connection_file).exists():
            path.parent.mkdir(parents=True, exist_ok=True)
            write_connection_file(
                str(path),
                transport=self.transport,
                ip=self.ip,
                key=self.session.key,
                signature_scheme=self.session.signature_scheme,
                kernel_name=self.kernel.kernel_name,
                **{f"{channel}_port": self.ports[channel] for channel in Channel},
            )
            ip_files: list[pathlib.Path] = []
            if self.transport == "ipc":
                for s in self.sockets.values():
                    f = pathlib.Path(s.get_string(zmq.LAST_ENDPOINT).removeprefix("ipc://"))
                    assert f.exists()
                    ip_files.append(f)

            def cleanup_file_files() -> None:
                path.unlink(missing_ok=True)
                for f in ip_files:
                    f.unlink(missing_ok=True)

            atexit.register(cleanup_file_files)

    @override
    def input_request(self, prompt: str, *, password=False) -> Any:
        job = async_kernel.utils.get_job()
        if not job["msg"].get("content", {}).get("allow_stdin", False):
            msg = "Stdin is not allowed in this context!"
            raise StdinNotImplementedError(msg)
        socket = self.sockets[Channel.stdin]
        # Clear messages on the stdin socket
        while socket.get(SocketOption.EVENTS) & PollEvent.POLLIN:  # pyright: ignore[reportOperatorIssue]
            socket.recv_multipart(flags=Flag.DONTWAIT, copy=False)
        # Send the input request.
        assert self is not None
        self.session.send(
            stream=socket,
            msg_or_type="input_request",
            content={"prompt": prompt, "password": password},
            parent=job["msg"],  # pyright: ignore[reportArgumentType]
            ident=job["ident"],
        )
        # Poll for a reply.
        while not (socket.poll(100) & PollEvent.POLLIN):
            if self.last_interrupt_frame:
                raise KernelInterruptError
        return self.session.recv(socket)[1]["content"]["value"]  # pyright: ignore[reportOptionalSubscript]

    @override
    def iopub_send(
        self,
        msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
        *,
        content: Content | None = None,
        metadata: dict[str, Any] | None = None,
        parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
        ident: bytes | list[bytes] | None = None,
        buffers: list[bytes] | None = None,
    ) -> None:
        """Send a message on the zmq iopub socket."""
        if socket := Caller.iopub_sockets.get(t_ident := Caller.current_ident()):
            msg = self.session.send(
                stream=socket,
                msg_or_type=msg_or_type,  # pyright: ignore[reportArgumentType]
                content=content,
                metadata=metadata,
                parent=parent if parent is not NoValue else utils.get_parent(),  # pyright: ignore[reportArgumentType]
                ident=ident,
                buffers=buffers,  # pyright: ignore[reportArgumentType]
            )
            if msg:
                self.log.debug("iopub_send: msg_type:'%s', content: %s", msg["header"]["msg_type"], msg["content"])
        elif (caller := self.callers.get(Channel.control)) and caller.ident != t_ident:
            caller.call_direct(
                self.iopub_send,
                msg_or_type=msg_or_type,
                content=content,
                metadata=metadata,
                parent=parent if parent is not NoValue else None,
                ident=ident,
                buffers=buffers,
            )

    def receive_msg_loop(self, channel: Literal[Channel.control, Channel.shell], ready: Event, start: Event) -> None:
        "Opens a zmq socket for the channel, receives messages and calls the message handler."

        if not utils.LAUNCHED_BY_DEBUGPY:
            utils.mark_thread_pydev_do_not_trace()

        session, log, message_handler = self.session, self.log, self.kernel.msg_handler
        with self._bind_socket(channel) as socket:
            lock = BinarySemaphore()

            async def send_reply(job: Job, content: dict, /) -> None:
                if "status" not in content:
                    content["status"] = "ok"
                async with lock:
                    msg = session.send(
                        stream=socket,
                        msg_or_type=job["msg"]["header"]["msg_type"].replace("request", "reply"),
                        content=content,
                        parent=job["msg"],  # pyright: ignore[reportArgumentType]
                        ident=job["ident"],
                        buffers=content.pop("buffers", None),
                    )
                    if msg:
                        log.debug("*** send_reply %s*** %s", channel, msg)

            ready.set()
            start.wait()
            while True:
                try:
                    ident, msg = session.recv(socket, mode=zmq.BLOCKY, copy=False)
                    msg["channel"] = channel  # pyright: ignore[reportOptionalSubscript]
                    job = Job(received_time=time.monotonic(), msg=msg, ident=ident)  # pyright: ignore[reportArgumentType]
                    message_handler(channel, MsgType(job["msg"]["header"]["msg_type"]), job, send_reply)
                except zmq.ContextTerminated:
                    break
                except Exception as e:
                    log.debug("Bad message on %s: %s", channel, e)
                    continue

    @enable_signal_safety
    def _signal_handler(self, signum, frame: FrameType | None) -> None:
        "Handle interrupt signals."

        match self._interrupt_requested:
            case "FORCE":
                self._interrupt_requested = False
                raise KernelInterruptError
            case True:
                if frame and frame.f_locals is self.kernel.shell.user_ns:
                    self._interrupt_requested = False
                    raise KernelInterruptError
                self.last_interrupt_frame = frame

                def clearlast_interrupt_frame():
                    if self.last_interrupt_frame is frame:
                        self.last_interrupt_frame = None

                def re_raise():
                    if self.last_interrupt_frame is frame:
                        self._interrupt_now(force=True)

                # Race to check if the main thread should be interrupted.
                self.callers[Channel.shell].call_direct(clearlast_interrupt_frame)
                self.callers[Channel.control].call_later(1, re_raise)
            case False:
                signal.default_int_handler(signum, frame)

    def _interrupt_now(self, *, force=False):
        """
        Request an interrupt of the currently running shell thread.

        If called from the main thread, sets the interrupt request flag and sends a SIGINT signal
        to the current process. On Windows, uses `signal.raise_signal`; on other platforms, uses `os.kill`.
        If `force` is True, sets the interrupt request flag to "FORCE".

        Args:
            force: If True, requests a forced interrupt. Defaults to False.
        """
        # Restricted this to when the shell is running in the main thread.
        if self.callers[Channel.shell].ident == Caller.MAIN_THREAD_IDENT:
            self._interrupt_requested = "FORCE" if force else True
            if sys.platform == "win32":
                signal.raise_signal(signal.SIGINT)
                time.sleep(0)
            else:
                os.kill(os.getpid(), signal.SIGINT)

    @override
    def interrupt(self):
        self._interrupt_now()
        super().interrupt()

sockets class-attribute instance-attribute

sockets: Fixed[Self, dict[Channel, Socket]] = Fixed(dict)

ports class-attribute instance-attribute

ip class-attribute instance-attribute

ip = Unicode()

The kernel's IP address [default localhost].

If the IP address is something other than localhost, then Consoles on other machines will be able to connect to the Kernel, so be careful!

session class-attribute instance-attribute

session = Fixed(Session)

Handles serialization and sending of messages.

transport class-attribute instance-attribute

transport: CaselessStrEnum[str] = CaselessStrEnum(
    ["tcp", "ipc"] if platform == "linux" else ["tcp"], default_value="tcp"
)

Transport for sockets.

backend class-attribute instance-attribute

backend: Container[Backend] = UseEnum(Backend)

The the backend used to provide the shell event loop.

backend_options class-attribute instance-attribute

backend_options = Dict(allow_none=True)

The backend_options to use with anyio.run.

start

start()

Start the kernel blocking until the kernel stops.

Warning
  • Running the kernel in a thread other than the 'MainThread' is permitted, but discouraged.
  • Blocking calls can only be interrupted in the 'MainThread' because 'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'.
  • Some libraries may assume the call is occurring in the 'MainThread'.
  • If there is an asyncio or trio event loop already running in the 'MainThread. Use async with kernel instead.
Source code in src/async_kernel/interface/zmq.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def start(self):
    """
    Start the kernel blocking until the kernel stops.

    Warning:
        - Running the kernel in a thread other than the 'MainThread' is permitted, but discouraged.
        - Blocking calls can only be interrupted in the 'MainThread' because [*'threads cannot be destroyed, stopped, suspended, resumed, or interrupted'*](https://docs.python.org/3/library/threading.html#module-threading).
        - Some libraries may assume the call is occurring in the 'MainThread'.
        - If there is an `asyncio` or `trio` event loop already running in the 'MainThread. Use `async with kernel` instead.
    """

    async def run_kernel() -> None:
        async with self.kernel:
            await self.wait_exit

    anyio.run(run_kernel, backend=self.backend, backend_options=self.backend_options)

load_connection_info

load_connection_info(info: dict[str, Any]) -> None

Load connection info from a dict containing connection info.

Typically this data comes from a connection file and is called by load_connection_file.

Parameters:

  • info

    (dict[str, Any]) –

    Dictionary containing connection_info. See the connection_file spec for details.

Source code in src/async_kernel/interface/zmq.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@override
def load_connection_info(self, info: dict[str, Any]) -> None:
    """
    Load connection info from a dict containing connection info.

    Typically this data comes from a connection file
    and is called by load_connection_file.

    Args:
        info: Dictionary containing connection_info. See the connection_file spec for details.
    """
    if self.ports:
        msg = "Connection info is already loaded!"
        raise RuntimeError(msg)
    self.transport = info.get("transport", self.transport)
    self.ip = info.get("ip") or self.ip
    for channel in Channel:
        name = f"{channel}_port"
        if channel not in self.ports and name in info:
            self.ports[channel] = info[name]
    if "key" in info:
        key = info["key"]
        if isinstance(key, str):
            key = key.encode()
        assert isinstance(key, bytes)

        self.session.key = key
    if "signature_scheme" in info:
        self.session.signature_scheme = info["signature_scheme"]

__asynccontextmanager__ async

__asynccontextmanager__() -> AsyncGenerator[Self]

Create caller, and open socketes.

Source code in src/async_kernel/interface/zmq.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@override
@asynccontextmanager
async def __asynccontextmanager__(self) -> AsyncGenerator[Self]:
    """Create caller, and open socketes."""
    sig = restore_io = None
    caller = Caller("manual", name="Shell", protected=True, log=self.kernel.log, zmq_context=self._zmq_context)
    self.callers[Channel.shell] = caller
    self.callers[Channel.control] = caller.get(name="Control", log=self.kernel.log, protected=True)
    start = Event()
    self.anyio_backend = Backend(current_async_library())
    try:
        async with caller:
            self._start_hb_iopub_shell_control_threads(start)
            with self._bind_socket(Channel.stdin):
                assert len(self.sockets) == len(Channel)
                self._write_connection_file()
                restore_io = self._patch_io()
                with contextlib.suppress(ValueError):
                    sig = signal.signal(signal.SIGINT, self._signal_handler)
                start.set()
                yield self
    finally:
        start.set()
        if sig:
            signal.signal(signal.SIGINT, sig)
        if restore_io:
            restore_io()
        self._zmq_context.term()

iopub_send

iopub_send(
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None

Send a message on the zmq iopub socket.

Source code in src/async_kernel/interface/zmq.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
@override
def iopub_send(
    self,
    msg_or_type: Message[dict[str, Any]] | dict[str, Any] | str,
    *,
    content: Content | None = None,
    metadata: dict[str, Any] | None = None,
    parent: dict[str, Any] | MsgHeader | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    ident: bytes | list[bytes] | None = None,
    buffers: list[bytes] | None = None,
) -> None:
    """Send a message on the zmq iopub socket."""
    if socket := Caller.iopub_sockets.get(t_ident := Caller.current_ident()):
        msg = self.session.send(
            stream=socket,
            msg_or_type=msg_or_type,  # pyright: ignore[reportArgumentType]
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else utils.get_parent(),  # pyright: ignore[reportArgumentType]
            ident=ident,
            buffers=buffers,  # pyright: ignore[reportArgumentType]
        )
        if msg:
            self.log.debug("iopub_send: msg_type:'%s', content: %s", msg["header"]["msg_type"], msg["content"])
    elif (caller := self.callers.get(Channel.control)) and caller.ident != t_ident:
        caller.call_direct(
            self.iopub_send,
            msg_or_type=msg_or_type,
            content=content,
            metadata=metadata,
            parent=parent if parent is not NoValue else None,
            ident=ident,
            buffers=buffers,
        )

receive_msg_loop

receive_msg_loop(channel: Literal[control, shell], ready: Event, start: Event) -> None

Opens a zmq socket for the channel, receives messages and calls the message handler.

Source code in src/async_kernel/interface/zmq.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def receive_msg_loop(self, channel: Literal[Channel.control, Channel.shell], ready: Event, start: Event) -> None:
    "Opens a zmq socket for the channel, receives messages and calls the message handler."

    if not utils.LAUNCHED_BY_DEBUGPY:
        utils.mark_thread_pydev_do_not_trace()

    session, log, message_handler = self.session, self.log, self.kernel.msg_handler
    with self._bind_socket(channel) as socket:
        lock = BinarySemaphore()

        async def send_reply(job: Job, content: dict, /) -> None:
            if "status" not in content:
                content["status"] = "ok"
            async with lock:
                msg = session.send(
                    stream=socket,
                    msg_or_type=job["msg"]["header"]["msg_type"].replace("request", "reply"),
                    content=content,
                    parent=job["msg"],  # pyright: ignore[reportArgumentType]
                    ident=job["ident"],
                    buffers=content.pop("buffers", None),
                )
                if msg:
                    log.debug("*** send_reply %s*** %s", channel, msg)

        ready.set()
        start.wait()
        while True:
            try:
                ident, msg = session.recv(socket, mode=zmq.BLOCKY, copy=False)
                msg["channel"] = channel  # pyright: ignore[reportOptionalSubscript]
                job = Job(received_time=time.monotonic(), msg=msg, ident=ident)  # pyright: ignore[reportArgumentType]
                message_handler(channel, MsgType(job["msg"]["header"]["msg_type"]), job, send_reply)
            except zmq.ContextTerminated:
                break
            except Exception as e:
                log.debug("Bad message on %s: %s", channel, e)
                continue