Caller
async_kernel.caller ¶
Classes:
-
AsyncLock
–Implements a mutex asynchronous lock that is compatible with async_kernel.caller.Caller.
-
Caller
–A class to enable calling functions and coroutines between anyio event loops.
-
Future
–A class representing a future result modelled on asyncio.Future.
-
FutureCancelledError
–Used to indicate a
Future
is cancelled. -
InvalidStateError
–An invalid state of a Future.
-
ReentrantAsyncLock
–Implements a Reentrant asynchronous lock compatible with async_kernel.caller.Caller.
AsyncLock ¶
AsyncLock()
Implements a mutex asynchronous lock that is compatible with async_kernel.caller.Caller.
Note
- Attempting to lock a 'mutuex' configured lock that is locked will raise a RuntimeError.
Methods:
-
acquire
–Acquire a lock.
-
is_in_context
–Returns
True
if the current context has the lock. -
release
–Decrement the internal counter.
Attributes:
Source code in src/async_kernel/caller.py
905 906 907 |
|
acquire
async
¶
acquire() -> Self
Acquire a lock.
If the lock is reentrant the internal counter increments to share the lock.
Source code in src/async_kernel/caller.py
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 |
|
is_in_context ¶
is_in_context() -> bool
Returns True
if the current context has the lock.
Source code in src/async_kernel/caller.py
986 987 988 |
|
release
async
¶
release() -> None
Decrement the internal counter.
If the current depth==1 the lock will be passed to the next queued or released if there isn't one.
Source code in src/async_kernel/caller.py
963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 |
|
Caller ¶
A class to enable calling functions and coroutines between anyio event loops.
The Caller
class provides a mechanism to execute functions and coroutines
in a dedicated thread, leveraging AnyIO for asynchronous task management.
It supports scheduling calls with delays, executing them immediately,
and running them without a context. It also provides a means to manage
a pool of threads for general purpose offloading of tasks.
The class maintains a registry of instances, associating each with a specific thread. It uses a task group to manage the execution of scheduled tasks and provides methods to start, stop, and query the status of the caller.
Methods:
-
__new__
–Create the
Caller
instance for the current thread or retrieve an existing instance -
all_callers
–A classmethod to get a list of the callers.
-
as_completed
–A classmethod iterator to get Futures as they complete.
-
call_direct
–Schedule func to be called in caller's event loop directly.
-
call_later
–Schedule func to be called in caller's event loop copying the current context.
-
call_soon
–Schedule func to be called in caller's event loop copying the current context.
-
current_future
–A classmethod that returns the current future when called from inside a function scheduled by Caller.
-
get_instance
–A classmethod that gets an instance by name, possibly starting a new instance.
-
queue_call
–Queue the execution of
func
with the arguments*args
in a queue unique to it (not thread-safe). -
queue_close
–Close the execution queue associated with func (thread-safe).
-
queue_exists
–Returns True if an execution queue exists for
func
. -
start_new
–Start a new thread with a new Caller open in the context of anyio event loop.
-
stop
–Stop the caller, cancelling all pending tasks and close the thread.
-
stop_all
–A classmethod to stop all un-protected callers.
-
to_thread
–A classmethod to call func in a separate thread see also to_thread_by_name.
-
to_thread_by_name
–A classmethod to call func in the thread specified by name.
-
wait
–A classmethod to wait for the futures given by items to complete.
Attributes:
-
MAX_BUFFER_SIZE
–The default maximum_buffer_size used in queue_call.
-
MAX_IDLE_POOL_INSTANCES
–The number of
pool
instances to leave idle (See alsoto_thread). -
backend
(Backend
) –The
anyio
backend the caller is running in. -
log
(LoggerAdapter[Any]
) – -
protected
(bool
) –Returns
True
if the caller is protected from stopping. -
running
–Returns
True
when the caller is available to run requests. -
stopped
(bool
) –Returns
True
if the caller is stopped. -
thread
(Thread
) –The thread in which the caller will run.
MAX_BUFFER_SIZE
class-attribute
instance-attribute
¶
MAX_BUFFER_SIZE = 1000
The default maximum_buffer_size used in queue_call.
MAX_IDLE_POOL_INSTANCES
class-attribute
instance-attribute
¶
MAX_IDLE_POOL_INSTANCES = 10
The number of pool
instances to leave idle (See alsoto_thread).
__new__ ¶
__new__(
*,
thread: Thread | None = None,
log: LoggerAdapter | None = None,
create: bool = False,
protected: bool = False,
) -> Self
Create the Caller
instance for the current thread or retrieve an existing instance
by passing the thread.
The caller provides a way to execute synchronous code in a separate thread, and to call asynchronous code from synchronous code.
Parameters:
-
thread
¶Thread | None
, default:None
) – -
log
¶LoggerAdapter | None
, default:None
) –Logger to use for logging messages.
-
create
¶bool
, default:False
) –Whether to create a new instance if one does not exist for the current thread.
-
protected
¶Whether the caller is protected from having its event loop closed.
Returns¶
Caller
The Caller
instance for the current thread.
Raises¶
RuntimeError
If create
is False and a Caller
instance does not exist.
Source code in src/async_kernel/caller.py
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
|
all_callers
classmethod
¶
all_callers(running_only: bool = True) -> list[Caller]
A classmethod to get a list of the callers.
Parameters:
-
running_only
¶bool
, default:True
) –Restrict the list to callers that are active (running in an async context).
Source code in src/async_kernel/caller.py
764 765 766 767 768 769 770 771 772 |
|
as_completed
async
classmethod
¶
as_completed(
items: Iterable[Future[T]] | AsyncGenerator[Future[T]],
*,
max_concurrent: NoValue | int = NoValue,
shield: bool = False,
) -> AsyncGenerator[Future[T], Any]
A classmethod iterator to get Futures as they complete.
Parameters:
-
items
¶Iterable[Future[T]] | AsyncGenerator[Future[T]]
) –Either a container with existing futures or generator of Futures.
-
max_concurrent
¶NoValue | int
, default:NoValue
) –The maximum number of concurrent futures to monitor at a time. This is useful when
items
is a generator utilising Caller.to_thread. By default this will limit toCaller.MAX_IDLE_POOL_INSTANCES
. -
shield
¶bool
, default:False
) –Shield existing items from cancellation.
Tip
- Pass a generator should you wish to limit the number future jobs when calling to_thread/to_task etc.
- Pass a set/list/tuple to ensure all get monitored at once.
Source code in src/async_kernel/caller.py
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 |
|
call_direct ¶
Schedule func to be called in caller's event loop directly.
The call is made without copying the context and does not use a future.
Parameters:
-
func
¶Callable[P, Any]
) –The function (awaitables permitted, though discouraged).
-
*args
¶args
, default:()
) –Arguments to use with func.
-
**kwargs
¶kwargs
, default:{}
) –Keyword arguments to use with func.
Warning
Use this method for lightweight calls only.
Source code in src/async_kernel/caller.py
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 |
|
call_later ¶
call_later(
delay: float, func: Callable[P, T | Awaitable[T]], /, *args: args, **kwargs: kwargs
) -> Future[T]
Schedule func to be called in caller's event loop copying the current context.
Parameters:
-
func
¶Callable[P, T | Awaitable[T]]
) –The function (awaitables permitted, though discouraged).
-
delay
¶float
) –The minimum delay to add between submission and execution.
-
*args
¶args
, default:()
) –Arguments to use with func.
-
**kwargs
¶kwargs
, default:{}
) –Keyword arguments to use with func.
Source code in src/async_kernel/caller.py
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 |
|
call_soon ¶
Schedule func to be called in caller's event loop copying the current context.
Parameters:
-
func
¶Callable[P, T | Awaitable[T]]
) –The function (awaitables permitted, though discouraged).
-
*args
¶args
, default:()
) –Arguments to use with func.
-
**kwargs
¶kwargs
, default:{}
) –Keyword arguments to use with func.
Source code in src/async_kernel/caller.py
531 532 533 534 535 536 537 538 539 540 |
|
current_future
classmethod
¶
A classmethod that returns the current future when called from inside a function scheduled by Caller.
Source code in src/async_kernel/caller.py
759 760 761 762 |
|
get_instance
classmethod
¶
A classmethod that gets an instance by name, possibly starting a new instance.
Parameters:
-
name
¶str | None
, default:'MainThread'
) –The name to identify the caller.
-
create
¶bool
, default:False
) –Create a new instance if one with the corresponding name does not already exist.
Source code in src/async_kernel/caller.py
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 |
|
queue_call ¶
queue_call(
func: Callable[[*PosArgsT], Awaitable[Any]],
/,
*args: *PosArgsT,
max_buffer_size: NoValue | int = NoValue,
wait: bool = False,
) -> CoroutineType[Any, Any, None] | None
Queue the execution of func
with the arguments *args
in a queue unique to it (not thread-safe).
The args are added to a queue associated with the provided func
. If queue does not already exist for
func, a new queue is created with a specified maximum buffer size. The arguments are then sent to the queue,
and an execute_loop
coroutine is started to consume the queue and execute the function with the received
arguments. Exceptions during execution are caught and logged.
Parameters:
-
func
¶Callable[[*PosArgsT], Awaitable[Any]]
) –The asynchronous function to execute.
-
*args
¶*PosArgsT
, default:()
) –The arguments to pass to the function.
-
max_buffer_size
¶NoValue | int
, default:NoValue
) –The maximum buffer size for the queue. If NoValue, defaults to [async_kernel.Caller.MAX_BUFFER_SIZE].
-
wait
¶bool
, default:False
) –Set as True to return a coroutine that will return once the request is sent. Use this to prevent experiencing exceptions if the buffer is full.
Info
The queue will stay open until one of the following occurs.
- It explicitly closed with the method
queue_close
. - All strong references are lost the function/method.
Source code in src/async_kernel/caller.py
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 |
|
queue_close ¶
Close the execution queue associated with func (thread-safe).
Parameters:
Source code in src/async_kernel/caller.py
639 640 641 642 643 644 645 646 647 |
|
queue_exists ¶
Returns True if an execution queue exists for func
.
Source code in src/async_kernel/caller.py
560 561 562 |
|
start_new
classmethod
¶
start_new(
*,
backend: Backend | NoValue = NoValue,
log: LoggerAdapter | None = None,
name: str | None = None,
protected: bool = False,
backend_options: dict | None | NoValue = NoValue,
) -> Self
Start a new thread with a new Caller open in the context of anyio event loop.
A new thread and caller is always started and ready to start new jobs as soon as it is returned.
Parameters:
-
backend
¶Backend | NoValue
, default:NoValue
) –The backend to use for the anyio event loop (anyio.run). Defaults to the backend from where it is called.
-
log
¶LoggerAdapter | None
, default:None
) –A logging adapter to use for debug messages.
-
protected
¶bool
, default:False
) –When True, the caller will not shutdown unless shutdown is called with
force=True
. -
backend_options
¶dict | None | NoValue
, default:NoValue
) –Backend options for anyio.run. Defaults to
Kernel.backend_options
.
Source code in src/async_kernel/caller.py
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 |
|
stop ¶
stop(*, force=False) -> None
Stop the caller, cancelling all pending tasks and close the thread.
If the instance is protected, this is no-op unless force is used.
Source code in src/async_kernel/caller.py
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 |
|
stop_all
classmethod
¶
stop_all(*, _stop_protected: bool = False) -> None
A classmethod to stop all un-protected callers.
Parameters:
Source code in src/async_kernel/caller.py
649 650 651 652 653 654 655 656 657 658 |
|
to_thread
classmethod
¶
A classmethod to call func in a separate thread see also to_thread_by_name.
Source code in src/async_kernel/caller.py
677 678 679 680 |
|
to_thread_by_name
classmethod
¶
to_thread_by_name(
name: str | None,
func: Callable[P, T | Awaitable[T]],
/,
*args: args,
**kwargs: kwargs,
) -> Future[T]
A classmethod to call func in the thread specified by name.
Parameters:
-
name
¶str | None
) – -
func
¶Callable[P, T | Awaitable[T]]
) –The function to call. If it returns an awaitable, the awaitable will be awaited. Passing a coroutine as
func
discourage, but will be awaited. -
*args
¶args
, default:()
) –Arguments to use with func.
-
**kwargs
¶kwargs
, default:{}
) –Keyword arguments to use with func.
Returns:
-
Future[T]
–A future that can be awaited for the result of func.
Source code in src/async_kernel/caller.py
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 |
|
wait
async
classmethod
¶
wait(
items: Iterable[Future[T]],
*,
timeout: float | None = None,
return_when: Literal[
"FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED"
] = "ALL_COMPLETED",
) -> tuple[set[T], set[Future[T]]]
A classmethod to wait for the futures given by items to complete.
Returns two sets of the futures: (done, pending).
Example
done, pending = await asyncio.wait(items)
Info
- This does not raise a TimeoutError!
- Futures that aren't done when the timeout occurs are returned in the second set.
Source code in src/async_kernel/caller.py
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 |
|
Future ¶
Future(thread: Thread | None = None)
A class representing a future result modelled on asyncio.Future.
This class provides an anyio compatible Future primitive. It is designed
to work with Caller
to enable thread-safe calling, setting and awaiting
execution results.
Methods:
-
add_done_callback
–Add a callback for when the callback is done (not thread-safe).
-
cancel
–Cancel the Future and schedule callbacks (thread-safe using Caller).
-
cancelled
–Return True if the Future is cancelled.
-
done
–Returns True if the Future is done.
-
exception
–Return the exception that was set on the Future.
-
get_caller
–The the Caller the Future's thread corresponds.
-
remove_done_callback
–Remove all instances of a callback from the callbacks list.
-
result
–Return the result of the Future.
-
set_cancel_scope
–Provide a cancel scope for cancellation.
-
set_exception
–Set the exception (thread-safe using Caller).
-
set_result
–Set the result (thread-safe using Caller).
-
wait
–Wait for future to be done (thread-safe) returning the result if specified.
-
wait_sync
–Synchronously wait for future to be done (thread-safe) returning the result if specified.
Attributes:
Source code in src/async_kernel/caller.py
72 73 74 75 76 77 78 79 80 |
|
thread
instance-attribute
¶
thread: Thread = thread or current_thread()
The thread in which the result is targeted to run.
add_done_callback ¶
Add a callback for when the callback is done (not thread-safe).
If the Future is already done it will be scheduled for calling.
The result of the future and done callbacks are always called for the futures thread. Callbacks are called in the reverse order in which they were added in the owning thread.
Source code in src/async_kernel/caller.py
196 197 198 199 200 201 202 203 204 205 206 207 208 |
|
cancel ¶
Cancel the Future and schedule callbacks (thread-safe using Caller).
Parameters:
Returns if it has been cancelled.
Source code in src/async_kernel/caller.py
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
|
cancelled ¶
cancelled() -> bool
Return True if the Future is cancelled.
Source code in src/async_kernel/caller.py
230 231 232 |
|
done ¶
done() -> bool
Returns True if the Future is done.
Done means either that a result / exception is available.
Source code in src/async_kernel/caller.py
189 190 191 192 193 194 |
|
exception ¶
exception() -> BaseException | None
Return the exception that was set on the Future.
If the Future has been cancelled, this method raises a FutureCancelledError exception.
If the Future isn't done yet, this method raises an InvalidStateError exception.
Source code in src/async_kernel/caller.py
248 249 250 251 252 253 254 255 256 257 258 259 260 |
|
get_caller ¶
get_caller() -> Caller
The the Caller the Future's thread corresponds.
Source code in src/async_kernel/caller.py
280 281 282 |
|
remove_done_callback ¶
Remove all instances of a callback from the callbacks list.
Returns the number of callbacks removed.
Source code in src/async_kernel/caller.py
262 263 264 265 266 267 268 269 270 271 272 |
|
result ¶
result() -> T
Return the result of the Future.
If the Future has been cancelled, this method raises a FutureCancelledError exception.
If the Future isn't done yet, this method raises an InvalidStateError exception.
Source code in src/async_kernel/caller.py
234 235 236 237 238 239 240 241 242 243 244 245 246 |
|
set_cancel_scope ¶
set_cancel_scope(scope: CancelScope) -> None
Provide a cancel scope for cancellation.
Source code in src/async_kernel/caller.py
274 275 276 277 278 |
|
set_exception ¶
set_exception(exception: BaseException) -> None
Set the exception (thread-safe using Caller).
Source code in src/async_kernel/caller.py
185 186 187 |
|
set_result ¶
set_result(value: T) -> None
Set the result (thread-safe using Caller).
Source code in src/async_kernel/caller.py
181 182 183 |
|
wait
async
¶
Wait for future to be done (thread-safe) returning the result if specified.
Parameters:
-
timeout
¶float | None
, default:None
) –Timeout in seconds.
-
shield
¶bool
, default:False
) –Shield the future from cancellation.
-
result
¶bool
, default:True
) –Whether the result should be returned.
Source code in src/async_kernel/caller.py
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
|
wait_sync ¶
Synchronously wait for future to be done (thread-safe) returning the result if specified.
Parameters:
-
timeout
¶float | None
, default:None
) –Timeout in seconds.
-
shield
¶bool
, default:False
) –Shield cancellation.
-
result
¶bool
, default:True
) –Whether the result should be returned.
Source code in src/async_kernel/caller.py
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
|
FutureCancelledError ¶
Used to indicate a Future
is cancelled.
ReentrantAsyncLock ¶
ReentrantAsyncLock()
Implements a Reentrant asynchronous lock compatible with async_kernel.caller.Caller.
Example
# Inside a coroutine running inside a thread where a [asyncio.caller.Caller][] instance is running.
lock = ReentrantAsyncLock(reentrant=True) # a reentrant lock
async with lock:
async with lock:
Caller().to_thread(...) # The lock is shared with the thread.
Note
- The lock context can be exitied in any order.
- A 'reentrant' lock can release control to another context and then re-enter later for tasks or threads called from a locked thread maintaining the same reentrant context.
Methods:
-
acquire
–Acquire a lock.
-
is_in_context
–Returns
True
if the current context has the lock. -
release
–Decrement the internal counter.
Attributes:
Source code in src/async_kernel/caller.py
905 906 907 |
|
acquire
async
¶
acquire() -> Self
Acquire a lock.
If the lock is reentrant the internal counter increments to share the lock.
Source code in src/async_kernel/caller.py
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 |
|
is_in_context ¶
is_in_context() -> bool
Returns True
if the current context has the lock.
Source code in src/async_kernel/caller.py
986 987 988 |
|
release
async
¶
release() -> None
Decrement the internal counter.
If the current depth==1 the lock will be passed to the next queued or released if there isn't one.
Source code in src/async_kernel/caller.py
963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 |
|