Skip to content

common

Classes:

  • KernelInterrupt

    Raised to interrupt the kernel.

  • MethodNotSupported

    This exception is used inside overridden methods to indicate that it

  • Fixed

    A property-like descriptor factory that always returns the same object.

  • SingleAsyncQueue

    A single-use asynchronous iterator with a queue.

Functions:

  • import_item

    Import an item from a module, given its dotted name.

KernelInterrupt

Bases: InterruptedError

Raised to interrupt the kernel.

Source code in src/async_kernel/common.py
45
46
class KernelInterrupt(InterruptedError):
    "Raised to interrupt the kernel."

MethodNotSupported

Bases: Exception

This exception is used inside overridden methods to indicate that it should not be used.

Source code in src/async_kernel/common.py
49
50
51
52
53
class MethodNotSupported(Exception):
    """
    This exception is used inside overridden methods to indicate that it
    should not be used.
    """

Fixed

Bases: Generic[S, T]

A property-like descriptor factory that always returns the same object.

The descriptor is defined with a callable or importable string to a callable.

On first access the object is obtained as the result of the callable, cached and returned. Subsequent access to the property returns the cached result.

Parameters:

  • obj

    (type[T] | Callable[[], T] | Callable[[FixedCreate[S]], T] | str) –

    A class, callable or dotted path.

    The following types are accepted:

    • string: A dotted importable path to class or function to be used as callable.
    • class | callable: Called with zero or one positional argument FixedCreate.
  • created

    (Callable[[FixedCreated[S, T]]] | None, default: None ) –

    A per-instance optional callback that gets called on first-access to the property.

  • mode

    (Literal['raise', 'ignore', 'log'], default: 'raise' ) –

    How to handle invalid data. - 'raise': Raise an error. - 'log': Log a warning or exception. - 'ignore': Ignore (value is not set).

Type Hints
  • S: Type of the owner class.
  • T: Type of the managed class.
Example
class MyClass:
    a: Fixed[Self, dict] = Fixed(dict)
    b: Fixed[Self, dict] = Fixed(lambda c: id(c["owner"].a))
    c: Fixed[Self, list[str]] = Fixed(
        list, created=lambda c: c["obj"].append(c["name"])
    )
Tip

You can use import_item inside a callable to lazy import.

