Skip to content

Caller

async_kernel.caller

Classes:

AsyncLock

AsyncLock()

Implements a mutex asynchronous lock that is compatible with async_kernel.caller.Caller.

Note

  • Attempting to lock a 'mutuex' configured lock that is locked will raise a RuntimeError.

Methods:

  • acquire

    Acquire a lock.

  • is_in_context

    Returns True if the current context has the lock.

  • release

    Decrement the internal counter.

Attributes:

  • count (int) –

    Returns the number of times the locked context has been entered.

Source code in src/async_kernel/caller.py
905
906
907
def __init__(self):
    self._ctx_var: contextvars.ContextVar[int] = contextvars.ContextVar(f"Lock:{id(self)}", default=0)
    self._queue: deque[tuple[int, Future[Future | None]]] = deque()

count property

count: int

Returns the number of times the locked context has been entered.

acquire async

acquire() -> Self

Acquire a lock.

If the lock is reentrant the internal counter increments to share the lock.

Source code in src/async_kernel/caller.py
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
async def acquire(self) -> Self:
    """
    Acquire a lock.

    If the lock is reentrant the internal counter increments to share the lock.
    """
    if not self._reentrant and self.is_in_context():
        msg = "Already locked and not reentrant!"
        raise RuntimeError(msg)
    # Get the context.
    if not self._reentrant or not (ctx := self._ctx_var.get()):
        self._ctx_count = ctx = self._ctx_count + 1
        self._ctx_var.set(ctx)
    # Check if we can lock or re-enter an active lock.
    if (not self._releasing) and ((not self.count) or (self._reentrant and self.is_in_context())):
        self._count += 1
        self._ctx_current = ctx
        return self
    # Join the queue.
    k: tuple[int, Future[None | Future[Future[None] | None]]] = ctx, Future()
    self._queue.append(k)
    try:
        fut = await k[1]
    finally:
        if k in self._queue:
            self._queue.remove(k)
    if fut:
        self._ctx_current = ctx
        fut.set_result(None)
        if self._reentrant:
            for k in tuple(self._queue):
                if k[0] == ctx:
                    self._queue.remove(k)
                    k[1].set_result(None)
                    self._count += 1
        self._releasing = False
    return self

is_in_context

is_in_context() -> bool

Returns True if the current context has the lock.

Source code in src/async_kernel/caller.py
986
987
988
def is_in_context(self) -> bool:
    "Returns `True` if the current context has the lock."
    return bool(self._count and self._ctx_current and (self._ctx_var.get() == self._ctx_current))

release async

release() -> None

Decrement the internal counter.

If the current depth==1 the lock will be passed to the next queued or released if there isn't one.

Source code in src/async_kernel/caller.py
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
async def release(self) -> None:
    """
    Decrement the internal counter.

    If the current depth==1 the lock will be passed to the next queued or released if there isn't one.
    """
    if not self.is_in_context():
        raise InvalidStateError
    if self._count == 1 and self._queue and not self._releasing:
        self._releasing = True
        self._ctx_var.set(0)
        try:
            fut = Future()
            k = self._queue.popleft()
            k[1].set_result(fut)
            await k[1]
        except Exception:
            self._releasing = False
    else:
        self._count -= 1
    if self._count == 0:
        self._ctx_current = 0

Caller

A class to enable calling functions and coroutines between anyio event loops.

The Caller class provides a mechanism to execute functions and coroutines in a dedicated thread, leveraging AnyIO for asynchronous task management. It supports scheduling calls with delays, executing them immediately, and running them without a context. It also provides a means to manage a pool of threads for general purpose offloading of tasks.

The class maintains a registry of instances, associating each with a specific thread. It uses a task group to manage the execution of scheduled tasks and provides methods to start, stop, and query the status of the caller.

Methods:

  • __new__

    Create the Caller instance for the current thread or retrieve an existing instance

  • all_callers

    A classmethod to get a list of the callers.

  • as_completed

    A classmethod iterator to get Futures as they complete.

  • call_direct

    Schedule func to be called in caller's event loop directly.

  • call_later

    Schedule func to be called in caller's event loop copying the current context.

  • call_soon

    Schedule func to be called in caller's event loop copying the current context.

  • current_future

    A classmethod that returns the current future when called from inside a function scheduled by Caller.

  • get_instance

    A classmethod that gets an instance by name, possibly starting a new instance.

  • queue_call

    Queue the execution of func with the arguments *args in a queue unique to it (not thread-safe).

  • queue_close

    Close the execution queue associated with func (thread-safe).

  • queue_exists

    Returns True if an execution queue exists for func.

  • start_new

    Start a new thread with a new Caller open in the context of anyio event loop.

  • stop

    Stop the caller, cancelling all pending tasks and close the thread.

  • stop_all

    A classmethod to stop all un-protected callers.

  • to_thread

    A classmethod to call func in a separate thread see also to_thread_by_name.

  • to_thread_by_name

    A classmethod to call func in the thread specified by name.

  • wait

    A classmethod to wait for the futures given by items to complete.

Attributes:

MAX_BUFFER_SIZE class-attribute instance-attribute

MAX_BUFFER_SIZE = 1000

The default maximum_buffer_size used in queue_call.

MAX_IDLE_POOL_INSTANCES class-attribute instance-attribute

MAX_IDLE_POOL_INSTANCES = 10

The number of pool instances to leave idle (See alsoto_thread).

