Join us and get access to hundreds of tutorials and a community of expert Pythonistas.

Unlock This Lesson

This lesson is for members only. Join us and get access to hundreds of tutorials and a community of expert Pythonistas.

Unlock This Lesson

Hint: You can adjust the default video playback speed in your account settings.
Hint: You can set the default subtitles language in your account settings.
Sorry! Looks like there’s an issue with video playback 🙁 This might be due to a temporary outage or because of a configuration issue with your browser. Please see our video player troubleshooting guide to resolve the issue.

The queue Module: Part 1

Give Feedback

In this lesson, you’ll use the queue module and threading events to improve your pipeline. If you download the sample code, you can get your own copy of 13-queue.py:

Download

Sample Code (.zip)

12.9 KB

To learn more, you can also check out the documentation for threading.Event, the queue module, and super.

00:00 In the previous lesson, you created a pipeline using locks and multi-threading to produce, basically, a message queue. This pipeline was limited by a capacity and it was also a one-to-one ratio, so the producer produced one message and the consumer picked that message off of the pipeline. In a more real-world situation, you would have many producers putting in many messages into the pipeline, and the consumers usually take longer than the producers do to produce, so you would have this backup of messages that the consumers are going to feed off of. In the program that you wrote, it would not support that. It would only support one message at a time.

00:49 Now you’re going to learn about another module in Python called the queue module, as well as some other objects from the threading module.

00:58 Let’s go ahead and copy all of our code from the previous lesson into a new file called 13-queue.

01:16 Now you’ll start to make some changes. Start by importing the queue module. This comes with Python 3, so no need to install anything extra.

01:26 Then come on down to your entry point.

01:32 One of the kind of hacky things that you did—because I told you to, it’s not your fault, ha—was creating this FINISH flag. That was a way for us to determine when the producer finished putting messages in the pipeline.

01:50 We’re going to get rid of that and instead use something called an Event. This comes from the threading package. We’ll say event = threading.Event().

02:03 An Event has two values, either True or False. It’s like a Boolean flag that any thread that has access to the Event will have access to this flag. It has two main methods associated with it.

02:19 It has .set() to set the event to True and it has .reset() to set it back to False. When you first create an Event, it will be under the False value, and then you call .set() on the Event to set it to True.

02:46 Let’s give our threads access to this event. We’ll pass in the event to each thread,

02:57 and then before you submit your threads, do a time.sleep() for maybe half of a second, and then call your event.set() method. Now that we’ve added the event as an extra parameter to our functions, let’s start with the producer() function.

03:17 Go up to producer() and let’s now give that an event as well so the producer() has access to that Boolean flag or that Event.

03:28 And now you can also get rid of this concept of a capacity, and instead just use the event to signal when the producer is finished.

03:38 Instead of looping over the capacity, you can now instead say while not event.is_set():. .is_set() is just checking if the event is set. In our entry point, we wait a half a second and then .set() the event. So in other words, the producer thread will not execute anything within this loop until we press that set button, so to speak.

04:07 Go ahead and get rid of your range() loop, and we’ll put the message creation and message setting within this event loop.

04:18 And you can go ahead and get rid of this FINISH message since we no longer need that. Then scroll on up to your Pipeline class.

04:28 We are going to now leverage the queue module. We’re going to have the Pipeline inherit from queue.Queue. The Queue class of the queue module.

04:42 I always hate spelling this word queue. But our Pipeline now inherits from the Queue, which gives us a whole bunch of goodies from the queue module.

04:55 One of the features of the Queue class is that it handles the locking automatically. So you can go down and just delete the .producer_lock and the .consumer_lock and the call to .acquire(), since the Queue object handles that automatically.

05:15 And while we’re at it, let’s delete our FINISH message since we don’t need that anymore. So, instead of capacity, the Queue class offers a parameter called maxsize.

05:27 So we’re going to delete the capacity argument here, and then we’ll use a call to super(), to grab the .__init__() method of the parent class that we’re inheriting from.

05:41 We’re taking the .__init__() method of the Queue class, and that has a parameter called maxsize. And then we’ll make that 10, similar to what our capacity was before.

