# Python : Database : Tortoise : Source
[tortoise/](https://github.com/tortoise/tortoise-orm/tree/main/tortoise/)
#### `__init__.py` (todo)
```python
Tortoise
_inited = False
apps = {} # { app: { model: Model } }
table_name_generator = None # callable( Model, str
_build_inital_querysets()
...
_discover_models( models_path, app_label ) -> [ Model ] # models_path -> str | Module
# checks for "__models__", else...
# grabs module attributes that are non-abstract Model subclasses with absent or matching ._meta.app
# (if absent, _meta.app = app_label)
~_drop_databases()
...
_get_config_from_config_file( config_file ) -> {}
...
_init_apps( apps_config ) # apps_config -> { str { str: * } }
...
_init_relations()
_build_initial_querysets()
_init_relations()
def get_related_model # test if app and model really exist, return model
def split_reference # validate and split name into app + model
def init_fk_o2o_field
_init_routers( ... )
...
_init_timezone( use_tz, timezone )
...
~_reset_apps()
...
~close_connections()
...
describe_model( model, serializable = True ) -> {}
describe_models( models = None, serializable = True )
# models -> [ Model ], else returns all models
# serializable False -> python objects, True -> JSON-serializable data
# -> { "models.User": { ... } }
~generate_schemas( safe = True )
...
get_connection( connection_name ) -> connections.get( connection_name )
~init( config = None, config_file = None, _create_db = False, db_url = None, modules = None,
use_tz = False, timezone = "UTC", routers = None, table_name_generator = None ):
connections._init( config[ "connections" ], _create_db )
saves params
_init_connections()
for every alias: create connection (and optionally create DB) via connections.get( alias )
_init_apps( config[ "apps" ] )
for app_label, info...
connections.get( info.default_connection ) # assert conn alias exists
init_models( info.models, app_label, _init_relations=False )
_discover_models()
# checks for "__models__", else...
# grabs module attributes that are non-abstract Model subclasses with absent or matching ._meta.app
# (if absent, _meta.app = app_label)
populate .apps
_init_relations() if _init_relations
...
for models: set _meta.default_connection = info.default_connection
init_models( models_path, app_label, _init_relations=True ) # models_path -> [ str | Module ]
_discover_models()
populate .apps
_init_relations() if _init_relations
run_async( coro )
loop = asyncio.get_event_loop()
try:
loop.run_until_complete( coro )
finally:
loop.run_until_complete( connections.close_all( discard=True ) )
```
#### `backends/__init__.py`
#### `backends/asyncpg/__init__.py`
```python
client_class = client.AsyncpgDBclient
```
#### `backends/asyncpg/client.py`
```python
AsyncpgDBClient < BasePostgresClient
connection_class = asyncpg.connection.Connection
executor_class = AsyncpgExecutor
schema_generator = AsyncpgSchemaGenerator
_connection = None
_pool
------------------------------------------------------------------
# all methods below match BasePostgresClient
~_close()
if _pool:
asyncio.wait_for( _pool.close(), 10 ) -or- _pool.terminate()
_pool = None
~_expire_connections()
_pool.expire_connections() if _pool
_in_transaction()
-> TransactionContextPooled( TransactionWrapper( self ) )
~_translate_exceptions( ... )
...
acquire_connection()
-> PoolConnectionWrapper( self )
~create_connection( with_db )
...
~create_pool( **kwargs )
-> asyncpg.create_pool( None, **kwargs )
~db_delete()
super(); close()
~execute_insert( query, values )
acquire_connection() as conn
log.debug( "{query}: {values}" )
-> conn.fetchrow( query, *values )
~execute_many( query, values )
acquire_connection() as conn:
log.debug( "{query}: {values}" )
trans = connection.transaction(); trans.start();
conn.executemany( query, values );
trans.commit() -or- trans.rollback()
~execute_query( query, values=None )
acquire_connection() as conn:
log.debug( "{query}: {values}" )
if UPDATE|DELETE: conn.execute( query, *values ); -> rows_affected, []
else: rows = conn.fetch( query, *values ); -> len( rows ), rows
~execute_query_dict( query, values=None )
acquire_connection() as conn:
log.debug( "{query}: {values}" )
-> conn.fetch( query, *values )
TransactionWrapper < AsyncpgDBClient, BaseTransactionWrapper
"A transactional connection wrapper for psycopg.
asyncpg implements nested transactions (savepoints) natively, so we don't need to."
__init__( connection )
_connection = connection._connection
_finalized = False
_lock = asyncio.Lock()
_parent = connection
connection_name = connection.connection_name
log = connection.log
transaction = None
# overrides for AsyncpgDBClient
_in_transaction()
-> NestedTransactionContext( TransactionWrapper( self ) )
acquire_connection()
-> ConnectionWrapper( _lock, self )
~execute_many( query, values )
acquire_connection() as conn:
log.debug( "{query}: {values}" )
conn.executemany( query, values )
# overrides for BaseTransactionWrapper
~begin()
transaction = _connection.transaction()
transaction.start()
~commit()
asserts transaction and not _finalized
transaction.commit(); _finalized = True
~rollback()
asserts transaction and not _finalized
transaction.rollback(); _finalized = True
~savepoint()
-> begin()
~release_savepoint()
-> commit()
~savepoint_rollback()
-> rollback()
```
###### Flattened
```python
AsyncpgDBClient < BasePostgresClient
DSN_TEMPLATE = "postgres://{user}:{password}@{host}:{port}/{database}"
_connection = None
_pool = None
capabilities = Capabilities("postgres", support_update_limit_order_by=False)
connection_class = asyncpg.connection.Connection
executor_class = AsyncpgExecutor
loop = None
query_class = PostgreSQLQuery
schema_generator = AsyncpgSchemaGenerator
------------------------------------------------------------------
__init__( user=None, password=None, database=None, host=None, port=5432, **kwargs )
_connection = None
_pool = None
_template = {}
connection_name = connection_name
fetch_inserted = fetch_inserted | True
log = db_client_logger
port = int( port )
extra = kwargs.copy();
application_name = extra.pop( "application_name", None )
connection_class = extra.pop( "connection_class", connection_class )
loop = extra.pop( "loop", None)
pool_maxsize = int( extra.pop( "maxsize", 5 ) )
pool_minsize = int( extra.pop( "minsize", 1 ) )
schema = extra.pop( "schema", None )
server_settings = {} or copy of extra.pop( "server_settings" ) if exists
extra.pop( "connection_name", "fetch_inserted" )
~_close()
if _pool:
asyncio.wait_for( _pool.close(), 10 ) -or- _pool.terminate()
_pool = None
~_expire_connections()
_pool.expire_connections() if _pool
_in_transaction()
-> TransactionContextPooled( TransactionWrapper( self ) )
~_translate_exceptions( ... )
...
acquire_connection()
-> PoolConnectionWrapper( self )
~close(): _close(); _template.clear()
~create_connection( with_db )
_pool = create_pool(
host port user password min_size max_size
connection_class loop server_settings **extra
database = database if with_db else None (pulls from PGDATABASE instead)
)
~create_pool( **kwargs )
-> asyncpg.create_pool( None, **kwargs )
~db_create():
create_connection( with_db=False )
execute_script( "CREATE DATABASE {database} OWNER {user}" ); close()
~db_delete()
create_connection( with_db=False )
execute_script( "DROP DATABASE {database}" ); close()
~execute_insert( query, values )
acquire_connection() as conn
log.debug( "{query}: {values}" )
-> conn.fetchrow( query, *values )
~execute_many( query, values )
acquire_connection() as conn:
log.debug( "{query}: {values}" )
trans = connection.transaction(); trans.start();
conn.executemany( query, values );
trans.commit() -or- trans.rollback()
~execute_query( query, values=None )
acquire_connection() as conn:
log.debug( "{query}: {values}" )
if UPDATE|DELETE: conn.execute( query, *values ); -> rows_affected, []
else: rows = conn.fetch( query, *values ); -> len( rows ), rows
~execute_query_dict( query, values=None )
acquire_connection() as conn:
log.debug( "{query}: {values}" )
-> conn.fetch( query, *values )
~execute_script( query )
acquire_connection as conn: log.debug( query ); conn.execute( query )
```
#### `backends/asyncpg/executor.py`
```python
AsyncpgExecutor < BasePostgresExecutor
(empty)
```
#### `backends/asyncpg/schema_generator.py`
```python
AsyncpgSchemaGenerator < BasePostgresSchemaGenerator
(empty)
```
#### `backends/base/__init__.py`
#### `backends/base/client.py`
```python
Capabilities
__init__( ... )
...
T_conn =~ asyncpg.Connection
BaseDBAsyncClient
query_class = Query
executor_class = BaseExecutor
schema_generator = BaseSchemaGenerator
capabilities = Capabilities( "" )
--------------------------------------------------
__init__( connection_name, fetch_inserted = True )
log = db_client_logger
# all methods below raise NotImplementedError
_in_transaction() -> TransactionContext
acquire_connection() -> ConnectionWrapper | PoolConnectionWrapper
"Acquires a connection from the pool.
Will return the current context connection if already in a transaction."
~close()
~create_connection( with_db ) # with_db: False -> "use default connection instead of configured one"
# True -> "select the DB to use"
~db_create()
~db_delete()
~execute_insert( query, values ) -> pk
~execute_many( query, values ) -> None # bulk insert
~execute_query( query, values = None ) -> ( int, [ dict ] )
~execute_query_dict( query, values = None ) -> [ dict ]
~execute_script( query ) -> None
ConnectionWrapper
"Wraps the connections with a lock to facilitate safe concurrent access
when using asyncio.gather, TaskGroup, or similar."
__init__( lock, client )
connection = client._connection # T_conn
~__aenter__() -> T_conn
lock.acquire()
ensure_connection()
-> connection
~__aexit__( ... )
lock.release()
~ensure_connection()
if not connection:
client.create_connection( with_db=True )
connection = client_connection
TransactionContext
"A context manager interface for transactions.
Returned from utils.in_transaction and BaseDBAsyncClient._in_transaction."
connection # T_conn
--------------
abc~__aenter() -> T_conn
abc~__aexit( ... )
TransactionContextPooled < TransactionContext
"A version of TransactionContext that uses a pool to acquire connections."
__init__( connection )
connection_name = connection.connection_name
~__aenter__() -> T_conn
"Set the context variable so the current task is always seeing a TransactionWrapper conneciton."
ensure_connection()
token = connections.set( connection_name, connection )
connection._connection = connection._parent._pool.acquire()
connection.begin()
-> connection
~__aexit__( ... )
if not connection._finalized: connection.commit() -or- connection.rollback()
if connection._parent._pool: connection._parent._pool.release( connection._connection )
connections.reest( token
~ensure_conenction()
if not connection._parent._pool:
connection._parent.create_connection( with_db=True )
NestedTransactionContext < TransactionContext
...
PoolConnectionWrapper
"Class to manage acquiring from and releasing connections to a pool."
__init__( client )
pool = client._pool
connection = None # T_conn
~__aenter__() -> T_conn
ensure_connection()
-> connection = pool.acquire()
~__aexit__( ... )
pool.release( connection )
~ensure_connection()
if not pool:
client.create_connection( with_db=True )
pool = client._pool
BaseTransactionWrapper
abc~begin() -> None
abc~savepoint() -> None
abc~rollback() -> None
abc~savepoint_rollback() -> None
abc~commit() -> None
abc~release_savepoint() -> None
```
#### `backends/base/config_generator.py`
```python
DB_LOOKUP = ...
expand_db_url( db_url, testing = False ) -> dict
# -> { "engine": ..., "credentials": ... }
generate_config( db_url, app_modules, connection_label = None, testing = False )
# app_modules = { app_label, modules }
# connection_label defaults to "default"
# -> { "connections" : { connection_label: expand_db_url( db_url, testing ) }
# "apps" : {
# app_label: { "models": modules, "default_connection": connection_label }
# for app_label, modules in app_modules.items()
# } }
```
#### `backends/base/executor.py` (todo)
```python
EXECUTOR_CACHE = {} # ( str, str, str ) : ( list, str, list, str, {}, str, {} )
BaseExecutor
DB_NATIVE = { bytes, str, int, float, decimal.Decimal, datetime.datetime, datetime.date }
EXPLAIN_PREFIX = "EXPLAIN"
FILTER_FUNC_OVERRIDE = {}
TO_DB_OVERRIDE = {}
get_overridden_filter_func( ... )
------------------------------------------------------------------
__init__( ... )
~_do_prefetch( ... )
~_execute_prefetch_queries( ... )
_make_prefetch_queries()
~_prefetch_direct_relation( ... )
~_prefetch_m2m_relation( ... )
~_prefetch_reverse_o2o_relation( ... )
~_prefetch_reverse_relation( ... )
_prepare_insert_columns( ... )
~_process_insert_result( ... )
_prepare_insert_statement( ... )
~execute_bulk_insert( ... )
~execute_delete( ... )
~execute_explain( ... )
~execute_insert( ... )
~execute_select( ... )
~execute_update( ... )
~fetch_for_list( ... )
get_update_sql( ... )
parameter( ... )
```
#### `backends/base/schema_generator.py` (todo)
```python
BaseSchemaGenerator
DIALECT = "sql"
FIELD_TEMPLATE = '"{name}" {type} {nullable} {unique}{primary}{default}{comment}'
FK_TEMPLATE = ' REFERENCES "{table}" ("{field}") ON DELETE {on_delete}{comment}'
GENERATED_PK_TEMPLATE = '"{field_name}" {generated_sql}{comment}'
INDEX_CREATE_TEMPLATE = 'CREATE INDEX {exists}"{index_name}" ON "{table_name}" ({fields});'
M2M_TABLE_TEMPLATE = (
'CREATE TABLE {exists}"{table_name}" (\n'
' "{backward_key}" {backward_type} NOT NULL{backward_fk},\n'
' "{forward_key}" {forward_type} NOT NULL{forward_fk}\n'
"){extra}{comment};"
)
TABLE_CREATE_TEMPLATE = 'CREATE TABLE {exists}"{table_name}" ({fields}){extra}{comment};'
UNIQUE_CONSTRAINT_CREATE_TEMPLATE = 'CONSTRAINT "{index_name}" UNIQUE ({fields})'
UNIQUE_INDEX_CREATE_TEMPLATE = INDEX_CREATE_TEMPLATE.replace(" INDEX", " UNIQUE INDEX")
_get_escape_translation_table()
_make_hash( ... )
------------------------------------------------------------------
__init__( client )
_column_comment_generator( ... )
_column_default_generator( ... )
_create_fk_string( ... )
_create_string( ... )
_escape_comment( ... )
_escape_default_value( ... )
_generate_fk_name( ... )
_generate_index_name( ... )
_get_index_sql( ... )
_get_inner_statements()
_get_models_to_create( ... )
_get_pk_field_sql_type( ... )
_get_table_sql( ... )
...200 lines...
_get_unique_constraint_sql( ... )
_get_unique_index_sql( ... )
_post_table_hook()
_table_comment_generator( ... )
_table_generate_extra( ... )
~generate_from_string( creation_string ) -> client.execute_script( creation_string )
get_create_schema_sql( ... )
quote( val ) -> '"{val}"'
```
#### `backends/base_postgres/__init__.py`
#### `backends/base_postgres/client.py`
```python
BasePostgresClient < BaseDBAsyncClient, abc.ABC
DSN_TEMPLATE = "postgres://{user}:{password}@{host}:{port}/{database}"
_connection = None
_pool = None
capabilities = Capabilities("postgres", support_update_limit_order_by=False)
connection_class = None
executor_class = BasePostgresExecutor
loop = None
query_class = PostgreSQLQuery
schema_generator = BasePostgresSchemaGenerator
------------------------------------------------------------------
__init__( user=None, password=None, database=None, host=None, port=5432, **kwargs )
super( **kwargs )
_connection = None
_pool = None
_template = {}
port = int( port )
extra = kwargs.copy();
application_name = extra.pop( "application_name", None )
connection_class = extra.pop( "connection_class", connection_class )
loop = extra.pop( "loop", None)
pool_maxsize = int( extra.pop( "maxsize", 5 ) )
pool_minsize = int( extra.pop( "minsize", 1 ) )
schema = extra.pop( "schema", None )
server_settings = {} or copy of extra.pop( "server_settings" ) if exists
extra.pop( "connection_name", "fetch_inserted" )
# methods below match BaseDBAsyncClient
acquire_connection()
-> PoolConnectionWrapper( _pool )
~close(): _close(); _template.clear()
~db_create():
create_connection( with_db=False )
execute_script( "CREATE DATABASE {database} OWNER {user}" ); close()
~db_delete():
create_connection( with_db=False )
execute_script( "DROP DATABASE {database}" ); close()
~execute_script( query )
acquire_connection as conn: log.debug( query ); conn.execute( query )
# methods below match BaseDBAsyncClient and raise NotImplementedError
_in_transaction() -> TransactionContext
~create_connection( with_db )
~execute_insert( query, values ) -> pk
~execute_many( query, values ) -> None
~execute_query( query, values=None ) -> ( int, [ dict ] )
~execute_query_dict( query, values=None ) -> [ dict ]
# methods below are new and raise NotImplementedError
~_close()
~_expire_connections()
~_translate_exceptions( func, *args, **kwargs ) -> Exception
~create_pool( **kwargs )
```
#### `backends/base_postgres/executor.py` (todo)
```python
postgres_search( field, value ) -> tortoise.contrib.postgres.search.SearchCriterion( field, expr=value )
# All attributes are overrides of base class.
BasePostgresExecutor < BaseExecutor
DB_NATIVE = BaseExecutor.DB_NATIVE | { bool, uuid.UUID }
EXPLAIN_PREFIX = "EXPLAIN (FORMAT JSON, VERBOSE)"
FILTER_FUNC_OVERRIDE = {
tortoise.filters.search : postgres_search,
tortoise.filters.json_contains : tortoise.contrib.postgres.json_functions.postgres_json_contains,
tortoise.filters.json_contained_by : tortoise.contrib.postgres.json_functions.postgres_json_contained_by,
tortoise.filters.json_filter : tortoise.contrib.postgres.json_functions.postgres_json_filter,
tortoise.filters.posix_regex : tortoise.contrib.postgres.regex.postgres_posix_regex,
}
------------------------------------------------------------------
_prepare_insert_statement( ... )
~_process_insert_result( ... )
```
#### `backends/base_postgres/schema_generator.py` (todo)
```python
# All attributes are overrides of base class except `COLUMN_COMMENT_TEMPLATE`.
BasePostgresSchemaGenerator < BaseSchemaGenerator
COLUMN_COMMENT_TEMPLATE = 'COMMENT ON COLUMN "{table}"."{column}" IS \'{comment}\';'
DIALECT = "postgres"
GENERATED_PK_TEMPLATE = '"{field_name}" {generated_sql}'
TABLE_COMMENT_TEMPLATE = "COMMENT ON TABLE \"{table}\" IS '{comment}';"
_get_escape_translation_table()
------------------------------------------------------------------
__init__( client ) comments_array = []
_column_comment_generator( ... )
_column_default_generator( ... )
_escape_default_value( ... )
_post_table_hook()
_table_comment_generator( ... )
```
#### `connection.py` (todo)
```python
# db_config is same object passed to Tortoise.init()
ConnectionHandler
_conn_storage = contextvars.ContextVar( "_conn_storage", default={} ) # { name: BaseDBAsyncClient }
------------------------------------------------------------------
__init__() _db_config = None, _create_db = False
_clear_storage() -> _conn_storage.get().clear()
_copy_storage() -> copy.copy( _get_storage() )
_create_connection( conn_alias )
-> AsyncpgDBclient( **credentials, connection_name=conn_alias )
_discover_client_class( engine ) -> import and return engine.client_class
_get_db_info( conn_alias ) -> db_config[ conn_alias ] # str | dict
_get_storage() -> _conn_storage.get()
~_init( db_config, create_db )
set or update _db_config
set _create_db
-> _init_connections()
~_init_connections()
for alias in db_config
conn = get( alias )
conn.db_create() if _create_db
_set_storage( new_storage ) -> _conn_storage.set( new_storage ) -> token
all()
...
~close_all( ... )
...
db_config -> _db_config
discard( ... )
...
get( conn_alias )
if not exists yet: _create_connection( conn_alias ) -> storage[ conn_alias ]
-> storage[ conn_alias ]
reset( ... )
...
set( ... )
...
connections = ConnectionHandler()
```
#### `contrib/__init__.py` (todo)
```python
```
#### `contrib/postgres/__init__.py` (todo)
```python
```
#### `contrib/postgres/fields.py` (todo)
```python
```
#### `contrib/postgres/functions.py` (todo)
```python
```
#### `contrib/postgres/indexes.py` (todo)
```python
```
#### `contrib/postgres/json_functions.py` (todo)
```python
```
#### `contrib/postgres/regex.py` (todo)
```python
```
#### `contrib/postgres/search.py` (todo)
```
```
#### `contrib/starlette/__init__.py` (todo)
```python
```
#### `contrib/test/__init__.py` (todo)
```python
```
#### `contrib/test/condition.py` (todo)
```python
```
#### `converters.py` (todo)
```python
```
#### `exceptions.py` (todo)
```python
```
#### `expressions.py` (todo)
```python
```
#### `fields/__init__.py`
Only imports.
#### `fields/base.py`
```python
import pypika.Term
_FieldMeta
__new__( name, bases, attrs )
sets field_type to bases[ 1: ]
Field < metaclass = _FieldMeta
GENERATED_SQL = None # str = sql to autogenerate (required if allows_generated)
SQL_TYPE = None # str = name of type in DB
field_type = None # set by _FieldMeta
indexable = True
has_db_field = True # if false, field is "virtual"
skip_to_python_if_native = False
allows_generated = False # are values able to be DB-generated?
function_cast = None # ( Term ) -> Term
------------------------------------------------------------------------------
# per-DB override specified in inner classes named like "_db_sqlite"
__init__(
source_field = None # name of db column, else generated from attr name
generated = False # field's value is DB-generated
primary_key = None # bool -> self.pk (if True, sets db_index and unique)
null = False
default = None # can be callable
unique = False
db_index = None # bool -> self.index
description = None # str
model = None # Model
validators = None # [ Validator | func, ... ] (defaults to [])
)
model_field_name = ""
docstring = None
reference = None # Field
_get_dialects()
constraints -> {} # defined in the Pydantic/JSONSchema format
describe( serializable ) -> str
get_db_field_types()
get_for_dialect( dialect, key )
required # True if !null & !default & !generated
to_db_value( value, instance ) -> value
value = field_type( value ) unless None or instanceof field_type
validate( value )
to_python_value( value ) -> value
value = field_type( value ) unless None or instanceof field_type
validate( value ) ▲ValidationError( "{model_field_name}: {ex}" )
```
#### `fields/data.py` (todo)
```python
IntField < Field, int
if primary_key, adds generated=True unless already in kwargs
...
```
#### `fields/relational.py` (todo)
```python
_NoneAwaitable
__await__ -> None
__bool__ -> False
NoneAwaitable = _NoneAwaitable()
ReverseRelation
...
ManytoManyRelation < ReverseRelation
...
RelationalField < Field
...
ForeignKeyFieldInstance < RelationalField
...
BackwardFKRelation < RelationalField
...
OneToOneFieldInstance < ForeignKeyFieldInstance
...
BackwardOneToOneRelation < BackwardFKRelation
...
ManytoManyFieldInstance < RelationalField
...
OneToOneField(
model_name, related_name = None,
on_delete = CASCADE, db_constraint = True, null = False, **kwargs
) -> OneToOneFieldInstance( ... )
ForeignKeyField(
model_name, related_name = None,
on_delete = CASCADE, db_constraint = True, null = False, **kwargs
) -> ForeignKeyFieldInstance( ... )
ManyToManyField(
model_name, through = None, forward_key = None, backward_key = "", related_name = "",
on_delete = CASCADE, db_constraint = True, create_unique_index = True, **kwargs
) -> ManyToManyFieldInstance( ... )
```
#### `filters.py` (todo)
```python
```
#### `functions.py` (todo)
```python
```
#### `indexes.py`
```python
Index
INDEX_CREATE_TEMPLATE = "CREATE{index_type}INDEX {index_name} ON {table_name} ({fields}){extra};"
INDEX_TYPE = ""
------------------------------------------------------------------
# provide fields -or- expressions (both or neither -> error)
__init__( *expressions, fields = None, name = None ): fields |= [], extra = ""
__eq__( other ) -> compares type and __dict__
__hash__() -> hash( ( tuple( fields ), name, tuple( expressions ) ) )
get_sql( schema_generator, model, save ) -> str
...
index_name( schema_generator, model ) -> name -or- schema_generator._generate_index_name( "idx", model, fields )
PartialIndex < Index
__init__( *expressions, fields = None, name = None, condition = None )
if condition:
extra = " WHERE "
items = [ f"{ k } = { ValueWrapper( v ) }" for k, v in condition.items() ]
extra += " AND ".join( items )
```
#### `log.py`
```python
db_client_logger
```
#### `managers.py`
```python
Manager
__init__( model=None ) _model = model
__getattr__( item ) -> getattr( self.get_queryset(), item )
get_queryset() -> QuerySet( self._model )
```
#### `models.py` (todo)
```python
Model
._meta
.add_field( ... )
.app
.db_table
.filters
.fk_fields
.m2m_fields
.o2o_fields
.pk_attr
._inited
.describe( serializable=? )
```
#### `query_utils.py` (todo)
```python
```
#### `queryset.py` (todo)
```python
```
#### `router.py`
```python
ConnectionRouter
__init__() _routers = None
_db_route( model, action ) -> connections.get( _router_func( model, action ) )
_router_func( model, action ) -> first( r.ACTION( model ) for r in routers )
db_for_read( model ) -> _db_route( model, "db_for_read" )
db_for_write( model ) -> _db_route( model, "db_for_write" )
init_routers( routers ) _routers = [ r() for r in routers ]
router = ConnectionRouter()
```
#### `signals.py`
```python
Signals = Enum( "Signals", [ "pre_save", "post_save", "pre_delete", "post_delete" ] )
post_save( *senders ) : decorator( f ) -> f
loop senders: sender.register_listener( Signals.post_save, f )
# same with: pre_save, pre_delete, post_delete
```
#### `timezone.py`
```python
is_aware( value ) -> value.utcoffset() is not None
is_naive( value ) -> value.utcoffset() is None
get_use_tz() -> os.environ.get( "USE_TZ" ) == "True"
get_timezone() -> os.environ.get( "TIMEZONE" ) or "UTC"
get_default_timezone() -> pytz.timezone( get_timezone() )
now() -> datetime.now( tz=pytz.utc )
localtime( value=None, timezone=None )
-> ( value or now() ).astimezone( pytz.timezone( timezone ) or get_default_timezone() )
make_aware( value, timezone=None, is_dst=None )
-> value.replace( tzinfo = pytz.timezone( timezone ) or get_default_timezone() )
make_naive( value, timezone=None )
-> value.astimezone( pytz.timezone( timezone ) or get_default_timezone() ).replace( tzinfo=None )
```
#### `transactions.py`
```python
_get_connection( connection_name ) : BaseDBAsyncClient
-> connections.get( connection_name or first connections.db_config key )
in_transaction( connection_name = None ) : TransactionContext
-> _get_connection( connection_name )._in_transaction()
atomic( connection_name = None ) # function decorator
async with in_transaction( connection_name )
return await func( *args, **kwargs )
```
#### `utils.py`
```python
get_schema_sql( client, safe ) -> client.schema_generator( client ).get_create_schema_sql( safe )
~generate_schema_for_client( client, safe ) -> None
if get_schema_sql( client, safe ): client.schema_generator( client ).generate_from_string( schema )
chunk( instances, batch_size = None ) -> yield tuples with batch_size instances
```
#### `validators.py`
```python
Validator < metaclass = abc.ABCMeta
abc __call__( value ) # raises ValidationError
RegexValidator < Validator
__init__( pattern, flags ): regex = re.compile( pattern, flags )
__call__( value ): regex.match( value ) or "Value '{value}' does not match regex '{self.regex.pattern}'"
MaxLengthValidator < Validator
__init__( max_length )
__call__( value ): "Value must not be None" -or- "Length of '{value}' {len(value)} > {self.max_length}"
MinLengthValidator < Validator
__init__( min_length )
__call__( value ): "Value must not be None" -or- "Length of '{value}' {len(value)} < {self.min_length}"
NumericValidator < Validator
types = ( int, float, Decimal )
_validate_type( value ): isinstance( value, self.types ) or "Value must be a numeric value and is required"
MinValueValidator < NumericValidator
__init__( min_value ): _validate_type( min_value )
__call__( value ): _validate_type( value ); "Value should be greater or equal to {self.min_value}"
MaxValueValidator < NumericValidator
__init__( max_value ): _validate_type( max_value )
__call__( value ): _validate_type( value ); "Value should be less or equal to {self.max_value}"
CommaSeparatedIntegerListValidator < Validator
__init__( allow_negative = False ):
pattern = r"^%(neg)s\d+(?:%(sep)s%(neg)s\d+)*\Z" % {
"neg": "(-)?" if allow_negative else "",
"sep": re.escape(","),
}
regex = RegexValidator( pattern, re.I )
__call__( value ): regex( value )
validate_ipv4_address( value ): ipaddress.IPv4Address( value ) or "'{value}' is not a valid IPv4 address."
validate_ipv6_address( value ): ipaddress.IPv6Address( value ) or "'{value}' is not a valid IPv6 address."
validate_ipv46_address( value ): validate_ipv4_address( value ) or validate_ipv6_address( value ) or
"'{value}' is not a valid IPv4 or IPv6 address."
```