# Python : Database : asyncpg \[ [pypi](https://pypi.org/project/asyncpg/) | [src](https://github.com/MagicStack/asyncpg/) | [docs](https://magicstack.github.io/asyncpg/current/) | [api](https://magicstack.github.io/asyncpg/current/api/index.html) ] > [!NOTE] DB-API (PEP-0249) > "DB-API is a synchronous API, while asyncpg is based around an asynchronous I/O model. Thus, full drop-in compatibility with DB-API is not possible, and we decided to design asyncpg API in a way that is better aligned with PostgreSQL architecture and terminology." — FAQ ## Cheatsheet #### Basics ```python from asyncpg import Record, connect, create_pool ## Single Connection conn = await connect( host="...", port=5432, user="...", password="...", database="..." ) conn = await connect( "postgres[ql]://USER:PASS@HOST:PORT/NAME" ) # ex: "postgres://foo:[email protected]:1234/projdb" conn = await connect( "postgres[ql]://USER:PASS@/NAME?host=PATH" ) # ex: "postgres://foo:bar@/projdb?host=/cloudsql/myproj-123456:us-central1:db1" # the "/.s.PGSQL.5432" segment is optional; asyncpg will add it for you await conn.close() ## Connection Pool async with create_pool( dsn, ... ) as pool: pool.execute( ... ) async with pool.acquire() as conn: conn.execute( ... ) ## Querying result = await conn.execute( "..." ) # -> command status string rows = await conn.fetch( "SELECT * FROM mytable WHERE id = $1", 10 ) row = await conn.fetchrow( "SELECT * FROM users WHERE name = $1", "Bob" ) value = await conn.fetchval( "SELECT 2 ^ $1", 7 ) ## Prepared Statements stmt = await conn.prepare( "SELECT 2 ^ $1" ) value = await stmt.fetchval( 10 ) ## Cursors async for record in conn.cursor( "SELECT ..." ): ... async for record in stmt.cursor( 10 ): ... ## Records row -> Record( id=1, name="Bob", dob=datetime.date( 1984, 3, 1 ) ) ``` **Prepared Statements** - Once a pool connection is released, it’s reset to close all open cursors and other resources *except* prepared statements. - The `.fetch*()` methods automatically do prepared statement caching. #### Transactions ```python # as context manager async with conn.transaction() as tr: await conn.execute( "INSERT..." ) # manually tr = conn.transaction() await tr.start() try: await conn.execute( "INSERT..." ) except: await tr.rollback() raise else: await tr.commit() # nested transaction (using savepoints) async with conn.transaction(): # outer transaction ... try: async with conn.transaction(): # inner transaction ... raise Exception # inner transaction will automatically be rolled back except: pass ``` When not in an explicit transaction block, any changes to the database will be applied immediately. This is also known as **auto-commit**. ## API Reference #### connect() ```python async connect( # method 1 dsn = None # "postgres://user:password@host:port/database?option=value" # method 2 (if dsn not specified) host = None # ...or env:PGHOST port = None # int | str or env:PGPORT or 5432 user = None # ...or env:PGUSER password = None # ...or env:PGPASSWORD database = None # ...or env:PGDATABASE # timeouts timeout = 60 # float in seconds (for connection) command_timeout = None # float in seconds (for each command) # results record_class = asyncpg.Record # prepared statement LRU cache statement_cache_size = 100 # max num statements (0 -> disable) max_cached_statement_lifetime = 300 # max seconds cache TTL (0 -> disable) max_cacheable_statement_size = 15360 # max statement size (0 -> disable) server_settings = { application_name="foo", ... } # https://postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS ) -> Connection ``` #### create_pool() ```python async create_pool( # connect args dsn = None **connect_kwargs # pool args min_size = 10 # initial pool size max_size = 10 max_queries = 50000 # after which connection is replaced max_inactive_connection_lifetime = 300.0 # float in seconds; 0 -> disable ) -> Pool # pool can be used as context manager, or simply awaited to init pool ``` #### Connection Methods that don't return data, instead return the command's status string. (ex: "INSERT 0 2") ```python async .close() async .copy_from_query( ... ) # copy query results to fileobj async .copy_from_table( ... ) # copy table contents to fileobj async .copy_records_to_table( table_name records # (a)iter:tuple columns = None # list; optional schema_name = None timeout = None where = None # SQL expression (str) ) async .copy_to_table( table_name source # path | fileobj columns = None # list; optional schema_name = None timeout = None ) cursor( query, *args, prefetch=None, timeout=None, record_class=None ) -> CursorFactory prefetch # num rows (default: 50) async .execute( query, *args, timeout=None ) # Can execute multiple commands if no args are provided. # Examples: await conn.execute( # with multiple commands """ CREATE TABLE mytab ( a int ); INSERT INTO mytab ( a ) VALUES ( 100 ), ( 200 ), ( 300 ) """ ) await conn.execute( # with args "INSERT INTO mytab ( a ) VALUES ( $1 ), ( $2 )" 10, 20 ) async .executemany( command, args, timeout=None ) # atomic args # iter:sequence await conn.executemany( "INSERT INTO mytab ( a ) VALUES ( $1, $2, $3 )" [ ( 1, 2, 3 ) ( 4, 5, 6 ) ] ) async .fetch( query, *args, timeout=None, record_class=None ) -> [ Record ] async .fetchrow( query, *args, timeout=None, record_class=None ) -> Record | None # first row async .fetchval( query, *args, column=0, timeout=None ) column # numeric index of value to return from first row .get_server_pid() .get_server_version() .is_closed() .is_in_transaction() async .prepare( query, name=None, timeout=None, record_class=None ) -> PreparedStatement async .transaction( isolation = None # "serializable" | "repeatable_read" | "read_uncommitted" | "read_committed" # otherwise determined by server and session (usually "read_committed") readonly = False deferrable = False ) ``` #### Pool Repeats many of the same methods as Connection, but when called, will first acquire a connection from the pool and then release it afterwards. ```python async .acquire( timeout=None ) -> Connection .get_idle_size() # num idle connections .get_size() # similar methods as Connection, except will acquire and release a connection each time .copy_from_query, .copy_from_table, .copy_records_to_table, .copy_to_table, .execute, .executemany, .fetch, .fetchrow, .fetchval ``` > [!NOTE] `pool.acquire()` > Is actually a static method that returns a `PoolAcquireContext` instance that is both awaitable (returning a connection) and a context manager (yielding a connection). > > ``` > conn = await pool.acquire() > -or- > async with pool.acquire as conn: > ``` #### PreparedStatement ```python .cursor( *args, prefetch=None, timeout=None ) -> CursorFactory # parsed from EXPLAIN ( FORMAT JSON, VERBOSE ) output # analyze wraps in transaction and rollsback after async .explain( *args, analyze=False ) -> obj # executes statement for each sequence of args in args async .executemany( args, timeout=None ) # args -> iter:sequence async .fetch( *args, timeout=None ) async .fetchrow( *args, timeout=None ) async .fetchval( *args, column=0, timeout=None ) .get_parameters() -> ( Type ) .get_query() -> str .get_statusmsg() -> str # from previously executed command ``` ###### explain Output ```python [{ "Plan": { "Node Type' : 'Result' "Parallel Aware" : False "Async Capable" : False "Startup Cost" : 0.0 "Total Cost" : 0.01 "Plan Rows" : 1 "Plan Width" : 4 "Output" : [ "1" ] } }] ``` #### Cursor Cursors iterate over the results of a large query without fetching all rows at once. The cursor will prefetch records to reduce the number of queries sent. > [!WARNING] Cursors & Transactions > Cursors created by a call to `Connection.cursor()` or `PreparedStatement.cursor()` cannot be used outside of a transaction. #### Record Read-only representation of a PostgreSQL row. Basically the same interface as `dict`, except subscript also allows numbers to select value based on row position. **API** ```python __len__() -> number of fields in record __getitem__( field ) -> value of field (name or index) __contains__( name ) -> True if record contains field name __iter__() -> iterator over values get( name[, default ] ) -> value for field if exists, else default or None keys() -> iterator over names values() -> iterator over values items() -> iterator over fields — ( name, value ) pairs print( record ) -> "<Record oid=16388 rolname='elvis' rolsuper=True>" ``` **Customization** **Q:** Can I use dot-notation with `Record`? A: We decided against making `Record` a named tuple to keep the method namespace separate from the column namespace. That said, you can provide a custom `Record` class that implements dot-notation via the `record_class` argument to `connect()` or any of the `Record`-returning methods. ```python # from asyncpg docs class MyRecord( Record ): def __getattr__( self, name ): return self[ name ] # my version class AttrRecord( asyncpg.Record ): def __getattr__( self, name ): if name not in self: raise AttributeError return self[ name ] ``` ## Types | PostgreSQL Types | Python Types | | ---------------------------------------- | ------------------------------------------------------------------------------------------------------ | | `anyarray` | `list` | | `anyenum` | `str` | | `anyrange` | `asyncpg.Range, tuple` | | `anymultirange` | `list[ asyncpg.Range ], list[ tuple ]` | | `record` | `asyncpg.Record, tuple, Mapping` | | `bit`, `varbit` | `asyncpg.BitString` | | `bool` | `bool` | | `box` | `asyncpg.Box` | | `bytea` | `bytes` | | `char`, `name`, `varchar`, `text`, `xml` | `str` | | `cidr` | `ipaddress.IPv4Network`, `ipaddress.IPv6Network` | | `inet` | `ipaddress.IPv4Interface`, `ipaddress.IPv6Interface`, `ipaddress.IPv4Address`, `ipaddress.IPv6Address` | | `macaddr` | `str` | | `circle` | `asyncpg.Circle` | | `date` | `datetime.date` | | `time` | offset-naive `datetime.time` | | `time with time zone` | offset-aware `datetime.time` | | `timestamp` | offset-naive `datetime.datetime` | | `timestamp with time zone` | offset-aware `datetime.datetime` | | `interval` | `datetime.timedelta` | | `float`, `double precision` | `float` | | `smallint`, `integer`, `bigint` | `int` | | `numeric` | `decimal.Decimal` | | `json`, `jsonb` | `str` | | `line` | `asyncpg.Line` | | `lseg` | `asyncpg.LineSegment` | | `money` | `str` | | `path` | `asyncpg.Path` | | `point` | `asyncpg.Point` | | `polygon` | `asyncpg.Polygon` | | `uuid` | `uuid.UUID` | | `tid` | `tuple` | [Custom Type Conversions](https://magicstack.github.io/asyncpg/current/usage.html#custom-type-conversions) (geometry example uses [shapely](https://pypi.org/project/shapely/) package) ## Exceptions ```python asyncpg.exceptions DuplicateDatabaseError # CREATE DATABASE but database already exists InternalClientError # "could not resolve query result and/or argument types" # when querying 'pg_attribute.attmissingval' (type: anyarray) InvalidCatalogNameError # CREATE DATABASE but template doesn't exist # DROP DATABASE but database doesn't exist ``` ## Cookbook Checking for membership in a variable list of values: ```python # you can't do "WHERE id IN $1" or "WHERE id IN ( $1 )", so instead: results = await connection.fetch( "SELECT * FROM post WHERE id = any($1::int[])", [ 1, 2, 3 ] ) ```