This part of the tutorial details how to implement a Redis task queue to handle text processing.
- 03/22/2016: Upgraded to Python version 3.5.1 as well as the latest versions of Redis, Python Redis, and RQ. See below for details.
- 02/22/2015: Added Python 3 support.
Remember: Here’s what we’re building – A Flask app that calculates word-frequency pairs based on the text from a given URL.
- Part One: Set up a local development environment and then deploy both a staging and a production environment on Heroku.
- Part Two: Set up a PostgreSQL database along with SQLAlchemy and Alembic to handle migrations.
- Part Three: Add in the back-end logic to scrape and then process the word counts from a webpage using the requests, BeautifulSoup, and Natural Language Toolkit (NLTK) libraries.
- Part Four: Implement a Redis task queue to handle the text processing. (current)
- Part Five: Set up Angular on the front-end to continuously poll the back-end to see if the request is done processing.
- Part Six: Push to the staging server on Heroku – setting up Redis and detailing how to run two processes (web and worker) on a single Dyno.
- Part Seven: Update the front-end to make it more user-friendly.
Need the code? Grab it from the repo.
Start by downloading and installing Redis from either the official site or via Homebrew (
brew install redis). Once installed, start the Redis server:
Next install Python Redis and RQ in a new terminal window:
1 2 3
Set up the Worker
Let’s start by creating a worker process to listen for queued tasks. Create a new file worker.py, and add this code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Here, we listened for a queue called
default and established a connection to the Redis server on
Fire this up in another terminal window:
1 2 3 4 5
Now we need to update our app.py to send jobs to the queue…
Add the following imports to app.py:
1 2 3
Then update the configuration section:
1 2 3 4 5 6 7 8
q = Queue(connection=conn) set up a Redis connection and initialized a queue based on that connection.
Move the text processing functionality out of our index route and into a new function called
count_and_save_words(). This function accepts one argument, a URL, which we will pass to it when we call it from our index route.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
Take note of the following code:
1 2 3 4
Here we used the queue that we initialized earlier and called the
enqueue_call() function. This added a new job to the queue and that job ran the
count_and_save_words() function with the URL as the argument. The
result_ttl=5000 line argument tells RQ how long to hold on to the result of the job for – 5,000 seconds, in this case. Then we outputted the job id to the terminal. This id is needed to see if the job is done processing.
Let’s setup a new route for that…
1 2 3 4 5 6 7 8 9
Let’s test this out.
Fire up the server, navigate to http://localhost:5000/, use the URL http://realpython.com, and grab the job id from the terminal. Then use that id in the ‘/results/’ endpoint – i.e., http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
As long as less than 5,000 seconds have elapsed before you check the status, then you should see an id number, which is generated when we add the results to the database:
1 2 3 4 5 6 7 8 9 10 11
Now, let’s refactor the route slightly to return the actual results from the database in JSON:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Make sure to add the import:
Test this out again. If all went well, you should see something similar to in your browser:
1 2 3 4 5 6 7 8 9 10 11 12
In Part 5 we’ll bring the client and server together by adding Angular into the mix to create a poller, which will send a request every five seconds to the
/results/<job_key> endpoint asking for updates. Once the data is available, we’ll add it to the DOM.
This is a collaboration piece between Cam Linke, co-founder of Startup Edmonton, and the folks at Real Python