# Python : Database : Tortoise : Source [tortoise/](https://github.com/tortoise/tortoise-orm/tree/main/tortoise/) #### `__init__.py` (todo) ```python Tortoise _inited = False apps = {} # { app: { model: Model } } table_name_generator = None # callable( Model, str _build_inital_querysets() ... _discover_models( models_path, app_label ) -> [ Model ] # models_path -> str | Module # checks for "__models__", else... # grabs module attributes that are non-abstract Model subclasses with absent or matching ._meta.app # (if absent, _meta.app = app_label) ~_drop_databases() ... _get_config_from_config_file( config_file ) -> {} ... _init_apps( apps_config ) # apps_config -> { str { str: * } } ... _init_relations() _build_initial_querysets() _init_relations() def get_related_model # test if app and model really exist, return model def split_reference # validate and split name into app + model def init_fk_o2o_field _init_routers( ... ) ... _init_timezone( use_tz, timezone ) ... ~_reset_apps() ... ~close_connections() ... describe_model( model, serializable = True ) -> {} describe_models( models = None, serializable = True ) # models -> [ Model ], else returns all models # serializable False -> python objects, True -> JSON-serializable data # -> { "models.User": { ... } } ~generate_schemas( safe = True ) ... get_connection( connection_name ) -> connections.get( connection_name ) ~init( config = None, config_file = None, _create_db = False, db_url = None, modules = None, use_tz = False, timezone = "UTC", routers = None, table_name_generator = None ): connections._init( config[ "connections" ], _create_db ) saves params _init_connections() for every alias: create connection (and optionally create DB) via connections.get( alias ) _init_apps( config[ "apps" ] ) for app_label, info... connections.get( info.default_connection ) # assert conn alias exists init_models( info.models, app_label, _init_relations=False ) _discover_models() # checks for "__models__", else... # grabs module attributes that are non-abstract Model subclasses with absent or matching ._meta.app # (if absent, _meta.app = app_label) populate .apps _init_relations() if _init_relations ... for models: set _meta.default_connection = info.default_connection init_models( models_path, app_label, _init_relations=True ) # models_path -> [ str | Module ] _discover_models() populate .apps _init_relations() if _init_relations run_async( coro ) loop = asyncio.get_event_loop() try: loop.run_until_complete( coro ) finally: loop.run_until_complete( connections.close_all( discard=True ) ) ``` #### `backends/__init__.py` #### `backends/asyncpg/__init__.py` ```python client_class = client.AsyncpgDBclient ``` #### `backends/asyncpg/client.py` ```python AsyncpgDBClient < BasePostgresClient connection_class = asyncpg.connection.Connection executor_class = AsyncpgExecutor schema_generator = AsyncpgSchemaGenerator _connection = None _pool ------------------------------------------------------------------ # all methods below match BasePostgresClient ~_close() if _pool: asyncio.wait_for( _pool.close(), 10 ) -or- _pool.terminate() _pool = None ~_expire_connections() _pool.expire_connections() if _pool _in_transaction() -> TransactionContextPooled( TransactionWrapper( self ) ) ~_translate_exceptions( ... ) ... acquire_connection() -> PoolConnectionWrapper( self ) ~create_connection( with_db ) ... ~create_pool( **kwargs ) -> asyncpg.create_pool( None, **kwargs ) ~db_delete() super(); close() ~execute_insert( query, values ) acquire_connection() as conn log.debug( "{query}: {values}" ) -> conn.fetchrow( query, *values ) ~execute_many( query, values ) acquire_connection() as conn: log.debug( "{query}: {values}" ) trans = connection.transaction(); trans.start(); conn.executemany( query, values ); trans.commit() -or- trans.rollback() ~execute_query( query, values=None ) acquire_connection() as conn: log.debug( "{query}: {values}" ) if UPDATE|DELETE: conn.execute( query, *values ); -> rows_affected, [] else: rows = conn.fetch( query, *values ); -> len( rows ), rows ~execute_query_dict( query, values=None ) acquire_connection() as conn: log.debug( "{query}: {values}" ) -> conn.fetch( query, *values ) TransactionWrapper < AsyncpgDBClient, BaseTransactionWrapper "A transactional connection wrapper for psycopg. asyncpg implements nested transactions (savepoints) natively, so we don't need to." __init__( connection ) _connection = connection._connection _finalized = False _lock = asyncio.Lock() _parent = connection connection_name = connection.connection_name log = connection.log transaction = None # overrides for AsyncpgDBClient _in_transaction() -> NestedTransactionContext( TransactionWrapper( self ) ) acquire_connection() -> ConnectionWrapper( _lock, self ) ~execute_many( query, values ) acquire_connection() as conn: log.debug( "{query}: {values}" ) conn.executemany( query, values ) # overrides for BaseTransactionWrapper ~begin() transaction = _connection.transaction() transaction.start() ~commit() asserts transaction and not _finalized transaction.commit(); _finalized = True ~rollback() asserts transaction and not _finalized transaction.rollback(); _finalized = True ~savepoint() -> begin() ~release_savepoint() -> commit() ~savepoint_rollback() -> rollback() ``` ###### Flattened ```python AsyncpgDBClient < BasePostgresClient DSN_TEMPLATE = "postgres://{user}:{password}@{host}:{port}/{database}" _connection = None _pool = None capabilities = Capabilities("postgres", support_update_limit_order_by=False) connection_class = asyncpg.connection.Connection executor_class = AsyncpgExecutor loop = None query_class = PostgreSQLQuery schema_generator = AsyncpgSchemaGenerator ------------------------------------------------------------------ __init__( user=None, password=None, database=None, host=None, port=5432, **kwargs ) _connection = None _pool = None _template = {} connection_name = connection_name fetch_inserted = fetch_inserted | True log = db_client_logger port = int( port ) extra = kwargs.copy(); application_name = extra.pop( "application_name", None ) connection_class = extra.pop( "connection_class", connection_class ) loop = extra.pop( "loop", None) pool_maxsize = int( extra.pop( "maxsize", 5 ) ) pool_minsize = int( extra.pop( "minsize", 1 ) ) schema = extra.pop( "schema", None ) server_settings = {} or copy of extra.pop( "server_settings" ) if exists extra.pop( "connection_name", "fetch_inserted" ) ~_close() if _pool: asyncio.wait_for( _pool.close(), 10 ) -or- _pool.terminate() _pool = None ~_expire_connections() _pool.expire_connections() if _pool _in_transaction() -> TransactionContextPooled( TransactionWrapper( self ) ) ~_translate_exceptions( ... ) ... acquire_connection() -> PoolConnectionWrapper( self ) ~close(): _close(); _template.clear() ~create_connection( with_db ) _pool = create_pool( host port user password min_size max_size connection_class loop server_settings **extra database = database if with_db else None (pulls from PGDATABASE instead) ) ~create_pool( **kwargs ) -> asyncpg.create_pool( None, **kwargs ) ~db_create(): create_connection( with_db=False ) execute_script( "CREATE DATABASE {database} OWNER {user}" ); close() ~db_delete() create_connection( with_db=False ) execute_script( "DROP DATABASE {database}" ); close() ~execute_insert( query, values ) acquire_connection() as conn log.debug( "{query}: {values}" ) -> conn.fetchrow( query, *values ) ~execute_many( query, values ) acquire_connection() as conn: log.debug( "{query}: {values}" ) trans = connection.transaction(); trans.start(); conn.executemany( query, values ); trans.commit() -or- trans.rollback() ~execute_query( query, values=None ) acquire_connection() as conn: log.debug( "{query}: {values}" ) if UPDATE|DELETE: conn.execute( query, *values ); -> rows_affected, [] else: rows = conn.fetch( query, *values ); -> len( rows ), rows ~execute_query_dict( query, values=None ) acquire_connection() as conn: log.debug( "{query}: {values}" ) -> conn.fetch( query, *values ) ~execute_script( query ) acquire_connection as conn: log.debug( query ); conn.execute( query ) ``` #### `backends/asyncpg/executor.py` ```python AsyncpgExecutor < BasePostgresExecutor (empty) ``` #### `backends/asyncpg/schema_generator.py` ```python AsyncpgSchemaGenerator < BasePostgresSchemaGenerator (empty) ``` #### `backends/base/__init__.py` #### `backends/base/client.py` ```python Capabilities __init__( ... ) ... T_conn =~ asyncpg.Connection BaseDBAsyncClient query_class = Query executor_class = BaseExecutor schema_generator = BaseSchemaGenerator capabilities = Capabilities( "" ) -------------------------------------------------- __init__( connection_name, fetch_inserted = True ) log = db_client_logger # all methods below raise NotImplementedError _in_transaction() -> TransactionContext acquire_connection() -> ConnectionWrapper | PoolConnectionWrapper "Acquires a connection from the pool. Will return the current context connection if already in a transaction." ~close() ~create_connection( with_db ) # with_db: False -> "use default connection instead of configured one" # True -> "select the DB to use" ~db_create() ~db_delete() ~execute_insert( query, values ) -> pk ~execute_many( query, values ) -> None # bulk insert ~execute_query( query, values = None ) -> ( int, [ dict ] ) ~execute_query_dict( query, values = None ) -> [ dict ] ~execute_script( query ) -> None ConnectionWrapper "Wraps the connections with a lock to facilitate safe concurrent access when using asyncio.gather, TaskGroup, or similar." __init__( lock, client ) connection = client._connection # T_conn ~__aenter__() -> T_conn lock.acquire() ensure_connection() -> connection ~__aexit__( ... ) lock.release() ~ensure_connection() if not connection: client.create_connection( with_db=True ) connection = client_connection TransactionContext "A context manager interface for transactions. Returned from utils.in_transaction and BaseDBAsyncClient._in_transaction." connection # T_conn -------------- abc~__aenter() -> T_conn abc~__aexit( ... ) TransactionContextPooled < TransactionContext "A version of TransactionContext that uses a pool to acquire connections." __init__( connection ) connection_name = connection.connection_name ~__aenter__() -> T_conn "Set the context variable so the current task is always seeing a TransactionWrapper conneciton." ensure_connection() token = connections.set( connection_name, connection ) connection._connection = connection._parent._pool.acquire() connection.begin() -> connection ~__aexit__( ... ) if not connection._finalized: connection.commit() -or- connection.rollback() if connection._parent._pool: connection._parent._pool.release( connection._connection ) connections.reest( token ~ensure_conenction() if not connection._parent._pool: connection._parent.create_connection( with_db=True ) NestedTransactionContext < TransactionContext ... PoolConnectionWrapper "Class to manage acquiring from and releasing connections to a pool." __init__( client ) pool = client._pool connection = None # T_conn ~__aenter__() -> T_conn ensure_connection() -> connection = pool.acquire() ~__aexit__( ... ) pool.release( connection ) ~ensure_connection() if not pool: client.create_connection( with_db=True ) pool = client._pool BaseTransactionWrapper abc~begin() -> None abc~savepoint() -> None abc~rollback() -> None abc~savepoint_rollback() -> None abc~commit() -> None abc~release_savepoint() -> None ``` #### `backends/base/config_generator.py` ```python DB_LOOKUP = ... expand_db_url( db_url, testing = False ) -> dict # -> { "engine": ..., "credentials": ... } generate_config( db_url, app_modules, connection_label = None, testing = False ) # app_modules = { app_label, modules } # connection_label defaults to "default" # -> { "connections" : { connection_label: expand_db_url( db_url, testing ) } # "apps" : { # app_label: { "models": modules, "default_connection": connection_label } # for app_label, modules in app_modules.items() # } } ``` #### `backends/base/executor.py` (todo) ```python EXECUTOR_CACHE = {} # ( str, str, str ) : ( list, str, list, str, {}, str, {} ) BaseExecutor DB_NATIVE = { bytes, str, int, float, decimal.Decimal, datetime.datetime, datetime.date } EXPLAIN_PREFIX = "EXPLAIN" FILTER_FUNC_OVERRIDE = {} TO_DB_OVERRIDE = {} get_overridden_filter_func( ... ) ------------------------------------------------------------------ __init__( ... ) ~_do_prefetch( ... ) ~_execute_prefetch_queries( ... ) _make_prefetch_queries() ~_prefetch_direct_relation( ... ) ~_prefetch_m2m_relation( ... ) ~_prefetch_reverse_o2o_relation( ... ) ~_prefetch_reverse_relation( ... ) _prepare_insert_columns( ... ) ~_process_insert_result( ... ) _prepare_insert_statement( ... ) ~execute_bulk_insert( ... ) ~execute_delete( ... ) ~execute_explain( ... ) ~execute_insert( ... ) ~execute_select( ... ) ~execute_update( ... ) ~fetch_for_list( ... ) get_update_sql( ... ) parameter( ... ) ``` #### `backends/base/schema_generator.py` (todo) ```python BaseSchemaGenerator DIALECT = "sql" FIELD_TEMPLATE = '"{name}" {type} {nullable} {unique}{primary}{default}{comment}' FK_TEMPLATE = ' REFERENCES "{table}" ("{field}") ON DELETE {on_delete}{comment}' GENERATED_PK_TEMPLATE = '"{field_name}" {generated_sql}{comment}' INDEX_CREATE_TEMPLATE = 'CREATE INDEX {exists}"{index_name}" ON "{table_name}" ({fields});' M2M_TABLE_TEMPLATE = ( 'CREATE TABLE {exists}"{table_name}" (\n' ' "{backward_key}" {backward_type} NOT NULL{backward_fk},\n' ' "{forward_key}" {forward_type} NOT NULL{forward_fk}\n' "){extra}{comment};" ) TABLE_CREATE_TEMPLATE = 'CREATE TABLE {exists}"{table_name}" ({fields}){extra}{comment};' UNIQUE_CONSTRAINT_CREATE_TEMPLATE = 'CONSTRAINT "{index_name}" UNIQUE ({fields})' UNIQUE_INDEX_CREATE_TEMPLATE = INDEX_CREATE_TEMPLATE.replace(" INDEX", " UNIQUE INDEX") _get_escape_translation_table() _make_hash( ... ) ------------------------------------------------------------------ __init__( client ) _column_comment_generator( ... ) _column_default_generator( ... ) _create_fk_string( ... ) _create_string( ... ) _escape_comment( ... ) _escape_default_value( ... ) _generate_fk_name( ... ) _generate_index_name( ... ) _get_index_sql( ... ) _get_inner_statements() _get_models_to_create( ... ) _get_pk_field_sql_type( ... ) _get_table_sql( ... ) ...200 lines... _get_unique_constraint_sql( ... ) _get_unique_index_sql( ... ) _post_table_hook() _table_comment_generator( ... ) _table_generate_extra( ... ) ~generate_from_string( creation_string ) -> client.execute_script( creation_string ) get_create_schema_sql( ... ) quote( val ) -> '"{val}"' ``` #### `backends/base_postgres/__init__.py` #### `backends/base_postgres/client.py` ```python BasePostgresClient < BaseDBAsyncClient, abc.ABC DSN_TEMPLATE = "postgres://{user}:{password}@{host}:{port}/{database}" _connection = None _pool = None capabilities = Capabilities("postgres", support_update_limit_order_by=False) connection_class = None executor_class = BasePostgresExecutor loop = None query_class = PostgreSQLQuery schema_generator = BasePostgresSchemaGenerator ------------------------------------------------------------------ __init__( user=None, password=None, database=None, host=None, port=5432, **kwargs ) super( **kwargs ) _connection = None _pool = None _template = {} port = int( port ) extra = kwargs.copy(); application_name = extra.pop( "application_name", None ) connection_class = extra.pop( "connection_class", connection_class ) loop = extra.pop( "loop", None) pool_maxsize = int( extra.pop( "maxsize", 5 ) ) pool_minsize = int( extra.pop( "minsize", 1 ) ) schema = extra.pop( "schema", None ) server_settings = {} or copy of extra.pop( "server_settings" ) if exists extra.pop( "connection_name", "fetch_inserted" ) # methods below match BaseDBAsyncClient acquire_connection() -> PoolConnectionWrapper( _pool ) ~close(): _close(); _template.clear() ~db_create(): create_connection( with_db=False ) execute_script( "CREATE DATABASE {database} OWNER {user}" ); close() ~db_delete(): create_connection( with_db=False ) execute_script( "DROP DATABASE {database}" ); close() ~execute_script( query ) acquire_connection as conn: log.debug( query ); conn.execute( query ) # methods below match BaseDBAsyncClient and raise NotImplementedError _in_transaction() -> TransactionContext ~create_connection( with_db ) ~execute_insert( query, values ) -> pk ~execute_many( query, values ) -> None ~execute_query( query, values=None ) -> ( int, [ dict ] ) ~execute_query_dict( query, values=None ) -> [ dict ] # methods below are new and raise NotImplementedError ~_close() ~_expire_connections() ~_translate_exceptions( func, *args, **kwargs ) -> Exception ~create_pool( **kwargs ) ``` #### `backends/base_postgres/executor.py` (todo) ```python postgres_search( field, value ) -> tortoise.contrib.postgres.search.SearchCriterion( field, expr=value ) # All attributes are overrides of base class. BasePostgresExecutor < BaseExecutor DB_NATIVE = BaseExecutor.DB_NATIVE | { bool, uuid.UUID } EXPLAIN_PREFIX = "EXPLAIN (FORMAT JSON, VERBOSE)" FILTER_FUNC_OVERRIDE = { tortoise.filters.search : postgres_search, tortoise.filters.json_contains : tortoise.contrib.postgres.json_functions.postgres_json_contains, tortoise.filters.json_contained_by : tortoise.contrib.postgres.json_functions.postgres_json_contained_by, tortoise.filters.json_filter : tortoise.contrib.postgres.json_functions.postgres_json_filter, tortoise.filters.posix_regex : tortoise.contrib.postgres.regex.postgres_posix_regex, } ------------------------------------------------------------------ _prepare_insert_statement( ... ) ~_process_insert_result( ... ) ``` #### `backends/base_postgres/schema_generator.py` (todo) ```python # All attributes are overrides of base class except `COLUMN_COMMENT_TEMPLATE`. BasePostgresSchemaGenerator < BaseSchemaGenerator COLUMN_COMMENT_TEMPLATE = 'COMMENT ON COLUMN "{table}"."{column}" IS \'{comment}\';' DIALECT = "postgres" GENERATED_PK_TEMPLATE = '"{field_name}" {generated_sql}' TABLE_COMMENT_TEMPLATE = "COMMENT ON TABLE \"{table}\" IS '{comment}';" _get_escape_translation_table() ------------------------------------------------------------------ __init__( client ) comments_array = [] _column_comment_generator( ... ) _column_default_generator( ... ) _escape_default_value( ... ) _post_table_hook() _table_comment_generator( ... ) ``` #### `connection.py` (todo) ```python # db_config is same object passed to Tortoise.init() ConnectionHandler _conn_storage = contextvars.ContextVar( "_conn_storage", default={} ) # { name: BaseDBAsyncClient } ------------------------------------------------------------------ __init__() _db_config = None, _create_db = False _clear_storage() -> _conn_storage.get().clear() _copy_storage() -> copy.copy( _get_storage() ) _create_connection( conn_alias ) -> AsyncpgDBclient( **credentials, connection_name=conn_alias ) _discover_client_class( engine ) -> import and return engine.client_class _get_db_info( conn_alias ) -> db_config[ conn_alias ] # str | dict _get_storage() -> _conn_storage.get() ~_init( db_config, create_db ) set or update _db_config set _create_db -> _init_connections() ~_init_connections() for alias in db_config conn = get( alias ) conn.db_create() if _create_db _set_storage( new_storage ) -> _conn_storage.set( new_storage ) -> token all() ... ~close_all( ... ) ... db_config -> _db_config discard( ... ) ... get( conn_alias ) if not exists yet: _create_connection( conn_alias ) -> storage[ conn_alias ] -> storage[ conn_alias ] reset( ... ) ... set( ... ) ... connections = ConnectionHandler() ``` #### `contrib/__init__.py` (todo) ```python ``` #### `contrib/postgres/__init__.py` (todo) ```python ``` #### `contrib/postgres/fields.py` (todo) ```python ``` #### `contrib/postgres/functions.py` (todo) ```python ``` #### `contrib/postgres/indexes.py` (todo) ```python ``` #### `contrib/postgres/json_functions.py` (todo) ```python ``` #### `contrib/postgres/regex.py` (todo) ```python ``` #### `contrib/postgres/search.py` (todo) ``` ``` #### `contrib/starlette/__init__.py` (todo) ```python ``` #### `contrib/test/__init__.py` (todo) ```python ``` #### `contrib/test/condition.py` (todo) ```python ``` #### `converters.py` (todo) ```python ``` #### `exceptions.py` (todo) ```python ``` #### `expressions.py` (todo) ```python ``` #### `fields/__init__.py` Only imports. #### `fields/base.py` ```python import pypika.Term _FieldMeta __new__( name, bases, attrs ) sets field_type to bases[ 1: ] Field < metaclass = _FieldMeta GENERATED_SQL = None # str = sql to autogenerate (required if allows_generated) SQL_TYPE = None # str = name of type in DB field_type = None # set by _FieldMeta indexable = True has_db_field = True # if false, field is "virtual" skip_to_python_if_native = False allows_generated = False # are values able to be DB-generated? function_cast = None # ( Term ) -> Term ------------------------------------------------------------------------------ # per-DB override specified in inner classes named like "_db_sqlite" __init__( source_field = None # name of db column, else generated from attr name generated = False # field's value is DB-generated primary_key = None # bool -> self.pk (if True, sets db_index and unique) null = False default = None # can be callable unique = False db_index = None # bool -> self.index description = None # str model = None # Model validators = None # [ Validator | func, ... ] (defaults to []) ) model_field_name = "" docstring = None reference = None # Field _get_dialects() constraints -> {} # defined in the Pydantic/JSONSchema format describe( serializable ) -> str get_db_field_types() get_for_dialect( dialect, key ) required # True if !null & !default & !generated to_db_value( value, instance ) -> value value = field_type( value ) unless None or instanceof field_type validate( value ) to_python_value( value ) -> value value = field_type( value ) unless None or instanceof field_type validate( value ) ▲ValidationError( "{model_field_name}: {ex}" ) ``` #### `fields/data.py` (todo) ```python IntField < Field, int if primary_key, adds generated=True unless already in kwargs ... ``` #### `fields/relational.py` (todo) ```python _NoneAwaitable __await__ -> None __bool__ -> False NoneAwaitable = _NoneAwaitable() ReverseRelation ... ManytoManyRelation < ReverseRelation ... RelationalField < Field ... ForeignKeyFieldInstance < RelationalField ... BackwardFKRelation < RelationalField ... OneToOneFieldInstance < ForeignKeyFieldInstance ... BackwardOneToOneRelation < BackwardFKRelation ... ManytoManyFieldInstance < RelationalField ... OneToOneField( model_name, related_name = None, on_delete = CASCADE, db_constraint = True, null = False, **kwargs ) -> OneToOneFieldInstance( ... ) ForeignKeyField( model_name, related_name = None, on_delete = CASCADE, db_constraint = True, null = False, **kwargs ) -> ForeignKeyFieldInstance( ... ) ManyToManyField( model_name, through = None, forward_key = None, backward_key = "", related_name = "", on_delete = CASCADE, db_constraint = True, create_unique_index = True, **kwargs ) -> ManyToManyFieldInstance( ... ) ``` #### `filters.py` (todo) ```python ``` #### `functions.py` (todo) ```python ``` #### `indexes.py` ```python Index INDEX_CREATE_TEMPLATE = "CREATE{index_type}INDEX {index_name} ON {table_name} ({fields}){extra};" INDEX_TYPE = "" ------------------------------------------------------------------ # provide fields -or- expressions (both or neither -> error) __init__( *expressions, fields = None, name = None ): fields |= [], extra = "" __eq__( other ) -> compares type and __dict__ __hash__() -> hash( ( tuple( fields ), name, tuple( expressions ) ) ) get_sql( schema_generator, model, save ) -> str ... index_name( schema_generator, model ) -> name -or- schema_generator._generate_index_name( "idx", model, fields ) PartialIndex < Index __init__( *expressions, fields = None, name = None, condition = None ) if condition: extra = " WHERE " items = [ f"{ k } = { ValueWrapper( v ) }" for k, v in condition.items() ] extra += " AND ".join( items ) ``` #### `log.py` ```python db_client_logger ``` #### `managers.py` ```python Manager __init__( model=None ) _model = model __getattr__( item ) -> getattr( self.get_queryset(), item ) get_queryset() -> QuerySet( self._model ) ``` #### `models.py` (todo) ```python Model ._meta .add_field( ... ) .app .db_table .filters .fk_fields .m2m_fields .o2o_fields .pk_attr ._inited .describe( serializable=? ) ``` #### `query_utils.py` (todo) ```python ``` #### `queryset.py` (todo) ```python ``` #### `router.py` ```python ConnectionRouter __init__() _routers = None _db_route( model, action ) -> connections.get( _router_func( model, action ) ) _router_func( model, action ) -> first( r.ACTION( model ) for r in routers ) db_for_read( model ) -> _db_route( model, "db_for_read" ) db_for_write( model ) -> _db_route( model, "db_for_write" ) init_routers( routers ) _routers = [ r() for r in routers ] router = ConnectionRouter() ``` #### `signals.py` ```python Signals = Enum( "Signals", [ "pre_save", "post_save", "pre_delete", "post_delete" ] ) post_save( *senders ) : decorator( f ) -> f loop senders: sender.register_listener( Signals.post_save, f ) # same with: pre_save, pre_delete, post_delete ``` #### `timezone.py` ```python is_aware( value ) -> value.utcoffset() is not None is_naive( value ) -> value.utcoffset() is None get_use_tz() -> os.environ.get( "USE_TZ" ) == "True" get_timezone() -> os.environ.get( "TIMEZONE" ) or "UTC" get_default_timezone() -> pytz.timezone( get_timezone() ) now() -> datetime.now( tz=pytz.utc ) localtime( value=None, timezone=None ) -> ( value or now() ).astimezone( pytz.timezone( timezone ) or get_default_timezone() ) make_aware( value, timezone=None, is_dst=None ) -> value.replace( tzinfo = pytz.timezone( timezone ) or get_default_timezone() ) make_naive( value, timezone=None ) -> value.astimezone( pytz.timezone( timezone ) or get_default_timezone() ).replace( tzinfo=None ) ``` #### `transactions.py` ```python _get_connection( connection_name ) : BaseDBAsyncClient -> connections.get( connection_name or first connections.db_config key ) in_transaction( connection_name = None ) : TransactionContext -> _get_connection( connection_name )._in_transaction() atomic( connection_name = None ) # function decorator async with in_transaction( connection_name ) return await func( *args, **kwargs ) ``` #### `utils.py` ```python get_schema_sql( client, safe ) -> client.schema_generator( client ).get_create_schema_sql( safe ) ~generate_schema_for_client( client, safe ) -> None if get_schema_sql( client, safe ): client.schema_generator( client ).generate_from_string( schema ) chunk( instances, batch_size = None ) -> yield tuples with batch_size instances ``` #### `validators.py` ```python Validator < metaclass = abc.ABCMeta abc __call__( value ) # raises ValidationError RegexValidator < Validator __init__( pattern, flags ): regex = re.compile( pattern, flags ) __call__( value ): regex.match( value ) or "Value '{value}' does not match regex '{self.regex.pattern}'" MaxLengthValidator < Validator __init__( max_length ) __call__( value ): "Value must not be None" -or- "Length of '{value}' {len(value)} > {self.max_length}" MinLengthValidator < Validator __init__( min_length ) __call__( value ): "Value must not be None" -or- "Length of '{value}' {len(value)} < {self.min_length}" NumericValidator < Validator types = ( int, float, Decimal ) _validate_type( value ): isinstance( value, self.types ) or "Value must be a numeric value and is required" MinValueValidator < NumericValidator __init__( min_value ): _validate_type( min_value ) __call__( value ): _validate_type( value ); "Value should be greater or equal to {self.min_value}" MaxValueValidator < NumericValidator __init__( max_value ): _validate_type( max_value ) __call__( value ): _validate_type( value ); "Value should be less or equal to {self.max_value}" CommaSeparatedIntegerListValidator < Validator __init__( allow_negative = False ): pattern = r"^%(neg)s\d+(?:%(sep)s%(neg)s\d+)*\Z" % { "neg": "(-)?" if allow_negative else "", "sep": re.escape(","), } regex = RegexValidator( pattern, re.I ) __call__( value ): regex( value ) validate_ipv4_address( value ): ipaddress.IPv4Address( value ) or "'{value}' is not a valid IPv4 address." validate_ipv6_address( value ): ipaddress.IPv6Address( value ) or "'{value}' is not a valid IPv6 address." validate_ipv46_address( value ): validate_ipv4_address( value ) or validate_ipv6_address( value ) or "'{value}' is not a valid IPv4 or IPv6 address." ```