Skip to content

common

Classes:

  • KernelInterrupt

    Raised to interrupt the kernel.

  • Fixed

    A property-like descriptor factory that always returns the same object.

  • SingleAsyncQueue

    A single-use asynchronous iterator with a queue.

Functions:

  • import_item

    Import an item from a module, given its dotted name.

KernelInterrupt

Bases: InterruptedError

Raised to interrupt the kernel.

Source code in src/async_kernel/common.py
46
47
class KernelInterrupt(InterruptedError):
    "Raised to interrupt the kernel."

Fixed

Bases: Generic[S, T]

A property-like descriptor factory that always returns the same object.

The descriptor is defined with a callable or importable string to a callable.

On first access the object is obtained as the result of the callable, cached and returned. Subsequent access to the property returns the cached result.

Parameters:

  • obj

    (type[T] | Callable[[FixedCreate[S]], T] | str) –

    A class, callable or dotted path.

    The following types are accepted:

    • string: A dotted importable path to class or function to be used as callable.
    • class | callable: Called with zero or one positional argument FixedCreate.
  • created

    (Callable[[FixedCreated[S, T]]] | None, default: None ) –

    A per-instance optional callback that gets called on first-access to the property.

Type Hints
  • S: Type of the owner class.
  • T: Type of the managed class.
Example
class MyClass:
    a: Fixed[Self, dict] = Fixed(dict)
    b: Fixed[Self, dict] = Fixed(lambda c: id(c["owner"].a))
    c: Fixed[Self, list[str]] = Fixed(
        list, created=lambda c: c["obj"].append(c["name"])
    )
Tip

You can use import_item inside a callable to lazy import.

Source code in src/async_kernel/common.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class Fixed(Generic[S, T]):
    """
    A property-like descriptor factory that always returns the same object.

    The descriptor is defined with a callable or importable string to a callable.

    On first access the object is obtained as the result of the callable, cached
    and returned. Subsequent access to the property returns the cached result.

    Args:
        obj:
            A class, callable or dotted path.

            The following types are accepted:

            - string: A dotted importable path to class or function to be used as callable.
            - class | callable: Called with zero or one positional argument [FixedCreate][].

        created: A per-instance optional callback that gets called on first-access to the property.

    Type Hints:
        - ``S``: Type of the owner class.
        - ``T``: Type of the managed class.

    Example:
        ```python
        class MyClass:
            a: Fixed[Self, dict] = Fixed(dict)
            b: Fixed[Self, dict] = Fixed(lambda c: id(c["owner"].a))
            c: Fixed[Self, list[str]] = Fixed(
                list, created=lambda c: c["obj"].append(c["name"])
            )
        ```

    Tip:
        You can use [import_item][] inside a callable to lazy import.
    """

    __slots__ = ["create", "created", "instances", "instances_locks", "name"]

    def __init__(
        self,
        obj: type[T] | Callable[[FixedCreate[S]], T] | str,
        /,
        *,
        created: Callable[[FixedCreated[S, T]]] | None = None,
    ) -> None:
        if callable(obj) or isinstance(obj, str):  # pyright: ignore[reportUnnecessaryIsInstance]
            self.create = obj
            self.created = created
            self.instances = {}
            self.instances_locks = {}
        else:
            msg = f"{obj=} is invalid! Use a lambda instead eg: lambda _: {obj}"  # pyright: ignore[reportUnreachable]
            raise TypeError(msg)

    def __set_name__(self, owner_cls: type[S], name: str) -> None:
        self.name = name

    def __get__(self, obj: S, objtype: type[S] | None = None) -> T:
        if obj is None:
            return self  # pyright: ignore[reportReturnType]
        key = id(obj)
        try:
            return self.instances[key]
        except KeyError:
            try:
                lock = self.instances_locks[key]
            except KeyError:
                lock = self.instances_locks.setdefault(key, create_thread_oncelock())
            lock.acquire()
            try:
                return self.instances[key]
            except KeyError:
                if lock._count > 1:
                    msg = f"Self-referencing creation detected for {obj.__class__.__name__}.{self.name}!"
                    raise RuntimeError(msg) from None
                return self.create_instance(obj, key)
            finally:
                lock.release()

    def create_instance(self, obj: S, key: int) -> T:
        if isinstance(create := self.create, str):
            self.create = create = import_item(create)
        try:
            instance = create()  # pyright: ignore[reportCallIssue, reportAssignmentType]
        except TypeError:
            instance: T = create(FixedCreate(name=self.name, owner=obj))  # pyright: ignore[reportAssignmentType, reportCallIssue]
        self.instances[key] = instance
        self.instances_locks[key] = THREAD_DUMMY_LOCK
        weakref.finalize(obj, self.instances.pop, key)
        weakref.finalize(obj, self.instances_locks.pop, key)
        if self.created:
            try:
                self.created({"owner": obj, "obj": instance, "name": self.name})
            except Exception:
                if log := getattr(obj, "log", None):
                    msg = f"Callback `created` failed for {obj.__class__}.{self.name}"
                    log.exception(msg, extra={"obj": self.created})
        return instance

    def __set__(self, obj: S, value: Self) -> Never:
        # Note: above we use `Self` for the `value` type hint to give a useful typing error
        msg = f"Setting `Fixed` parameter {obj.__class__.__name__}.{self.name} is forbidden!"
        raise AttributeError(msg)

