# Python : Dramatiq
\[ [PyPI](https://pypi.org/project/dramatiq/) | [Source](https://github.com/Bogdanp/dramatiq/) | [Docs](https://dramatiq.io/guide.html) | [API](https://dramatiq.io/reference.html) ]
_Uses [Pika](https://pypi.org/project/pika/ "https://pypi.org/project/pika/") to talk to RabbitMQ._
#### Imports
```python
dramatiq.actor
dramatiq.brokers.rabbitmq.RabbitmqBroker
```
## Setup
```python
RabbitmqBroker(
confirm_delivery = False
url = None # takes precedence over host+port
middleware = None # list
max_priority = None
parameters = None
**kwargs # forwarded to Pika client
)
broker = RabbitmqBroker( url="amqp://guest:
[email protected]:5672" )
broker = RabbitmqBroker( host="127.0.0.1", port=5672 )
```
## API
#### Defining & Running Actors
```python
@dramatiq.actor
-or-
@dramatiq.actor( ...actor_options... )
def my_actor( ... ):
...
my_actor( ... ) # run locally
my_actor.send( ... ) # send to queue
my_actor.send_with_options( args=(), kwargs={}, delay=ms | timedelta, ...actor_options... )
```
###### Actor Options
```python
actor_name = "..."
max_retries = 20
min_backoff = 15 seconds (in ms)
max_backoff = 7 days (in ms)
retry_when = None # callable, determines whether to retry (max_retries ignored)
throws = None # Exception or tuple of Exceptions to ignore (not retry)
max_age = None (in ms)
time_limit = 10 minutes (in ms) # then killed with TimeLimitExceeded error (you can catch it inside your task)
```
> [!NOTE]
> After `max_retries` or `max_age`, message sent to dead letter queue for seven days, then dropped.
#### Brokers
```python
broker = dramatiq.get_broker()
dramatiq.set_broker( broker )
broker.add_middleware( middleware )
broker.flush( queue_name )
broker.flush_all()
actor = broker.get_actor( actor_name )
names = broker.get_declared_actors() # returns set of strings
names = broker.get_declared_queues() # returns set of strings (ex: {'default'})
```
#### Messages
Such as those received by middleware methods.
```python
Message(
queue_name = "default"
actor_name = "count_words"
args = ( "https://example.com", ),
kwargs = {},
options = { "eta": 1498560453548 },
message_id = "7387dc76-8ebe-426e-aec1-db34c236563c",
message_timestamp = 1498560443548
)
```
#### CLI
By default, will spin up as many processes as there are CPU cores on your machine with 8 worker threads per process.
```bash
usage: dramatiq [OPTIONS] broker [module ...]
Run dramatiq workers.
positional arguments:
broker the broker to use (eg: 'module' or 'module:a_broker')
module additional python modules to import
options:
-h, --help show this help message and exit
--processes PROCESSES, -p PROCESSES the number of worker processes to run (default: 16)
--threads THREADS, -t THREADS the number of worker threads per process (default: 8)
--path [PATH ...], -P [PATH ...] the module import path (default: .)
--queues [QUEUES ...], -Q [QUEUES ...] listen to a subset of queues (default: all queues)
--pid-file PID_FILE write the PID of the master process to a file (default: no pid file)
--log-file LOG_FILE write all logs to a file (default: sys.stderr)
--skip-logging do not call logging.basicConfig()
--use-spawn start processes by spawning (default: fork on unix, spawn on windows)
--fork-function FORKS, -f FORKS fork a subprocess to run the given function
--worker-shutdown-timeout TIMEOUT timeout for worker shutdown, in milliseconds (default: 10 minutes)
--version show program's version number and exit
--verbose, -v turn on verbose log output
--watch PATH reload on code change (don't use in production)
examples:
# Run dramatiq workers with actors defined in `./some_module.py`.
$ dramatiq some_module
# Run with a broker named "redis_broker" defined in "some_module".
$ dramatiq some_module:redis_broker
# Run with a broker named "broker" defined as attribute of "app" in "some_module".
$ dramatiq some_module:app.broker
# Run with a callable that sets up a broker.
$ dramatiq some_module:setup_broker
```
To signal workers to reload the code:
```bash
$ kill -s HUP 13047 # use `ps` and look for lines containing the string "[dramatiq.MainProcess]"
```