Caller: A backend agnostic executor (notebook)¶
This notebook has the Python dependencies: trio, matplotlib
Caller is a thread-local class that coordinates the execution of functions and coroutines in the event loop of a asyncio or trio backend.
Caller instances are created on the following basis:
- CPython: per-thread
- Pyodide: per-context (until threading is supported in Pyodide)
A Caller instance can be started with one gui as a host to one backend started as a guest.
A second backend can also be started in the same thread with or without a host gui event loop. Potentially the same thread could have up to:
- One host gui event loop (
tk,qt,...) - Two asynchronous backends (
asyncio,trio)
It should be noted that gui's generally assume/require their mainloop runs in the main thread.
The magic command %callers prints a list of the current Caller instances.
import async_kernel
assert async_kernel.utils.get_kernel() is get_ipython().kernel # pyright: ignore[reportUndefinedVariable]
%callers
Name Running Protected Thread Caller
────────────────────────────────────────────────────────────────────────
Shell ✓ 🔐 MainThread 140487821938320 ← current
Control ✓ 🔐 Control 140487778013536
There are two caller instances associated with the Kernel:
- Shell: The caller for the shell channel (normally the MainThread).
- Control: The caller for the control channel (a child of the shell).
Calling Caller() with no arguments will return the caller for the current thread (or context in Pyodide).
A runtime error will occur if there is no event loop (backend) running in the current thread (unlikely in Pyodide which provides a running event loop).
from async_kernel import Caller
caller = Caller()
Modifiers¶
There are four modifiers that determine which instance is returned when calling Caller.
CurrentThread(default): Access the caller that belongs to the current thread (or context with Pyodide).MainThread: Access main thread for the Caller.NewThread: Create a new thread.manual: Manually create a new instance for the current thread (must be started manually).
assert caller is Caller()
assert caller is Caller("CurrentThread")
assert caller is Caller("MainThread")
caller1 = Caller("NewThread", name="My caller1")
assert caller1 is not caller
# We can start using it immediately.
await caller1.call_soon(display, "hello world!", caller1)
# We should stop the caller when we don't need it.
caller1.stop()
del caller1
'hello world!'
Caller creation options¶
The caller can be customised by passing keyword arguments when creating a new instance. The main options are:
backend: The backend to use ('trio' or 'asyncio').name: A name for the caller.
import trio
trio_caller = Caller("NewThread", backend="trio")
await trio_caller.call_soon(trio.sleep, 0)
trio_caller.stop()
del trio_caller
trio_caller = caller.get(name="trio", backend="trio")
await trio_caller.call_soon(trio.sleep, 0)
# Callers returned from 'get' are children
assert trio_caller in caller.children
# The parent is accessible from the child
assert trio_caller.parent is caller
# The same caller can be accessed by using the name
assert caller.get(name="trio") is trio_caller
child1 = trio_caller.get()
child2 = trio_caller.get()
assert trio_caller.children == {child1, child2}
trio_caller.stop()
await trio_caller.stopped
# Stopping the parent will stop the children
assert child1.stopped
assert child2.stopped
Caller methods that return Pending¶
Pending is like a thread-safe asyncio.Future like object to return future results. It was called Pending to avoid confusion about differences in functionality to that of asyncio.Future and concurrent.futures.Future.
The following functions return a Pending object:
Caller.call_soon: Call a function in a new 'task'.Caller.call_later: Call a function in a new 'task' executing the func after a delay.Caller.schedule_call: An advanced function to schedule execution exposing the full functionality (used bycall_soon,call_later, ...).Caller.call_using_backend: Call in the caller's thread ensuring a specific backend is used.
# Standard functions
assert await caller.call_soon(lambda: 1 + 1) == 2
assert await caller.call_later(0, lambda: 1 + 1) == 2
assert await caller.schedule_call(lambda: 1 + 1, (), {}) == 2
assert await caller.to_thread(lambda: 1 + 1) == 2
assert await caller.call_using_backend("asyncio", lambda: 1 + 1) == 2
# Coroutine functions
async def add(a, b):
return a + b
assert await caller.call_soon(add, 1, 1) == 2
assert await caller.call_later(0, add, 1, 1) == 2
assert await caller.schedule_call(add, (1, 1), {}) == 2
assert await caller.to_thread(add, 1, 1) == 2
assert await caller.call_using_backend("asyncio", add, 1, 1) == 2
# It is also possible (though discouraged) to wrap a coroutine with a lambda
assert await caller.call_soon(lambda: add(1, 1)) == 2
Caller.call_using_backend¶
This method calls functions and coroutines using a specific asynchronous backend in the same thread.
When the backend does not match the callers backend, it is run with the backend running as a guest.
import asyncio
import threading
import sniffio
import trio
await Caller().call_using_backend("asyncio", asyncio.sleep, 0)
await Caller().call_using_backend("trio", trio.sleep, 0)
for backend in ["asyncio", "trio"]:
print("backend:", backend)
print("thread:", await Caller().call_using_backend(backend, threading.current_thread)) # pyright: ignore[reportArgumentType]
print("sniffio:", await Caller().call_using_backend(backend, sniffio.current_async_library), "\n") # pyright: ignore[reportArgumentType]
backend:
asyncio
thread:
<_MainThread(MainThread, started 140487829827712)>
sniffio:
asyncio
backend:
trio
thread:
<_MainThread(MainThread, started 140487829827712)>
sniffio:
trio
Low-level methods¶
Caller.queue_call¶
This method is used extensively by the kernel for message handling. Each time the queue_call is called with the same function the function and call arguments are appended to a queue. On the first call a task is started in the backend to process the queue as items are added. The task remains alive until the function is deleted or the queue is stopped. Any exceptions that occur are logged.
from aiologic import CountdownEvent
N = 10
done = CountdownEvent(N)
def f(n):
float(n)
done.down()
print(done)
for i in range(N):
caller.queue_call(f, i)
await done
assert caller.queue_get(f)
caller.queue_close(f)
assert caller.queue_get(f) is None
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=9, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=8, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=7, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=6, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=5, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=4, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=3, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=2, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=1, waiting=1]>
<aiologic.CountdownEvent(10) at 0x7fc5d82d9180 [value=0]>
Caller.call_direct¶
This method will call the function/ coroutine function directly in the caller's scheduler. It is used by the Kernel extensively to execute coroutine functions that are known to run quickly.
Do not use this for long running coroutines as it will block the scheduler.
from aiologic import Event
event = Event()
caller.call_direct(event.set)
await event
True
import random
import time
import ipywidgets as ipw
outputs = {}
stop = ipw.RadioButtons(value=None, options=["Stop"])
box = ipw.VBox([stop])
display(box)
def my_func(n):
caller = Caller()
if not (out := outputs.get(caller)):
outputs[caller] = out = ipw.HTML(description=str(caller), style={"description_width": "initial"})
box.children = [*box.children, out]
sleep_time = random.random() / 4
out.value = f"{n=:04d} sleeping {sleep_time * 1000:03.0f} ms"
time.sleep(sleep_time)
return n
async def run_forever():
n = 0
while not stop.value:
n += 1
yield Caller().to_thread(my_func, n)
async for fut in Caller().as_completed(run_forever()):
result = await fut
print(f"Finished: {result}", end="\r")
VBox(children=(RadioButtons(options=('Stop',), value=None),))
Caller.wait¶
This method was inspired by asyncio.wait. A collection of awaitables or Pending can be passed which will wait until the condition is satisfied. Awaitables will be converted to pending by using caller.call_soon.
done, pending = await caller.wait(
[asyncio.sleep(0), caller.to_thread(asyncio.sleep, 0), caller.call_using_backend("trio", trio.sleep, 0)],
return_when="ALL_COMPLETED",
)
Caller.create_pending_group¶
A PendingGroup is an asynchronous context manager for for Pending. All pending that 'opt-in' when created are added to the pending group (Caller methods that return Pending 'opt-in').
The PendingGroup has different modes:
- 0: Ignore cancellation of pending.
- 1: Cancel if any pending is cancelled - raise PendingCancelled on exit.
- 2: Cancel if any pending is cancelled - exit quietly.
import anyio
async with caller.create_pending_group() as pg:
caller.call_soon(anyio.sleep, 0.01)
caller.call_later(0, anyio.sleep, 0)
caller.to_thread(anyio.sleep, 0.01)
caller.call_using_backend("asyncio", asyncio.sleep, 0.01)
caller.call_using_backend("trio", trio.sleep, 0.01)
print("In context pending=", len(pg.pending))
print("Out of context pending=", len(pg.pending))
In context pending=
5
Out of context pending=
0