backend instance-attribute

backend: Backend

The anyio backend the caller is running in.

log instance-attribute

protected property

protected: bool

Returns True if the caller is protected from stopping.

running property

running

Returns True when the caller is available to run requests.

stopped property

stopped: bool

Returns True if the caller is stopped.

thread instance-attribute

thread: Thread

The thread in which the caller will run.

__new__

__new__(
    *,
    thread: Thread | None = None,
    log: LoggerAdapter | None = None,
    create: bool = False,
    protected: bool = False,
) -> Self

Create the Caller instance for the current thread or retrieve an existing instance by passing the thread.

The caller provides a way to execute synchronous code in a separate thread, and to call asynchronous code from synchronous code.

Parameters:

  • thread
    (Thread | None, default: None ) –
  • log
    (LoggerAdapter | None, default: None ) –

    Logger to use for logging messages.

  • create
    (bool, default: False ) –

    Whether to create a new instance if one does not exist for the current thread.

  • protected

    Whether the caller is protected from having its event loop closed.

Returns

Caller The Caller instance for the current thread.

Raises

RuntimeError If create is False and a Caller instance does not exist.

Source code in src/async_kernel/caller.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def __new__(
    cls,
    *,
    thread: threading.Thread | None = None,
    log: logging.LoggerAdapter | None = None,
    create: bool = False,
    protected: bool = False,
) -> Self:
    """
    Create the `Caller` instance for the current thread or retrieve an existing instance
    by passing the thread.

    The caller provides a way to execute synchronous code in a separate
    thread, and to call asynchronous code from synchronous code.

    Args:
        thread:
        log: Logger to use for logging messages.
        create: Whether to create a new instance if one does not exist for the current thread.
        protected : Whether the caller is protected from having its event loop closed.

    Returns
    -------
    Caller
        The `Caller` instance for the current thread.

    Raises
    ------
    RuntimeError
        If `create` is False and a `Caller` instance does not exist.
    """

    thread = thread or threading.current_thread()
    if not (inst := cls._instances.get(thread)):
        if not create:
            msg = f"A caller is not provided for {thread=}"
            raise RuntimeError(msg)
        inst = super().__new__(cls)
        inst.backend = Backend(sniffio.current_async_library())
        inst.thread = thread
        inst.log = log or logging.LoggerAdapter(logging.getLogger())
        inst._callers = deque()
        inst._callers_added = threading.Event()
        inst._protected = protected
        inst._queue_map = weakref.WeakKeyDictionary()
        cls._instances[thread] = inst
    return inst

all_callers classmethod

all_callers(running_only: bool = True) -> list[Caller]

A classmethod to get a list of the callers.

Parameters:

  • running_only
    (bool, default: True ) –

    Restrict the list to callers that are active (running in an async context).

Source code in src/async_kernel/caller.py
764
765
766
767
768
769
770
771
772
@classmethod
def all_callers(cls, running_only: bool = True) -> list[Caller]:
    """
    A classmethod to get a list of the callers.

    Args:
        running_only: Restrict the list to callers that are active (running in an async context).
    """
    return [caller for caller in Caller._instances.values() if caller._running or not running_only]

as_completed async classmethod

as_completed(
    items: Iterable[Future[T]] | AsyncGenerator[Future[T]],
    *,
    max_concurrent: NoValue | int = NoValue,
    shield: bool = False,
) -> AsyncGenerator[Future[T], Any]

A classmethod iterator to get Futures as they complete.

Parameters:

  • items
    (Iterable[Future[T]] | AsyncGenerator[Future[T]]) –

    Either a container with existing futures or generator of Futures.

  • max_concurrent
    (NoValue | int, default: NoValue ) –

    The maximum number of concurrent futures to monitor at a time. This is useful when items is a generator utilising Caller.to_thread. By default this will limit to Caller.MAX_IDLE_POOL_INSTANCES.

  • shield
    (bool, default: False ) –

    Shield existing items from cancellation.

Tip

  1. Pass a generator should you wish to limit the number future jobs when calling to_thread/to_task etc.
  2. Pass a set/list/tuple to ensure all get monitored at once.
