This part of the tutorial details how to implement a Redis task queue to handle text processing.
Updates:
- 02/12/2020: Upgraded to Python version 3.8.1 as well as the latest versions of Redis, Python Redis, and RQ. See below for details. Mention a bug in the latest RQ version and provide a solution. Solved the http before https bug.
- 03/22/2016: Upgraded to Python version 3.5.1 as well as the latest versions of Redis, Python Redis, and RQ. See below for details.
- 02/22/2015: Added Python 3 support.
Free Bonus: Click here to get access to a free Flask + Python video tutorial that shows you how to build Flask web app, step-by-step.
Remember: Here’s what we’re building—A Flask app that calculates word-frequency pairs based on the text from a given URL.
- Part One: Set up a local development environment and then deploy both a staging and a production environment on Heroku.
- Part Two: Set up a PostgreSQL database along with SQLAlchemy and Alembic to handle migrations.
- Part Three: Add in the back-end logic to scrape and then process the word counts from a webpage using the requests, BeautifulSoup, and Natural Language Toolkit (NLTK) libraries.
- Part Four: Implement a Redis task queue to handle the text processing. (current)
- Part Five: Set up Angular on the front-end to continuously poll the back-end to see if the request is done processing.
- Part Six: Push to the staging server on Heroku - setting up Redis and detailing how to run two processes (web and worker) on a single Dyno.
- Part Seven: Update the front-end to make it more user-friendly.
- Part Eight: Create a custom Angular Directive to display a frequency distribution chart using JavaScript and D3.
Need the code? Grab it from the repo.
Install Requirements
Tools used:
Start by downloading and installing Redis from either the official site or via Homebrew (brew install redis
). Once installed, start the Redis server:
$ redis-server
Next install Python Redis and RQ in a new terminal window:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
Set up the Worker
Let’s start by creating a worker process to listen for queued tasks. Create a new file worker.py, and add this code:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Here, we listened for a queue called default
and established a connection to the Redis server on localhost:6379
.
Fire this up in another terminal window:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
Now we need to update our app.py to send jobs to the queue…
Update app.py
Add the following imports to app.py:
from rq import Queue
from rq.job import Job
from worker import conn
Then update the configuration section:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
set up a Redis connection and initialized a queue based on that connection.
Move the text processing functionality out of our index route and into a new function called count_and_save_words()
. This function accepts one argument, a URL, which we will pass to it when we call it from our index route.
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
Take note of the following code:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
Note: We need to import the
count_and_save_words
function in our functionindex
as the RQ package currently has a bug, where it won’t find functions in the same module.
Here we used the queue that we initialized earlier and called the enqueue_call()
function. This added a new job to the queue and that job ran the count_and_save_words()
function with the URL as the argument. The result_ttl=5000
line argument tells RQ how long to hold on to the result of the job for - 5,000 seconds, in this case. Then we outputted the job id to the terminal. This id is needed to see if the job is done processing.
Let’s setup a new route for that…
Get Results
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
Let’s test this out.
Fire up the server, navigate to http://localhost:5000/, use the URL https://realpython.com, and grab the job id from the terminal. Then use that id in the ‘/results/’ endpoint - i.e., http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
As long as less than 5,000 seconds have elapsed before you check the status, then you should see an id number, which is generated when we add the results to the database:
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
Now, let’s refactor the route slightly to return the actual results from the database in JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
Make sure to add the import:
from flask import jsonify
Test this out again. If all went well, you should see something similar to in your browser:
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
What’s Next?
Free Bonus: Click here to get access to a free Flask + Python video tutorial that shows you how to build Flask web app, step-by-step.
In Part 5 we’ll bring the client and server together by adding Angular into the mix to create a poller, which will send a request every five seconds to the /results/<job_key>
endpoint asking for updates. Once the data is available, we’ll add it to the DOM.
Cheers!
This is a collaboration piece between Cam Linke, co-founder of Startup Edmonton, and the folks at Real Python