# Python : Dramatiq \[ [PyPI](https://pypi.org/project/dramatiq/) | [Source](https://github.com/Bogdanp/dramatiq/) | [Docs](https://dramatiq.io/guide.html) | [API](https://dramatiq.io/reference.html) ] _Uses [Pika](https://pypi.org/project/pika/ "https://pypi.org/project/pika/") to talk to RabbitMQ._ #### Imports ```python dramatiq.actor dramatiq.brokers.rabbitmq.RabbitmqBroker ``` ## Setup ```python RabbitmqBroker( confirm_delivery = False url = None # takes precedence over host+port middleware = None # list max_priority = None parameters = None **kwargs # forwarded to Pika client ) broker = RabbitmqBroker( url="amqp://guest:[email protected]:5672" ) broker = RabbitmqBroker( host="127.0.0.1", port=5672 ) ``` ## API #### Defining & Running Actors ```python @dramatiq.actor -or- @dramatiq.actor( ...actor_options... ) def my_actor( ... ): ... my_actor( ... ) # run locally my_actor.send( ... ) # send to queue my_actor.send_with_options( args=(), kwargs={}, delay=ms | timedelta, ...actor_options... ) ``` ###### Actor Options ```python actor_name = "..." max_retries = 20 min_backoff = 15 seconds (in ms) max_backoff = 7 days (in ms) retry_when = None # callable, determines whether to retry (max_retries ignored) throws = None # Exception or tuple of Exceptions to ignore (not retry) max_age = None (in ms) time_limit = 10 minutes (in ms) # then killed with TimeLimitExceeded error (you can catch it inside your task) ``` > [!NOTE] > After `max_retries` or `max_age`, message sent to dead letter queue for seven days, then dropped. #### Brokers ```python broker = dramatiq.get_broker() dramatiq.set_broker( broker ) broker.add_middleware( middleware ) broker.flush( queue_name ) broker.flush_all() actor = broker.get_actor( actor_name ) names = broker.get_declared_actors() # returns set of strings names = broker.get_declared_queues() # returns set of strings (ex: {'default'}) ``` #### Messages Such as those received by middleware methods. ```python Message( queue_name = "default" actor_name = "count_words" args = ( "https://example.com", ), kwargs = {}, options = { "eta": 1498560453548 }, message_id = "7387dc76-8ebe-426e-aec1-db34c236563c", message_timestamp = 1498560443548 ) ``` #### CLI By default, will spin up as many processes as there are CPU cores on your machine with 8 worker threads per process. ```bash usage: dramatiq [OPTIONS] broker [module ...] Run dramatiq workers. positional arguments: broker the broker to use (eg: 'module' or 'module:a_broker') module additional python modules to import options: -h, --help show this help message and exit --processes PROCESSES, -p PROCESSES the number of worker processes to run (default: 16) --threads THREADS, -t THREADS the number of worker threads per process (default: 8) --path [PATH ...], -P [PATH ...] the module import path (default: .) --queues [QUEUES ...], -Q [QUEUES ...] listen to a subset of queues (default: all queues) --pid-file PID_FILE write the PID of the master process to a file (default: no pid file) --log-file LOG_FILE write all logs to a file (default: sys.stderr) --skip-logging do not call logging.basicConfig() --use-spawn start processes by spawning (default: fork on unix, spawn on windows) --fork-function FORKS, -f FORKS fork a subprocess to run the given function --worker-shutdown-timeout TIMEOUT timeout for worker shutdown, in milliseconds (default: 10 minutes) --version show program's version number and exit --verbose, -v turn on verbose log output --watch PATH reload on code change (don't use in production) examples: # Run dramatiq workers with actors defined in `./some_module.py`. $ dramatiq some_module # Run with a broker named "redis_broker" defined in "some_module". $ dramatiq some_module:redis_broker # Run with a broker named "broker" defined as attribute of "app" in "some_module". $ dramatiq some_module:app.broker # Run with a callable that sets up a broker. $ dramatiq some_module:setup_broker ``` To signal workers to reload the code: ```bash $ kill -s HUP 13047 # use `ps` and look for lines containing the string "[dramatiq.MainProcess]" ```