SingleAsyncQueue

Bases: Generic[T]

A single-use asynchronous iterator with a queue.

Notes
  • Append to the queue from anywhere (internally synchronised).
  • The queue will only yield for one async iterator consumer.
  • When SingleAsyncQueue.stop is called:
    • Any items in the queue are immediately rejected.
    • The async iterator is stopped.
  • Items added after stop is called will be rejected immediately.
Usage
q = SingleAsyncQueue(reject=lambda item: print("rejected", item))

# In a task
async for item in q:
    q

# Other threads/tasks
q.append(item)
q.extent([item1, item2])

# Stop the iterator
q.stop()

Methods:

  • stop

    Stop the queue rejecting any items currently in the queue.

  • append

    Append item to the queue.

  • appendleft

    Append item to the left side of the queue.

  • extend

    Append all items in iterable to the queue.

Attributes:

  • stopped (bool) –

    Will return True once stop has been called meaning there are no items left in the queue.

Source code in src/async_kernel/common.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class SingleAsyncQueue(Generic[T]):
    """
    A single-use asynchronous iterator with a queue.

    Notes:
        - Append to the queue from anywhere (internally synchronised).
        - The queue will only yield for one async iterator consumer.
        - When [SingleAsyncQueue.stop][] is called:
            - Any items in the queue are immediately rejected.
            - The async iterator is stopped.
        - Items added after stop is called will be rejected immediately.

    Usage:
        ```python
        q = SingleAsyncQueue(reject=lambda item: print("rejected", item))

        # In a task
        async for item in q:
            q

        # Other threads/tasks
        q.append(item)
        q.extent([item1, item2])

        # Stop the iterator
        q.stop()
        ```
    """

    __slots__ = ["__weakref__", "_active", "_reject", "_resume"]

    _active: bool | None
    queue: Fixed[Self, deque[T]] = Fixed(deque)

    def __init__(self, *, reject: Callable[[T], Any] | None = None) -> None:
        self._resume = noop
        self._active = None
        self._reject = reject

    async def __aiter__(self) -> AsyncGenerator[T]:
        if self._active is not None:
            return
        backend = Backend(current_async_library())
        checkpoint = asyncio_checkpoint if backend is Backend.asyncio else trio_checkpoint
        self._active = True
        queue = self.queue
        try:
            while self._active:
                if queue:
                    yield queue.popleft()
                    await checkpoint()
                else:
                    event = create_async_event()
                    self._resume = event.set
                    if not queue and self._active:
                        await event
                    self._resume = noop
        except IndexError:
            pass
        finally:
            self._resume = noop
            self.stop()

    def stop(self) -> None:
        """
        Stop the queue rejecting any items currently in the queue.
        """
        self._active = False
        if self._reject:
            while True:
                try:
                    self._reject(self.queue.popleft())
                except IndexError:
                    break
        else:
            self.queue.clear()
        self._resume()

    def append(self, item: T, /) -> None:
        """
        Append `item` to the queue.

        If the queue has been stopped `item` will be rejected immediately.
        """
        if self._active is False:
            if self._reject:
                self._reject(item)
        else:
            self.queue.append(item)
            self._resume()

    def appendleft(self, item: T, /) -> None:
        """
        Append `item` to the left side of the queue.

        If the queue has been stopped `item` will be rejected immediately.
        """
        if self._active is False:
            if self._reject:
                self._reject(item)
        else:
            self.queue.appendleft(item)
            self._resume()

    def extend(self, iterable: Iterable[T], /) -> None:
        """
        Append all items in `iterable` to the queue.

        If the queue has been stopped all items in `iterable` will be rejected immediately.
        """
        if self._active is False:
            if self._reject:
                for item in iterable:
                    self._reject(item)
        else:
            self.queue.extend(iterable)
            self._resume()

    @property
    def stopped(self) -> bool:
        """
        Will return `True` once stop has been called meaning there are no items left in the queue.
        """
        return self._active is False

