In this lesson, you’ll use the
queue module and threading events to improve your pipeline. If you download the sample code, you can get your own copy of
The queue Module: Part 1
In this lesson, you’ll use the
00:00 In the previous lesson, you created a pipeline using locks and multi-threading to produce, basically, a message queue. This pipeline was limited by a capacity and it was also a one-to-one ratio, so the producer produced one message and the consumer picked that message off of the pipeline. In a more real-world situation, you would have many producers putting in many messages into the pipeline, and the consumers usually take longer than the producers do to produce, so you would have this backup of messages that the consumers are going to feed off of. In the program that you wrote, it would not support that. It would only support one message at a time.
One of the kind of hacky things that you did—because I told you to, it’s not your fault, ha—was creating this
FINISH flag. That was a way for us to determine when the producer finished putting messages in the pipeline.
.set() to set the event to
True and it has
.reset() to set it back to
False. When you first create an
Event, it will be under the
False value, and then you call
.set() on the
Event to set it to
and then before you submit your threads, do a
time.sleep() for maybe half of a second, and then call your
event.set() method. Now that we’ve added the
event as an extra parameter to our functions, let’s start with the
Instead of looping over the
capacity, you can now instead say
while not event.is_set():.
.is_set() is just checking if the
event is set. In our entry point, we wait a half a second and then
event. So in other words, the
producer thread will not execute anything within this loop until we press that set button, so to speak.
One of the features of the
Queue class is that it handles the locking automatically. So you can go down and just delete the
.producer_lock and the
.consumer_lock and the call to
.acquire(), since the
Queue object handles that automatically.
So instead of doing this
self.message = message, what we can do instead is say
.put() will signal the
producer, which uses the
.set_message() method, to put the message in the queue.
but you actually want to save that message, so we would say
message = self.get() and then you can delete that line there. And you can see the power of leveraging preexisting libraries like the
These two lines are just print statements that we’re using for our own sake of learning, but really it’s a very simple method here. We’re doing a
.put() when we set a message and we’re doing a
.get() to get a message off the queue.
Then you simplified the code in your
.get_message(). You also change the
producer() to take in the
event and got rid of the global
FINISH flag, since we’re using events now. You also need to change the
consumer() function a little bit.
First, start by passing the
event in as well as the
pipeline, which is now a
Queue. And since we no longer have our
FINISH flag and we’re using an
Event now, you can delete these two lines. And instead we’ll do another
Let’s go down to the main thread and just go over the changes we made. First we created an
event. This is a Boolean flag that both of these threads now have access to, since we added them to our
.submit() call. We did a
sleep() for half a second before we set the
event. Once we set the
event, by the time the
event is set, our
consumer threads have already started.
pipeline.empty() to determine when it needs to stop. And it also uses that
event flag as well. You updated the
.get_message() method to use
self.get() to assign the
message and then return it back to the consumer.
One of the sometimes frustrating things about working with threads is that when there’s an error in your code, you don’t often get a good error message, or you don’t even get an error message sometimes. So one thing that we didn’t fix was that we’re still referencing this
FINISH flag, which actually doesn’t exist anymore. So let’s see if we can delete this line
and just move
time.sleep() into that
while loop. We’ll save this and then come back to the terminal to try to execute our program again. And that didn’t solve our deadlock problem, so let’s go ahead and kill the process and see if there’s anything else in our code that needs to be changed.
in this print statement, we have a reference to
self.message, which we deleted from our
.__init__() method. Let’s change this now too. We can actually put this below the
message assignment and release that
self., and then we’ll now have
13:22 Before we go over this output, it’s important to realize when you’re debugging multi-threaded applications and message queues, it can be very tough because deadlock situations and freezing can happen in ways that aren’t necessarily related to locking and unlocking, but just errors.
Become a Member to join the conversation.