Skip to content

shell

Defines the shell base class.

Classes:

  • BaseShell

    The base shell implementation.

BaseShell

Bases: HasInterface[T_interface_co], LoggingConfigurable, Generic[T_interface_co]

The base shell implementation.

This should be the left most inherited class to be used.

Methods:

Attributes:

Source code in src/async_kernel/shell/base.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class BaseShell(HasInterface[T_interface_co], LoggingConfigurable, Generic[T_interface_co]):
    """
    The base shell implementation.

    This should be the left most inherited class to be used.
    """

    kernel: Fixed[Self, Kernel[T_interface_co, Self]] = Fixed(lambda c: c["owner"].parent.kernel)  # pyright: ignore[reportAttributeAccessIssue]
    ""

    pending_manager = Fixed(ShellPendingManager)
    """
    Provides the `subshell_id` for the shell which add all consenting pending created in
    the context of the shell.
    """

    user_ns_hidden: Fixed[Self, dict] = Fixed(dict)
    ""

    timeout = traitlets.CFloat(0.0).tag(config=True)
    "A timeout in seconds to complete execute requests."

    stop_on_error_time_offset = traitlets.Float(0.0).tag(config=True)
    "An offset to add to the cancellation time to catch late arriving execute requests."

    protected = traitlets.Bool(read_only=True)
    "Protect from accidental deletion."

    is_mainshell = traitlets.Bool(False, read_only=True)
    "Set by the mainshell to indicate it is the main shell."

    subshell_id: Fixed[Self, str | None] = Fixed(
        lambda c: None if c["owner"].is_mainshell else c["owner"].pending_manager.id
    )
    "Used to identify the subshell."

    _execution_count = 0
    _resetting = False
    _user_ns: Fixed[Self, dict] = Fixed(dict)
    _stop_on_error_info: Fixed[Self, dict[Literal["time", "execution_count"], Any]] = Fixed(dict)

    @property
    def user_global_ns(self) -> dict:
        if self.is_mainshell:
            return self.user_ns
        return (
            self.kernel.main_shell.user_global_ns.copy() if self._resetting else self.kernel.main_shell.user_global_ns
        )

    @property
    def banner(self) -> str:
        return (
            f"async-kernel v{async_kernel.__version__} name:{self.parent.name!r} backend:{str(self.parent.backend)!r}"
        )

    @property
    def execution_count(self) -> int:
        return self._execution_count

    @execution_count.setter
    def execution_count(self, value) -> None:
        return

    def _get_default_ns(self) -> dict[str, Any]:
        return {}

    @property
    def user_ns(self) -> dict[Any, Any]:
        ns = self._user_ns
        if "_ih" not in self._user_ns:
            ns.update(self._get_default_ns())
        return ns

    @user_ns.setter
    def user_ns(self, ns) -> None:
        ns = dict(ns)
        self.user_ns_hidden.clear()
        self._user_ns.clear()
        ns_ = self._get_default_ns()
        self.user_ns_hidden.update(ns_)
        self._user_ns.update(ns_)
        self._user_ns.update(ns)

    @override
    def __repr__(self) -> str:
        protected = " 🔐" if self.protected else ""
        if self.is_mainshell:
            return f"<{self.__class__.__name__}{protected}: Main Shell>"
        return f"<{self.__class__.__name__}{protected}: Subshell ({self.subshell_id})>"

    def __init__(self, *, protected: bool = False, **kwargs) -> None:

        if kwargs.pop("is_mainshell", False):
            self.set_trait("is_mainshell", True)
        super(LoggingConfigurable, self).__init__(**kwargs)
        self.set_trait("protected", protected)
        self.kernel._shell_created(self)  # pyright: ignore[reportPrivateUsage]
        if self.subshell_id:
            self.initialize()

    @contextlib.asynccontextmanager
    async def mainshell_running(self):
        # Used in the context of `Kernel._run`
        assert self.is_mainshell
        restore = self.apply_patches()
        try:
            self.initialize()
            self.log.info("Main shell started")
            yield
        finally:
            self.stop(force=True)
            self.log.info("Main shell stopped")
            restore()

    def apply_patches(self) -> Callable[[], None]:
        return lambda: None

    def initialize(self) -> None:
        self.initialize = lambda: None  # Replace with a dummy patch.
        self.init_builtins()

    def init_builtins(self) -> None:
        pass

    def stop(self, *, force: bool = False) -> None:
        """
        Stop the shell.

        Args:
            force: Force a protected shell to stop.
        """

        if self.protected and not force:
            return
        if not self.is_mainshell:
            self.kernel._subshell_stopped(self)  # pyright: ignore[reportPrivateUsage]
        self.reset(new_session=False)

    def reset(self, new_session=True, aggressive=False) -> None:
        "Reset the shell, cancelling all associated pending."
        if not self._resetting:
            self._resetting = True
            try:
                for pen in self.pending_manager.pending:
                    pen.cancel()
                if new_session:
                    self._execution_count = 0
                    self._stop_on_error_info.clear()
            finally:
                self._resetting = False

    def get_ipython(self) -> BaseShell:
        """Return the shell for the current context."""
        return async_kernel.utils.get_ipython()

    @contextlib.contextmanager
    def context(self) -> Generator[None, Any, None]:
        "A context manager where the shell is active."
        with self.pending_manager.context():
            yield

    def displayhook(self, result: Any) -> None:
        """
        Publish execution results.

        This is invoked every time the interpreter needs to print, and is
        activated by setting the variable sys.displayhook to it.
        """
        if result is not None and utils.show_result_enabled():
            content = {}
            content["execution_count"] = self.execution_count
            content["data"] = repr(result)
            content["metadata"] = {}
            self.parent.iopub_send("execute_result", content=content)

    async def do_execute(
        self,
        code: str = "",
        *,
        silent: bool = False,
        store_history: bool = False,
        user_expressions: dict[str, str] | None = None,
        allow_stdin: bool = False,
        stop_on_error: bool = False,
        cell_id: str | None = None,
        received_time: float = 0,
        tags: Iterable[str] = (),
        **_ignored,
    ) -> Content:
        """
        Execute code in the shell.
        """
        raise NotImplementedError

    async def do_complete(self, code: str, cursor_pos: int | None = None) -> Content:
        ""
        raise NotImplementedError

    async def is_complete(self, code: str) -> Content:
        ""
        raise NotImplementedError

    async def do_inspect(self, code: str, cursor_pos: int = 0, detail_level: Literal[0, 1] = 0) -> Content:
        ""
        raise NotImplementedError

    async def do_history(
        self,
        *,
        output: bool = False,
        raw: bool = True,
        hist_access_type: str,
        session: int = 0,
        start: int = 1,
        stop: int | None = None,
        n: int = 10,
        pattern: str = "*",
        unique: bool = False,
        **_ignored,
    ) -> Content:
        ""
        raise NotImplementedError