stopped property

stopped: bool

Will return True once stop has been called meaning there are no items left in the queue.

stop

stop() -> None

Stop the queue rejecting any items currently in the queue.

Source code in src/async_kernel/common.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def stop(self) -> None:
    """
    Stop the queue rejecting any items currently in the queue.
    """
    self._active = False
    if self._reject:
        while True:
            try:
                self._reject(self.queue.popleft())
            except IndexError:
                break
    else:
        self.queue.clear()
    self._resume()

append

append(item: T) -> None

Append item to the queue.

If the queue has been stopped item will be rejected immediately.

Source code in src/async_kernel/common.py
235
236
237
238
239
240
241
242
243
244
245
246
def append(self, item: T, /) -> None:
    """
    Append `item` to the queue.

    If the queue has been stopped `item` will be rejected immediately.
    """
    if self._active is False:
        if self._reject:
            self._reject(item)
    else:
        self.queue.append(item)
        self._resume()

appendleft

appendleft(item: T) -> None

Append item to the left side of the queue.

If the queue has been stopped item will be rejected immediately.

Source code in src/async_kernel/common.py
248
249
250
251
252
253
254
255
256
257
258
259
def appendleft(self, item: T, /) -> None:
    """
    Append `item` to the left side of the queue.

    If the queue has been stopped `item` will be rejected immediately.
    """
    if self._active is False:
        if self._reject:
            self._reject(item)
    else:
        self.queue.appendleft(item)
        self._resume()

extend

extend(iterable: Iterable[T]) -> None

Append all items in iterable to the queue.

If the queue has been stopped all items in iterable will be rejected immediately.

Source code in src/async_kernel/common.py
261
262
263
264
265
266
267
268
269
270
271
272
273
def extend(self, iterable: Iterable[T], /) -> None:
    """
    Append all items in `iterable` to the queue.

    If the queue has been stopped all items in `iterable` will be rejected immediately.
    """
    if self._active is False:
        if self._reject:
            for item in iterable:
                self._reject(item)
    else:
        self.queue.extend(iterable)
        self._resume()

import_item

import_item(dottedname: str) -> Any

Import an item from a module, given its dotted name.

Example
import_item("os.path.join")
Source code in src/async_kernel/common.py
25
26
27
28
29
30
31
32
33
34
def import_item(dottedname: str) -> Any:
    """Import an item from a module, given its dotted name.

    Example:
        ```python
        import_item("os.path.join")
        ```
    """
    module, name0 = dottedname.rsplit(".", maxsplit=1)
    return aiologic.meta.import_from(module, name0)