Source code in src/async_kernel/caller.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
@classmethod
async def as_completed(
    cls,
    items: Iterable[Future[T]] | AsyncGenerator[Future[T]],
    *,
    max_concurrent: NoValue | int = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    shield: bool = False,
) -> AsyncGenerator[Future[T], Any]:
    """
    A classmethod iterator to get [Futures][async_kernel.caller.Future] as they complete.

    Args:
        items: Either a container with existing futures or generator of Futures.
        max_concurrent: The maximum number of concurrent futures to monitor at a time.
            This is useful when `items` is a generator utilising Caller.to_thread.
            By default this will limit to `Caller.MAX_IDLE_POOL_INSTANCES`.
        shield: Shield existing items from cancellation.

    !!! tip

        1. Pass a generator should you wish to limit the number future jobs when calling to_thread/to_task etc.
        2. Pass a set/list/tuple to ensure all get monitored at once.
    """
    event_future_ready = threading.Event()
    has_result: deque[Future[T]] = deque()
    futures: set[Future[T]] = set()
    done = False
    resume: Event | None = cast("anyio.Event | None", None)
    current_future = cls.current_future()
    if isinstance(items, set | list | tuple):
        max_concurrent_ = 0
    else:
        max_concurrent_ = cls.MAX_IDLE_POOL_INSTANCES if max_concurrent is NoValue else int(max_concurrent)

    def _on_done(fut: Future[T]) -> None:
        has_result.append(fut)
        event_future_ready.set()

    async def iter_items():
        nonlocal done, resume
        gen = items if isinstance(items, AsyncGenerator) else iter(items)
        try:
            while True:
                fut = await anext(gen) if isinstance(gen, AsyncGenerator) else next(gen)
                if fut is not current_future:
                    futures.add(fut)
                    if fut.done():
                        has_result.append(fut)
                        event_future_ready.set()
                    else:
                        fut.add_done_callback(_on_done)
                    if max_concurrent_ and len(futures) == max_concurrent_:
                        resume = anyio.Event()
                        await resume.wait()
        except (StopAsyncIteration, StopIteration):
            return
        finally:
            done = True
            event_future_ready.set()

    fut = cls().call_soon(iter_items)
    try:
        while futures or not done:
            if has_result:
                event_future_ready.clear()
                fut = has_result.popleft()
                futures.discard(fut)
                yield fut
                if resume:
                    resume.set()
            else:
                await wait_thread_event(event_future_ready)
    finally:
        fut.cancel()
        for fut in futures:
            fut.remove_done_callback(_on_done)
            if not shield:
                fut.cancel("Cancelled by as_completed")

call_direct

call_direct(func: Callable[P, Any], /, *args: args, **kwargs: kwargs) -> None

Schedule func to be called in caller's event loop directly.

The call is made without copying the context and does not use a future.

Parameters:

  • func
    (Callable[P, Any]) –

    The function (awaitables permitted, though discouraged).

  • *args
    (args, default: () ) –

    Arguments to use with func.

  • **kwargs
    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Warning

Use this method for lightweight calls only.

Source code in src/async_kernel/caller.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
def call_direct(self, func: Callable[P, Any], /, *args: P.args, **kwargs: P.kwargs) -> None:
    """
    Schedule func to be called in caller's event loop directly.

    The call is made without copying the context and does not use a future.

    Args:
        func: The function (awaitables permitted, though discouraged).
        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.

    ??? warning

        **Use this method for lightweight calls only.**
    """
    self._callers.append(functools.partial(func, *args, **kwargs))
    self._callers_added.set()

call_later

call_later(
    delay: float, func: Callable[P, T | Awaitable[T]], /, *args: args, **kwargs: kwargs
) -> Future[T]

Schedule func to be called in caller's event loop copying the current context.

Parameters:

  • func
    (Callable[P, T | Awaitable[T]]) –

    The function (awaitables permitted, though discouraged).

  • delay
    (float) –

    The minimum delay to add between submission and execution.

  • *args
    (args, default: () ) –

    Arguments to use with func.

  • **kwargs
    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Source code in src/async_kernel/caller.py
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
def call_later(
    self, delay: float, func: Callable[P, T | Awaitable[T]], /, *args: P.args, **kwargs: P.kwargs
) -> Future[T]:
    """
    Schedule func to be called in caller's event loop copying the current context.

    Args:
        func: The function (awaitables permitted, though discouraged).
        delay: The minimum delay to add between submission and execution.
        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.
    """
    if self._stopped:
        raise anyio.ClosedResourceError
    fut: Future[T] = Future(thread=self.thread)
    if threading.current_thread() is self.thread and (tg := self._taskgroup):
        tg.start_soon(self._wrap_call, fut, time.monotonic(), delay, func, args, kwargs)
    else:
        self._callers.append((contextvars.copy_context(), (fut, time.monotonic(), delay, func, args, kwargs)))
        self._callers_added.set()
    self._outstanding += 1
    return fut

call_soon

call_soon(
    func: Callable[P, T | Awaitable[T]], /, *args: args, **kwargs: kwargs
) -> Future[T]

Schedule func to be called in caller's event loop copying the current context.

Parameters:

  • func
    (Callable[P, T | Awaitable[T]]) –

    The function (awaitables permitted, though discouraged).

  • *args
    (args, default: () ) –

    Arguments to use with func.

  • **kwargs
    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Source code in src/async_kernel/caller.py
531
532
533
534
535
536
537
538
539
540
def call_soon(self, func: Callable[P, T | Awaitable[T]], /, *args: P.args, **kwargs: P.kwargs) -> Future[T]:
    """
    Schedule func to be called in caller's event loop copying the current context.

    Args:
        func: The function (awaitables permitted, though discouraged).
        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.
    """
    return self.call_later(0, func, *args, **kwargs)

current_future classmethod

current_future() -> Future[Any] | None

A classmethod that returns the current future when called from inside a function scheduled by Caller.

Source code in src/async_kernel/caller.py
759
760
761
762
@classmethod
def current_future(cls) -> Future[Any] | None:
    """A classmethod that returns the current future when called from inside a function scheduled by Caller."""
    return cls._future_var.get()

get_instance classmethod

get_instance(name: str | None = 'MainThread', *, create: bool = False) -> Self

A classmethod that gets an instance by name, possibly starting a new instance.

Parameters:

  • name
    (str | None, default: 'MainThread' ) –

    The name to identify the caller.

  • create
    (bool, default: False ) –

    Create a new instance if one with the corresponding name does not already exist.

