Join us and get access to thousands of tutorials and a community of expert Pythonistas.

Unlock This Lesson

This lesson is for members only. Join us and get access to thousands of tutorials and a community of expert Pythonistas.

Unlock This Lesson

Hint: You can adjust the default video playback speed in your account settings.
Hint: You can set your subtitle preferences in your account settings.
Sorry! Looks like there’s an issue with video playback 🙁 This might be due to a temporary outage or because of a configuration issue with your browser. Please refer to our video player troubleshooting guide for assistance.

The Producer-Consumer Pipeline: Part 3

In this lesson, you’ll improve your producer-consumer pipeline. If you download the sample code, you can get your own copy of 12-prodcom.py:

Download

Sample Code (.zip)

12.9 KB

00:00 In the previous video, you created your pipeline and your producer and your consumer. When we ran the program, we saw that the output wasn’t exactly what we would want.

00:09 So what you’re going to do now is use your knowledge of locks to create a more stable and consistent pipeline. Now that you know about locks and how you can use Lock.acquire() to protect an area of your program and Lock.release() to unlock that area of your program, I want you to now create two locks—one for the producer, one for the consumer. So when you initialize your Pipeline, we’ll say self.producer_lock is a threading.Lock object, and same for consumer.

00:48 Then we’ll need to import threading, while we’re here.

00:52 And I’m going to fix a little typo, .consumer_lock. So now you know that the .set_message() is used by the consumer—or sorry, rather, the producer. The producer sets messages, it puts messages in the pipeline. The consumer gets messages.

01:12 So when the producer sets a message, we would want to have the producer lock that particular part of the code.

01:22 So right above the point where we set self.message to message, call your self.producer_lock.acquire() method. This is going to lock down this particular part of our program to the producer thread.

01:43 Do the same thing for the .consumer_lock in the .get_message().

01:48 Right above where you assign message, say self.consumer_lock.acquire().

01:59 Okay, so this should fix all our problems, right? Let’s go down into a terminal and we’ll clear the screen and run our program again. And something interesting is happening here—it looks like we are in a deadlock situation.

02:16 You know why this happened. A deadlock is where an .acquire() call was called twice on a Lock by a thread.

02:26 You probably know that we need to, somewhere in our code, release the locks. I’m going to kill this process, clear the screen, close the terminal, and come back into our code.

02:39 There’s an interesting relationship between the producer and the consumer that their corresponding methods are kind of symbiotic. And we can take advantage of that by having the consumer’s method, which is the .get_message()—we can call the .release() method on the .producer_lock in this function. So after the consumer reads the message, then we can say self.producer_lock.release(). And same thing for the .set_message().

03:14 After we changed self.message to the random number that the producer produces, we can now unlock the .consumer_lock. Let’s save our program, open up the terminal, and now we should have a steady pipeline.

03:34 Let’s run our program again—and we’re in deadlock again! What exactly happened here is very subtle. I’m going to kill this process and go back to our code, and let’s think about what happened here for a moment.

03:53 If we kind of zoom out and think of the bigger picture without getting wrapped up in all this complex code, when you create a pipeline you want the producer to be able to produce a message and you want the consumer to wait until a message is produced.

04:12 A very simple way that we can tell the consumer to wait is by calling an .acquire() on its Lock. So if we do self.consumer_lock.acquire(), that means that when our consumer thread first starts through our ThreadPoolExecutor down here, it’s going to initialize a Pipeline, which calls this .consumer_lock.acquire(), so anything below this code is actually going to be unavailable to the consumer, so the consumer won’t be able to read a message from the pipeline until the producer releases the .consumer_lock.

04:56 And that happens right here at the bottom of the first call to .set_message(). It’s the producer that frees the .consumer_lock and then vice versa, it’s the consumer that frees the .producer_lock when it’s done reading the .message.

05:13 Now let’s open up the terminal and we’ll run our program again. And this is great. We’re not in deadlock, so it’s actually pumping out numbers. And look at that—our outcome appears to be in the same order and the same length.