Source code in src/async_kernel/common.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
class Fixed(Generic[S, T]):
    """
    A property-like descriptor factory that always returns the same object.

    The descriptor is defined with a callable or importable string to a callable.

    On first access the object is obtained as the result of the callable, cached
    and returned. Subsequent access to the property returns the cached result.

    Args:
        obj:
            A class, callable or dotted path.

            The following types are accepted:

            - string: A dotted importable path to class or function to be used as callable.
            - class | callable: Called with zero or one positional argument [FixedCreate][].

        created: A per-instance optional callback that gets called on first-access to the property.
        mode: How to handle invalid data.
            - `'raise'`: Raise an error.
            - `'log'`: Log a warning or exception.
            - 'ignore': Ignore (value is not set).

    Type Hints:
        - ``S``: Type of the owner class.
        - ``T``: Type of the managed class.

    Example:
        ```python
        class MyClass:
            a: Fixed[Self, dict] = Fixed(dict)
            b: Fixed[Self, dict] = Fixed(lambda c: id(c["owner"].a))
            c: Fixed[Self, list[str]] = Fixed(
                list, created=lambda c: c["obj"].append(c["name"])
            )
        ```

    Tip:
        You can use [import_item][] inside a callable to lazy import.
    """

    __slots__ = ["create", "created", "instances", "instances_locks", "mode", "name"]

    def __init__(
        self,
        obj: type[T] | Callable[[], T] | Callable[[FixedCreate[S]], T] | str,
        /,
        *,
        created: Callable[[FixedCreated[S, T]]] | None = None,
        mode: Literal["raise", "ignore", "log"] = "raise",
    ) -> None:
        if callable(obj) or isinstance(obj, str):  # pyright: ignore[reportUnnecessaryIsInstance]
            self.create = obj
            self.created = created
            self.instances = {}
            self.instances_locks = {}
            self.mode: Literal["raise", "ignore", "log"] = mode
        else:
            msg = f"{obj=} is invalid! Use a lambda instead eg: lambda _: {obj}"  # pyright: ignore[reportUnreachable]
            raise TypeError(msg)

    def __set_name__(self, owner_cls: type[S], name: str) -> None:
        self.name = name

    def __get__(self, obj: S, objtype: type[S] | None = None) -> T:
        if obj is None:
            return self  # pyright: ignore[reportReturnType]
        key = id(obj)
        try:
            return self.instances[key]
        except KeyError:
            try:
                lock = self.instances_locks[key]
            except KeyError:
                lock = self.instances_locks.setdefault(key, create_thread_oncelock())
            lock.acquire()
            try:
                return self.instances[key]
            except KeyError:
                if lock._count > 1:
                    msg = f"Self-referencing creation detected for {obj.__class__.__name__}.{self.name}!"
                    raise RuntimeError(msg) from None
                return self._create_instance(obj, key)
            finally:
                lock.release()

    def __delete__(self, obj: S, /) -> None:
        self._handle_invalid(
            obj, f"Deletion of `Fixed` parameter `{self._rep_with_obj(obj)}` is not allowed. ({obj=!r})"
        )

    def __set__(self, obj: S, value: Self) -> Never:  # pyright: ignore[reportReturnType]
        # Note: We use `Self` as the type hint for the `value` above  to give a useful typing error.
        self._handle_invalid(obj, f"Setting `Fixed` parameter `{self._rep_with_obj(obj)}` is forbidden! ({obj=!r})")

    def _rep_with_obj(self, obj: S) -> str:
        return f"{obj.__class__.__name__}.{self.name}"

    def _handle_invalid(self, obj: S, msg: str, *, error: Exception | None = None) -> None:
        match self.mode:
            case "raise":
                raise AttributeError(msg)
            case "log":
                if log := getattr(obj, "log", None):
                    if error:
                        log.exception(msg, exc_info=error)
                    else:
                        log.warning(msg, obj)
            case "ignore":
                pass

    def _create_instance(self, obj: S, key: int) -> T:
        ""
        if isinstance(create := self.create, str):
            self.create = create = import_item(create)
        try:
            instance = create()  # pyright: ignore[reportCallIssue, reportAssignmentType]
        except TypeError:
            if create.__name__ == "<lambda>" or len(inspect.signature(create).parameters) == 1:
                instance: T = create(FixedCreate(name=self.name, owner=obj))  # pyright: ignore[reportAssignmentType, reportCallIssue]
            else:
                raise
        self.instances[key] = instance
        self.instances_locks[key] = THREAD_DUMMY_LOCK
        if not isinstance(self.instances, weakref.WeakValueDictionary):
            weakref.finalize(obj, self.instances.pop, key)
        weakref.finalize(obj, self.instances_locks.pop, key)
        if self.created:
            try:
                self.created({"owner": obj, "obj": instance, "name": self.name})
            except Exception as e:
                self._handle_invalid(
                    obj, f"Callback `created` failed for `{self._rep_with_obj(obj)}` ({obj=!r})", error=e
                )
        return instance

SingleAsyncQueue

Bases: Generic[T]

A single-use asynchronous iterator with a queue.

