Queues and Parallelism
00:00
In the previous lesson, I talked about using list
and deque
objects as FIFO queues in Python. In this lesson, I’m going to talk about queues as they relate to threads and multiprocessing.
00:12
In the lesson on stacks and the LIFO queue, I talked about the problem of thread-safety and race conditions. Python supports several different ways of doing parallelism. Inside of the same process, you can use threads or asyncio. And between separate processes, there is the multiprocessing
library.
00:31 Thread safety problems only occur in concurrent code that executes inside of a single process. By contrast, multiprocess code doesn’t have the thread-safety issue, but doesn’t have access to the same memory space.
00:45 You have to write specific code to handle the interprocess communication.
00:50
For multithreaded queue code, you want the Queue
object in the queue
library. It is similar to the LifoQueue
object discussed in a previous lesson in the section on stacks. Queue
uses .put()
, .get()
, and .get_nowait()
just like LifoQueue
does.
01:08
The order of it, though, is FIFO-based instead of LIFO. I’ll leave a demonstration to your imagination. You can go back in and mentally edit everywhere where it said LifoQueue
and replace it with the word Queue
. Threads and multiple processes are both techniques to achieve concurrency, but with subtle differences. Threads are good for I/O-bound work.
01:30 A lot of computing time is spent waiting for I/O. Having one thread wait while another continues to compute is a great way of achieving speedup. Python has a master lock in the interpreter called the GIL, or Global Interpreter Lock.
01:44 This lock is there to protect the interpreter from race conditions, but the cost is that it is severely limiting on CPU-bound parallelism. Hence, threads are only really useful for I/O-bound processing.
01:58 If you’re doing CPU-intensive work and have multiple processors on your machine—most desktops do now—you can get around the restrictions of the GIL by spawning multiple processes.
02:09 Each process gets its own instance of the Python interpreter, which is a bit expensive. It has a startup cost and uses more memory. You’re essentially running two copies of Python. To add to the complications, these two copies of Python need special treatment to talk to each other.
02:26 That being said, there are all sorts of computationally heavy things that can benefit greatly by using multiple processors in your computer. Why leave all that hardware idle?
02:36 There’s just a bit more work for you to do to take advantage of it all.
02:41
One of the ways of communicating between two processes in Python is to use a Queue
from the multiprocessing
library. Conceptually, this is similar to the thread-safe queues but is geared for interprocess communication, or IPC to the cool kids.
02:55
The only restriction is that anything put into the queue has to be serializable. The multiprocessing.Queue
uses Python’s pickle method to do serialization.
03:04
Anything you put into a Queue
has to be pickleable.
03:10
In order to demonstrate a multiprocessing.Queue
, I’m going to need to show you some multiprocessing code. This script starts up two processes and has one shout messages at the other.
03:21
A good practice in Python, or any language for that matter, is to assume that any library code is not thread-safe unless the documentation explicitly tells you that it is. Even the lowly print()
function can be dangerous, so I’ve implemented lprint()
, a version of print()
that is atomic.
03:39 It takes a lock and make sure that everything printed to the screen happens at once, before releasing the lock. Without this method, you could potentially see both of the processes printing to the terminal at the same time, and your output could overlap. That in hand, the first thing you’ll need is a method that can shout messages.
03:57
This one takes a lock and a multiprocessing queue as arguments. In order to insert some variety in the pacing of this contrived example, I’ve got the process randomly choosing between either sending a message or sending a message and taking a nap. The use of random.choice()
here results in a one-in-four chance of taking a nap after sending a message.
04:20 Once that’s figured out, I atomically print the message, then put the message onto the shared queue. Finally, if it is nap time, then the process sleeps for 100 milliseconds.
04:31
That’s one heck of a power nap. This whole process is wrapped up in a for
loop and is done 10 times. Before exiting the function, shout()
queues the message "done"
to tell the other process that it is finished. Let me just scroll down here.
04:50
The companion method to shout()
is hear()
. Like shout()
, it takes a lock and a queue as arguments. The first thing that the hear()
function does is receive a message from the queue. This is a blocking call. If there is nothing on the queue, then this process waits until there is. When the first message is received, you enter the while
loop and the atomic print is used to display the message.
05:15
Then the loop blocks, waiting for the next message. Once it’s received, you go to the top of the while
loop. If the message
is "done"
, you’re finished. You exit the hear()
function. If it’s not, you repeat the process.
05:29
shout()
and hear()
are functions that each run in a separate process. Let me show you how to tie all this together. The code inside of the if
block will only get run if the script is run as an executable, not on import.
05:44
The execution section starts by creating a shared Lock
and a shared Queue
. Then, a Process
object is created. The target
argument specifies the function to run inside of the process, passing in lock
and queue
as its arguments. For this first Process
object, the target
is the shout
function.
06:05
A second Process
object is then created, this time with the target
set to hear
.
06:11
Although the Process
objects have been created, the processes themselves have not been spawned. Calling the .start()
method on each Process
object will actually kick them off.
06:22
You don’t want the script to finish before the processes are done. Calling .join()
on the Process
objects causes the script to wait here until the processes are complete—in this case, when the shout()
and hear()
functions finish inside of their respective processes.
06:38 Let’s see this code in action.
06:43
Here it goes. That was pretty quick, let me see if I can break it down. Both processes start at the same time, but the hear()
function doesn’t do anything. It blocks, waiting for a message.
06:55
The shout()
process then shouts the message "1"
, then "2"
, then "3"
. Those messages are all in the queue, so the hear()
function wakes up and starts pulling them out.
07:07
The reason you don’t see Shout
and then Hear
in succession is because the two processes are fighting over the lock inside of lprint()
.
07:16
Just after the shout()
process has queued its third message, the hear()
process has won the lprint()
lock and manages to print out what it heard. Both hear()
and shout()
are still spinning, trying to compete.
07:30
shout()
, for whatever reason, has won the lock again for the printing, so it’s still spitting out messages. It does this for "4"
, "5"
, "6"
, "7"
, and then finally randomness works out and it decides to take a nap.
07:45
Within those 100 milliseconds, the hear()
function is now free to keep using the lprint()
lock, and so it consumes the message "2"
, "3"
, "4"
, "5"
, "6"
, "7"
, and then "8"
, at which point in time it blocks.
08:01
Somewhere inside of that process, shout()
has woken up. It messages the "9"
and the "10"
and then it messages "done"
, and it finishes. At this point, there’s no longer any contention for the print lock, so the hear()
function takes in message "9"
and "10"
, prints it out, gets the "done"
message, which I don’t print, and then it finishes.
08:25 Just as a quick demonstration of how the contention changes the behavior of the program, let me run it again.
08:36
Two things have affected the difference in the output here. One, the randomness of when to nap, and two, the lock contention. This time around, the randomness of the napping has meant that the hear()
function seems to be getting more chances to get at its queue.
08:53 One more time for giggles. And once again, the pattern has changed a little bit. This variety is what makes concurrent programming a challenge to debug.
09:06 Hopefully, you’ve got a clear understanding of how queues in multiprocessing work now, though.
09:13 Next up, I’ll close out the section on queues by talking about how to choose between the different libraries when writing your code.
Christopher Trudeau RP Team on June 30, 2022
Glad you’re finding value, a lot of people work very hard to bring it to you. Nice to know it is appreciated!
Jordan Bell on Nov. 26, 2024
The flush keyword argument is something I’ve used before with threads/concurrency. In your sample code, adding flush kwarg to print doesn’t change output. For what situations does using flush change the output? Tinkering with the code, if we remove the use of the lock from lprint
, then including the flush keyword does have an effect on the output.
Bartosz Zaczyński RP Team on Nov. 26, 2024
@Jordan Bell The print()
function in Python isn’t thread-safe, meaning that another thread could interrupt its execution while it’s printing, potentially mixing fragments of output from different threads. To avoid such issues, it’s generally recommended to use the logging
module, which is thread-safe. Alternatively, you can create a custom thread-safe wrapper for print()
, as demonstrated in the video, which uses locks to prevent interruptions during a print()
call.
The flush=True
parameter ensures that the print()
function’s internal buffer is written to the output stream immediately. By default, the standard output stream (stdout
) is line-buffered, so output may not appear right away if the buffer isn’t full. This behavior increases the liklihood of incomplete or interleaved output in a multithreaded environment.
Alternatively, instead of relying on flush=True
, you could also use the standard error stream (stderr
), which is unbuffered by default:
import sys
print("No buffering!", file=sys.stderr)
To directly address your question, using flush=True
guarantees that the output is displayed immediately in buffered streams. This can be especially useful when you need real-time feedback or during debugging.
Martin Breuss RP Team on Nov. 27, 2024
Just additionally leaving a link to our tutorial on How to Flush the Output of the Python Print Function because it discusses some related points in detail.
Become a Member to join the conversation.
marcinszydlowski1984 on June 29, 2022
Best site about Python I’ve seen. The problems shown here aren’t Python-specific but the quality and simplicity of presented media is worth the price.