05:30 So, producer produced 99, the consumer read that, and then it produced 46, and vice versa, and we can see that in our output here. The print statements aren’t exactly in order, but the order in which they are consumed is. producing message of 99 and then consuming message of 99. Producing 46, and then we produced 2, but the next consuming message was 46.

05:58 So, things end up being an order. We can execute this as many times as we want and in theory, we should have that one-to-one ratio of producing and consuming.

06:12 Congratulations! You were able to use locks to fix your pipeline and make it more stable. Now, I consider this creating a pipeline the hard way, kind of manually doing the acquire and release.

06:26 Next, you’re going to learn about something from the queue package that makes this a lot more pleasant to write.

malbert137 on June 2, 2020

This is interesting as a “toy” example, but perhaps it is worth calling out two items.

First, it’s not clear when a pipeline like this would actually be useful. There is no way for the producer and consumer to simultaneously make progress, and this code will have problems if there are multiple producer or consumer threads.

Also, in general it’s probably not the best design to acquire a lock in one function and release it in another. It makes it more difficult to reason about the code. Further, this precludes the use of a context manager, so one needs to think about exceptions.

Of course, for the “real” implementation one wants semaphores, which are discussed later, although for a “toy” example one could doing polling with “sleep()s”.

malbert137 on June 2, 2020

import random
import concurrent.futures
import time
import threading

FINISH = 'THE END'

class ToyQueueWithPollingNotForProduction:
    def __init__(self, capacity, polling_interval=0.05):
        self.capacity = capacity
        self.messages = []
        self.lock = threading.Lock()
        self.polling_interval = polling_interval
    def send_message(self, message):
        print(f'sending message of {message}')
        while True:
            with self.lock:
                if len(self.messages) < self.capacity:
                    self.messages.append(message)
                    return
            time.sleep(self.polling_interval)
    def recv_message(self):
        while True:
            with self.lock:
                if self.messages:
                    msg = self.messages.pop(0)
                    print(f'consuming message of {msg}')
                    return msg
            time.sleep(self.polling_interval)

producer_pipeline = []
consumer_pipeline = []

def producer(pipeline):
    for _ in range(pipeline.capacity):
        message = random.randint(1, 100)
        producer_pipeline.append(message)
        pipeline.send_message(message)
    pipeline.send_message(FINISH)

def consumer(pipeline):
    message = None
    while message is not FINISH:
        message = pipeline.recv_message()
        if message is not FINISH:
            consumer_pipeline.append(message)
            time.sleep(random.random())
        else:
            break


if __name__ == '__main__':
    pipeline = ToyQueueWithPollingNotForProduction(10)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
        ex.submit(producer, pipeline)
        ex.submit(consumer, pipeline)
    print(f'producer: {producer_pipeline}')
    print(f'consumer: {consumer_pipeline}')

Manish Sharma on Oct. 7, 2023

I couldn’t understand why producer and consumer methods are symbiotically related, like why locking in other and releasing in other.

Bartosz Zaczyński RP Team on Oct. 8, 2023

@Manish Sharma The producer and consumer share a common resource—the pipeline. In order to ensure that only one of them can access the shared resource at a time—so that the producer doesn’t override unconsumed data or the consumer doesn’t try to start reading incomplete data—they must cooperate with each other through locks.

Whenever the producer finishes writing the data, it notifies the consumer that it’s safe to read from the pipeline by releasing the corresponding lock. Conversely, when the consumer clears the pipeline, it notifies the producer by releasing the producer’s lock.

In that sense, the producer and consumer are in a symbiotic relationship because they use each other’s states to coordinate and synchronize their operations.

I hope that clears it up for you.

Tony Ngok on Feb. 11, 2024

In this example, are these two ex.submit() creating a producer and a consumer thread respectively?

Bartosz Zaczyński RP Team on Feb. 12, 2024

@Tony Ngok The idea is to allocate a pool of threads upfront to pay the cost of their creation only once. When you submit a task to the executor, it assigns one of the available threads to your job or puts it on a waitlist. So, calling ex.submit() never creates a new thread. Not sure if this is what you were asking about, though.

Become a Member to join the conversation.