Skip to content

async_kernel.utils

Functions:

async_kernel.utils.mark_thread_pydev_do_not_trace

mark_thread_pydev_do_not_trace(thread: Thread, *, remove=False)

Modifies the given thread's attributes to hide or unhide it from the debugger (e.g., debugpy).

Source code in src/async_kernel/utils.py
43
44
45
def mark_thread_pydev_do_not_trace(thread: threading.Thread, *, remove=False):
    """Modifies the given thread's attributes to hide or unhide it from the debugger (e.g., debugpy)."""
    thread.pydev_do_not_trace = not remove  # pyright: ignore[reportAttributeAccessIssue]

async_kernel.utils.do_not_debug_this_thread

do_not_debug_this_thread()

A context to mark the thread for debugpy to not debug.

Source code in src/async_kernel/utils.py
48
49
50
51
52
53
54
55
56
57
@contextlib.contextmanager
def do_not_debug_this_thread():
    "A context to mark the thread for debugpy to not debug."
    if not LAUNCHED_BY_DEBUGPY:
        mark_thread_pydev_do_not_trace(threading.current_thread())
    try:
        yield
    finally:
        if not LAUNCHED_BY_DEBUGPY:
            mark_thread_pydev_do_not_trace(threading.current_thread(), remove=True)

async_kernel.utils.wait_thread_event async

wait_thread_event(thread_event: Event)

Wait for thread_event to be set.

Info

  • On external cancellation the event is set here to prevent the thread from waiting forever.
Source code in src/async_kernel/utils.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def wait_thread_event(thread_event: threading.Event, /):
    """
    Wait for `thread_event` to be set.

    !!! info

        - On external cancellation the `event` is set here to prevent the thread from waiting forever.
    """
    await anyio.sleep(0)
    if thread_event.is_set():
        return

    def _wait_thread_event(thread_event: threading.Event, event: anyio.Event, token):
        thread_event.wait()
        try:
            from_thread.run_sync(event.set, token=token)
        except anyio.RunFinishedError:
            pass

    try:
        event = anyio.Event()
        thread = threading.Thread(target=_wait_thread_event, args=[thread_event, event, current_token()], daemon=True)
        thread.pydev_do_not_trace = not LAUNCHED_BY_DEBUGPY  # pyright: ignore[reportAttributeAccessIssue]
        if not thread_event.is_set():
            thread.start()
            await event.wait()
    finally:
        thread_event.set()

async_kernel.utils.get_kernel

get_kernel() -> Kernel

Get the current kernel.

Source code in src/async_kernel/utils.py
90
91
92
def get_kernel() -> Kernel:
    "Get the current kernel."
    return async_kernel.Kernel()

async_kernel.utils.get_job

get_job() -> Job[dict] | dict

Get the job for the current context.

Source code in src/async_kernel/utils.py
 95
 96
 97
 98
 99
100
def get_job() -> Job[dict] | dict:
    "Get the job for the current context."
    try:
        return _job_var.get()
    except Exception:
        return {}

async_kernel.utils.get_parent

get_parent(job: Job | None = None) -> Message[dict[str, Any]] | None

Get the parent message for the current context.

Source code in src/async_kernel/utils.py
103
104
105
def get_parent(job: Job | None = None, /) -> Message[dict[str, Any]] | None:
    "Get the [parent message]() for the current context."
    return (job or get_job()).get("msg")

async_kernel.utils.get_metadata

get_metadata(job: Job | None = None) -> Mapping[str, Any]

Gets metadata for the current context.

Source code in src/async_kernel/utils.py
108
109
110
def get_metadata(job: Job | None = None, /) -> Mapping[str, Any]:
    "Gets [metadata]() for the current context."
    return (job or get_job()).get("msg", {}).get("metadata", {})

async_kernel.utils.get_tags

get_tags(job: Job | None = None) -> list[str]

Gets the tags for the current context.

Source code in src/async_kernel/utils.py
113
114
115
def get_tags(job: Job | None = None, /) -> list[str]:
    "Gets the [tags]() for the current context."
    return get_metadata(job).get("tags", [])

async_kernel.utils.get_execute_request_timeout

get_execute_request_timeout(job: Job | None = None) -> float | None

Gets the execute_request_timeout for the current context.

Source code in src/async_kernel/utils.py
118
119
120
121
122
123
124
125
def get_execute_request_timeout(job: Job | None = None, /) -> float | None:
    "Gets the execute_request_timeout for the current context."
    try:
        if timeout := get_metadata(job).get(MetadataKeys.timeout):
            return float(timeout)
        return get_kernel().shell.execute_request_timeout
    except Exception:
        return None

async_kernel.utils.get_execution_count

get_execution_count() -> int

Gets the execution count for the current context, defaults to the current kernel count.

Source code in src/async_kernel/utils.py
128
129
130
131
def get_execution_count() -> int:
    "Gets the execution count for the current context, defaults to the current kernel count."

    return _execution_count_var.get(None) or async_kernel.Kernel()._execution_count  # pyright: ignore[reportPrivateUsage]

async_kernel.utils.setattr_nested

setattr_nested(obj: object, name: str, value: str | Any) -> dict[str, Any]

Set a nested attribute of an object.

If the attribute name contains dots, it is interpreted as a nested attribute. For example, if name is "a.b.c", then the code will attempt to set obj.a.b.c to value.

This is primarily intended for use with async_kernel.command.command_line to set the nesteded attributes on on kernels.

Parameters:

  • obj

    (object) –

    The object to set the attribute on.

  • name

    (str) –

    The name of the attribute to set.

  • value

    (str | Any) –

    The value to set the attribute to.

Returns:

  • dict[str, Any]

    The mapping of the name to the set value if the value has been set.

  • dict[str, Any]

    An empty dict indicates the value was not set.

Source code in src/async_kernel/utils.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def setattr_nested(obj: object, name: str, value: str | Any) -> dict[str, Any]:
    """
    Set a nested attribute of an object.

    If the attribute name contains dots, it is interpreted as a nested attribute.
    For example, if name is "a.b.c", then the code will attempt to set obj.a.b.c to value.

    This is primarily intended for use with [async_kernel.command.command_line][]
    to set the nesteded attributes on on kernels.

    Args:
        obj: The object to set the attribute on.
        name: The name of the attribute to set.
        value: The value to set the attribute to.

    Returns:
        The mapping of the name to the set value if the value has been set.
        An empty dict indicates the value was not set.

    """
    if len(bits := name.split(".")) > 1:
        try:
            obj = getattr(obj, bits[0])
        except Exception:
            return {}
        setattr_nested(obj, ".".join(bits[1:]), value)
    if (isinstance(obj, traitlets.HasTraits) and obj.has_trait(name)) or hasattr(obj, name):
        try:
            setattr(obj, name, value)
        except Exception:
            setattr(obj, name, eval(value))
        return {name: getattr(obj, name)}
    return {}