This part of the tutorial details how to implement a Redis task queue to handle text processing.


flask by example part 4

Updates:

  • 03/22/2016: Upgraded to Python version 3.5.1 as well as the latest versions of Redis, Python Redis, and RQ. See below for details.
  • 02/22/2015: Added Python 3 support.


Remember: Here’s what we’re building – A Flask app that calculates word-frequency pairs based on the text from a given URL.


  1. Part One: Set up a local development environment and then deploy both a staging and a production environment on Heroku.
  2. Part Two: Set up a PostgreSQL database along with SQLAlchemy and Alembic to handle migrations.
  3. Part Three: Add in the back-end logic to scrape and then process the word counts from a webpage using the requests, BeautifulSoup, and Natural Language Toolkit (NLTK) libraries.
  4. Part Four: Implement a Redis task queue to handle the text processing. (current)
  5. Part Five: Set up Angular on the front-end to continuously poll the back-end to see if the request is done processing.
  6. Part Six: Push to the staging server on Heroku – setting up Redis and detailing how to run two processes (web and worker) on a single Dyno.
  7. Part Seven: Update the front-end to make it more user-friendly.
  8. Part Eight: Create a custom Angular Directive to display a frequency distribution chart using JavaScript and D3.

Need the code? Grab it from the repo.

Install Requirements

Tools used:

  • Redis (3.0.7)
  • Python Redis (2.10.5)
  • RQ (0.5.6) – a simple library for creating a task queue

Start by downloading and installing Redis from either the official site or via Homebrew (brew install redis). Once installed, start the Redis server:

1
$ redis-server

Next install Python Redis and RQ in a new terminal window:

1
2
3
$ cd flask-by-example
$ pip install redis==2.10.5 rq==0.5.6
$ pip freeze > requirements.txt

Set up the Worker

Let’s start by creating a worker process to listen for queued tasks. Create a new file worker.py, and add this code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Here, we listened for a queue called default and established a connection to the Redis server on localhost:6379.

Fire this up in another terminal window:

1
2
3
4
5
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Now we need to update our app.py to send jobs to the queue…

Update app.py

Add the following imports to app.py:

1
2
3
from rq import Queue
from rq.job import Job
from worker import conn

Then update the configuration section:

1
2
3
4
5
6
7
8
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) set up a Redis connection and initialized a queue based on that connection.

Move the text processing functionality out of our index route and into a new function called count_and_save_words(). This function accepts one argument, a URL, which we will pass to it when we call it from our index route.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # get url that the person has entered
        url = request.form['url']
        if 'http://' not in url[:7]:
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Take note of the following code:

1
2
3
4
job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Here we used the queue that we initialized earlier and called the enqueue_call() function. This added a new job to the queue and that job ran the count_and_save_words() function with the URL as the argument. The result_ttl=5000 line argument tells RQ how long to hold on to the result of the job for – 5,000 seconds, in this case. Then we outputted the job id to the terminal. This id is needed to see if the job is done processing.

Let’s setup a new route for that…

Get Results

1
2
3
4
5
6
7
8
9
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Let’s test this out.

Fire up the server, navigate to http://localhost:5000/, use the URL http://realpython.com, and grab the job id from the terminal. Then use that id in the ‘/results/’ endpoint – i.e., http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

As long as less than 5,000 seconds have elapsed before you check the status, then you should see an id number, which is generated when we add the results to the database:

1
2
3
4
5
6
7
8
9
10
11
# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Now, let’s refactor the route slightly to return the actual results from the database in JSON:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Make sure to add the import:

1
from flask import jsonify

Test this out again. If all went well, you should see something similar to in your browser:

1
2
3
4
5
6
7
8
9
10
11
12
{
  Course: 5,
  Python: 19,
  Real: 11,
  course: 4,
  courses: 7,
  development: 7,
  product: 4,
  sample: 4,
  videos: 5,
  web: 12
}

What’s Next?

In Part 5 we’ll bring the client and server together by adding Angular into the mix to create a poller, which will send a request every five seconds to the /results/<job_key> endpoint asking for updates. Once the data is available, we’ll add it to the DOM.

Cheers!


This is a collaboration piece between Cam Linke, co-founder of Startup Edmonton, and the folks at Real Python

Comments