05:53 And that means we can delete our .capacity and we can also delete .message.

05:59 While you’re here, you can do more cleanup. Since you no longer have locks, he can delete your calls to any .consumer_lock or .producer_lock.

06:10 Putting messages into a queue and getting messages from a queue are standard operations in a queue or a pipeline. The Queue class offers two methods: .put() and .get().

06:23 So instead of doing this self.message = message, what we can do instead is say self.put(message), and .put() will signal the producer, which uses the .set_message() method, to put the message in the queue.

06:47 That means you can delete that line there. And .get_message() also has the ability to just say self.get()

07:01 but you actually want to save that message, so we would say message = self.get() and then you can delete that line there. And you can see the power of leveraging preexisting libraries like the queue module.

07:17 These two lines are just print statements that we’re using for our own sake of learning, but really it’s a very simple method here. We’re doing a .put() when we set a message and we’re doing a .get() to get a message off the queue.

07:38 You’ve changed the Pipeline class quite a bit. First, you inherited from the Queue class, and then you inherited its .__init__() method, set a maxsize of 10.

07:49 Then you simplified the code in your .set_message() and .get_message(). You also change the producer() to take in the event and got rid of the global FINISH flag, since we’re using events now. You also need to change the consumer() function a little bit.

08:07 First, start by passing the event in as well as the pipeline, which is now a Queue. And since we no longer have our FINISH flag and we’re using an Event now, you can delete these two lines. And instead we’ll do another while loop.

08:25 We’ll say while pipeline

08:29 rather, while not pipeline.empty() or not event.is_set(): pipeline.empty() comes from the queue module—again, we’re inheriting that when we create our Pipeline.

08:47 We now have access to this .empty() method, which just does what it sounds—if the pipeline is empty, meaning there are no messages in it, then this will be True.

08:58 So while the pipeline is not empty or the event is not set, we’ll continue reading messages off of the queue. Another method we can use is .qsize().

09:14 We can print f'queue size is ' and then pipeline, which is now a Queue, .qsize(). This method keeps track of how many messages are in the pipeline.

09:33 Let’s go down to the main thread and just go over the changes we made. First we created an event. This is a Boolean flag that both of these threads now have access to, since we added them to our .submit() call. We did a sleep() for half a second before we set the event. Once we set the event, by the time the event is set, our producer and consumer threads have already started.

10:01 So the producer thread will go up into the producer() function, which now takes the event and it will wait for the event to be set before it starts putting messages on the queue.

10:11 It does that through the .set_message() method that you created and you recently changed to use self.put() to put the message on the queue.

10:21 The consumer thread

10:24 uses pipeline.empty() to determine when it needs to stop. And it also uses that event flag as well. You updated the .get_message() method to use self.get() to assign the message and then return it back to the consumer.

10:46 Let’s go ahead and open up a terminal, clear the screen, and run our 13-queue program.

10:57 Looks like we have an error on line 36, and—oh yeah, we no longer have capacity, so I can delete the argument there to Pipeline(). And Pipeline() now doesn’t take any arguments.

11:12 We do define the maxsize coming from the Queue.__init__() method. Let’s go ahead and come back to the terminal and we’ll try this again.

11:24 Okay, that’s good. We’re seeing some output.

11:29 It looks like we got a bunch of producer messages putting messages on the queue, but it appears that we are in a deadlock situation. Let’s kill this process.

11:42 We’ll clear the screen and come back to the code and think about why that is.

11:48 One of the sometimes frustrating things about working with threads is that when there’s an error in your code, you don’t often get a good error message, or you don’t even get an error message sometimes. So one thing that we didn’t fix was that we’re still referencing this FINISH flag, which actually doesn’t exist anymore. So let’s see if we can delete this line

12:15 and just move time.sleep() into that while loop. We’ll save this and then come back to the terminal to try to execute our program again. And that didn’t solve our deadlock problem, so let’s go ahead and kill the process and see if there’s anything else in our code that needs to be changed.

12:37 If we scroll back up to the Pipeline class,

