- For devs
How To Use Parallel Programming
Message queuing is an interesting tool that can help us create scalable websites or web services. A message queue allows applications to communicate asynchronously by sending messages to each other.
At a high level, message queuing is pretty simple. A process, called the Producer, publishes Messages to a Queue, where they are stored until a Consumer process is ready to consume them.
Publishing messages to a message broker is a very fast operation, and we can leverage this to speed up our web services. We can delegate some tasks to background processes by making our web service publish a message to a queue instead. We can then have background consumer processes consume the messages and perform the delegated tasks as needed.
In this guide, we’ll be using RabbitMQ as a message broker. We’ll integrate RabbitMQ into a sample Flask application to defer sending mail to a background process. The application, Flaskr, may look familiar if you’ve previously completed the Flask Tutorial.
Let’s take a look at our signup view function:
1@app.route('/signup', methods=['GET', 'POST']) 2def signup(): 3 error = None 4 if request.method == 'POST': 5 email = request.form['email'] 6 password = request.form['password'] 7 if not (email or password): 8 return signup_error('Email Address and Password are required.') 9 db = get_db() 10 c = db.cursor() 11 c.execute('SELECT * FROM users WHERE email=?;', (email,)) 12 if c.fetchone(): 13 return signup_error('Email Addres already has an account.') 14 c.execute('INSERT INTO users (email, password) VALUES (?, ?);', 15 (email, pbkdf2_sha256.hash(password))) 16 db.commit() 17 send_welcome_email(email) 18 flash('Account Created') 19 return redirect(url_for('login')) 20 else: 21 return render_template('signup.html') 2223def send_welcome_email(address): 24 res = requests.post( 25 "https://api.mailgun.net/v3/{}/messages".format(app.config['DOMAIN']), 26 auth=("api", MAILGUN_API_KEY), 27 data={"from": "Flaskr <noreply@{}>".format(app.config['DOMAIN']), 28 "to": [address], 29 "subject": "Welcome to Flaskr!", 30 "text": "Welcome to Flaskr, your account is now active!"} 31 ) 32 if res.status_code != 200: 33 # Something terrible happened <img draggable="false" class="emoji" alt="🙁" src="https://s.w.org/images/core/emoji/12.0.0-1/svg/1f641.svg"> 34 raise MailgunError("{}-{}".format(res.status_code, res.reason)) 3536def signup_error(error): 37 return render_template('signup.html', error=error)
Currently, the view function checks the database to make sure an account with that email address doesn’t already exist and then adds the new user to the database. The view then immediately sends a welcome email message to the user before finishing the request.
There are a couple of issues with this view that we can improve with a message queue. We don’t need to make the user wait for a response while we send the welcome email. It would be better if we can respond to the user as soon as possible. We also want a way to retry sending the welcome email if for some reason we’re unable to do it on the first try.
First, let’s run an instance of RabbitMQ in a Docker container:
1 docker run -d --hostname my-rabbit -p 4369:4369 -p 5672:5672 2 -p 35197:35197 --name rabbitmq rabbitmq:3
Now let’s add some code to initialize a queue when our application starts:
1def connect_queue(): 2 if not hasattr(g, 'rabbitmq'): 3 g.rabbitmq = pika.BlockingConnection( 4 pika.ConnectionParameters(app.config['RABBITMQ_HOST']) 5 ) 6 return g.rabbitmq 78def get_welcome_queue(): 9 if not hasattr(g, 'welcome_queue'): 10 conn = connect_queue() 11 channel = conn.channel() 12 channel.queue_declare(queue='welcome_queue', durable=True) 13 channel.queue_bind(exchange='amq.direct', queue='welcome_queue') 14 g.welcome_queue = channel 15 return g.welcome_queue 1617@app.teardown_appcontext 18def close_queue(error) 19 if hasattr(g, 'rabbitmq'): 20 g.rabbitmq.close()
We’re declaring a queue called “welcome_queue” that we’ll use to send messages to workers with the email address where the welcome email needs to be sent.
Next, we can update our signup view to publish a message to the queue instead of sending the welcome email.
1@app.route('/signup', methods=['GET', 'POST']) 2def signup(): 3 error = None 4 if request.method == 'POST': 5 email = request.form['email'] 6 password = request.form['password'] 7 if not (email or password): 8 return signup_error('Email Address and Password are required.') 9 db = get_db() 10 c = db.cursor() 11 c.execute('SELECT * FROM users WHERE email=?;', (email,)) 12 if c.fetchone(): 13 return signup_error('Email Addres already has an account.') 14 c.execute('INSERT INTO users (email, password) VALUES (?, ?);', 15 (email, pbkdf2_sha256.hash(password))) 16 db.commit() 17 q = get_welcome_queue() 18 q.basic_publish( 19 exchange='amq.direct', 20 routing_key='welcome_queue', 21 body=email, 22 properties=pika.BasicProperties( 23 delivery_mode=_DELIVERY_MODE_PERSISTENT 24 ) 25 ) 26 flash('Account Created') 27 return redirect(url_for('login')) 28 else: 29 return render_template('signup.html')
Now we can write our worker code. The cool thing about using a message queue is that the worker process can run anywhere. It can run on dedicated worker servers, or maybe alongside your web service. This is what a simple worker script would look like:
1import pika 2import requests34# Configuration5DOMAIN = 'example.com' 6MAILGUN_API_KEY = 'YOUR_MAILGUN_API_KEY' 7RABBITMQ_HOST = 'localhost'89connection = pika.BlockingConnection( 10 pika.ConnectionParameters(host=RABBITMQ_HOST)11)12channel = connection.channel() 13channel.queue_declare(queue='welcome_queue', durable=True)1415class Error(Exception): 16 pass1718class MailgunError(Error): 19 def __init__(self, message):20 self.message = message2122def send_welcome_message(ch, method, properties, body): 23 address = body.decode('UTF-8')24 print("Sending welcome email to {}".format(address))25 res = requests.post(26 "https://api.mailgun.net/v3/{}/messages".format(DOMAIN),27 auth=("api", MAILGUN_API_KEY),28 data={"from": "Flaskr <noreply@{}>".format(DOMAIN),29 "to": [address],30 "subject": "Welcome to Flaskr!",31 "text": "Welcome to Flaskr, your account is now active!"}32 )33 ch.basic_ack(delivery_tag=method.delivery_tag)34 if res.status_code != 200:35 # Something terrible happened :-O36 raise MailgunError("{}-{}".format(res.status_code, res.reason))3738channel.basic_consume(send_welcome_message, queue='welcome_queue') 39channel.start_consuming()
We’re almost done! We’ve successfully decoupled sending our welcome email from the web service. Now the only thing missing is for us to retry sending the welcome email in case it fails to send for some reason. One clever way to add retry logic to your application is to use a dead letter queue.
We’ll add a second queue, the “retry_queue”, which we’ll use to temporarily place a message if we ever encounter an email send error. The messages in the retry queue will have an expiration date some time in the future. We’ll configure RabbitMQ such that when a message expires it will be placed back in our welcome queue ready to be picked up by a worker to attempt to send the email again:
1retry_channel = connection.channel() 2retry_channel.queue_declare( 3 queue='retry_queue',4 durable=True,5 arguments={6 'x-message-ttl': RETRY_DELAY_MS,7 'x-dead-letter-exchange': 'amq.direct',8 'x-dead-letter-routing-key': 'welcome_queue'9 }10)1112def send_welcome_message(ch, method, properties, body): 13 address = body.decode('UTF-8')14 print("Sending welcome email to {}".format(address))15 res = requests.post(16 "https://api.mailgun.net/v3/{}/messages".format(DOMAIN),17 auth=("api", MAILGUN_API_KEY),18 data={"from": "Flaskr <noreply@{}>".format(DOMAIN),19 "to": [address],20 "subject": "Welcome to Flaskr!",21 "text": "Welcome to Flaskr, your account is now active!"}22 )23 ch.basic_ack(delivery_tag=method.delivery_tag)24 if res.status_code != 200:25 print("Error sending to {}. {} {}. Retrying...".format(26 address, res.status_code, res.reason27 ))28 retry_channel.basic_publish(29 exchange='',30 routing_key='retry_queue',31 body=address,32 properties=pika.BasicProperties(33 delivery_mode=_DELIVERY_MODE_PERSISTENT34 )35 )
And that’s it! We’ve successfully introduced asynchronous sending into our application using RabbitMQ. To download the full working example source code repository, check out my github repo.
Get more guides like this by subscribing to the blog. You’ll get an update each week with the latest posts from the Mailgun team. And if you’re not yet using Mailgun to send, receive, and track your application’s emails, you should sign up below!
Learn about our Deliverability Services
Looking to send a high volume of emails? Our email experts can supercharge your email performance. See how we've helped companies like Lyft, Shopify, Github increase their email delivery rates to an average of 97%.
Last updated on August 28, 2020
How To Use Parallel Programming
How we built a Lucene-inspired parser in Go
Gubernator: Cloud-native distributed rate limiting for microservices
What Toasters And Distributed Systems Might Have In Common
Pseudonymization And You – Optimizing Data Protection
Same API, New Tricks: Get Event Notifications Just In Time With Webhooks
Sending Email Using The Mailgun PHP API
Avoiding The Blind Spots Of Missing Data With Machine Learning
How And Why We Adopted Service Mesh With Vulcand And Nginx
TLS Connection Control
InboxReady x Salesforce: The Key to a Stronger Email Deliverability
Become an Email Pro With Our Templates API
Google Postmaster Tools: Understanding Sender Reputation
Navigating Your Career as a Woman in Tech
Implementing Dmarc – A Step-by-Step Guide
Email Bounces: What To Do About Them
Announcing InboxReady: The deliverability suite you need to hit the inbox
Black History Month in Tech: 7 Visionaries Who Shaped The Future
How To Create a Successful Triggered Email Program
Designing HTML Email Templates For Transactional Emails
InboxReady x Salesforce: The Key to a Stronger Email Deliverability
Implementing Dmarc – A Step-by-Step Guide
Announcing InboxReady: The deliverability suite you need to hit the inbox
Designing HTML Email Templates For Transactional Emails
Email Security Best Practices: How To Keep Your Email Program Safe
Mailgun’s Active Defense Against Log4j
Email Blasts: The Dos And Many Don’ts Of Mass Email Sending
Email's Best of 2021
5 Ideas For Better Developer-Designer Collaboration
Mailgun Joins Sinch: The Future of Customer Communications Is Here
Always be in the know and grab free email resources!
By sending this form, I agree that Mailgun may contact me and process my data in accordance with its Privacy Policy.