Skip to content

Utils

async_kernel.utils

Functions:

do_not_debug_this_thread

do_not_debug_this_thread(name='')

A context to mark the thread for debugpy to not debug.

Source code in src/async_kernel/utils.py
49
50
51
52
53
54
55
56
57
58
@contextlib.contextmanager
def do_not_debug_this_thread(name=""):
    "A context to mark the thread for debugpy to not debug."
    if not LAUNCHED_BY_DEBUGPY:
        mark_thread_pydev_do_not_trace(threading.current_thread(), name)
    try:
        yield
    finally:
        if not LAUNCHED_BY_DEBUGPY:
            mark_thread_pydev_do_not_trace(threading.current_thread(), remove=True)

get_execute_request_timeout

get_execute_request_timeout(job: Job | None = None) -> float | None

Gets the execute_request_timeout for the current context.

Source code in src/async_kernel/utils.py
111
112
113
114
115
116
117
118
def get_execute_request_timeout(job: Job | None = None, /) -> float | None:
    "Gets the execute_request_timeout for the current context."
    try:
        if timeout := get_metadata(job).get(MetadataKeys.timeout):
            return float(timeout)
        return get_kernel().shell.execute_request_timeout
    except Exception:
        return None

get_execution_count

get_execution_count() -> int

Gets the execution count for the current context, defaults to the current kernel count.

Source code in src/async_kernel/utils.py
121
122
123
124
def get_execution_count() -> int:
    "Gets the execution count for the current context, defaults to the current kernel count."

    return _execution_count_var.get(None) or async_kernel.Kernel()._execution_count  # pyright: ignore[reportPrivateUsage]

get_job

get_job() -> Job[dict] | dict

Get the job for the current context.

Source code in src/async_kernel/utils.py
88
89
90
91
92
93
def get_job() -> Job[dict] | dict:
    "Get the job for the current context."
    try:
        return _job_var.get()
    except Exception:
        return {}

get_kernel

get_kernel() -> Kernel

Get the current kernel.

Source code in src/async_kernel/utils.py
83
84
85
def get_kernel() -> Kernel:
    "Get the current kernel."
    return async_kernel.Kernel()

get_metadata

get_metadata(job: Job | None = None) -> Mapping[str, Any]

Gets metadata for the current context.

Source code in src/async_kernel/utils.py
101
102
103
def get_metadata(job: Job | None = None, /) -> Mapping[str, Any]:
    "Gets [metadata]() for the current context."
    return (job or get_job()).get("msg", {}).get("metadata", {})

get_parent

get_parent(job: Job | None = None) -> Message[dict[str, Any]] | None

Get the parent message for the current context.

Source code in src/async_kernel/utils.py
96
97
98
def get_parent(job: Job | None = None, /) -> Message[dict[str, Any]] | None:
    "Get the [parent message]() for the current context."
    return (job or get_job()).get("msg")

get_tags

get_tags(job: Job | None = None) -> list[str]

Gets the tags for the current context.

Source code in src/async_kernel/utils.py
106
107
108
def get_tags(job: Job | None = None, /) -> list[str]:
    "Gets the [tags]() for the current context."
    return get_metadata(job).get("tags", [])

mark_thread_pydev_do_not_trace

mark_thread_pydev_do_not_trace(thread: Thread, name='', *, remove=False)

Modifies the given thread's attributes to hide or unhide it from the debugger (e.g., debugpy).

Source code in src/async_kernel/utils.py
42
43
44
45
46
def mark_thread_pydev_do_not_trace(thread: threading.Thread, name="", *, remove=False):
    """Modifies the given thread's attributes to hide or unhide it from the debugger (e.g., debugpy)."""
    thread.pydev_do_not_trace = not remove  # pyright: ignore[reportAttributeAccessIssue]
    if name:
        thread.name = name

setattr_nested

setattr_nested(obj: object, name: str, value: str | Any) -> dict[str, Any]

Set a nested attribute of an object.

If the attribute name contains dots, it is interpreted as a nested attribute. For example, if name is "a.b.c", then the code will attempt to set obj.a.b.c to value.

This is primarily intended for use with async_kernel.command.command_line to set the nesteded attributes on on kernels.

Parameters:

  • obj

    (object) –

    The object to set the attribute on.

  • name

    (str) –

    The name of the attribute to set.

  • value

    (str | Any) –

    The value to set the attribute to.

Returns:

  • dict[str, Any]

    The mapping of the name to the set value if the value has been set.

  • dict[str, Any]

    An empty dict indicates the value was not set.

Source code in src/async_kernel/utils.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def setattr_nested(obj: object, name: str, value: str | Any) -> dict[str, Any]:
    """
    Set a nested attribute of an object.

    If the attribute name contains dots, it is interpreted as a nested attribute.
    For example, if name is "a.b.c", then the code will attempt to set obj.a.b.c to value.

    This is primarily intended for use with [async_kernel.command.command_line][]
    to set the nesteded attributes on on kernels.

    Args:
        obj: The object to set the attribute on.
        name: The name of the attribute to set.
        value: The value to set the attribute to.

    Returns:
        The mapping of the name to the set value if the value has been set.
        An empty dict indicates the value was not set.

    """
    if len(bits := name.split(".")) > 1:
        try:
            obj = getattr(obj, bits[0])
        except Exception:
            return {}
        setattr_nested(obj, ".".join(bits[1:]), value)
    if (isinstance(obj, traitlets.HasTraits) and obj.has_trait(name)) or hasattr(obj, name):
        try:
            setattr(obj, name, value)
        except Exception:
            setattr(obj, name, eval(value))
        return {name: getattr(obj, name)}
    return {}

wait_thread_event async

wait_thread_event(event: Event)

Wait for the threading event using an anyio worker thread.

  • If the event is already set anyio.sleep(0) is used instead.
  • On external cancellation the event will be set enable the thread to exit.
Source code in src/async_kernel/utils.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def wait_thread_event(event: threading.Event):
    """
    Wait for the threading event using an anyio worker thread.

    - If the event is already set anyio.sleep(0) is used instead.
    - On external cancellation the event will be set enable the thread to exit.
    """
    if event.is_set():
        # Required  - allow other tasks to run.
        await anyio.sleep(0)
        return

    def _in_thread_call():
        with do_not_debug_this_thread():
            event.wait()

    try:
        await anyio.to_thread.run_sync(_in_thread_call)
    finally:
        event.set()