Source code in src/async_kernel/caller.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
@classmethod
def get_instance(cls, name: str | None = "MainThread", *, create: bool = False) -> Self:
    """
    A classmethod that gets an instance by name, possibly starting a new instance.

    Args:
        name: The name to identify the caller.
        create: Create a new instance if one with the corresponding name does not already exist.
    """
    for thread in cls._instances:
        if thread.name == name:
            return cls._instances[thread]
    if create:
        return cls.start_new(name=name)
    msg = f"A Caller was not found for {name=}."
    raise RuntimeError(msg)

queue_call

queue_call(
    func: Callable[[*PosArgsT], Awaitable[Any]],
    /,
    *args: *PosArgsT,
    max_buffer_size: NoValue | int = NoValue,
    wait: Literal[True],
) -> CoroutineType[Any, Any, None]
queue_call(
    func: Callable[[*PosArgsT], Awaitable[Any]],
    /,
    *args: *PosArgsT,
    max_buffer_size: NoValue | int = NoValue,
    wait: Literal[False] | Any = False,
) -> None
queue_call(
    func: Callable[[*PosArgsT], Awaitable[Any]],
    /,
    *args: *PosArgsT,
    max_buffer_size: NoValue | int = NoValue,
    wait: bool = False,
) -> CoroutineType[Any, Any, None] | None

Queue the execution of func with the arguments *args in a queue unique to it (not thread-safe).

The args are added to a queue associated with the provided func. If queue does not already exist for func, a new queue is created with a specified maximum buffer size. The arguments are then sent to the queue, and an execute_loop coroutine is started to consume the queue and execute the function with the received arguments. Exceptions during execution are caught and logged.

Parameters:

  • func
    (Callable[[*PosArgsT], Awaitable[Any]]) –

    The asynchronous function to execute.

  • *args
    (*PosArgsT, default: () ) –

    The arguments to pass to the function.

  • max_buffer_size
    (NoValue | int, default: NoValue ) –

    The maximum buffer size for the queue. If NoValue, defaults to [async_kernel.Caller.MAX_BUFFER_SIZE].

  • wait
    (bool, default: False ) –

    Set as True to return a coroutine that will return once the request is sent. Use this to prevent experiencing exceptions if the buffer is full.

Info

The queue will stay open until one of the following occurs.

  1. It explicitly closed with the method queue_close.
  2. All strong references are lost the function/method.
Source code in src/async_kernel/caller.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
def queue_call(
    self,
    func: Callable[[*PosArgsT], Awaitable[Any]],
    /,
    *args: *PosArgsT,
    max_buffer_size: NoValue | int = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    wait: bool = False,
) -> CoroutineType[Any, Any, None] | None:
    """
    Queue the execution of `func` with the arguments `*args` in a queue unique to it (not thread-safe).

    The args are added to a queue associated with the provided `func`. If queue does not already exist for
    func, a new queue is created with a specified maximum buffer size. The arguments are then sent to the queue,
    and an `execute_loop` coroutine is started to consume the queue and execute the function with the received
    arguments.  Exceptions during execution are caught and logged.

    Args:
        func: The asynchronous function to execute.
        *args: The arguments to pass to the function.
        max_buffer_size: The maximum buffer size for the queue. If NoValue, defaults to [async_kernel.Caller.MAX_BUFFER_SIZE].
        wait: Set as True to return a coroutine that will return once the request is sent.
            Use this to prevent experiencing exceptions if the buffer is full.

    !!! info

        The queue will stay open until one of the following occurs.

        1. It explicitly closed with the method `queue_close`.
        1. All strong references are lost the function/method.

    """
    self._check_in_thread()
    if not (sender := self._queue_map.get(func)):
        max_buffer_size = self.MAX_BUFFER_SIZE if max_buffer_size is NoValue else max_buffer_size
        sender, queue = anyio.create_memory_object_stream[tuple[*PosArgsT]](max_buffer_size=max_buffer_size)

        async def execute_loop():
            try:
                with contextlib.suppress(anyio.get_cancelled_exc_class()):
                    async with queue as receive_stream:
                        async for args in receive_stream:
                            if func not in self._queue_map:
                                break
                            try:
                                await func(*args)
                            except Exception as e:
                                self.log.exception("Execution %f failed", func, exc_info=e)
            finally:
                self._queue_map.pop(func, None)

        self._queue_map[func] = sender
        self.call_soon(execute_loop)
    return sender.send(args) if wait else sender.send_nowait(args)

queue_close

queue_close(func: Callable) -> None

Close the execution queue associated with func (thread-safe).

Parameters:

  • func
    (Callable) –

    The queue of the function to close.

Source code in src/async_kernel/caller.py
639
640
641
642
643
644
645
646
647
def queue_close(self, func: Callable) -> None:
    """
    Close the execution queue associated with func (thread-safe).

    Args:
        func: The queue of the function to close.
    """
    if sender := self._queue_map.pop(func, None):
        self.call_direct(sender.close)

queue_exists

queue_exists(func: Callable) -> bool

Returns True if an execution queue exists for func.

Source code in src/async_kernel/caller.py
560
561
562
def queue_exists(self, func: Callable) -> bool:
    "Returns True if an execution queue exists for `func`."
    return func in self._queue_map

start_new classmethod

start_new(
    *,
    backend: Backend | NoValue = NoValue,
    log: LoggerAdapter | None = None,
    name: str | None = None,
    protected: bool = False,
    backend_options: dict | None | NoValue = NoValue,
) -> Self

Start a new thread with a new Caller open in the context of anyio event loop.

