# Python : Database : asyncpg
\[ [pypi](https://pypi.org/project/asyncpg/) | [src](https://github.com/MagicStack/asyncpg/) | [docs](https://magicstack.github.io/asyncpg/current/) | [api](https://magicstack.github.io/asyncpg/current/api/index.html) ]
> [!NOTE] DB-API (PEP-0249)
> "DB-API is a synchronous API, while asyncpg is based around an asynchronous I/O model. Thus, full drop-in compatibility with DB-API is not possible, and we decided to design asyncpg API in a way that is better aligned with PostgreSQL architecture and terminology." — FAQ
## Cheatsheet
#### Basics
```python
from asyncpg import Record, connect, create_pool
## Single Connection
conn = await connect( host="...", port=5432, user="...", password="...", database="..." )
conn = await connect( "postgres[ql]://USER:PASS@HOST:PORT/NAME" ) # ex: "postgres://foo:
[email protected]:1234/projdb"
conn = await connect( "postgres[ql]://USER:PASS@/NAME?host=PATH" ) # ex: "postgres://foo:bar@/projdb?host=/cloudsql/myproj-123456:us-central1:db1"
# the "/.s.PGSQL.5432" segment is optional; asyncpg will add it for you
await conn.close()
## Connection Pool
async with create_pool( dsn, ... ) as pool:
pool.execute( ... )
async with pool.acquire() as conn:
conn.execute( ... )
## Querying
result = await conn.execute( "..." ) # -> command status string
rows = await conn.fetch( "SELECT * FROM mytable WHERE id = $1", 10 )
row = await conn.fetchrow( "SELECT * FROM users WHERE name = $1", "Bob" )
value = await conn.fetchval( "SELECT 2 ^ $1", 7 )
## Prepared Statements
stmt = await conn.prepare( "SELECT 2 ^ $1" )
value = await stmt.fetchval( 10 )
## Cursors
async for record in conn.cursor( "SELECT ..." ):
...
async for record in stmt.cursor( 10 ):
...
## Records
row -> Record( id=1, name="Bob", dob=datetime.date( 1984, 3, 1 ) )
```
**Prepared Statements**
- Once a pool connection is released, it’s reset to close all open cursors and other resources *except* prepared statements.
- The `.fetch*()` methods automatically do prepared statement caching.
#### Transactions
```python
# as context manager
async with conn.transaction() as tr:
await conn.execute( "INSERT..." )
# manually
tr = conn.transaction()
await tr.start()
try:
await conn.execute( "INSERT..." )
except:
await tr.rollback()
raise
else:
await tr.commit()
# nested transaction (using savepoints)
async with conn.transaction(): # outer transaction
...
try:
async with conn.transaction(): # inner transaction
...
raise Exception # inner transaction will automatically be rolled back
except:
pass
```
When not in an explicit transaction block, any changes to the database will be applied immediately. This is also known as **auto-commit**.
## API Reference
#### connect()
```python
async connect(
# method 1
dsn = None # "postgres://user:password@host:port/database?option=value"
# method 2 (if dsn not specified)
host = None # ...or env:PGHOST
port = None # int | str or env:PGPORT or 5432
user = None # ...or env:PGUSER
password = None # ...or env:PGPASSWORD
database = None # ...or env:PGDATABASE
# timeouts
timeout = 60 # float in seconds (for connection)
command_timeout = None # float in seconds (for each command)
# results
record_class = asyncpg.Record
# prepared statement LRU cache
statement_cache_size = 100 # max num statements (0 -> disable)
max_cached_statement_lifetime = 300 # max seconds cache TTL (0 -> disable)
max_cacheable_statement_size = 15360 # max statement size (0 -> disable)
server_settings = { application_name="foo", ... } # https://postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
) -> Connection
```
#### create_pool()
```python
async create_pool(
# connect args
dsn = None
**connect_kwargs
# pool args
min_size = 10 # initial pool size
max_size = 10
max_queries = 50000 # after which connection is replaced
max_inactive_connection_lifetime = 300.0 # float in seconds; 0 -> disable
) -> Pool # pool can be used as context manager, or simply awaited to init pool
```
#### Connection
Methods that don't return data, instead return the command's status string. (ex: "INSERT 0 2")
```python
async .close()
async .copy_from_query( ... ) # copy query results to fileobj
async .copy_from_table( ... ) # copy table contents to fileobj
async .copy_records_to_table(
table_name
records # (a)iter:tuple
columns = None # list; optional
schema_name = None
timeout = None
where = None # SQL expression (str)
)
async .copy_to_table(
table_name
source # path | fileobj
columns = None # list; optional
schema_name = None
timeout = None
)
cursor( query, *args, prefetch=None, timeout=None, record_class=None ) -> CursorFactory
prefetch # num rows (default: 50)
async .execute( query, *args, timeout=None )
# Can execute multiple commands if no args are provided.
# Examples:
await conn.execute( # with multiple commands
"""
CREATE TABLE mytab ( a int );
INSERT INTO mytab ( a ) VALUES ( 100 ), ( 200 ), ( 300 )
"""
)
await conn.execute( # with args
"INSERT INTO mytab ( a ) VALUES ( $1 ), ( $2 )"
10, 20
)
async .executemany( command, args, timeout=None ) # atomic
args # iter:sequence
await conn.executemany(
"INSERT INTO mytab ( a ) VALUES ( $1, $2, $3 )"
[ ( 1, 2, 3 )
( 4, 5, 6 ) ]
)
async .fetch( query, *args, timeout=None, record_class=None ) -> [ Record ]
async .fetchrow( query, *args, timeout=None, record_class=None ) -> Record | None # first row
async .fetchval( query, *args, column=0, timeout=None )
column # numeric index of value to return from first row
.get_server_pid()
.get_server_version()
.is_closed()
.is_in_transaction()
async .prepare( query, name=None, timeout=None, record_class=None ) -> PreparedStatement
async .transaction(
isolation = None # "serializable" | "repeatable_read" | "read_uncommitted" | "read_committed"
# otherwise determined by server and session (usually "read_committed")
readonly = False
deferrable = False
)
```
#### Pool
Repeats many of the same methods as Connection, but when called, will first acquire a connection from the pool and then release it afterwards.
```python
async .acquire( timeout=None ) -> Connection
.get_idle_size() # num idle connections
.get_size()
# similar methods as Connection, except will acquire and release a connection each time
.copy_from_query, .copy_from_table, .copy_records_to_table, .copy_to_table,
.execute, .executemany, .fetch, .fetchrow, .fetchval
```
> [!NOTE] `pool.acquire()`
> Is actually a static method that returns a `PoolAcquireContext` instance that is both awaitable (returning a connection) and a context manager (yielding a connection).
>
> ```
> conn = await pool.acquire()
> -or-
> async with pool.acquire as conn:
> ```
#### PreparedStatement
```python
.cursor( *args, prefetch=None, timeout=None ) -> CursorFactory
# parsed from EXPLAIN ( FORMAT JSON, VERBOSE ) output
# analyze wraps in transaction and rollsback after
async .explain( *args, analyze=False ) -> obj
# executes statement for each sequence of args in args
async .executemany( args, timeout=None ) # args -> iter:sequence
async .fetch( *args, timeout=None )
async .fetchrow( *args, timeout=None )
async .fetchval( *args, column=0, timeout=None )
.get_parameters() -> ( Type )
.get_query() -> str
.get_statusmsg() -> str # from previously executed command
```
###### explain Output
```python
[{
"Plan": {
"Node Type' : 'Result'
"Parallel Aware" : False
"Async Capable" : False
"Startup Cost" : 0.0
"Total Cost" : 0.01
"Plan Rows" : 1
"Plan Width" : 4
"Output" : [ "1" ]
}
}]
```
#### Cursor
Cursors iterate over the results of a large query without fetching all rows at once.
The cursor will prefetch records to reduce the number of queries sent.
> [!WARNING] Cursors & Transactions
> Cursors created by a call to `Connection.cursor()` or `PreparedStatement.cursor()` cannot be used outside of a transaction.
#### Record
Read-only representation of a PostgreSQL row.
Basically the same interface as `dict`, except subscript also allows numbers to select value based on row position.
**API**
```python
__len__() -> number of fields in record
__getitem__( field ) -> value of field (name or index)
__contains__( name ) -> True if record contains field name
__iter__() -> iterator over values
get( name[, default ] ) -> value for field if exists, else default or None
keys() -> iterator over names
values() -> iterator over values
items() -> iterator over fields — ( name, value ) pairs
print( record ) -> "<Record oid=16388 rolname='elvis' rolsuper=True>"
```
**Customization**
**Q:** Can I use dot-notation with `Record`?
A: We decided against making `Record` a named tuple to keep the method namespace separate from the column namespace. That said, you can provide a custom `Record` class that implements dot-notation via the `record_class` argument to `connect()` or any of the `Record`-returning methods.
```python
# from asyncpg docs
class MyRecord( Record ):
def __getattr__( self, name ):
return self[ name ]
# my version
class AttrRecord( asyncpg.Record ):
def __getattr__( self, name ):
if name not in self:
raise AttributeError
return self[ name ]
```
## Types
| PostgreSQL Types | Python Types |
| ---------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| `anyarray` | `list` |
| `anyenum` | `str` |
| `anyrange` | `asyncpg.Range, tuple` |
| `anymultirange` | `list[ asyncpg.Range ], list[ tuple ]` |
| `record` | `asyncpg.Record, tuple, Mapping` |
| `bit`, `varbit` | `asyncpg.BitString` |
| `bool` | `bool` |
| `box` | `asyncpg.Box` |
| `bytea` | `bytes` |
| `char`, `name`, `varchar`, `text`, `xml` | `str` |
| `cidr` | `ipaddress.IPv4Network`, `ipaddress.IPv6Network` |
| `inet` | `ipaddress.IPv4Interface`, `ipaddress.IPv6Interface`, `ipaddress.IPv4Address`, `ipaddress.IPv6Address` |
| `macaddr` | `str` |
| `circle` | `asyncpg.Circle` |
| `date` | `datetime.date` |
| `time` | offset-naive `datetime.time` |
| `time with time zone` | offset-aware `datetime.time` |
| `timestamp` | offset-naive `datetime.datetime` |
| `timestamp with time zone` | offset-aware `datetime.datetime` |
| `interval` | `datetime.timedelta` |
| `float`, `double precision` | `float` |
| `smallint`, `integer`, `bigint` | `int` |
| `numeric` | `decimal.Decimal` |
| `json`, `jsonb` | `str` |
| `line` | `asyncpg.Line` |
| `lseg` | `asyncpg.LineSegment` |
| `money` | `str` |
| `path` | `asyncpg.Path` |
| `point` | `asyncpg.Point` |
| `polygon` | `asyncpg.Polygon` |
| `uuid` | `uuid.UUID` |
| `tid` | `tuple` |
[Custom Type Conversions](https://magicstack.github.io/asyncpg/current/usage.html#custom-type-conversions) (geometry example uses [shapely](https://pypi.org/project/shapely/) package)
## Exceptions
```python
asyncpg.exceptions
DuplicateDatabaseError # CREATE DATABASE but database already exists
InternalClientError # "could not resolve query result and/or argument types"
# when querying 'pg_attribute.attmissingval' (type: anyarray)
InvalidCatalogNameError # CREATE DATABASE but template doesn't exist
# DROP DATABASE but database doesn't exist
```
## Cookbook
Checking for membership in a variable list of values:
```python
# you can't do "WHERE id IN $1" or "WHERE id IN ( $1 )", so instead:
results = await connection.fetch( "SELECT * FROM post WHERE id = any($1::int[])", [ 1, 2, 3 ] )
```