• For devs

How To Set Up Message Queues For Asynchronous Sending

Mailgun Team
5 min read
featured

Message queuing is an interesting tool that can help us create scalable websites or web services. A message queue allows applications to communicate asynchronously by sending messages to each other.

At a high level, message queuing is pretty simple. A process, called the Producer, publishes Messages to a Queue, where they are stored until a Consumer process is ready to consume them.

Publishing messages to a message broker is a very fast operation, and we can leverage this to speed up our web services. We can delegate some tasks to background processes by making our web service publish a message to a queue instead. We can then have background consumer processes consume the messages and perform the delegated tasks as needed.

In this guide, we’ll be using RabbitMQ as a message broker. We’ll integrate RabbitMQ into a sample Flask application to defer sending mail to a background process. The application, Flaskr, may look familiar if you’ve previously completed the Flask Tutorial.

Let’s take a look at our signup view function:

1@app.route('/signup', methods=['GET', 'POST'])
2def signup():
3 error = None
4 if request.method == 'POST':
5 email = request.form['email']
6 password = request.form['password']
7 if not (email or password):
8 return signup_error('Email Address and Password are required.')
9 db = get_db()
10 c = db.cursor()
11 c.execute('SELECT * FROM users WHERE email=?;', (email,))
12 if c.fetchone():
13 return signup_error('Email Addres already has an account.')
14 c.execute('INSERT INTO users (email, password) VALUES (?, ?);',
15 (email, pbkdf2_sha256.hash(password)))
16 db.commit()
17 send_welcome_email(email)
18 flash('Account Created')
19 return redirect(url_for('login'))
20 else:
21 return render_template('signup.html')
22
23def send_welcome_email(address):
24 res = requests.post(
25 "https://api.mailgun.net/v3/{}/messages".format(app.config['DOMAIN']),
26 auth=("api", MAILGUN_API_KEY),
27 data={"from": "Flaskr <noreply@{}>".format(app.config['DOMAIN']),
28 "to": [address],
29 "subject": "Welcome to Flaskr!",
30 "text": "Welcome to Flaskr, your account is now active!"}
31 )
32 if res.status_code != 200:
33 # Something terrible happened <img draggable="false" class="emoji" alt="🙁" src="https://s.w.org/images/core/emoji/12.0.0-1/svg/1f641.svg">
34 raise MailgunError("{}-{}".format(res.status_code, res.reason))
35
36def signup_error(error):
37 return render_template('signup.html', error=error)

Currently, the view function checks the database to make sure an account with that email address doesn’t already exist and then adds the new user to the database. The view then immediately sends a welcome email message to the user before finishing the request.

There are a couple of issues with this view that we can improve with a message queue. We don’t need to make the user wait for a response while we send the welcome email. It would be better if we can respond to the user as soon as possible. We also want a way to retry sending the welcome email if for some reason we’re unable to do it on the first try.

First, let’s run an instance of RabbitMQ in a Docker container:

1 docker run -d --hostname my-rabbit -p 4369:4369 -p 5672:5672
2 -p 35197:35197 --name rabbitmq rabbitmq:3

Now let’s add some code to initialize a queue when our application starts:

1def connect_queue():
2 if not hasattr(g, 'rabbitmq'):
3 g.rabbitmq = pika.BlockingConnection(
4 pika.ConnectionParameters(app.config['RABBITMQ_HOST'])
5 )
6 return g.rabbitmq
7
8def get_welcome_queue():
9 if not hasattr(g, 'welcome_queue'):
10 conn = connect_queue()
11 channel = conn.channel()
12 channel.queue_declare(queue='welcome_queue', durable=True)
13 channel.queue_bind(exchange='amq.direct', queue='welcome_queue')
14 g.welcome_queue = channel
15 return g.welcome_queue
16
17@app.teardown_appcontext
18def close_queue(error)
19 if hasattr(g, 'rabbitmq'):
20 g.rabbitmq.close()

We’re declaring a queue called “welcome_queue” that we’ll use to send messages to workers with the email address where the welcome email needs to be sent.

Next, we can update our signup view to publish a message to the queue instead of sending the welcome email.

1@app.route('/signup', methods=['GET', 'POST'])
2def signup():
3 error = None
4 if request.method == 'POST':
5 email = request.form['email']
6 password = request.form['password']
7 if not (email or password):
8 return signup_error('Email Address and Password are required.')
9 db = get_db()
10 c = db.cursor()
11 c.execute('SELECT * FROM users WHERE email=?;', (email,))
12 if c.fetchone():
13 return signup_error('Email Addres already has an account.')
14 c.execute('INSERT INTO users (email, password) VALUES (?, ?);',
15 (email, pbkdf2_sha256.hash(password)))
16 db.commit()
17 q = get_welcome_queue()
18 q.basic_publish(
19 exchange='amq.direct',
20 routing_key='welcome_queue',
21 body=email,
22 properties=pika.BasicProperties(
23 delivery_mode=_DELIVERY_MODE_PERSISTENT
24 )
25 )
26 flash('Account Created')
27 return redirect(url_for('login'))
28 else:
29 return render_template('signup.html')

