Queue Services¶
When a client sends an email, that email may go through several email servers before arriving at its final destination. Each server is designed to take responsibility for delivering the message to the next one, retrying if necessary. Once a server has taken responsibility for a message, the connecting client (or server) may disconnect.
The queue service is responsible for making sure email messages are stored persistently somewhere (in case of catastrophic failure) and that delivery is tried and retried with due diligence.
Delivery Attempts¶
Delivery attempts for a queue are performed with the Relay
object passed in to
the Queue
constructor. The delivery attempt will ultimately produce a Reply
object indicating its success or failure. If delivery was successful, the queue
will remove the message from persistent storage.
If delivery failed permanently, with a 5xx
code or too many 4xx
codes,
a Bounce
envelope is created from the original message, which is delivered
back to the original message sender. The original message is removed from
storage and not retried.
If delivery failed transiently, with a 4xx
code (which usually includes
connectivity issues), the message is left in storage and a new delivery attempt
is scheduled in the future. The time between delivery attempts is managed by the
backoff
function passed in to the Queue
constructor. If this backoff
function returns None
, the message is permanently failed.
Here is an example backoff
function that makes 5 delivery attempts with an
exponentially increasing backoff time:
def exponential_backoff(envelope, attempts):
if attempts <= 5:
return 12.0 * (5.0 ** attempts)
return None
Persistent Storage¶
A storage mechanism should store the entirety of an Envelope
object, such that
it can be recreated on restart. Along with the envelope, queue services must
also keep track of when a message’s next delivery attempt should be and how many
attempts a message has undergone. In essence, a queue’s storage mechanism allows
slimta to be stopped and restarted without losing state.
In-Memory¶
The DictStorage
class is a simple storage mechanism
that, by itself, does not provide on-disk persistence. By default, it creates
two dicts in memory for queue data, but passing in shelve
objects will
allow basic persistence. Be aware, however, that shelve
may not handle
system or process failure and could leave corruption.
The DictStorage
class is very useful for development
and testing, but probably should be avoided for live systems.
Local Disk¶
In the fashion of traditional MTAs, diskstorage
writes
Envelope
data and queue metadata directly to disk to configurable
directories. This functionality relies on pyaio to asynchronously write and
read files on disk, and as such it is only available on Linux.
To ensure file creation and modification is atomic, files are
first written to a scratch directory and then os.rename()
moves them to
their final destination. For this reason, it is important that the scratch
directory (tmp_dir
argument in the constructor) reside on the same
filesystem as the envelope and meta directories (env_dir
and meta_dir
arguments, respectively).
The files created in the envelope directory will be identified by a
uuid4()
hexadecimal string
appended with
the suffix .env
. The files created in the meta directory will be identified
by the same uuid string as its corresponding envelope file, but with the suffix
.meta
. The envelope and meta directories can be the same, but two
DiskStorage
should not share directories.
To use this storage:
$ pip install python-slimta[disk]
And to initialize a new DiskStorage
:
from slimta.diskstorage import DiskStorage
queue_dir = '/var/spool/slimta/queue'
queue = DiskStorage(queue_dir, queue_dir)
Redis¶
Taking advantage of the advanced data structures and ease of use of the redis
database, redisstorage
simply creates a hash key for each queued
message, containing its delivery metadata and a pickled version of the
Envelope
.
The keys created in redis will look like the following:
redis 127.0.0.1:6379> KEYS *
1) "slimta:28195d3b0a5847f9853e5b0173c85151"
2) "slimta:5ebb94976cd94b418d6063a2ca4cbf8f"
3) "slimta:d33879cf66244472b983770ba762e07b"
redis 127.0.0.1:6379>
Each key is a hash that will look something like:
redis 127.0.0.1:6379> HGETALL slimta:d33879cf66244472b983770ba762e07b
1) "attempts"
2) "2"
3) "timestamp"
4) "1377121655"
5) "envelope"
6) "..."
redis 127.0.0.1:6379>
On startup, the Queue
will scan the keyspace (using the customizable prefix
slimta:
) and populate the queue with existing messages for delivery.
To use redis you must install:
$ pip install python-slimta[redis]
And to initialize a new RedisStorage
:
from slimta.redisstorage import RedisStorage
store = RedisStorage('redis.example.com')
Cloud Storage¶
The slimta.cloudstorage
module makes available connectors to the cloud
service providers AWS. Envelope
data and queue metadata are written to a
cloud object store. Optionally, a reference to their location in the object
store is then written to a cloud message queue, which can alert relayers in
other processes of the availability of a new message in the object store.
To use AWS requires installation and configuration of the boto
library:
$ pip install python-slimta[aws]
Using the boto
library, we need to come up with references to a
Bucket
and optionally a
Queue
. Then, use them to create
SimpleStorageService
and
SimpleQueueService
objects:
from boto.s3.connection import S3Connection
s3_conn = S3Connection('1A2B3C4D5E', 'XXXXXXXXXXXX')
s3_bucket = s3_conn.get_bucket('slimta-queue')
import boto.sqs
sqs_conn = boto.sqs.connect_to_region('us-west-2',
aws_access_key_id='1A2B3C4D5E',
aws_secret_access_key='XXXXXXXXXXXX')
sqs_queue = sqs_conn.create_queue('slimta-queue')
from slimta.cloudstorage.aws import SimpleStorageService, SimpleQueueService
s3 = SimpleStorageService(s3_bucket)
sqs = SimpleQueueService(sqs_queue)
Once you have these objects created for your cloud service, link them together
into a queue storage driver using CloudStorage
:
from slimta.cloudstorage import CloudStorage
storage = CloudStorage(cloud_files, cloud_queues)
# or...
storage = CloudStorage(s3, sqs)
This object can then be used anywhere a QueueStorage
object is required.