12:44 in this print statement, we have a reference to self.message, which we deleted from our .__init__() method. Let’s change this now too. We can actually put this below the message assignment and release that self., and then we’ll now have message.

13:09 Let’s open up the terminal again and try to execute our program once more. All right, and that seemed to be the problem. But that is a bit of a moment of frustration.

13:22 Before we go over this output, it’s important to realize when you’re debugging multi-threaded applications and message queues, it can be very tough because deadlock situations and freezing can happen in ways that aren’t necessarily related to locking and unlocking, but just errors.

13:45 When a thread has an error, it often can’t report back and so it’s up to the developer to really step through the code slowly and see if there are any bugs in it.

Michal on Nov. 29, 2019

Does it mean you get no NameError or AttributeError message whatsoever, despite referencing undefined names FINISH and self.message? That’s kind of scary.

Lee RP Team on Nov. 30, 2019

Yes :( That is one of the drawbacks of working with threads. This thread on StackOverflow might be interesting to you: stackoverflow.com/questions/2829329/catch-a-threads-exception-in-the-caller-thread-in-python

pymash on April 12, 2020

At about 3:55 of the video, the explanation is that the producer loop doesn’t process any of the code in the while block until the event is set. But isn’t the event initially false, so the while not event.is_set() will execute immediately, filling the queue until the event.set() called. Then the producer thread exits the while loop and the read ends.

Michael C on May 13, 2020

Hi Lee, Pretty much enjoyed the lesson. I have a question regarding the number of messages produced by using queue module. When we were using Lock() and specifying the capacity for the Pipeline object, the number of messages produced would be limited to the number of we specified for capacity. Ex. Pipeline(5), then we would expect 5 messages to be produced excluding the “The End” message. However, when we were using queue.Queue inheritance, the role of capacity has been replaced by maxsize. For some reason, the number of messages produced was more then what was specified by maxsize. Ex. maxsize = 5. The total number of messages produced were more than 5 messages. I think in the lesson you specified maxsize = 10, if I pasued and counted the video screen correclty, you were getting 14 messages instead of 10. Is that correct? Or, is there a way to be sure that the number of messages produced would be the same as what is specified for maxsize? many thanks.

YY on May 20, 2020

I think the maxsize only limited how many messages could be stored in the Queue at any one time, but the process could start consuming and producing more messages before the event was set, and is only limited by how fast the CPU is to produce and consume messages before the 0.5 sec sleep timer is up, at which point the event is set and the producer stops producing because its while condition becomes false, and the consumer just finishes consuming the remaining contents of the Queue.

malbert137 on June 6, 2020

As “pymash” notes, the “event” is initially false, so the way execution typically proceeds is this: 1. The producer starts immediately, putting 10 items in the queue, then blocks, as the queue is full (as per “YY” and “Michael C”). 2. The consumer starts immediately. Note the consumer could actually run before the producer has populated the queue, in which case it enters the loop on “not event.set()” and blocks on get_message(). Otherwise is starts processing the queue(). 3. At some indeterminate time the producer will unblock and add to the queue. Note, the producer will not run indefinitely because eventually event.is_set() is true. 4. The consumer runs until it sees the queue is empty().

I say “normally” because, in principle, the producer and consumer threads could be delayed so long that the event is set before either starts, but that is extremely unlikely.

There are multiple issues(). 1. It’s not clear what was the intended logic of the “event”, given that there seems to have been a logic error, but normally one would try not to “poll”, but to do an “event.wait()”. 2. In actual use, one would either let a queue live “forever”, or use “queue.join()” from the thread which populated the queue(). 3. It’s not clear that one is guaranteed to get everything put in the queue. There could be a race condition where the consumer sees “empty()” but the producer still adds something and one could observe the queue is not empty just before the program terminates.

Bresson Nemesis on July 18, 2020

Like @pymash, I am confused by this lesson, most notably with the explanations around threading.event. It appears to me threading.event is an eventing mechanism that can be used to start, or in the case of the lesson, stop consumer and producer. Multi-threading is handled by the scheduler/threading library.

Become a Member to join the conversation.