The Producer-Consumer Pipeline: Part 2
In this lesson, you’ll begin creating a multi-threaded producer-consumer pipeline. If you download the sample code, you can get your own copy of 12-prodcom.py
:
To learn more, you can also check out the documentation.
00:00
In the previous video, you learned what a producer-consumer pipeline is, and now you’re going to start to implement one. Start by creating a class called Pipeline
and give it an .__init__()
method with a parameter of capacity
.
00:16
This will be the number of messages that you can put into your pipeline. Set self.capacity
to that value, and then define self.message
.
00:30
This is going to represent this shared piece of data that the producer puts in and the consumer reads from the pipeline. When a Pipeline
is first initialized, set the .message
to None
.
00:45
Now, create your producer and your consumer. Let’s start with the producer. Define the producer()
function that is going to take a pipeline
as well as a message
. And recall that the producer’s job is to put messages into the pipeline.
01:02
We can only put capacity
number of messages in the Pipeline
, so let’s restrict that by creating messages within a loop. for _ in range(pipeline.capacity):
so we’re only going to create capacity
number of messages, we’ll say message
is—and instead of doing something really complex, let’s just make the message
a random number.
01:30
So we’ll say random.randint()
and let’s say from 1
to a 100
. We’ll need to import the random
package.
01:43
And so our producer()
is going to produce a message
that’s a random number, and then we need to get that message
into the queue.
01:50
So we need to write a method in our Pipeline
class that allows us to set a .message
. But for now in our producer()
we can just say pipeline.set_message()
and we’ll set the message
that we create here.
02:08
Now, in the end, when we’re done looping capacity
number of times and creating capacity
number of messages and putting them in the pipeline, we need to signal to our consumer that we are done—that the producer cannot produce any more messages.
02:24
So when we’re out of the loop, make another call to pipeline.set_message()
, and give it a special message, so let’s just say that special message is FINISH
.
02:38
And then this will be kind of like a global variable, if you will—a flag that the consumer is going to read. So let’s define FINISH
outside the class so that all parts of our program can read this variable. This will just be a string and let’s say it’s 'THE END'
.
02:59
The producer()
is going to create capacity
number of messages, put them in the pipeline
, and then when it’s done, it will put a message
of 'THE END'
to represent it’s done producing.
03:11
Now you need to define your .set_message()
method in the Pipeline
. Underneath .__init__()
, create a .set_message()
method that takes in a message
.
03:23
And let’s make a print statement here and we’ll say f'producing message of {message}'
. And then the .set_message()
method will take self.message
and set it to the message
that was passed to the producer()
—or rather, that the producer()
generated.
03:45
So now, you’ve created your producer()
function along with the .set_message()
method that puts the message
in the pipeline.
03:52
Now you need to create your consumer()
function.
03:57
Drop down below your producer()
and define consumer()
, and this is going to take a pipeline
as well. The consumer’s job is to read messages off of the queue and then do something with them. And since we used this FINISH
flag, we can write a loop within the consumer()
function.
04:16
while message is not FINISH:
then we’ll read in the message
, so we’ll say message = get_message()
. Once we get the message
, we’ll check if it is the 'THE END'
that we’ve defined here as FINISH
.
04:36
We’ll say if message is FINISH
—
04:42
rather, if message is not FINISH:
then we will take that message
and then maybe send it to another server or do some computation, but for now we’ll just replicate that by doing a time.sleep()
and we’ll sleep for a random number this time.
04:58
random.random()
produces a value between 0
and 1
, so we’ll sleep for some amount of time less than 1
second.
05:09
At this point, when the message is read and it is 'THE END'
, it is FINISH
, then we’ll just break out of this function and return back to the main thread.
05:19
You may have noticed that we are referencing message
without defining it in the scope of this function. So what you need to do is actually define message
before we enter the loop, and you can just set it to None
. As long as it’s defined, that will allow us to continue.
05:36
Now you’ve created your consumer()
function and it’s reading messages off the queue until it hits that FINISH
message, and then it will break out and return to the main thread.
05:45
But you still need to define the get_message()
method, and that is actually part of the Pipeline
class, so this is actually going to be pipeline.get_message()
.
05:56
And then come on up to your Pipeline
class and define your .get_message()
method, and this does not take any arguments besides the Pipeline
itself.
06:08
And let’s do a print statement under here, that says f'consuming message of {self.message}'
. .get_message()
will read the message
off of the Pipeline
, so we’ll say message = self.message
.
06:30
And then we want to return that message
back to the consumer()
. If we go back down to the consumer()
, we’ll see the message
is equal to the return value of pipeline.get_message()
.
06:43
And we check if that’s 'THE END'
. If it’s not, we continue reading from the pipeline
.
06:52
Now that you’ve created your consumer()
function, along with the corresponding .get_message()
method that goes into your Pipeline
, we now have all the components we need to actually produce this consumer-producer pipeline.
07:07 Go ahead and open up a terminal and let’s execute this program.
07:14
And I totally forgot to make an entry point, ha, so let’s exit out of the terminal, drop down underneath your consumer()
function, and create your entry point.
07:23
if __name__ == '__main__':
then we’ll create a Pipeline
. So we’ll say pipeline = Pipeline()
and we need to pass it the capacity
—let’s do 10
for now.
07:38
Then we’ll use our ThreadPoolExecutor
to create the producer
and the consumer
threads. with concurrent.futures.ThreadPoolExecutor()
and the max_workers
is going to be 2
—one for each thread.
07:59
So we have ex
and we’ll use .submit()
again to start and join the threads. The target function is producer
, so we want to start with the producer
and we want to pass in the pipeline
.
08:15
The producer()
function takes in a pipeline
and a message
. So. Our message
is—actually, this is incorrect. We don’t need to pass in a message
here.
08:30
The message
is actually generated in this loop. All we need to do is pass the producer()
the pipeline
.
08:38
And then we do the same thing down here for our consumer
.
08:44
I think it’s nice to be able to see the Pipeline
at the end of the program, so go down underneath the ThreadPoolExecutor
block and we’ll do a print statement and we’ll say f'producer: {producer_pipeline}'
, and then we’ll do the same for our consumer
, and this is consumer_pipeline
. So we haven’t defined these yet, but we are going to right now. Right above our entry point we’ll say producer_pipeline
is just an empty list, and same for consumer_pipeline
.
09:27
And that means we’re going to want to append the messages as they are produced and consumed. So in the .set_message()
and .get_message()
methods that you created, add in a call to append to those lists. The .set_message()
is used by the producer
, so producer_pipeline.append(message)
.
09:52
And then we can copy that into the .get_message()
and just change producer
to consumer
. And this is just for learning purposes so you can see the end result of the action from the producer
and the consumer
.
10:09 Now, open up a terminal and let’s try to execute our program.
10:19
I forgot to import concurrent.futures
, so I’m going to go up here and import concurrent.futures
. Also, I’m going to need to import time
.
10:32
Save that, clear the screen, and let’s try this again. Okay. That was very fast. Let’s go ahead and scroll up to the top of the output, where we see producing message of 1
.
10:45
We see a bunch of producing
messages and we actually see one consuming
message of THE END
. So, this isn’t exactly what we want.
10:56
We produced 10 messages, but we only read the last one. Let’s just execute this again and see the output. What we would want is the producer
to produce 23
, then the consumer
to consume 23
, and then 36
and 36
, and so on.
11:18 So, we’re getting there—we’ve actually created a pipeline as well as a consumer and a producer with their corresponding methods, but something isn’t right here.
11:28 In the next video, let’s see if we can fix this problem.
mo on Jan. 2, 2022
Thanks a lot for this lesson. I have a question. The method get_message
is written as follows in this lesson:
...
def get_message(self):
print(f'consuming message of {self.message}')
message = self.message
consumer_pipeline.append(message)
return message
...
My question is: why is self.message
copied into message
first? Why isn’t self.message
used and returned directly? In other words: why not like this:
...
def get_message(self):
print(f'consuming message of {self.message}')
consumer_pipeline.append(self.message)
return self.message
...
I tried what happened and it seems to work the same. But if there is a reason to use an intermediate message
I would like to understand.
Manish Sharma on Oct. 7, 2023
Hey, anyone reading these comments to answer the questions?
Bartosz Zaczyński RP Team on Oct. 8, 2023
@Manish Sharma Our team reads questions like this every day, trying to answer them as best as we can. That said, there are several questions submitted every day, so it’s not always possible for us to answer each and every one. We encourage other users to also share their knowledge and help answer questions in the comments section or take advantage of the collective knowledge of our Slack community.
Though I’m not the creator of this course, the use of message = self.message
looks redundant in this case, as someone has correctly pointed out before. This is probably a matter of coding style or a way to be explicit for teaching purposes.
eyrei RP Team on Aug. 4, 2024
Bartosz you are correct regarding the teaching usage.
I’ve seen this used in teaching examples before to demonstrate missed and dirty updates. It becomes important because each thread calling the function gets its own copy of message whereas there is only one shared copy of self.message.
Become a Member to join the conversation.
fofanovis on April 4, 2020
I would add that you face a deadlock not because of an inherently wrong locking/releasing but because you just do not catch exceptions thrown out of release method. RuntimeException happens when you try to release a released lock. BTW, it is not enough to just check the locked flag because of possible race conditions, so there is a need to wrap release methods by try-except and everything will work fine.
I see that initial locking helps but anyway such code seems too fragile to me. So, I would add try-excepts anyway.