# Python : Core : Async
\[ [Runners](https://docs.python.org/3/library/asyncio-runner.html) | [Coros and Tasks](https://docs.python.org/3/library/asyncio-task.html) | [Streams](https://docs.python.org/3/library/asyncio-stream.html) | [Sync Primitives](https://docs.python.org/3/library/asyncio-sync.html) | [Subprocs](https://docs.python.org/3/library/asyncio-subprocess.html) | [Queues](https://docs.python.org/3/library/asyncio-queue.html) | [Exceptions](https://docs.python.org/3/library/asyncio-exceptions.html) ]
**Native Coroutines**
- A native coroutine is a function defined with `async def` that doesn't contain a `yield`.
- Adding a `yield` turns it into an async generator function which returns an `async_generator` object providing `__anext__()`.
- Calling it just returns a native coroutine object — it doesn't execute the function body.
- The object must be scheduled to run on the event loop using `run()`, `await`, `create_task()`, etc.
**Async Generator Functions**
- Doesn't "return" value — only the `yield` matters. Can't use non-empty return statements.
- Use with `async for`.
**Terms and Abbreviations**
- *Awaitables* -> Coroutines, Tasks, Futures, any object with `__await__()`
- *aw* -> awaitables
- *aws* → awaitables
- *corof* -> native coroutine function
- *coro* -> native coroutine object
> [!NOTE] To Use in REPL
> `$ python -m asyncio`
## Cheatsheet
**Syntax**
```python
async def <function> # define native coroutine or async generator function
async for ... in <async_iteratable_or_generator> # __aiter__, __anext__
async with <async_context_manager> # __aenter__, __aexit__
await <awaitable> # Use to run awaitable here and now. Can only be used inside an async function.
(foo async for foo in foos if foo) # async generator expression; returns async_generator
[foo async for foo in foos if foo] # async comprehension
```
**Imports**
```python
from asyncio import (
# classes
CancelledError, Runner, Semaphore, Server, StreamReader, StreamWriter, Task, TaskGroup, Timeout
# functions
all_tasks, as_completed, create_task, eager_task_factory, gather, get_event_loop, iscoroutine, new_event_loop, run, sleep, start_server, timeout, timeout_at, to_thread, wait, wait_for
# enum values
ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
)
from asyncio.trsock import TransportSocket
```
**Main**
```python
async def main():
...
run( main()[, debug=True] )
```
**Usage**
```python
## TaskGroups
async with TaskGroup() as tg:
t1 = tg.create_task( coro )
...
... # runs after all tasks are done - await is implicit in the context exit
## Timeouts
try:
async with timeout( ... ) as t:
await long_running_task()
except TimeoutError:
pass
if t.expired():
print( "Looks like we haven't finished on time." )
## Throttling
semaphore = Semaphore( qty )
async with semaphore:
... # ensure no more than qty tasks are running this block at the same time
## Waiting
for coro in as_completed( aws ):
next_result = await coro
```
## Reference
```python
all_tasks( loop=get_running_loop ) -> set
# Run awaitable objects in the aws iterable concurrently.
# Each coro returned can be awaited to get the earliest next result from the remaining awaitables.
# Raises TimeoutError if the timeout occurs before all Futures are done.
# If initial coro raised exception, returned coro will re-raise it when you await it.
as_completed( aws, *, timeout=None ) -> iter: coro
# Wrap the coro into a Task and schedule its execution. Return the Task object.
# If name is not None, set as name using Task.set_name().
# Tasks can easily and safely be cancelled. When a task is cancelled,
# CancelledError will be raised in the task at the next opportunity.
create_task( coro, *, name=None, context=None ) -> Task
# WARNING: Save a reference to the result of this function, to avoid a task disappearing mid-execution.
# The event loop only keeps weak references to tasks.
# A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done.
tasks = set()
task = create_task( coro )
tasks.add( task ) # add task to set - creates a strong reference
task.add_done_callback( tasks.discard ) # make each task remove its own reference after finishing
current_task( loop=get_running_loop ) -> Task
# Run aws concurrently; return results in same order.
await gather( *aws, return_exceptions=False )
# return_exceptions
# False -> The first raised exception is immediately propagated to the task that awaits on gather().
# Other awaitables in the aws sequence won’t be cancelled and will continue to run.
# True -> Exceptions are treated the same as successful results, and aggregated in the result list.
# (Recommended by prominent Python authors.)
get_event_loop() -> loop # get reference to event loop; raises RuntimeError if no loop
await loop.getaddrinfo( domain, None ) # returns 5-item tuple of connection params; raises socket.gaierror if can't resolve domain
# (uses socket.getaddrinfo(), which blocks)
loop.set_task_factory( eager_task_factory )
# Coroutines begin execution synchronously during Task construction.
# Tasks are only scheduled on the event loop if they block.
# This can be a performance improvement as the overhead of loop scheduling is avoided for coroutines that complete synchronously.
# A common example where this is beneficial is coroutines which employ caching or memoization to avoid actual I/O when possible.
iscoroutine( obj ) -> bool
# Create event loop, run a coro, close the loop, return value from coro.
# Should be used as the main entry point for async programs.
# Cannot be called when another asyncio event loop is running in the same thread.
# It's recommended to use loop_factory to configure the event loop instead of policies.
run( coro, *, debug=None, loop_factory=new_event_loop ) -> result
# debug: True -> event loop will be run in debug mode
# False -> disables debug mode
# None -> respect global Debug Mode settings
# Acts as context manager; only allows qty coros to be inside the block at the same time.
Semaphore( qty ) -> Semaphore
await sleep( delay ) # use sleep( 0 ) to allow other tasks to run
start_server() -> Server # TCP socket server
server.serve_forever()
# Not intended to be created directly.
Task < Future
add_done_callback( callback, *, context=None ) # cb called with Task object as sole argument
cancel( msg=None ) # The msg parameter is propagated from cancelled task to its awaiter.
cancelled() -> bool
done() -> bool
exception() -> exception -or- CancelledError if canclled -or- InvalidStateError if not yet available -or- None if finished without exception
get_name() -> "Task-1"
result() -> result -or- CancelledError if canclled -or- InvalidStateError if not yet available
# Usable as context manager; awaits all created tasks before exiting.
# The way exceptions are handled is complicated - see the docs.
TaskGroup( ... )
create_task( ... ) -> Task # same signature as create_task()
async with timeout_at( epoch_seconds ) as t:
async with timeout( seconds_from_now ) as t: # raises TimeoutError after seconds (t -> Timeout)
t.reschedule( float | None ) # can start with timeout( None ) and use reschedule to set a value later
# t.reschedule( loop.time() + 10 )
t.when() -> float | None
t.expired() -> bool # can use after exiting context manager
# Asynchronously run function func in a separate thread.
# Any *args and **kwargs supplied for this function are directly passed to func.
# Return a coroutine that can be awaited to get the eventual result of func.
# Also, the current contextvars.Context is propagated, allowing context variables from the event loop thread to be accessed in the separate thread.
to_thread( func, /, *args, **kwargs ) -> coro
# Run Future and Task instances in the aws iterable concurrently and block until the condition specified by return_when.
# Does not raise TimeoutError — Futures or Tasks that aren’t done when the timeout occurs are simply returned in the second set.
await wait( aws, *, timeout=None, return_when=ALL_COMPLETED ) -> done, pending
# timeout -> float|int seconds
# ALL_COMPLETED -> The function will return when all futures finish or are cancelled.
# FIRST_COMPLETED -> The function will return when any future finishes or is cancelled.
# FIRST_EXCEPTION -> The function will return when any future finishes by raising an exception.
# If no future raises an exception then it is equivalent to ALL_COMPLETED.
await wait_for( aw, timeout ) # raises TimeoutError
```
## Tips
- Move blocking IO-bound tasks into background threads.
- Move CPU-bound tasks into background processes.
- Never spend more than a few ms before releasing control with `await`.
- Use `TaskGroup`, which is a newer alternative to `create_task()`+`gather()`.
- `TaskGroup` keeps references to all tasks to prevent them from being garbage collected prematurely. (https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/)
- `TaskGroup` provides stronger safety guarantees than `gather()` for scheduling a nesting of subtasks. Unlike `gather()`, if a task raises an exception, `TaskGroup` will cancel the remaining tasks.
- Use `to_thread()` (which is a newer alternative to `loop.run_in_executor()`) unless you need to start a new process for a CPU-intensive task.
- WARNING re: `run_in_executor()` -> It isn't affected by Ctrl+C, and is notoriously hard to cancel.
## Cancellation
asyncio handles `signal.SIGINT` (Ctrl+C) as follows:
1. `Runner.run()` installs a custom `signal.SIGINT` handler before any user code is executed and removes it when exiting from the function.
2. The Runner creates the main task for the passed coro for its execution.
3. When `signal.SIGINT` is raised by Ctrl-C, the custom signal handler cancels the main task by calling `Task.cancel()` which raises `CancelledError` inside the main task. This causes the Python stack to unwind, try/except and try/finally blocks can be used for resource cleanup. After the main task is cancelled, `Runner.run()` raises `KeyboardInterrupt`.
4. A user could write a tight loop which cannot be interrupted by `Task.cancel()`, in which case the second following Ctrl-C immediately raises the `KeyboardInterrupt` without cancelling the main task.