A new thread and caller is always started and ready to start new jobs as soon as it is returned.

Parameters:

  • backend
    (Backend | NoValue, default: NoValue ) –

    The backend to use for the anyio event loop (anyio.run). Defaults to the backend from where it is called.

  • log
    (LoggerAdapter | None, default: None ) –

    A logging adapter to use for debug messages.

  • protected
    (bool, default: False ) –

    When True, the caller will not shutdown unless shutdown is called with force=True.

  • backend_options
    (dict | None | NoValue, default: NoValue ) –

    Backend options for anyio.run. Defaults to Kernel.backend_options.

Source code in src/async_kernel/caller.py
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
@classmethod
def start_new(
    cls,
    *,
    backend: Backend | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
    log: logging.LoggerAdapter | None = None,
    name: str | None = None,
    protected: bool = False,
    backend_options: dict | None | NoValue = NoValue,  # pyright: ignore[reportInvalidTypeForm]
) -> Self:
    """
    Start a new thread with a new Caller open in the context of anyio event loop.

    A new thread and caller is always started and ready to start new jobs as soon as it is returned.

    Args:
        backend: The backend to use for the anyio event loop (anyio.run). Defaults to the backend from where it is called.
        log: A logging adapter to use for debug messages.
        protected: When True, the caller will not shutdown unless shutdown is called with `force=True`.
        backend_options: Backend options for [anyio.run][]. Defaults to `Kernel.backend_options`.
    """

    def anyio_run_caller() -> None:
        async def caller_context() -> None:
            nonlocal caller
            async with cls(log=log, create=True, protected=protected) as caller:
                ready_event.set()
                with contextlib.suppress(anyio.get_cancelled_exc_class()):
                    await anyio.sleep_forever()

        anyio.run(caller_context, backend=backend_, backend_options=backend_options)

    assert name not in [t.name for t in cls._instances], f"{name=} already exists!"
    backend_ = Backend(backend if backend is not NoValue else sniffio.current_async_library())
    if backend_options is NoValue:
        backend_options = async_kernel.Kernel().anyio_backend_options.get(backend_)
    caller = cast("Self", object)
    ready_event = threading.Event()
    thread = threading.Thread(target=anyio_run_caller, name=name, daemon=True)
    thread.start()
    ready_event.wait()
    assert isinstance(caller, cls)
    return caller

stop

stop(*, force=False) -> None

Stop the caller, cancelling all pending tasks and close the thread.

If the instance is protected, this is no-op unless force is used.

Source code in src/async_kernel/caller.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def stop(self, *, force=False) -> None:
    """
    Stop the caller, cancelling all pending tasks and close the thread.

    If the instance is protected, this is no-op unless force is used.
    """
    if self._protected and not force:
        return
    self._stopped = True
    for sender in self._queue_map.values():
        sender.close()
    self._queue_map.clear()
    self._callers_added.set()
    self._instances.pop(self.thread, None)
    if self in self._to_thread_pool:
        self._to_thread_pool.remove(self)
    if self.thread is not threading.current_thread():
        self._stopped_event.wait()

stop_all classmethod

stop_all(*, _stop_protected: bool = False) -> None

A classmethod to stop all un-protected callers.

Parameters:

  • _stop_protected
    (bool, default: False ) –

    A private argument to shutdown protected instances.

Source code in src/async_kernel/caller.py
649
650
651
652
653
654
655
656
657
658
@classmethod
def stop_all(cls, *, _stop_protected: bool = False) -> None:
    """
    A classmethod to stop all un-protected callers.

    Args:
        _stop_protected: A private argument to shutdown protected instances.
    """
    for caller in tuple(reversed(cls._instances.values())):
        caller.stop(force=_stop_protected)

to_thread classmethod

to_thread(
    func: Callable[P, T | Awaitable[T]], /, *args: args, **kwargs: kwargs
) -> Future[T]

A classmethod to call func in a separate thread see also to_thread_by_name.

Source code in src/async_kernel/caller.py
677
678
679
680
@classmethod
def to_thread(cls, func: Callable[P, T | Awaitable[T]], /, *args: P.args, **kwargs: P.kwargs) -> Future[T]:
    """A classmethod to call func in a separate thread see also [to_thread_by_name][async_kernel.Caller.to_thread_by_name]."""
    return cls.to_thread_by_name(None, func, *args, **kwargs)

to_thread_by_name classmethod

