Join us and get access to hundreds of tutorials and a community of expert Pythonistas.

Unlock This Lesson

This lesson is for members only. Join us and get access to hundreds of tutorials and a community of expert Pythonistas.

Unlock This Lesson

Hint: You can adjust the default video playback speed in your account settings.
Hint: You can set the default subtitles language in your account settings.
Sorry! Looks like there’s an issue with video playback 🙁 This might be due to a temporary outage or because of a configuration issue with your browser. Please see our video player troubleshooting guide to resolve the issue.

The Producer-Consumer Pipeline: Part 3

Give Feedback

In this lesson, you’ll improve your producer-consumer pipeline. If you download the sample code, you can get your own copy of 12-prodcom.py:

Download

Sample Code (.zip)

12.9 KB

malbert137 on June 2, 2020

This is interesting as a “toy” example, but perhaps it is worth calling out two items.

First, it’s not clear when a pipeline like this would actually be useful. There is no way for the producer and consumer to simultaneously make progress, and this code will have problems if there are multiple producer or consumer threads.

Also, in general it’s probably not the best design to acquire a lock in one function and release it in another. It makes it more difficult to reason about the code. Further, this precludes the use of a context manager, so one needs to think about exceptions.

Of course, for the “real” implementation one wants semaphores, which are discussed later, although for a “toy” example one could doing polling with “sleep()s”.

malbert137 on June 2, 2020

import random
import concurrent.futures
import time
import threading

FINISH = 'THE END'

class ToyQueueWithPollingNotForProduction:
    def __init__(self, capacity, polling_interval=0.05):
        self.capacity = capacity
        self.messages = []
        self.lock = threading.Lock()
        self.polling_interval = polling_interval
    def send_message(self, message):
        print(f'sending message of {message}')
        while True:
            with self.lock:
                if len(self.messages) < self.capacity:
                    self.messages.append(message)
                    return
            time.sleep(self.polling_interval)
    def recv_message(self):
        while True:
            with self.lock:
                if self.messages:
                    msg = self.messages.pop(0)
                    print(f'consuming message of {msg}')
                    return msg
            time.sleep(self.polling_interval)

producer_pipeline = []
consumer_pipeline = []

def producer(pipeline):
    for _ in range(pipeline.capacity):
        message = random.randint(1, 100)
        producer_pipeline.append(message)
        pipeline.send_message(message)
    pipeline.send_message(FINISH)

def consumer(pipeline):
    message = None
    while message is not FINISH:
        message = pipeline.recv_message()
        if message is not FINISH:
            consumer_pipeline.append(message)
            time.sleep(random.random())
        else:
            break


if __name__ == '__main__':
    pipeline = ToyQueueWithPollingNotForProduction(10)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
        ex.submit(producer, pipeline)
        ex.submit(consumer, pipeline)
    print(f'producer: {producer_pipeline}')
    print(f'consumer: {consumer_pipeline}')

Become a Member to join the conversation.