Skip to content

async_kernel.caller

Classes:

async_kernel.caller.FutureCancelledError

Bases: ClosedResourceError

Used to indicate a Future is cancelled.

async_kernel.caller.InvalidStateError

Bases: RuntimeError

An invalid state of a Future.

async_kernel.caller.AsyncEvent

AsyncEvent(thread: Thread | None = None)

An asynchronous thread-safe event compatible with Caller.

Methods:

  • wait

    Wait until the flag has been set.

  • set

    Set the internal flag to True and trigger notification.

  • is_set

    Return True if the flag is set, False if not.

Source code in src/async_kernel/caller.py
66
67
68
69
def __init__(self, thread: threading.Thread | None = None) -> None:
    self._thread = thread or threading.current_thread()
    self._events = set()
    self._flag = False

wait async

wait() -> None

Wait until the flag has been set.

If the flag has already been set when this method is called, it returns immediately.

Warning

This method requires that a Caller for its target thread. ```

Source code in src/async_kernel/caller.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def wait(self) -> None:
    """
    Wait until the flag has been set.

    If the flag has already been set when this method is called, it returns immediately.

    !!! warning

        This method requires that a [Caller][async_kernel.caller.Caller] for its target thread.
        ```
    """
    if not self._flag:

        def _get_event(event_type: type[T]) -> T | None:
            for event in self._events:
                if isinstance(event, event_type):
                    return event if not self._flag else None
            event = event_type()
            self._events.add(event)
            return event if not self._flag else None

        if self._thread is threading.current_thread():
            if event := _get_event(anyio.Event):
                await event.wait()
        else:
            if event := _get_event(threading.Event):
                await wait_thread_event(event)
    self.set()

set

set() -> None

Set the internal flag to True and trigger notification.

Source code in src/async_kernel/caller.py
104
105
106
107
108
109
110
111
112
def set(self) -> None:
    "Set the internal flag to `True` and trigger notification."
    self._flag = True
    while self._events:
        event = self._events.pop()
        if isinstance(event, anyio.Event):
            Caller(thread=self._thread).call_direct(event.set)
        else:
            event.set()

is_set

is_set() -> bool

Return True if the flag is set, False if not.

Source code in src/async_kernel/caller.py
114
115
116
def is_set(self) -> bool:
    "Return `True` if the flag is set, `False` if not."
    return self._flag

async_kernel.caller.Future

Future(thread: Thread | None = None, /, **metadata)

Bases: Awaitable[T]

A class representing a future result modelled on asyncio.Future.

This class provides an anyio compatible Future primitive. It is designed to work with Caller to enable thread-safe calling, setting, awaiting and cancelling execution results.

Methods:

  • wait

    Wait for future to be done (thread-safe) returning the result if specified.

  • set_result

    Set the result (thread-safe using Caller).

  • set_exception

    Set the exception (thread-safe using Caller).

  • done

    Returns True if the Future is done.

  • add_done_callback

    Add a callback for when the callback is done (not thread-safe).

  • cancel

    Cancel the Future (thread-safe using Caller).

  • cancelled

    Return True if the Future is cancelled.

  • result

    Return the result of the Future.

  • exception

    Return the exception that was set on the Future.

  • remove_done_callback

    Remove all instances of a callback from the callbacks list.

  • set_canceller

    Set a callback to handle cancellation.

  • get_caller

    The Caller that is running for this futures thread.

Attributes:

Source code in src/async_kernel/caller.py
135
136
137
138
139
def __init__(self, thread: threading.Thread | None = None, /, **metadata) -> None:
    self._done_callbacks = []
    self._metadata = metadata
    self._thread = thread = thread or threading.current_thread()
    self._done = AsyncEvent(thread)

metadata property

metadata: dict[str, Any]

A dict provided to store metadata with the future.

Info

The metadata is used when forming the representation of the future.

Example

fut = Future(name="My future")
fut = Caller().call_soon(anyio.sleep, 0)
fut.metadata.update(name="My future")

Tip

A future returned by methods of async_kernel.caller.Caller stores the function and call arguments in the futures metedata. It adds a on_set_callback that clears the metadata to avoid memory leaks.

thread property

thread: Thread

The thread to which the future is associated.

wait async

wait(
    *,
    timeout: float | None = ...,
    shield: bool = False | ...,
    result: Literal[True] = True,
) -> T
wait(*, timeout: float | None = ..., shield: bool = ..., result: Literal[False]) -> None
wait(
    *, timeout: float | None = None, shield: bool = False, result: bool = True
) -> T | None

Wait for future to be done (thread-safe) returning the result if specified.

Parameters:

  • timeout

    (float | None, default: None ) –

    Timeout in seconds.

  • shield

    (bool, default: False ) –

    Shield the future from cancellation.

  • result

    (bool, default: True ) –

    Whether the result should be returned.

Source code in src/async_kernel/caller.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def wait(self, *, timeout: float | None = None, shield: bool = False, result: bool = True) -> T | None:
    """
    Wait for future to be done (thread-safe) returning the result if specified.

    Args:
        timeout: Timeout in seconds.
        shield: Shield the future from cancellation.
        result: Whether the result should be returned.
    """
    try:
        if not self.done():
            with anyio.fail_after(timeout):
                await self._done.wait()
        return self.result() if result else None
    finally:
        if not self.done() and not shield:
            self.cancel("Cancelled with waiter cancellation.")

set_result

set_result(value: T) -> None

Set the result (thread-safe using Caller).

Source code in src/async_kernel/caller.py
253
254
255
def set_result(self, value: T) -> None:
    "Set the result (thread-safe using Caller)."
    self._set_value("result", value)

set_exception

set_exception(exception: BaseException) -> None

Set the exception (thread-safe using Caller).

Source code in src/async_kernel/caller.py
257
258
259
def set_exception(self, exception: BaseException) -> None:
    "Set the exception (thread-safe using Caller)."
    self._set_value("exception", exception)

done

done() -> bool

Returns True if the Future is done.

Done means either that a result / exception is available.

Source code in src/async_kernel/caller.py
261
262
263
264
265
266
def done(self) -> bool:
    """
    Returns True if the Future is done.

    Done means either that a result / exception is available."""
    return self._done.is_set()

add_done_callback

add_done_callback(fn: Callable[[Self], Any]) -> None

Add a callback for when the callback is done (not thread-safe).

If the Future is already done it will be scheduled for calling.

The result of the future and done callbacks are always called for the futures thread. Callbacks are called in the reverse order in which they were added in the owning thread.

Source code in src/async_kernel/caller.py
268
269
270
271
272
273
274
275
276
277
278
279
280
def add_done_callback(self, fn: Callable[[Self], Any]) -> None:
    """
    Add a callback for when the callback is done (not thread-safe).

    If the Future is already done it will be scheduled for calling.

    The result of the future and done callbacks are always called for the futures thread.
    Callbacks are called in the reverse order in which they were added in the owning thread.
    """
    if not self.done():
        self._done_callbacks.append(fn)
    else:
        self.get_caller().call_direct(fn, self)

cancel

cancel(msg: str | None = None) -> bool

Cancel the Future (thread-safe using Caller).

Note

  • Cancellation cannot be undone.
  • The future will not be done until set_result or set_excetion is called in both cases the value is ignore and replaced with a FutureCancelledError and the result is inaccessible.

Parameters:

  • msg

    (str | None, default: None ) –

    The message to use when cancelling.

Returns if it has been cancelled.

Source code in src/async_kernel/caller.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def cancel(self, msg: str | None = None) -> bool:
    """
    Cancel the Future (thread-safe using Caller).

    !!! note

        - Cancellation cannot be undone.
        - The future will not be done until set_result or set_excetion is called
            in both cases the value is ignore and replaced with a [FutureCancelledError][async_kernel.caller.FutureCancelledError]
            and the result is inaccessible.

    Args:
        msg: The message to use when cancelling.

    Returns if it has been cancelled.
    """
    if not self.done():
        if msg and isinstance(self._cancelled, str):
            msg = f"{self._cancelled}\n{msg}"
        self._cancelled = msg or self._cancelled or True
        if canceller := self._canceller:
            if threading.current_thread() is self._thread:
                canceller(msg)
            else:
                Caller(thread=self._thread).call_direct(self.cancel)
    return self.cancelled()

cancelled

cancelled() -> bool

Return True if the Future is cancelled.

Source code in src/async_kernel/caller.py
309
310
311
def cancelled(self) -> bool:
    """Return True if the Future is cancelled."""
    return bool(self._cancelled)

result

result() -> T

Return the result of the Future.

If the Future has been cancelled, this method raises a FutureCancelledError exception.

If the Future isn't done yet, this method raises an InvalidStateError exception.

Source code in src/async_kernel/caller.py
313
314
315
316
317
318
319
320
321
322
323
324
325
def result(self) -> T:
    """
    Return the result of the Future.

    If the Future has been cancelled, this method raises a [FutureCancelledError][async_kernel.caller.FutureCancelledError] exception.

    If the Future isn't done yet, this method raises an [InvalidStateError][async_kernel.caller.InvalidStateError] exception.
    """
    if not self.cancelled() and not self.done():
        raise InvalidStateError
    if e := self.exception():
        raise e
    return self._result

exception

exception() -> BaseException | None

Return the exception that was set on the Future.

If the Future has been cancelled, this method raises a FutureCancelledError exception.

If the Future isn't done yet, this method raises an InvalidStateError exception.

Source code in src/async_kernel/caller.py
327
328
329
330
331
332
333
334
335
336
337
338
339
def exception(self) -> BaseException | None:
    """
    Return the exception that was set on the Future.

    If the Future has been cancelled, this method raises a [FutureCancelledError][async_kernel.caller.FutureCancelledError] exception.

    If the Future isn't done yet, this method raises an [InvalidStateError][async_kernel.caller.InvalidStateError] exception.
    """
    if self._cancelled:
        raise self._make_cancelled_error()
    if not self.done():
        raise InvalidStateError
    return self._exception

remove_done_callback

remove_done_callback(fn: Callable[[Self], object]) -> int

Remove all instances of a callback from the callbacks list.

Returns the number of callbacks removed.

Source code in src/async_kernel/caller.py
341
342
343
344
345
346
347
348
349
350
351
def remove_done_callback(self, fn: Callable[[Self], object], /) -> int:
    """
    Remove all instances of a callback from the callbacks list.

    Returns the number of callbacks removed.
    """
    n = 0
    while fn in self._done_callbacks:
        n += 1
        self._done_callbacks.remove(fn)
    return n

set_canceller

set_canceller(canceller: Callable[[str | None], Any]) -> None

Set a callback to handle cancellation.

Note

set_result must still be called to mark the future as completed. You can pass any value as it will be replaced with a async_kernel.caller.FutureCancelledError.

Source code in src/async_kernel/caller.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def set_canceller(self, canceller: Callable[[str | None], Any]) -> None:
    """
    Set a callback to handle cancellation.

    !!! note

        `set_result` must still be called to mark the future as completed. You can pass any
        value as it will be replaced with a [async_kernel.caller.FutureCancelledError][].
    """
    if self.done() or self._canceller:
        raise InvalidStateError
    self._canceller = canceller
    if self.cancelled():
        self.cancel()

get_caller

get_caller() -> Caller

The Caller that is running for this futures thread.

Source code in src/async_kernel/caller.py
368
369
370
def get_caller(self) -> Caller:
    "The [Caller][async_kernel.caller.Caller] that is running for this *futures* thread."
    return Caller(thread=self._thread)

async_kernel.caller.Caller

Bases: AsyncContextManagerMixin

A class to enable calling functions and coroutines between anyio event loops.

The Caller class provides a mechanism to execute functions and coroutines in a dedicated thread, leveraging AnyIO for asynchronous task management. It supports scheduling calls with delays, executing them immediately, and running them without a context. It also provides a means to manage a pool of threads for general purpose offloading of tasks.

The class maintains a registry of instances, associating each with a specific thread. It uses a task group to manage the execution of scheduled tasks and provides methods to start, stop, and query the status of the caller.

Methods:

  • __new__

    Create or retrieve the Caller instance for the specified thread.

  • get_runner

    The preferred way to run the caller loop.

  • stop

    Stop the caller, cancelling all pending tasks and close the thread.

  • schedule_call

    Schedule func to be called inside a task running in the callers thread (thread-safe).

  • call_later

    Schedule func to be called in caller's event loop copying the current context.

  • call_soon

    Schedule func to be called in caller's event loop copying the current context.

  • call_direct

    Schedule func to be called in caller's event loop directly.

  • queue_get

    Returns Future for func where the queue is running.

  • queue_call

    Queue the execution of func in a queue unique to it and this caller (thread-safe).

  • queue_close

    Close the execution queue associated with func (thread-safe).

  • stop_all

    A classmethod to stop all un-protected callers.

  • get_instance

    A classmethod that gets the caller associated to the thread using the threads name.

  • to_thread

    A classmethod to call func in a separate thread see also to_thread_advanced.

  • to_thread_advanced

    A classmethod to call func in a Caller specified by the options.

  • start_new

    A classmethod that creates a new caller instance with the thread determined according to the provided name.

  • current_future

    A classmethod that returns the current future when called from inside a function scheduled by Caller.

  • all_callers

    A classmethod to get a list of the callers.

  • as_completed

    A classmethod iterator to get Futures as they complete.

  • wait

    A classmethod to wait for the futures given by items to complete.

Attributes:

MAX_IDLE_POOL_INSTANCES class-attribute instance-attribute

MAX_IDLE_POOL_INSTANCES = 10

The number of pool instances to leave idle (See also to_thread).

log instance-attribute

iopub_sockets class-attribute

iopub_sockets: WeakKeyDictionary[Thread, Socket] = WeakKeyDictionary()

iopub_url class-attribute instance-attribute

iopub_url: ClassVar = 'inproc://iopub'

name property

name: str

The name of the thread when the caller was created.

thread property

thread: Thread

The thread in which the caller will run.

backend property

backend: Backend

The anyio backend the caller is running in.

protected property

protected: bool

Returns True if the caller is protected from stopping.

running property

running

Returns True when the caller is available to run requests.

stopped property

stopped: bool

Returns True if the caller is stopped.

__new__

__new__(
    *,
    thread: Thread | None = None,
    log: LoggerAdapter | None = None,
    create: bool = False,
    protected: bool = False,
) -> Self

Create or retrieve the Caller instance for the specified thread.

Parameters:

  • thread

    (Thread | None, default: None ) –

    The thread where the caller is based. There is only one instance per thread.

  • log

    (LoggerAdapter | None, default: None ) –

    Logger to use for logging messages.

  • create

    (bool, default: False ) –

    Whether to create a new instance if one does not exist for the current thread.

  • protected

    (bool, default: False ) –

    Whether the caller is protected from having its event loop closed.

Returns:

  • Caller ( Self ) –

    The Caller instance for the current thread.

Raises:

  • RuntimeError

    If create is False and a Caller instance does not exist.

Source code in src/async_kernel/caller.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def __new__(
    cls,
    *,
    thread: threading.Thread | None = None,
    log: logging.LoggerAdapter | None = None,
    create: bool = False,
    protected: bool = False,
) -> Self:
    """
    Create or retrieve the `Caller` instance for the specified thread.

    Args:
        thread: The thread where the caller is based. There is only one instance per thread.
        log: Logger to use for logging messages.
        create: Whether to create a new instance if one does not exist for the current thread.
        protected: Whether the caller is protected from having its event loop closed.

    Returns:
        Caller: The `Caller` instance for the current thread.

    Raises:
        RuntimeError: If `create` is `False` and a `Caller` instance does not exist.
    """

    thread = thread or threading.current_thread()
    if not (inst := cls._instances.get(thread)):
        if not create:
            msg = f"A caller does not exist for{thread=}. Did you mean use the classmethod `Caller.get_instance()`?"
            raise RuntimeError(msg)
        inst = super().__new__(cls)
        inst._thread = thread
        inst._name = thread.name
        inst.log = log or logging.LoggerAdapter(logging.getLogger())
        inst._jobs = deque()
        inst._job_added = threading.Event()
        inst._protected = protected
        inst._queue_map = {}
        cls._instances[thread] = inst
    return inst

get_runner

get_runner(*, started: Callable[[], None] | None = None)

The preferred way to run the caller loop.

Tip

See async_kernel.caller.Caller.get_instance for a usage example.

Source code in src/async_kernel/caller.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
def get_runner(self, *, started: Callable[[], None] | None = None):
    """The preferred way to run the caller loop.

    !!! tip

        See [async_kernel.caller.Caller.get_instance][] for a usage example.
    """
    if self.running or self.stopped:
        raise RuntimeError

    async def run_caller_in_context() -> None:
        with contextlib.suppress(anyio.get_cancelled_exc_class()):
            async with self:
                if started:
                    started()
                await anyio.sleep_forever()

    return run_caller_in_context

stop

stop(*, force=False) -> None

Stop the caller, cancelling all pending tasks and close the thread.

If the instance is protected, this is no-op unless force is used.

Source code in src/async_kernel/caller.py
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def stop(self, *, force=False) -> None:
    """
    Stop the caller, cancelling all pending tasks and close the thread.

    If the instance is protected, this is no-op unless force is used.
    """
    if self._protected and not force:
        return
    self._stopped = True
    for func in tuple(self._queue_map):
        self.queue_close(func)
    self._job_added.set()
    self._instances.pop(self.thread, None)
    if self in self._to_thread_pool:
        self._to_thread_pool.remove(self)
    if self.thread is not threading.current_thread():
        self._stopped_event.wait()

schedule_call

schedule_call(
    func: Callable[..., CoroutineType[Any, Any, T] | T],
    /,
    args: tuple,
    kwargs: dict,
    context: Context | None = None,
    **metadata: Any,
) -> Future[T]

Schedule func to be called inside a task running in the callers thread (thread-safe).

The methods call_soon and call_later use this method in the background, they should be used in preference to this method since they provide type hinting for the arguments.

Parameters:

  • func

    (Callable[..., CoroutineType[Any, Any, T] | T]) –

    The function to be called. If it returns a coroutine, it will be awaited and its result will be returned.

  • args

    (tuple) –

    Arguments corresponding to in the call to func.

  • kwargs

    (dict) –

    Keyword arguments to use with in the call to func.

  • context

    (Context | None, default: None ) –

    The context to use, if not provided the current context is used.

  • metadata

    (Any, default: {} ) –

    Additional metadata to store in the future.

Note

All arguments are stored in the future's metadata. When the call is done the metadata is cleared to avoid memory leaks.

Source code in src/async_kernel/caller.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def schedule_call(
    self,
    func: Callable[..., CoroutineType[Any, Any, T] | T],
    /,
    args: tuple,
    kwargs: dict,
    context: contextvars.Context | None = None,
    **metadata: Any,
) -> Future[T]:
    """
    Schedule `func` to be called inside a task running in the callers thread (thread-safe).

    The methods [call_soon][async_kernel.caller.Caller.call_soon] and [call_later][async_kernel.caller.Caller.call_later]
    use this method in the background,  they should be used in preference to this method since they provide type hinting for the arguments.

    Args:
        func: The function to be called. If it returns a coroutine, it will be awaited and its result will be returned.
        args: Arguments corresponding to in the call to  `func`.
        kwargs: Keyword arguments to use with in the call to `func`.
        context: The context to use, if not provided the current context is used.
        metadata: Additional metadata to store in the future.

    !!! note

        All arguments are stored in the future's metadata. When the call is done the
        metadata is cleared to avoid memory leaks.
    """
    if self._stopped:
        raise anyio.ClosedResourceError
    fut = Future(self.thread, func=func, args=args, kwargs=kwargs, **metadata)
    fut.add_done_callback(self._on_call_done)
    self._jobs.append((context or contextvars.copy_context(), fut))
    self._job_added.set()
    return fut

call_later

call_later(
    delay: float,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

Schedule func to be called in caller's event loop copying the current context.

Parameters:

  • func

    (Callable[P, T | CoroutineType[Any, Any, T]]) –

    The function.

  • delay

    (float) –

    The minimum delay to add between submission and execution.

  • *args

    (args, default: () ) –

    Arguments to use with func.

  • **kwargs

    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Info

All call arguments are packed into the Futures metadata. The future metadata is cleared when futures result is set.

Source code in src/async_kernel/caller.py
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
def call_later(
    self,
    delay: float,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """
    Schedule func to be called in caller's event loop copying the current context.

    Args:
        func: The function.
        delay: The minimum delay to add between submission and execution.
        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.

    !!! info

        All call arguments are packed into the Futures metadata. The future metadata
        is cleared when futures result is set.
    """
    return self.schedule_call(func, args, kwargs, delay=delay, start_time=time.monotonic())

call_soon

call_soon(
    func: Callable[P, T | CoroutineType[Any, Any, T]], /, *args: args, **kwargs: kwargs
) -> Future[T]

Schedule func to be called in caller's event loop copying the current context.

Parameters:

  • func

    (Callable[P, T | CoroutineType[Any, Any, T]]) –

    The function.

  • *args

    (args, default: () ) –

    Arguments to use with func.

  • **kwargs

    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Source code in src/async_kernel/caller.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def call_soon(
    self,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """
    Schedule func to be called in caller's event loop copying the current context.

    Args:
        func: The function.
        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.
    """
    return self.schedule_call(func, args, kwargs)

call_direct

call_direct(
    func: Callable[P, T | CoroutineType[Any, Any, T]], /, *args: args, **kwargs: kwargs
) -> None

Schedule func to be called in caller's event loop directly.

This method is provided to facilitate lightweight thread-safe function calls that need to be performed from within the callers event loop/taskgroup.

Parameters:

  • func

    (Callable[P, T | CoroutineType[Any, Any, T]]) –

    The function.

  • *args

    (args, default: () ) –

    Arguments to use with func.

  • **kwargs

    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Warning

Use this method for lightweight calls only!

Source code in src/async_kernel/caller.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
def call_direct(
    self,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> None:
    """
    Schedule `func` to be called in caller's event loop directly.

    This method is provided to facilitate lightweight *thread-safe* function calls that
    need to be performed from within the callers event loop/taskgroup.

    Args:
        func: The function.
        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.

    ??? warning

        **Use this method for lightweight calls only!**

    """
    self._jobs.append(functools.partial(func, *args, **kwargs))
    self._job_added.set()

queue_get

queue_get(func: Callable) -> Future[Never] | None

Returns Future for func where the queue is running.

Warning

  • This future loops forever until the loop is closed or func no longer exists.
  • queue_close is the preferred means to shutdown the queue.
Source code in src/async_kernel/caller.py
714
715
716
717
718
719
720
721
722
def queue_get(self, func: Callable) -> Future[Never] | None:
    """Returns Future for `func` where the queue is running.

    !!! warning

        - This future loops forever until the  loop is closed or func no longer exists.
        - `queue_close` is the preferred means to shutdown the queue.
    """
    return self._queue_map.get(hash(func))

queue_call

queue_call(
    func: Callable[P, T | CoroutineType[Any, Any, T]], /, *args: args, **kwargs: kwargs
) -> None

Queue the execution of func in a queue unique to it and this caller (thread-safe).

The queue executor loop will stay open until one of the following occurs:

  1. The method async_kernel.caller.Caller.queue_close is called with func.
  2. If func is a method is deleted and garbage collected (using weakref.finalize).

Parameters:

  • func

    (Callable[P, T | CoroutineType[Any, Any, T]]) –

    The function.

  • *args

    (args, default: () ) –

    Arguments to use with func.

  • **kwargs

    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Source code in src/async_kernel/caller.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
def queue_call(
    self,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> None:
    """
    Queue the execution of `func` in a queue unique to it and this caller (thread-safe).

    The queue executor loop will stay open until one of the following occurs:

    1. The method [async_kernel.caller.Caller.queue_close][] is called with `func`.
    2. If `func` is a method is deleted and garbage collected (using [weakref.finalize][]).

    Args:
        func: The function.
        *args: Arguments to use with `func`.
        **kwargs: Keyword arguments to use with `func`.
    """
    key = hash(func)
    if not (fut_ := self._queue_map.get(key)):
        queue = deque()
        event_added = threading.Event()
        with contextlib.suppress(TypeError):
            weakref.finalize(func.__self__ if inspect.ismethod(func) else func, lambda: self.queue_close(key))

        async def queue_loop(key: int, queue: deque, event_added: threading.Event) -> None:
            fut = self.current_future()
            assert fut
            try:
                while True:
                    if not queue:
                        await wait_thread_event(event_added)
                    if queue:
                        context, func_, args, kwargs = queue.popleft()
                        try:
                            result = context.run(func_, *args, **kwargs)
                            if inspect.iscoroutine(object=result):
                                await result
                        except (anyio.get_cancelled_exc_class(), Exception) as e:
                            if fut.cancelled():
                                raise
                            self.log.exception("Execution %f failed", func_, exc_info=e)
                        finally:
                            func_ = None
                    else:
                        event_added.clear()
            finally:
                self._queue_map.pop(key)

        self._queue_map[key] = fut_ = self.call_soon(queue_loop, key=key, queue=queue, event_added=event_added)
    fut_.metadata["kwargs"]["queue"].append((contextvars.copy_context(), func, args, kwargs))
    if len(fut_.metadata["kwargs"]["queue"]) == 1:
        fut_.metadata["kwargs"]["event_added"].set()

queue_close

queue_close(func: Callable | int) -> None

Close the execution queue associated with func (thread-safe).

Parameters:

  • func

    (Callable | int) –

    The queue of the function to close.

Source code in src/async_kernel/caller.py
780
781
782
783
784
785
786
787
788
789
def queue_close(self, func: Callable | int) -> None:
    """
    Close the execution queue associated with `func` (thread-safe).

    Args:
        func: The queue of the function to close.
    """
    key = func if isinstance(func, int) else hash(func)
    if fut := self._queue_map.pop(key, None):
        fut.cancel()

stop_all classmethod

stop_all(*, _stop_protected: bool = False) -> None

A classmethod to stop all un-protected callers.

Parameters:

  • _stop_protected

    (bool, default: False ) –

    A private argument to shutdown protected instances.

Source code in src/async_kernel/caller.py
791
792
793
794
795
796
797
798
799
800
@classmethod
def stop_all(cls, *, _stop_protected: bool = False) -> None:
    """
    A [classmethod][] to stop all un-protected callers.

    Args:
        _stop_protected: A private argument to shutdown protected instances.
    """
    for caller in tuple(reversed(cls._instances.values())):
        caller.stop(force=_stop_protected)

get_instance classmethod

get_instance(
    *, create: bool | NoValue = NoValue, **kwargs: Unpack[CallerStartNewOptions]
) -> Self

A classmethod that gets the caller associated to the thread using the threads name.

When called without a name MainThread will be used as the name.

Parameters:

  • create

    (bool | NoValue, default: NoValue ) –

    Create a new instance if one with the corresponding name does not already exist. When not provided it defaults to True when name is MainThread otherwise False.

kwargs: Options to use to identify or create a new instance if an instance does not already exist.

Source code in src/async_kernel/caller.py
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
@classmethod
def get_instance(cls, *, create: bool | NoValue = NoValue, **kwargs: Unpack[CallerStartNewOptions]) -> Self:  # pyright: ignore[reportInvalidTypeForm]
    """
    A [classmethod][] that gets the caller associated to the thread using the threads name.


    When called without a name `MainThread` will be used as the `name`.

    Args:
        create: Create a new instance if one with the corresponding name does not already exist.
            When not provided it defaults to `True` when `name` is `MainThread` otherwise `False`.
    kwargs:
        Options to use to identify or create a new instance if an instance does not already exist.
    """
    if "name" not in kwargs:
        kwargs["name"] = "MainThread"
    for caller in cls._instances.values():
        if caller.name == kwargs["name"]:
            return caller
    if create is True or (create is NoValue and kwargs["name"] == "MainThread"):
        return cls.start_new(**kwargs)
    msg = f"A Caller was not found for {kwargs['name']=}."
    raise RuntimeError(msg)

to_thread classmethod

to_thread(
    func: Callable[P, T | CoroutineType[Any, Any, T]], /, *args: args, **kwargs: kwargs
) -> Future[T]

A classmethod to call func in a separate thread see also to_thread_advanced.

Source code in src/async_kernel/caller.py
826
827
828
829
830
831
832
833
834
835
@classmethod
def to_thread(
    cls,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """A [classmethod][] to call func in a separate thread see also [to_thread_advanced][async_kernel.Caller.to_thread_advanced]."""
    return cls.to_thread_advanced({"name": None}, func, *args, **kwargs)

to_thread_advanced classmethod

to_thread_advanced(
    options: CallerStartNewOptions,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

A classmethod to call func in a Caller specified by the options.

A Caller will be created if it isn't found.

Parameters:

  • options

    (CallerStartNewOptions) –

    A dict wht the name of the Caller to use and other Options to pass to async_kernel.caller.Caller.start_new should a a new instance is started 1.


    1. 'MainThread' is special name corresponding to the main thread. A RuntimeError will be raised if a Caller does not exist for the main thread. 

  • func

    (Callable[P, T | CoroutineType[Any, Any, T]]) –

    The function.

  • *args

    (args, default: () ) –

    Arguments to use with func.

  • **kwargs

    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Returns:

  • Future[T]

    A future that can be awaited for the result of func.

Source code in src/async_kernel/caller.py
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
@classmethod
def to_thread_advanced(
    cls,
    options: CallerStartNewOptions,
    func: Callable[P, T | CoroutineType[Any, Any, T]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """
    A [classmethod][] to call func in a Caller specified by the options.

    A Caller will be created if it isn't found.

    Args:
        options: A dict wht the `name` of the Caller to use and other Options to pass to [async_kernel.caller.Caller.start_new][]
            should a a new instance is started [^notes].

            [^notes]:  'MainThread' is special name corresponding to the main thread.
                A `RuntimeError` will be raised if a Caller does not exist for the main thread.

        func: The function.
        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.

    Returns:
        A future that can be awaited for the  result of func.
    """
    caller = None
    if not options.get("name"):
        with contextlib.suppress(IndexError):
            caller = cls._to_thread_pool.popleft()
    if caller is None:
        caller = cls.get_instance(create=True, **options)
    fut = caller.call_soon(func, *args, **kwargs)
    if not options.get("name"):
        cls._pool_instances.add(caller)
        cls._busy_worker_threads += 1

        def _to_thread_on_done(_) -> None:
            cls._busy_worker_threads -= 1
            if not caller._stopped:
                if len(caller._to_thread_pool) + cls._busy_worker_threads < caller.MAX_IDLE_POOL_INSTANCES:
                    caller._to_thread_pool.append(caller)
                else:
                    caller.stop()

        fut.add_done_callback(_to_thread_on_done)
    return fut

start_new classmethod

start_new(
    *,
    name: str | None = None,
    log: LoggerAdapter | None = None,
    backend: Backend | NoValue = NoValue,
    protected: bool = False,
    backend_options: dict | None | NoValue = NoValue,
) -> Self

A classmethod that creates a new caller instance with the thread determined according to the provided name.

When name equals the current thread's name it will use the current thread providing the backend is 'asyncio' and there is a running event loop available.

When the name does not match the current thread name, a new thread will be started provided that the name provided is not the name does not overlap with any existing threads. When no name is provided, a new thread can always be started.

Parameters:

  • backend

    (Backend | NoValue, default: NoValue ) –

    The backend to use for the anyio event loop (anyio.run). Defaults to the backend from where it is called.

  • log

    (LoggerAdapter | None, default: None ) –

    A logging adapter to use for debug messages.

  • protected

    (bool, default: False ) –

    When True, the caller will not shutdown unless shutdown is called with force=True.

  • backend_options

    (dict | None | NoValue, default: NoValue ) –

    Backend options for anyio.run. Defaults to Kernel.backend_options.

Returns:

  • Caller ( Self ) –

    The newly created caller.

Raises:

  • RuntimeError

    If a caller already exists or when the caller can't be started.

Source code in src/async_kernel/caller.py
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
@classmethod
def start_new(
    cls,
    *,
    name: str | None = None,
    log: logging.LoggerAdapter | None = None,
    backend: Backend | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    protected: bool = False,
    backend_options: dict | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
) -> Self:
    """
    A [classmethod][] that creates a new caller instance with the thread determined according to the provided `name`.

    When `name` equals the current thread's name it will use the current thread providing the backend is 'asyncio' and
    there is a running event loop available.

    When the name does not match the current thread name, a new thread will be started provided
    that the name provided is not the name does not overlap with any existing threads. When no
    name is provided, a new thread can always be started.

    Args:
        backend: The backend to use for the anyio event loop (anyio.run). Defaults to the backend from where it is called.
        log: A logging adapter to use for debug messages.
        protected: When True, the caller will not shutdown unless shutdown is called with `force=True`.
        backend_options: Backend options for [anyio.run][]. Defaults to `Kernel.backend_options`.

    Returns:
        Caller: The newly created caller.

    Raises:
        RuntimeError: If a caller already exists or when the caller can't be started.

    """
    if name and name in [t.name for t in cls._instances]:
        msg = f"A caller already exists with {name=}!"
        raise RuntimeError(msg)

    # Current thread
    if name is not None and name == threading.current_thread().name:
        if (backend := sniffio.current_async_library()) == Backend.asyncio:
            loop = asyncio.get_running_loop()
            caller = cls(log=log, create=True, protected=protected)
            caller._task = loop.create_task(caller.get_runner()())  # pyright: ignore[reportAttributeAccessIssue]
            return caller
        msg = f"Starting a caller for the MainThread is not supported for {backend=}"
        raise RuntimeError(msg)

    # New thread
    if name and name in [t.name for t in threading.enumerate()]:
        msg = f"A thread with {name=} already exists!"
        raise RuntimeError(msg)

    def async_kernel_caller() -> None:
        anyio.run(caller.get_runner(started=ready_event.set), backend=backend_, backend_options=backend_options)

    backend_ = Backend(backend if backend is not NoValue else sniffio.current_async_library())
    if backend_options is NoValue:
        backend_options = async_kernel.Kernel().anyio_backend_options.get(backend_)
    ready_event = threading.Event()
    thread = threading.Thread(target=async_kernel_caller, name=name or None, daemon=True)
    caller = cls(thread=thread, log=log, create=True, protected=protected)
    thread.start()
    ready_event.wait()
    return caller

current_future classmethod

current_future() -> Future[Any] | None

A classmethod that returns the current future when called from inside a function scheduled by Caller.

Source code in src/async_kernel/caller.py
952
953
954
955
@classmethod
def current_future(cls) -> Future[Any] | None:
    """A [classmethod][] that returns the current future when called from inside a function scheduled by Caller."""
    return cls._future_var.get()

all_callers classmethod

all_callers(running_only: bool = True) -> list[Caller]

A classmethod to get a list of the callers.

Parameters:

  • running_only

    (bool, default: True ) –

    Restrict the list to callers that are active (running in an async context).

Source code in src/async_kernel/caller.py
957
958
959
960
961
962
963
964
965
@classmethod
def all_callers(cls, running_only: bool = True) -> list[Caller]:
    """
    A [classmethod][] to get a list of the callers.

    Args:
        running_only: Restrict the list to callers that are active (running in an async context).
    """
    return [caller for caller in Caller._instances.values() if caller._running or not running_only]

as_completed async classmethod

as_completed(
    items: Iterable[Future[T]] | AsyncGenerator[Future[T]],
    *,
    max_concurrent: NoValue | int = NoValue,
    shield: bool = False,
) -> AsyncGenerator[Future[T], Any]

A classmethod iterator to get Futures as they complete.

Parameters:

  • items

    (Iterable[Future[T]] | AsyncGenerator[Future[T]]) –

    Either a container with existing futures or generator of Futures.

  • max_concurrent

    (NoValue | int, default: NoValue ) –

    The maximum number of concurrent futures to monitor at a time. This is useful when items is a generator utilising async_kernel.caller.Caller.to_thread. By default this will limit to Caller.MAX_IDLE_POOL_INSTANCES.

  • shield

    (bool, default: False ) –

    Shield existing items from cancellation.

Tip

  1. Pass a generator should you wish to limit the number future jobs when calling to_thread/to_task etc.
  2. Pass a set/list/tuple to ensure all get monitored at once.
Source code in src/async_kernel/caller.py
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
@classmethod
async def as_completed(
    cls,
    items: Iterable[Future[T]] | AsyncGenerator[Future[T]],
    *,
    max_concurrent: NoValue | int = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    shield: bool = False,
) -> AsyncGenerator[Future[T], Any]:
    """
    A [classmethod][] iterator to get [Futures][async_kernel.caller.Future] as they complete.

    Args:
        items: Either a container with existing futures or generator of Futures.
        max_concurrent: The maximum number of concurrent futures to monitor at a time.
            This is useful when `items` is a generator utilising [async_kernel.caller.Caller.to_thread][].
            By default this will limit to `Caller.MAX_IDLE_POOL_INSTANCES`.
        shield: Shield existing items from cancellation.

    !!! tip

        1. Pass a generator should you wish to limit the number future jobs when calling to_thread/to_task etc.
        2. Pass a set/list/tuple to ensure all get monitored at once.
    """
    event_future_ready = threading.Event()
    has_result: deque[Future[T]] = deque()
    futures: set[Future[T]] = set()
    done = False
    resume: AsyncEvent | None = cast("AsyncEvent | None", None)
    current_future = cls.current_future()
    if isinstance(items, set | list | tuple):
        max_concurrent_ = 0
    else:
        max_concurrent_ = cls.MAX_IDLE_POOL_INSTANCES if max_concurrent is NoValue else int(max_concurrent)

    def _on_done(fut: Future[T]) -> None:
        has_result.append(fut)
        if not event_future_ready.is_set():
            event_future_ready.set()

    async def iter_items():
        nonlocal done, resume
        gen = items if isinstance(items, AsyncGenerator) else iter(items)
        try:
            while True:
                fut = await anext(gen) if isinstance(gen, AsyncGenerator) else next(gen)
                if fut is not current_future:
                    futures.add(fut)
                    if fut.done():
                        has_result.append(fut)
                        if not event_future_ready.is_set():
                            event_future_ready.set()
                    else:
                        fut.add_done_callback(_on_done)
                    if max_concurrent_ and len(futures) == max_concurrent_:
                        resume = AsyncEvent()
                        await resume.wait()
        except (StopAsyncIteration, StopIteration):
            return
        finally:
            done = True
            if not event_future_ready.is_set():
                event_future_ready.set()

    fut = cls().call_soon(iter_items)
    try:
        while futures or not done:
            if not has_result:
                await wait_thread_event(event_future_ready)
            if has_result:
                fut = has_result.popleft()
                futures.discard(fut)
                yield fut
                if resume:
                    resume.set()
            else:
                event_future_ready.clear()

    finally:
        fut.cancel()
        for fut in futures:
            fut.remove_done_callback(_on_done)
            if not shield:
                fut.cancel("Cancelled by as_completed")

wait async classmethod

wait(
    items: Iterable[Future[T]],
    *,
    timeout: float | None = None,
    return_when: Literal[
        "FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED"
    ] = "ALL_COMPLETED",
) -> tuple[set[T], set[Future[T]]]

A classmethod to wait for the futures given by items to complete.

Returns two sets of the futures: (done, pending).

Parameters:

  • items

    (Iterable[Future[T]]) –

    An iterable of futures to wait for.

  • timeout

    (float | None, default: None ) –

    The maximum time before returning.

  • return_when

    (Literal['FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED'], default: 'ALL_COMPLETED' ) –

    The same options as available for asyncio.wait.

Example

done, pending = await asyncio.wait(items)

Info

  • This does not raise a TimeoutError!
  • Futures that aren't done when the timeout occurs are returned in the second set.
Source code in src/async_kernel/caller.py
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
@classmethod
async def wait(
    cls,
    items: Iterable[Future[T]],
    *,
    timeout: float | None = None,
    return_when: Literal["FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED"] = "ALL_COMPLETED",
) -> tuple[set[T], set[Future[T]]]:
    """
    A [classmethod][] to wait for the futures given by items to complete.

    Returns two sets of the futures: (done, pending).

    Args:
        items: An iterable of futures to wait for.
        timeout: The maximum time before returning.
        return_when: The same options as available for [asyncio.wait][].

    !!! example

        ```python
        done, pending = await asyncio.wait(items)
        ```

    !!! info

        - This does not raise a TimeoutError!
        - Futures that aren't done when the timeout occurs are returned in the second set.
    """
    done = set()
    if pending := set(items):
        with anyio.move_on_after(timeout):
            async for fut in cls.as_completed(items, shield=True):
                pending.discard(fut)
                done.add(fut)
                if return_when == "FIRST_COMPLETED":
                    break
                if return_when == "FIRST_EXCEPTION" and (fut.cancelled() or fut.exception()):
                    break
    return done, pending

async_kernel.caller.ReentrantAsyncLock

ReentrantAsyncLock()

A Reentrant asynchronous lock compatible with Caller.

The lock is reentrant in terms of contextvars.Context.

Note

  • The lock context can be exitied in any order.
  • The context can potentially leak.
  • A 'reentrant' lock can release control to another context and then re-enter later for tasks or threads called from a locked thread maintaining the same reentrant context.

Methods:

Attributes:

  • count (int) –

    Returns the number of times the locked context has been entered.

Source code in src/async_kernel/caller.py
1113
1114
1115
def __init__(self):
    self._ctx_var: contextvars.ContextVar[int] = contextvars.ContextVar(f"Lock:{id(self)}", default=0)
    self._queue: deque[tuple[int, Future[bool]]] = deque()

count property

count: int

Returns the number of times the locked context has been entered.

acquire async

acquire() -> Self

Acquire a lock.

The internal counter increments when the lock is entered.

Source code in src/async_kernel/caller.py
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
async def acquire(self) -> Self:
    """
    Acquire a lock.

    The internal counter increments when the lock is entered.
    """
    if not self._reentrant and self.is_in_context():
        msg = "Already locked and not reentrant!"
        raise RuntimeError(msg)
    # Get the context.
    if (self._ctx_count == 0) or not self._reentrant or not (ctx := self._ctx_var.get()):
        self._ctx_count = ctx = self._ctx_count + 1
        self._ctx_var.set(ctx)
    # Check if we can lock or re-enter an active lock.
    if (not self._releasing) and ((not self.count) or (self._reentrant and self.is_in_context())):
        self._count += 1
        self._ctx_current = ctx
        return self
    # Join the queue.
    k: tuple[int, Future[bool]] = ctx, Future()
    self._queue.append(k)
    try:
        result = await k[1]
    finally:
        if k in self._queue:
            self._queue.remove(k)
    if result:
        self._ctx_current = ctx
        if self._reentrant:
            for k in tuple(self._queue):
                if k[0] == ctx:
                    self._queue.remove(k)
                    k[1].set_result(False)
                    self._count += 1
        self._releasing = False
    return self

release async

release() -> None

Decrement the internal counter.

If the current depth==1 the lock will be passed to the next queued or released if there isn't one.

Source code in src/async_kernel/caller.py
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
async def release(self) -> None:
    """
    Decrement the internal counter.

    If the current depth==1 the lock will be passed to the next queued or released if there isn't one.
    """
    if self._count == 1 and self._queue and not self._releasing:
        self._releasing = True
        self._ctx_var.set(0)
        self._queue.popleft()[1].set_result(True)
    else:
        self._count -= 1
    if self._count == 0:
        self._ctx_current = 0

is_in_context

is_in_context() -> bool

Returns True if the current contextvars.Context has the lock.

Source code in src/async_kernel/caller.py
1185
1186
1187
def is_in_context(self) -> bool:
    "Returns `True` if the current [contextvars.Context][] has the lock."
    return bool(self._count and self._ctx_current and (self._ctx_var.get() == self._ctx_current))

base async

base() -> AsyncGenerator[Self, Any]

Acquire the lock as a new contextvars.Context.

Use this to ensure exclusive access from within this contextvars.Context.

Note

Warning

Using this inside its own acquired lock will cause a deadlock.

Source code in src/async_kernel/caller.py
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
@asynccontextmanager
async def base(self) -> AsyncGenerator[Self, Any]:
    """
    Acquire the lock as a new [contextvars.Context][].

    Use this to ensure exclusive access from within this [contextvars.Context][].

    !!! note
        - This method is not useful for the mutex variant ([async_kernel.caller.AsyncLock][]) which does this by default.

    !!! warning
        Using this inside its own acquired lock will cause a deadlock.
    """
    if self._reentrant:
        self._ctx_var.set(0)
    async with self:
        yield self

async_kernel.caller.AsyncLock

AsyncLock()

Bases: ReentrantAsyncLock

A mutex asynchronous lock that is compatible with Caller.

Note

Source code in src/async_kernel/caller.py
1113
1114
1115
def __init__(self):
    self._ctx_var: contextvars.ContextVar[int] = contextvars.ContextVar(f"Lock:{id(self)}", default=0)
    self._queue: deque[tuple[int, Future[bool]]] = deque()