to_thread_by_name(
    name: str | None,
    func: Callable[P, T | Awaitable[T]],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

A classmethod to call func in the thread specified by name.

Parameters:

  • name
    (str | None) –

    The name of the Caller. A new Caller is created if an instance corresponding to name 1.


    1. 'MainThread' is special name corresponding to the main thread. A RuntimeError will be raised if a Caller does not exist for the main thread. 

  • func
    (Callable[P, T | Awaitable[T]]) –

    The function to call. If it returns an awaitable, the awaitable will be awaited. Passing a coroutine as func discourage, but will be awaited.

  • *args
    (args, default: () ) –

    Arguments to use with func.

  • **kwargs
    (kwargs, default: {} ) –

    Keyword arguments to use with func.

Returns:

  • Future[T]

    A future that can be awaited for the result of func.

Source code in src/async_kernel/caller.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
@classmethod
def to_thread_by_name(
    cls, name: str | None, func: Callable[P, T | Awaitable[T]], /, *args: P.args, **kwargs: P.kwargs
) -> Future[T]:
    """
    A classmethod to call func in the thread specified by name.

    Args:
        name: The name of the `Caller`. A new `Caller` is created if an instance corresponding to name  [^notes].

            [^notes]:  'MainThread' is special name corresponding to the main thread.
                A `RuntimeError` will be raised if a Caller does not exist for the main thread.

        func: The function to call. If it returns an awaitable, the awaitable will be awaited.
            Passing a coroutine as `func` discourage, but will be awaited.

        *args: Arguments to use with func.
        **kwargs: Keyword arguments to use with func.

    Returns:
        A future that can be awaited for the  result of func.
    """
    caller = (
        cls._to_thread_pool.popleft()
        if not name and cls._to_thread_pool
        else cls.get_instance(name=name, create=True)
    )
    fut = caller.call_soon(func, *args, **kwargs)
    if not name:
        cls._pool_instances.add(caller)
        fut.add_done_callback(caller._to_thread_on_done)
    return fut

wait async classmethod

wait(
    items: Iterable[Future[T]],
    *,
    timeout: float | None = None,
    return_when: Literal[
        "FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED"
    ] = "ALL_COMPLETED",
) -> tuple[set[T], set[Future[T]]]

A classmethod to wait for the futures given by items to complete.

Returns two sets of the futures: (done, pending).

Example

done, pending = await asyncio.wait(items)

Info

  • This does not raise a TimeoutError!
  • Futures that aren't done when the timeout occurs are returned in the second set.
Source code in src/async_kernel/caller.py
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
@classmethod
async def wait(
    cls,
    items: Iterable[Future[T]],
    *,
    timeout: float | None = None,
    return_when: Literal["FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED"] = "ALL_COMPLETED",
) -> tuple[set[T], set[Future[T]]]:
    """
    A classmethod to wait for the futures given by items to complete.

    Returns two sets of the futures: (done, pending).

    !!! example

        ```python
        done, pending = await asyncio.wait(items)
        ```

    !!! info

        - This does not raise a TimeoutError!
        - Futures that aren't done when the timeout occurs are returned in the second set.
    """
    done = set()
    if pending := set(items):
        with anyio.move_on_after(timeout):
            async for fut in cls.as_completed(items, shield=True):
                pending.discard(fut)
                done.add(fut)
                if return_when == "FIRST_COMPLETED":
                    break
                if return_when == "FIRST_EXCEPTION" and (fut.cancelled() or fut.exception()):
                    break
    return done, pending

Future

Future(thread: Thread | None = None)

A class representing a future result modelled on asyncio.Future.

This class provides an anyio compatible Future primitive. It is designed to work with Caller to enable thread-safe calling, setting and awaiting execution results.

Methods:

  • add_done_callback

    Add a callback for when the callback is done (not thread-safe).

  • cancel

    Cancel the Future and schedule callbacks (thread-safe using Caller).

  • cancelled

    Return True if the Future is cancelled.

  • done

    Returns True if the Future is done.

  • exception

    Return the exception that was set on the Future.

  • get_caller

    The the Caller the Future's thread corresponds.

  • remove_done_callback

    Remove all instances of a callback from the callbacks list.

  • result

    Return the result of the Future.

  • set_cancel_scope

    Provide a cancel scope for cancellation.

  • set_exception

    Set the exception (thread-safe using Caller).

  • set_result

    Set the result (thread-safe using Caller).

  • wait

    Wait for future to be done (thread-safe) returning the result if specified.

  • wait_sync

    Synchronously wait for future to be done (thread-safe) returning the result if specified.

Attributes:

  • thread (Thread) –

    The thread in which the result is targeted to run.

Source code in src/async_kernel/caller.py
72
73
74
75
76
77
78
79
80
def __init__(self, thread: threading.Thread | None = None) -> None:
    self._event_done = threading.Event()
    self._exception = None
    self._anyio_event_done = None
    self.thread = thread or threading.current_thread()
    self._done_callbacks = []
    self._cancelled = False
    self._cancel_scope: anyio.CancelScope | None = None
    self._setting_value = False

thread instance-attribute

thread: Thread = thread or current_thread()

The thread in which the result is targeted to run.

add_done_callback

add_done_callback(fn: Callable[[Self], object]) -> None

Add a callback for when the callback is done (not thread-safe).

If the Future is already done it will be scheduled for calling.

The result of the future and done callbacks are always called for the futures thread. Callbacks are called in the reverse order in which they were added in the owning thread.

Source code in src/async_kernel/caller.py
196
197
198
199
200
201
202
203
204
205
206
207
208
def add_done_callback(self, fn: Callable[[Self], object]) -> None:
    """
    Add a callback for when the callback is done (not thread-safe).

    If the Future is already done it will be scheduled for calling.

    The result of the future and done callbacks are always called for the futures thread.
    Callbacks are called in the reverse order in which they were added in the owning thread.
    """
    if not self.done():
        self._done_callbacks.append(fn)
    else:
        self.get_caller().call_direct(fn, self)

cancel

cancel(msg: str | None = None) -> bool

Cancel the Future and schedule callbacks (thread-safe using Caller).

Parameters:

  • msg
    (str | None, default: None ) –

    The message to use when raising a FutureCancelledError.

Returns if it has been cancelled.

Source code in src/async_kernel/caller.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def cancel(self, msg: str | None = None) -> bool:
    """
    Cancel the Future and schedule callbacks (thread-safe using Caller).

    Args:
        msg: The message to use when raising a FutureCancelledError.

    Returns if it has been cancelled.
    """
    if not self.done():
        if msg and isinstance(self._cancelled, str):
            msg = f"{self._cancelled}\n{msg}"
        self._cancelled = msg or self._cancelled or True
        if scope := self._cancel_scope:
            if threading.current_thread() is self.thread:
                scope.cancel()
            else:
                Caller(thread=self.thread).call_direct(self.cancel)
    return self.cancelled()

cancelled

cancelled() -> bool

Return True if the Future is cancelled.

Source code in src/async_kernel/caller.py
230
231
232
def cancelled(self) -> bool:
    """Return True if the Future is cancelled."""
    return bool(self._cancelled)

done

done() -> bool

Returns True if the Future is done.

Done means either that a result / exception is available.

Source code in src/async_kernel/caller.py
189
190
191
192
193
194
def done(self) -> bool:
    """
    Returns True if the Future is done.

    Done means either that a result / exception is available."""
    return self._event_done.is_set()

exception

exception() -> BaseException | None

Return the exception that was set on the Future.

If the Future has been cancelled, this method raises a FutureCancelledError exception.

If the Future isn't done yet, this method raises an InvalidStateError exception.

Source code in src/async_kernel/caller.py
248
249
250
251
252
253
254
255
256
257
258
259
260
def exception(self) -> BaseException | None:
    """
    Return the exception that was set on the Future.

    If the Future has been cancelled, this method raises a [FutureCancelledError][async_kernel.caller.FutureCancelledError] exception.

    If the Future isn't done yet, this method raises an [InvalidStateError][async_kernel.caller.InvalidStateError] exception.
    """
    if self._cancelled:
        raise self._make_cancelled_error()
    if not self.done():
        raise InvalidStateError
    return self._exception

get_caller

get_caller() -> Caller

The the Caller the Future's thread corresponds.

Source code in src/async_kernel/caller.py
280
281
282
def get_caller(self) -> Caller:
    "The the Caller the Future's thread corresponds."
    return Caller(thread=self.thread)

remove_done_callback

remove_done_callback(fn: Callable[[Self], object]) -> int

Remove all instances of a callback from the callbacks list.

Returns the number of callbacks removed.

Source code in src/async_kernel/caller.py
262
263
264
265
266
267
268
269
270
271
272
def remove_done_callback(self, fn: Callable[[Self], object], /) -> int:
    """
    Remove all instances of a callback from the callbacks list.

    Returns the number of callbacks removed.
    """
    n = 0
    while fn in self._done_callbacks:
        n += 1
        self._done_callbacks.remove(fn)
    return n

result

result() -> T

Return the result of the Future.

If the Future has been cancelled, this method raises a FutureCancelledError exception.

If the Future isn't done yet, this method raises an InvalidStateError exception.

Source code in src/async_kernel/caller.py
234
235
236
237
238
239
240
241
242
243
244
245
246
def result(self) -> T:
    """
    Return the result of the Future.

    If the Future has been cancelled, this method raises a [FutureCancelledError][async_kernel.caller.FutureCancelledError] exception.

    If the Future isn't done yet, this method raises an [InvalidStateError][async_kernel.caller.InvalidStateError] exception.
    """
    if not self.cancelled() and not self.done():
        raise InvalidStateError
    if e := self.exception():
        raise e
    return self._result

set_cancel_scope

set_cancel_scope(scope: CancelScope) -> None

Provide a cancel scope for cancellation.

Source code in src/async_kernel/caller.py
274
275
276
277
278
def set_cancel_scope(self, scope: anyio.CancelScope) -> None:
    "Provide a cancel scope for cancellation."
    if self._cancelled or self._cancel_scope:
        raise InvalidStateError
    self._cancel_scope = scope

set_exception

set_exception(exception: BaseException) -> None

Set the exception (thread-safe using Caller).

Source code in src/async_kernel/caller.py
185
186
187
def set_exception(self, exception: BaseException) -> None:
    "Set the exception (thread-safe using Caller)."
    self._set_value("exception", exception)

set_result

set_result(value: T) -> None

Set the result (thread-safe using Caller).

Source code in src/async_kernel/caller.py
181
182
183
def set_result(self, value: T) -> None:
    "Set the result (thread-safe using Caller)."
    self._set_value("result", value)

wait async

wait(
    *,
    timeout: float | None = ...,
    shield: bool = False | ...,
    result: Literal[True] = True,
) -> T
wait(*, timeout: float | None = ..., shield: bool = ..., result: Literal[False]) -> None
wait(
    *, timeout: float | None = None, shield: bool = False, result: bool = True
) -> T | None

Wait for future to be done (thread-safe) returning the result if specified.

Parameters:

  • timeout
    (float | None, default: None ) –

    Timeout in seconds.

  • shield
    (bool, default: False ) –

    Shield the future from cancellation.

  • result
    (bool, default: True ) –

    Whether the result should be returned.

Source code in src/async_kernel/caller.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def wait(self, *, timeout: float | None = None, shield: bool = False, result: bool = True) -> T | None:
    """
    Wait for future to be done (thread-safe) returning the result if specified.

    Args:
        timeout: Timeout in seconds.
        shield: Shield the future from cancellation.
        result: Whether the result should be returned.
    """
    try:
        if not self.done():
            with anyio.fail_after(timeout):
                if threading.current_thread() is self.thread:
                    if not self._anyio_event_done:
                        self._anyio_event_done = anyio.Event()
                    await self._anyio_event_done.wait()
                else:
                    await wait_thread_event(self._event_done)
        return self.result() if result else None
    finally:
        if not self.done() and not shield:
            self.cancel("Cancelled with waiter cancellation.")

wait_sync

wait_sync(
    *,
    timeout: float | None = ...,
    shield: bool = False | ...,
    result: Literal[True] = True,
) -> T
wait_sync(
    *, timeout: float | None = ..., shield: bool = ..., result: Literal[False]
) -> None
wait_sync(
    *, timeout: float | None = None, shield: bool = False, result: bool = True
) -> T | None

Synchronously wait for future to be done (thread-safe) returning the result if specified.

Parameters:

  • timeout
    (float | None, default: None ) –

    Timeout in seconds.

  • shield
    (bool, default: False ) –

    Shield cancellation.

  • result
    (bool, default: True ) –

    Whether the result should be returned.

Source code in src/async_kernel/caller.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def wait_sync(self, *, timeout: float | None = None, shield: bool = False, result: bool = True) -> T | None:
    """
    Synchronously wait for future to be done (thread-safe) returning the result if specified.

    Args:
        timeout: Timeout in seconds.
        shield: Shield cancellation.
        result: Whether the result should be returned.
    """
    if self.thread in {threading.current_thread(), threading.main_thread()}:
        raise RuntimeError
    self._event_done.wait(timeout)
    if not self.done():
        if not shield:
            self.cancel("timeout from wait_sync")
        raise TimeoutError
    return self.result() if result else None

FutureCancelledError

Used to indicate a Future is cancelled.

InvalidStateError

An invalid state of a Future.

ReentrantAsyncLock

ReentrantAsyncLock()

Implements a Reentrant asynchronous lock compatible with async_kernel.caller.Caller.

Example

# Inside a coroutine running inside a thread where a [asyncio.caller.Caller][] instance is running.

lock = ReentrantAsyncLock(reentrant=True)  # a reentrant lock
async with lock:
    async with lock:
        Caller().to_thread(...)  # The lock is shared with the thread.

Note

  • The lock context can be exitied in any order.
  • A 'reentrant' lock can release control to another context and then re-enter later for tasks or threads called from a locked thread maintaining the same reentrant context.

Methods:

  • acquire

    Acquire a lock.

  • is_in_context

    Returns True if the current context has the lock.

  • release

    Decrement the internal counter.

Attributes:

  • count (int) –

    Returns the number of times the locked context has been entered.

Source code in src/async_kernel/caller.py
905
906
907
def __init__(self):
    self._ctx_var: contextvars.ContextVar[int] = contextvars.ContextVar(f"Lock:{id(self)}", default=0)
    self._queue: deque[tuple[int, Future[Future | None]]] = deque()

count property

count: int

Returns the number of times the locked context has been entered.

acquire async

acquire() -> Self

Acquire a lock.

If the lock is reentrant the internal counter increments to share the lock.

Source code in src/async_kernel/caller.py
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
async def acquire(self) -> Self:
    """
    Acquire a lock.

    If the lock is reentrant the internal counter increments to share the lock.
    """
    if not self._reentrant and self.is_in_context():
        msg = "Already locked and not reentrant!"
        raise RuntimeError(msg)
    # Get the context.
    if not self._reentrant or not (ctx := self._ctx_var.get()):
        self._ctx_count = ctx = self._ctx_count + 1
        self._ctx_var.set(ctx)
    # Check if we can lock or re-enter an active lock.
    if (not self._releasing) and ((not self.count) or (self._reentrant and self.is_in_context())):
        self._count += 1
        self._ctx_current = ctx
        return self
    # Join the queue.
    k: tuple[int, Future[None | Future[Future[None] | None]]] = ctx, Future()
    self._queue.append(k)
    try:
        fut = await k[1]
    finally:
        if k in self._queue:
            self._queue.remove(k)
    if fut:
        self._ctx_current = ctx
        fut.set_result(None)
        if self._reentrant:
            for k in tuple(self._queue):
                if k[0] == ctx:
                    self._queue.remove(k)
                    k[1].set_result(None)
                    self._count += 1
        self._releasing = False
    return self

is_in_context

is_in_context() -> bool

Returns True if the current context has the lock.

Source code in src/async_kernel/caller.py
986
987
988
def is_in_context(self) -> bool:
    "Returns `True` if the current context has the lock."
    return bool(self._count and self._ctx_current and (self._ctx_var.get() == self._ctx_current))

release async

release() -> None

Decrement the internal counter.

If the current depth==1 the lock will be passed to the next queued or released if there isn't one.

Source code in src/async_kernel/caller.py
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
async def release(self) -> None:
    """
    Decrement the internal counter.

    If the current depth==1 the lock will be passed to the next queued or released if there isn't one.
    """
    if not self.is_in_context():
        raise InvalidStateError
    if self._count == 1 and self._queue and not self._releasing:
        self._releasing = True
        self._ctx_var.set(0)
        try:
            fut = Future()
            k = self._queue.popleft()
            k[1].set_result(fut)
            await k[1]
        except Exception:
            self._releasing = False
    else:
        self._count -= 1
    if self._count == 0:
        self._ctx_current = 0