The queue Module: Part 1
In this lesson, you’ll use the queue
module and threading events to improve your pipeline. If you download the sample code, you can get your own copy of 13-queue.py
:
To learn more, you can also check out the documentation for threading.Event
, the queue
module, and super
.
00:00 In the previous lesson, you created a pipeline using locks and multi-threading to produce, basically, a message queue. This pipeline was limited by a capacity and it was also a one-to-one ratio, so the producer produced one message and the consumer picked that message off of the pipeline. In a more real-world situation, you would have many producers putting in many messages into the pipeline, and the consumers usually take longer than the producers do to produce, so you would have this backup of messages that the consumers are going to feed off of. In the program that you wrote, it would not support that. It would only support one message at a time.
00:49
Now you’re going to learn about another module in Python called the queue
module, as well as some other objects from the threading
module.
00:58
Let’s go ahead and copy all of our code from the previous lesson into a new file called 13-queue
.
01:16
Now you’ll start to make some changes. Start by importing the queue
module. This comes with Python 3, so no need to install anything extra.
01:26 Then come on down to your entry point.
01:32
One of the kind of hacky things that you did—because I told you to, it’s not your fault, ha—was creating this FINISH
flag. That was a way for us to determine when the producer finished putting messages in the pipeline.
01:50
We’re going to get rid of that and instead use something called an Event
. This comes from the threading
package. We’ll say event = threading.Event()
.
02:03
An Event
has two values, either True
or False
. It’s like a Boolean flag that any thread that has access to the Event
will have access to this flag. It has two main methods associated with it.
02:19
It has .set()
to set the event to True
and it has .reset()
to set it back to False
. When you first create an Event
, it will be under the False
value, and then you call .set()
on the Event
to set it to True
.
02:46
Let’s give our threads access to this event
. We’ll pass in the event
to each thread,
02:57
and then before you submit your threads, do a time.sleep()
for maybe half of a second, and then call your event.set()
method. Now that we’ve added the event
as an extra parameter to our functions, let’s start with the producer()
function.
03:17
Go up to producer()
and let’s now give that an event
as well so the producer()
has access to that Boolean flag or that Event
.
03:28
And now you can also get rid of this concept of a capacity
, and instead just use the event
to signal when the producer is finished.
03:38
Instead of looping over the capacity
, you can now instead say while not event.is_set():
. .is_set()
is just checking if the event
is set. In our entry point, we wait a half a second and then .set()
the event
. So in other words, the producer
thread will not execute anything within this loop until we press that set button, so to speak.
04:07
Go ahead and get rid of your range()
loop, and we’ll put the message creation and message setting within this event
loop.
04:18
And you can go ahead and get rid of this FINISH
message since we no longer need that. Then scroll on up to your Pipeline
class.
04:28
We are going to now leverage the queue
module. We’re going to have the Pipeline
inherit from queue.Queue
. The Queue
class of the queue
module.
04:42
I always hate spelling this word queue. But our Pipeline
now inherits from the Queue
, which gives us a whole bunch of goodies from the queue
module.
04:55
One of the features of the Queue
class is that it handles the locking automatically. So you can go down and just delete the .producer_lock
and the .consumer_lock
and the call to .acquire()
, since the Queue
object handles that automatically.
05:15
And while we’re at it, let’s delete our FINISH
message since we don’t need that anymore. So, instead of capacity
, the Queue
class offers a parameter called maxsize
.
05:27
So we’re going to delete the capacity
argument here, and then we’ll use a call to super()
, to grab the .__init__()
method of the parent class that we’re inheriting from.
05:41
We’re taking the .__init__()
method of the Queue
class, and that has a parameter called maxsize
. And then we’ll make that 10
, similar to what our capacity
was before.
05:53
And that means we can delete our .capacity
and we can also delete .message
.
05:59
While you’re here, you can do more cleanup. Since you no longer have locks, he can delete your calls to any .consumer_lock
or .producer_lock
.
06:10
Putting messages into a queue and getting messages from a queue are standard operations in a queue or a pipeline. The Queue
class offers two methods: .put()
and .get()
.
06:23
So instead of doing this self.message = message
, what we can do instead is say self.put(message)
, and .put()
will signal the producer
, which uses the .set_message()
method, to put the message in the queue.
06:47
That means you can delete that line there. And .get_message()
also has the ability to just say self.get()
—
07:01
but you actually want to save that message, so we would say message = self.get()
and then you can delete that line there. And you can see the power of leveraging preexisting libraries like the queue
module.
07:17
These two lines are just print statements that we’re using for our own sake of learning, but really it’s a very simple method here. We’re doing a .put()
when we set a message and we’re doing a .get()
to get a message off the queue.
07:38
You’ve changed the Pipeline
class quite a bit. First, you inherited from the Queue
class, and then you inherited its .__init__()
method, set a maxsize
of 10
.
07:49
Then you simplified the code in your .set_message()
and .get_message()
. You also change the producer()
to take in the event
and got rid of the global FINISH
flag, since we’re using events now. You also need to change the consumer()
function a little bit.
08:07
First, start by passing the event
in as well as the pipeline
, which is now a Queue
. And since we no longer have our FINISH
flag and we’re using an Event
now, you can delete these two lines. And instead we’ll do another while
loop.
08:25
We’ll say while pipeline
—
08:29
rather, while not pipeline.empty() or not event.is_set():
pipeline.empty()
comes from the queue
module—again, we’re inheriting that when we create our Pipeline
.
08:47
We now have access to this .empty()
method, which just does what it sounds—if the pipeline
is empty, meaning there are no messages in it, then this will be True
.
08:58
So while the pipeline
is not empty or the event
is not set, we’ll continue reading messages off of the queue. Another method we can use is .qsize()
.
09:14
We can print f'queue size is '
and then pipeline
, which is now a Queue
, .qsize()
. This method keeps track of how many messages are in the pipeline
.
09:33
Let’s go down to the main thread and just go over the changes we made. First we created an event
. This is a Boolean flag that both of these threads now have access to, since we added them to our .submit()
call. We did a sleep()
for half a second before we set the event
. Once we set the event
, by the time the event
is set, our producer
and consumer
threads have already started.
10:01
So the producer
thread will go up into the producer()
function, which now takes the event
and it will wait for the event
to be set before it starts putting messages on the queue.
10:11
It does that through the .set_message()
method that you created and you recently changed to use self.put()
to put the message
on the queue.
10:24
uses pipeline.empty()
to determine when it needs to stop. And it also uses that event
flag as well. You updated the .get_message()
method to use self.get()
to assign the message
and then return it back to the consumer.
10:46
Let’s go ahead and open up a terminal, clear the screen, and run our 13-queue
program.
10:57
Looks like we have an error on line 36, and—oh yeah, we no longer have capacity
, so I can delete the argument there to Pipeline()
. And Pipeline()
now doesn’t take any arguments.
11:12
We do define the maxsize
coming from the Queue.__init__()
method. Let’s go ahead and come back to the terminal and we’ll try this again.
11:24 Okay, that’s good. We’re seeing some output.
11:29 It looks like we got a bunch of producer messages putting messages on the queue, but it appears that we are in a deadlock situation. Let’s kill this process.
11:42 We’ll clear the screen and come back to the code and think about why that is.
11:48
One of the sometimes frustrating things about working with threads is that when there’s an error in your code, you don’t often get a good error message, or you don’t even get an error message sometimes. So one thing that we didn’t fix was that we’re still referencing this FINISH
flag, which actually doesn’t exist anymore. So let’s see if we can delete this line
12:15
and just move time.sleep()
into that while
loop. We’ll save this and then come back to the terminal to try to execute our program again. And that didn’t solve our deadlock problem, so let’s go ahead and kill the process and see if there’s anything else in our code that needs to be changed.
12:37
If we scroll back up to the Pipeline
class,
12:44
in this print statement, we have a reference to self.message
, which we deleted from our .__init__()
method. Let’s change this now too. We can actually put this below the message
assignment and release that self.
, and then we’ll now have message
.
13:09 Let’s open up the terminal again and try to execute our program once more. All right, and that seemed to be the problem. But that is a bit of a moment of frustration.
13:22 Before we go over this output, it’s important to realize when you’re debugging multi-threaded applications and message queues, it can be very tough because deadlock situations and freezing can happen in ways that aren’t necessarily related to locking and unlocking, but just errors.
13:45 When a thread has an error, it often can’t report back and so it’s up to the developer to really step through the code slowly and see if there are any bugs in it.
Lee RP Team on Nov. 30, 2019
Yes :( That is one of the drawbacks of working with threads. This thread on StackOverflow might be interesting to you: stackoverflow.com/questions/2829329/catch-a-threads-exception-in-the-caller-thread-in-python
pymash on April 12, 2020
At about 3:55 of the video, the explanation is that the producer loop doesn’t process any of the code in the while block until the event is set. But isn’t the event initially false, so the while not event.is_set() will execute immediately, filling the queue until the event.set() called. Then the producer thread exits the while loop and the read ends.
Michael C on May 13, 2020
Hi Lee, Pretty much enjoyed the lesson. I have a question regarding the number of messages produced by using queue module. When we were using Lock() and specifying the capacity for the Pipeline object, the number of messages produced would be limited to the number of we specified for capacity. Ex. Pipeline(5), then we would expect 5 messages to be produced excluding the “The End” message. However, when we were using queue.Queue inheritance, the role of capacity has been replaced by maxsize. For some reason, the number of messages produced was more then what was specified by maxsize. Ex. maxsize = 5. The total number of messages produced were more than 5 messages. I think in the lesson you specified maxsize = 10, if I pasued and counted the video screen correclty, you were getting 14 messages instead of 10. Is that correct? Or, is there a way to be sure that the number of messages produced would be the same as what is specified for maxsize? many thanks.
YY on May 20, 2020
I think the maxsize only limited how many messages could be stored in the Queue at any one time, but the process could start consuming and producing more messages before the event was set, and is only limited by how fast the CPU is to produce and consume messages before the 0.5 sec sleep timer is up, at which point the event is set and the producer stops producing because its while condition becomes false, and the consumer just finishes consuming the remaining contents of the Queue.
malbert137 on June 6, 2020
As “pymash” notes, the “event” is initially false, so the way execution typically proceeds is this: 1. The producer starts immediately, putting 10 items in the queue, then blocks, as the queue is full (as per “YY” and “Michael C”). 2. The consumer starts immediately. Note the consumer could actually run before the producer has populated the queue, in which case it enters the loop on “not event.set()” and blocks on get_message(). Otherwise is starts processing the queue(). 3. At some indeterminate time the producer will unblock and add to the queue. Note, the producer will not run indefinitely because eventually event.is_set() is true. 4. The consumer runs until it sees the queue is empty().
I say “normally” because, in principle, the producer and consumer threads could be delayed so long that the event is set before either starts, but that is extremely unlikely.
There are multiple issues(). 1. It’s not clear what was the intended logic of the “event”, given that there seems to have been a logic error, but normally one would try not to “poll”, but to do an “event.wait()”. 2. In actual use, one would either let a queue live “forever”, or use “queue.join()” from the thread which populated the queue(). 3. It’s not clear that one is guaranteed to get everything put in the queue. There could be a race condition where the consumer sees “empty()” but the producer still adds something and one could observe the queue is not empty just before the program terminates.
Bresson Nemesis on July 18, 2020
Like @pymash, I am confused by this lesson, most notably with the explanations around threading.event. It appears to me threading.event is an eventing mechanism that can be used to start, or in the case of the lesson, stop consumer and producer. Multi-threading is handled by the scheduler/threading library.
Become a Member to join the conversation.
Michal on Nov. 29, 2019
Does it mean you get no
NameError
orAttributeError
message whatsoever, despite referencing undefined namesFINISH
andself.message
? That’s kind of scary.