kernel class-attribute instance-attribute

kernel: Fixed[Self, Kernel[T_interface_co, Self]] = Fixed(
    lambda c: c["owner"].parent.kernel
)

pending_manager class-attribute instance-attribute

pending_manager = Fixed(ShellPendingManager)

Provides the subshell_id for the shell which add all consenting pending created in the context of the shell.

user_ns_hidden class-attribute instance-attribute

user_ns_hidden: Fixed[Self, dict] = Fixed(dict)

timeout class-attribute instance-attribute

timeout = traitlets.CFloat(0.0).tag(config=True)

A timeout in seconds to complete execute requests.

stop_on_error_time_offset class-attribute instance-attribute

stop_on_error_time_offset = traitlets.Float(0.0).tag(config=True)

An offset to add to the cancellation time to catch late arriving execute requests.

protected class-attribute instance-attribute

protected = traitlets.Bool(read_only=True)

Protect from accidental deletion.

is_mainshell class-attribute instance-attribute

is_mainshell = traitlets.Bool(False, read_only=True)

Set by the mainshell to indicate it is the main shell.

subshell_id class-attribute instance-attribute

subshell_id: Fixed[Self, str | None] = Fixed(
    lambda c: None if c["owner"].is_mainshell else c["owner"].pending_manager.id
)

Used to identify the subshell.

stop

stop(*, force: bool = False) -> None

Stop the shell.

Parameters:

  • force

    (bool, default: False ) –

    Force a protected shell to stop.

Source code in src/async_kernel/shell/base.py
159
160
161
162
163
164
165
166
167
168
169
170
171
def stop(self, *, force: bool = False) -> None:
    """
    Stop the shell.

    Args:
        force: Force a protected shell to stop.
    """

    if self.protected and not force:
        return
    if not self.is_mainshell:
        self.kernel._subshell_stopped(self)  # pyright: ignore[reportPrivateUsage]
    self.reset(new_session=False)