Notes
  • Append to the queue from anywhere (internally synchronised).
  • The queue will only yield for one async iterator consumer.
  • When SingleAsyncQueue.stop is called:
    • Any items in the queue are immediately rejected.
    • The async iterator is stopped.
  • Items added after stop is called will be rejected immediately.
  • boolean: True when not stopped.
  • len: `The current length of the queue.
Usage
q = SingleAsyncQueue(reject=lambda item: print("rejected", item))

# In a task
async for item in q:
    item  # use the item

# Other threads/tasks
q.append(item)
q.extent([item1, item2])

# Stop the iterator
q.stop()

Methods:

  • stop

    Stop the queue rejecting any items currently in the queue.

  • append

    Append item to the queue.

  • appendleft

    Append item to the left side of the queue.

  • extend

    Append all items in iterable to the queue.

Attributes:

  • stopped (bool) –

    Will return True once stop has been called meaning there are no items left in the queue.

Source code in src/async_kernel/common.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class SingleAsyncQueue(Generic[T]):
    """
    A single-use asynchronous iterator with a queue.

    Notes:
        - Append to the queue from anywhere (internally synchronised).
        - The queue will only yield for one async iterator consumer.
        - When [SingleAsyncQueue.stop][] is called:
            - Any items in the queue are immediately rejected.
            - The async iterator is stopped.
        - Items added after stop is called will be rejected immediately.
        - boolean: `True` when not stopped.
        - len: `The current length of the queue.

    Usage:
        ```python
        q = SingleAsyncQueue(reject=lambda item: print("rejected", item))

        # In a task
        async for item in q:
            item  # use the item

        # Other threads/tasks
        q.append(item)
        q.extent([item1, item2])

        # Stop the iterator
        q.stop()
        ```
    """

    __slots__ = ["__weakref__", "_queue", "_reject", "_token", "_waiter"]

    def __init__(self, *, reject: Callable[[T], Any] | None = None) -> None:
        self._reject = reject
        self._queue = deque()
        # _token prevents more than one async iterator.
        self._token = True

    def __bool__(self) -> bool:
        return not self.stopped

    def __len__(self) -> int:
        try:
            return len(self._queue)
        except AttributeError:
            return 0

    async def __aiter__(self) -> AsyncGenerator[T]:

        try:
            del self._token
            backend = Backend(current_async_library())
            checkpoint = asyncio_checkpoint if backend is Backend.asyncio else trio_checkpoint
            while True:
                if self._queue:
                    yield self._queue.popleft()
                    await checkpoint()
                else:
                    self._waiter = create_async_waiter()
                    try:
                        if not self._queue:
                            await self._waiter
                        del self._waiter
                    except AttributeError:
                        pass
        except AttributeError:
            pass
        except BaseException:
            self.stop()
            raise

    def _resume(self) -> None:
        try:
            waiter = self._waiter
            del self._waiter
            waiter.wake()
            del waiter
        except AttributeError:
            pass

    def stop(self) -> None:
        """
        Stop the queue rejecting any items currently in the queue.
        """
        try:
            queue = self._queue
            del self._queue
            self._resume()
            while queue:
                item = queue.popleft()
                if self._reject:
                    self._reject(item)
        except (AttributeError, IndexError):
            pass

    def append(self, item: T, /) -> None:
        """
        Append `item` to the queue.

        If the queue has been stopped `item` will be rejected immediately.
        """
        try:
            self._queue.append(item)
            self._resume()
        except AttributeError:
            if self._reject:
                self._reject(item)

    def appendleft(self, item: T, /) -> None:
        """
        Append `item` to the left side of the queue.

        If the queue has been stopped `item` will be rejected immediately.
        """
        try:
            self._queue.appendleft(item)
            self._resume()
        except AttributeError:
            if self._reject:
                self._reject(item)

    def extend(self, iterable: Iterable[T], /) -> None:
        """
        Append all items in `iterable` to the queue.

        If the queue has been stopped all items in `iterable` will be rejected immediately.
        """
        try:
            self._queue.extend(iterable)
            self._resume()
        except AttributeError:
            if self._reject:
                for item in iterable:
                    self._reject(item)

    @property
    def stopped(self) -> bool:
        """
        Will return `True` once stop has been called meaning there are no items left in the queue.
        """
        return not hasattr(self, "_queue")

stopped property

stopped: bool

Will return True once stop has been called meaning there are no items left in the queue.

stop

stop() -> None

Stop the queue rejecting any items currently in the queue.

Source code in src/async_kernel/common.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def stop(self) -> None:
    """
    Stop the queue rejecting any items currently in the queue.
    """
    try:
        queue = self._queue
        del self._queue
        self._resume()
        while queue:
            item = queue.popleft()
            if self._reject:
                self._reject(item)
    except (AttributeError, IndexError):
        pass

append

append(item: T) -> None

Append item to the queue.

If the queue has been stopped item will be rejected immediately.

Source code in src/async_kernel/common.py
290
291
292
293
294
295
296
297
298
299
300
301
def append(self, item: T, /) -> None:
    """
    Append `item` to the queue.

    If the queue has been stopped `item` will be rejected immediately.
    """
    try:
        self._queue.append(item)
        self._resume()
    except AttributeError:
        if self._reject:
            self._reject(item)

appendleft

appendleft(item: T) -> None

Append item to the left side of the queue.

If the queue has been stopped item will be rejected immediately.

Source code in src/async_kernel/common.py
303
304
305
306
307
308
309
310
311
312
313
314
def appendleft(self, item: T, /) -> None:
    """
    Append `item` to the left side of the queue.

    If the queue has been stopped `item` will be rejected immediately.
    """
    try:
        self._queue.appendleft(item)
        self._resume()
    except AttributeError:
        if self._reject:
            self._reject(item)

extend

extend(iterable: Iterable[T]) -> None

Append all items in iterable to the queue.

If the queue has been stopped all items in iterable will be rejected immediately.

Source code in src/async_kernel/common.py
316
317
318
319
320
321
322
323
324
325
326
327
328
def extend(self, iterable: Iterable[T], /) -> None:
    """
    Append all items in `iterable` to the queue.

    If the queue has been stopped all items in `iterable` will be rejected immediately.
    """
    try:
        self._queue.extend(iterable)
        self._resume()
    except AttributeError:
        if self._reject:
            for item in iterable:
                self._reject(item)

import_item

import_item(dottedname: str) -> Any

Import an item from a module, given its dotted name.

Example
import_item("os.path.join")
Source code in src/async_kernel/common.py
27
28
29
30
31
32
33
34
35
36
37
def import_item(dottedname: str, /) -> Any:
    """
    Import an item from a module, given its dotted name.

    Example:
        ```python
        import_item("os.path.join")
        ```
    """
    module, name = dottedname.rsplit(".", maxsplit=1)
    return aiologic.meta.import_from(module, name)