Now we can write our worker code. The cool thing about using a message queue is that the worker process can run anywhere. It can run on dedicated worker servers, or maybe alongside your web service. This is what a simple worker script would look like:

1import pika
2import requests
3
4# Configuration
5DOMAIN = 'example.com'
6MAILGUN_API_KEY = 'YOUR_MAILGUN_API_KEY'
7RABBITMQ_HOST = 'localhost'
8
9connection = pika.BlockingConnection(
10 pika.ConnectionParameters(host=RABBITMQ_HOST)
11)
12channel = connection.channel()
13channel.queue_declare(queue='welcome_queue', durable=True)
14
15class Error(Exception):
16 pass
17
18class MailgunError(Error):
19 def __init__(self, message):
20 self.message = message
21
22def send_welcome_message(ch, method, properties, body):
23 address = body.decode('UTF-8')
24 print("Sending welcome email to {}".format(address))
25 res = requests.post(
26 "https://api.mailgun.net/v3/{}/messages".format(DOMAIN),
27 auth=("api", MAILGUN_API_KEY),
28 data={"from": "Flaskr <noreply@{}>".format(DOMAIN),
29 "to": [address],
30 "subject": "Welcome to Flaskr!",
31 "text": "Welcome to Flaskr, your account is now active!"}
32 )
33 ch.basic_ack(delivery_tag=method.delivery_tag)
34 if res.status_code != 200:
35 # Something terrible happened :-O
36 raise MailgunError("{}-{}".format(res.status_code, res.reason))
37
38channel.basic_consume(send_welcome_message, queue='welcome_queue')
39channel.start_consuming()

We’re almost done! We’ve successfully decoupled sending our welcome email from the web service. Now the only thing missing is for us to retry sending the welcome email in case it fails to send for some reason. One clever way to add retry logic to your application is to use a dead letter queue.

We’ll add a second queue, the “retry_queue”, which we’ll use to temporarily place a message if we ever encounter an email send error. The messages in the retry queue will have an expiration date some time in the future. We’ll configure RabbitMQ such that when a message expires it will be placed back in our welcome queue ready to be picked up by a worker to attempt to send the email again:

1retry_channel = connection.channel()
2retry_channel.queue_declare(
3 queue='retry_queue',
4 durable=True,
5 arguments={
6 'x-message-ttl': RETRY_DELAY_MS,
7 'x-dead-letter-exchange': 'amq.direct',
8 'x-dead-letter-routing-key': 'welcome_queue'
9 }
10)
11
12def send_welcome_message(ch, method, properties, body):
13 address = body.decode('UTF-8')
14 print("Sending welcome email to {}".format(address))
15 res = requests.post(
16 "https://api.mailgun.net/v3/{}/messages".format(DOMAIN),
17 auth=("api", MAILGUN_API_KEY),
18 data={"from": "Flaskr <noreply@{}>".format(DOMAIN),
19 "to": [address],
20 "subject": "Welcome to Flaskr!",
21 "text": "Welcome to Flaskr, your account is now active!"}
22 )
23 ch.basic_ack(delivery_tag=method.delivery_tag)
24 if res.status_code != 200:
25 print("Error sending to {}. {} {}. Retrying...".format(
26 address, res.status_code, res.reason
27 ))
28 retry_channel.basic_publish(
29 exchange='',
30 routing_key='retry_queue',
31 body=address,
32 properties=pika.BasicProperties(
33 delivery_mode=_DELIVERY_MODE_PERSISTENT
34 )
35 )

And that’s it! We’ve successfully introduced asynchronous sending into our application using RabbitMQ. To download the full working example source code repository, check out my github repo.

Get more guides like this by subscribing to the blog. You’ll get an update each week with the latest posts from the Mailgun team. And if you’re not yet using Mailgun to send, receive, and track your application’s emails, you should sign up below!

DELIVERABILITY SERVICES

Learn about our Deliverability Services

Looking to send a high volume of emails? Our email experts can supercharge your email performance. See how we've helped companies like Lyft, Shopify, Github increase their email delivery rates to an average of 97%.

Learn More

Last updated on August 28, 2020

  • Related posts
  • Recent posts
  • Top posts
View all

Always be in the know and grab free email resources!

No spam, ever. Only musings and writings from the Mailgun team.

By sending this form, I agree that Mailgun may contact me and process my data in accordance with its Privacy Policy.

sign up
It's easy to get started. And it's free.
See what you can accomplish with the world's best email delivery platform.
Sign up for Free