The Producer-Consumer Pipeline: Part 3
In this lesson, you’ll improve your producer-consumer pipeline. If you download the sample code, you can get your own copy of 12-prodcom.py
:
00:00 In the previous video, you created your pipeline and your producer and your consumer. When we ran the program, we saw that the output wasn’t exactly what we would want.
00:09
So what you’re going to do now is use your knowledge of locks to create a more stable and consistent pipeline. Now that you know about locks and how you can use Lock.acquire()
to protect an area of your program and Lock.release()
to unlock that area of your program, I want you to now create two locks—one for the producer
, one for the consumer
. So when you initialize your Pipeline
, we’ll say self.producer_lock
is a threading.Lock
object, and same for consumer
.
00:48
Then we’ll need to import threading
, while we’re here.
00:52
And I’m going to fix a little typo, .consumer_lock
. So now you know that the .set_message()
is used by the consumer
—or sorry, rather, the producer
. The producer sets messages, it puts messages in the pipeline. The consumer gets messages.
01:12
So when the producer
sets a message, we would want to have the producer
lock that particular part of the code.
01:22
So right above the point where we set self.message
to message
, call your self.producer_lock.acquire()
method. This is going to lock down this particular part of our program to the producer
thread.
01:43
Do the same thing for the .consumer_lock
in the .get_message()
.
01:48
Right above where you assign message
, say self.consumer_lock.acquire()
.
01:59 Okay, so this should fix all our problems, right? Let’s go down into a terminal and we’ll clear the screen and run our program again. And something interesting is happening here—it looks like we are in a deadlock situation.
02:16
You know why this happened. A deadlock is where an .acquire()
call was called twice on a Lock
by a thread.
02:26 You probably know that we need to, somewhere in our code, release the locks. I’m going to kill this process, clear the screen, close the terminal, and come back into our code.
02:39
There’s an interesting relationship between the producer and the consumer that their corresponding methods are kind of symbiotic. And we can take advantage of that by having the consumer’s method, which is the .get_message()
—we can call the .release()
method on the .producer_lock
in this function. So after the consumer reads the message, then we can say self.producer_lock.release()
. And same thing for the .set_message()
.
03:14
After we changed self.message
to the random number that the producer produces, we can now unlock the .consumer_lock
. Let’s save our program, open up the terminal, and now we should have a steady pipeline.
03:34 Let’s run our program again—and we’re in deadlock again! What exactly happened here is very subtle. I’m going to kill this process and go back to our code, and let’s think about what happened here for a moment.
03:53 If we kind of zoom out and think of the bigger picture without getting wrapped up in all this complex code, when you create a pipeline you want the producer to be able to produce a message and you want the consumer to wait until a message is produced.
04:12
A very simple way that we can tell the consumer to wait is by calling an .acquire()
on its Lock
. So if we do self.consumer_lock.acquire()
, that means that when our consumer
thread first starts through our ThreadPoolExecutor
down here, it’s going to initialize a Pipeline
, which calls this .consumer_lock.acquire()
, so anything below this code is actually going to be unavailable to the consumer, so the consumer won’t be able to read a message from the pipeline until the producer releases the .consumer_lock
.
04:56
And that happens right here at the bottom of the first call to .set_message()
. It’s the producer
that frees the .consumer_lock
and then vice versa, it’s the consumer
that frees the .producer_lock
when it’s done reading the .message
.
05:13 Now let’s open up the terminal and we’ll run our program again. And this is great. We’re not in deadlock, so it’s actually pumping out numbers. And look at that—our outcome appears to be in the same order and the same length.
05:30
So, producer
produced 99
, the consumer
read that, and then it produced 46
, and vice versa, and we can see that in our output here. The print statements aren’t exactly in order, but the order in which they are consumed is. producing message of 99
and then consuming message of 99
. Producing 46
, and then we produced 2
, but the next consuming message was 46
.
05:58 So, things end up being an order. We can execute this as many times as we want and in theory, we should have that one-to-one ratio of producing and consuming.
06:12 Congratulations! You were able to use locks to fix your pipeline and make it more stable. Now, I consider this creating a pipeline the hard way, kind of manually doing the acquire and release.
06:26
Next, you’re going to learn about something from the queue
package that makes this a lot more pleasant to write.
malbert137 on June 2, 2020
import random
import concurrent.futures
import time
import threading
FINISH = 'THE END'
class ToyQueueWithPollingNotForProduction:
def __init__(self, capacity, polling_interval=0.05):
self.capacity = capacity
self.messages = []
self.lock = threading.Lock()
self.polling_interval = polling_interval
def send_message(self, message):
print(f'sending message of {message}')
while True:
with self.lock:
if len(self.messages) < self.capacity:
self.messages.append(message)
return
time.sleep(self.polling_interval)
def recv_message(self):
while True:
with self.lock:
if self.messages:
msg = self.messages.pop(0)
print(f'consuming message of {msg}')
return msg
time.sleep(self.polling_interval)
producer_pipeline = []
consumer_pipeline = []
def producer(pipeline):
for _ in range(pipeline.capacity):
message = random.randint(1, 100)
producer_pipeline.append(message)
pipeline.send_message(message)
pipeline.send_message(FINISH)
def consumer(pipeline):
message = None
while message is not FINISH:
message = pipeline.recv_message()
if message is not FINISH:
consumer_pipeline.append(message)
time.sleep(random.random())
else:
break
if __name__ == '__main__':
pipeline = ToyQueueWithPollingNotForProduction(10)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
ex.submit(producer, pipeline)
ex.submit(consumer, pipeline)
print(f'producer: {producer_pipeline}')
print(f'consumer: {consumer_pipeline}')
Manish Sharma on Oct. 7, 2023
I couldn’t understand why producer and consumer methods are symbiotically related, like why locking in other and releasing in other.
Bartosz Zaczyński RP Team on Oct. 8, 2023
@Manish Sharma The producer and consumer share a common resource—the pipeline. In order to ensure that only one of them can access the shared resource at a time—so that the producer doesn’t override unconsumed data or the consumer doesn’t try to start reading incomplete data—they must cooperate with each other through locks.
Whenever the producer finishes writing the data, it notifies the consumer that it’s safe to read from the pipeline by releasing the corresponding lock. Conversely, when the consumer clears the pipeline, it notifies the producer by releasing the producer’s lock.
In that sense, the producer and consumer are in a symbiotic relationship because they use each other’s states to coordinate and synchronize their operations.
I hope that clears it up for you.
Tony Ngok on Feb. 11, 2024
In this example, are these two ex.submit()
creating a producer and a consumer thread respectively?
Bartosz Zaczyński RP Team on Feb. 12, 2024
@Tony Ngok The idea is to allocate a pool of threads upfront to pay the cost of their creation only once. When you submit a task to the executor, it assigns one of the available threads to your job or puts it on a waitlist. So, calling ex.submit()
never creates a new thread. Not sure if this is what you were asking about, though.
Become a Member to join the conversation.
malbert137 on June 2, 2020
This is interesting as a “toy” example, but perhaps it is worth calling out two items.
First, it’s not clear when a pipeline like this would actually be useful. There is no way for the producer and consumer to simultaneously make progress, and this code will have problems if there are multiple producer or consumer threads.
Also, in general it’s probably not the best design to acquire a lock in one function and release it in another. It makes it more difficult to reason about the code. Further, this precludes the use of a context manager, so one needs to think about exceptions.
Of course, for the “real” implementation one wants semaphores, which are discussed later, although for a “toy” example one could doing polling with “sleep()s”.