reset

reset(new_session=True, aggressive=False) -> None

Reset the shell, cancelling all associated pending.

Source code in src/async_kernel/shell/base.py
173
174
175
176
177
178
179
180
181
182
183
184
def reset(self, new_session=True, aggressive=False) -> None:
    "Reset the shell, cancelling all associated pending."
    if not self._resetting:
        self._resetting = True
        try:
            for pen in self.pending_manager.pending:
                pen.cancel()
            if new_session:
                self._execution_count = 0
                self._stop_on_error_info.clear()
        finally:
            self._resetting = False

get_ipython

get_ipython() -> BaseShell

Return the shell for the current context.

Source code in src/async_kernel/shell/base.py
186
187
188
def get_ipython(self) -> BaseShell:
    """Return the shell for the current context."""
    return async_kernel.utils.get_ipython()

context

context() -> Generator[None, Any, None]

A context manager where the shell is active.

Source code in src/async_kernel/shell/base.py
190
191
192
193
194
@contextlib.contextmanager
def context(self) -> Generator[None, Any, None]:
    "A context manager where the shell is active."
    with self.pending_manager.context():
        yield

displayhook

displayhook(result: Any) -> None

Publish execution results.

This is invoked every time the interpreter needs to print, and is activated by setting the variable sys.displayhook to it.

Source code in src/async_kernel/shell/base.py
196
197
198
199
200
201
202
203
204
205
206
207
208
def displayhook(self, result: Any) -> None:
    """
    Publish execution results.

    This is invoked every time the interpreter needs to print, and is
    activated by setting the variable sys.displayhook to it.
    """
    if result is not None and utils.show_result_enabled():
        content = {}
        content["execution_count"] = self.execution_count
        content["data"] = repr(result)
        content["metadata"] = {}
        self.parent.iopub_send("execute_result", content=content)

do_execute async

do_execute(
    code: str = "",
    *,
    silent: bool = False,
    store_history: bool = False,
    user_expressions: dict[str, str] | None = None,
    allow_stdin: bool = False,
    stop_on_error: bool = False,
    cell_id: str | None = None,
    received_time: float = 0,
    tags: Iterable[str] = (),
    **_ignored,
) -> Content

Execute code in the shell.

Source code in src/async_kernel/shell/base.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def do_execute(
    self,
    code: str = "",
    *,
    silent: bool = False,
    store_history: bool = False,
    user_expressions: dict[str, str] | None = None,
    allow_stdin: bool = False,
    stop_on_error: bool = False,
    cell_id: str | None = None,
    received_time: float = 0,
    tags: Iterable[str] = (),
    **_ignored,
) -> Content:
    """
    Execute code in the shell.
    """
    raise NotImplementedError

do_complete async

do_complete(code: str, cursor_pos: int | None = None) -> Content
Source code in src/async_kernel/shell/base.py
229
230
231
async def do_complete(self, code: str, cursor_pos: int | None = None) -> Content:
    ""
    raise NotImplementedError

is_complete async

is_complete(code: str) -> Content
Source code in src/async_kernel/shell/base.py
233
234
235
async def is_complete(self, code: str) -> Content:
    ""
    raise NotImplementedError

do_inspect async

do_inspect(code: str, cursor_pos: int = 0, detail_level: Literal[0, 1] = 0) -> Content
Source code in src/async_kernel/shell/base.py
237
238
239
async def do_inspect(self, code: str, cursor_pos: int = 0, detail_level: Literal[0, 1] = 0) -> Content:
    ""
    raise NotImplementedError

do_history async

do_history(
    *,
    output: bool = False,
    raw: bool = True,
    hist_access_type: str,
    session: int = 0,
    start: int = 1,
    stop: int | None = None,
    n: int = 10,
    pattern: str = "*",
    unique: bool = False,
    **_ignored,
) -> Content
Source code in src/async_kernel/shell/base.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
async def do_history(
    self,
    *,
    output: bool = False,
    raw: bool = True,
    hist_access_type: str,
    session: int = 0,
    start: int = 1,
    stop: int | None = None,
    n: int = 10,
    pattern: str = "*",
    unique: bool = False,
    **_ignored,
) -> Content:
    ""
    raise NotImplementedError