Parallel Processing With multiprocessing: Conclusion
In this section, you learned how to do parallel programming in Python using functional programming principles and the multiprocessing
module. You used the example data set based on an immutable data structure that you previously transformed using the built-in map()
function. But this time, you processed the data it in parallel, across multiple CPU cores using the Python multiprocessing
module available in the standard library.
You saw, step by step, how to parallelize an existing piece of Python code so that it can execute much faster and leverage all of your available CPU cores. You learned how to use the multiprocessing.Pool
class and its parallel map
implementation, which makes parallelizing most Python code that’s written in a functional style a breeze.
You built a little testbed program that you used to measure execution time with the time.time()
function, so that you could compare the single-threaded and multithreaded implementations of the same algorithm. Stay tuned for the next section in the course, where you’ll learn how to make your Python programs multithreaded using the concurrent.futures
module as an alternative way to implement concurrency.
00:00
I just love this way of parallel programming, because it is very easy to do if you write your code in a functional programming style. And if you’re using a map()
function, then it’s very easy to parallelize this code, as you’ve seen here, right?
00:16
I didn’t change anything, here, really. I mean, we made some cosmetic changes just to be able to trace what’s going on with this transform()
function, but, really, all I did was change these two lines of code.
00:29 And now all of a sudden, these calculations are running in parallel across multiple CPU cores. I think that’s a really powerful concept. Now, there’s more to this, of course. This is more, you know—this is really just scratching the surface, but I hope it is enough for you to see the value that this programming model brings with it.
00:48 I really want to encourage you with this video to go out and do your own experimentation, right? Maybe turn this into a little web scraper. Maybe you can do some I/O or some more expensive computations here, and then use the same technique to speed this up and actually get your result a lot faster. If you imagine you had to fetch 10,000 websites—well, if you did that sequentially, this could take a really long time, but if you can parallelize them and fetch these websites in batches of 100 items at a time, then you’re going to get a huge speedup.
01:20
It will be relatively easy to do this with the multiprocessing.Pool
module that’s built into Python. So yeah—give this a shot, play with it, get familiar with it. All right. Talk to you soon.
darth88vader88 on April 4, 2020
Any ideas, please, why this would happen:
>>> def transform(x):
... result = reduce(reducer,x,collections.defaultdict(list))
... return result
...
>>> pool = multiprocessing.Pool()
>>> result = pool.map(transform, scientists)
Traceback (most recent call last):
File "<input>", line 1, in <module>
result = pool.map(transform, scientists)
File "/Users/dv/anaconda3/lib/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Users/dv/anaconda3/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
AttributeError: 'str' object has no attribute 'field'
zorion on April 8, 2020
@darth88vader88
I think that your transform
function would try to reduce a sequence, but since you are using a map
you are passing one item (a scientist) each time.
So I guess transform(scientists)
would work for you, but transform(scientists[0])
would not.
darth88vader88 on April 13, 2020
Thanks, sir. Will certainly check out your suggestion.
norcal618 on May 28, 2020
I have a question about parallel processing. My computer is also a dual core with multi-threading, so I get how 4 processes run in parallel. What I don’t get is when I give the Pool object a parameter process=len(scientists), it is able to go beyond for processes at a time. Is it using parallelism along with concurrency in the case of going beyond the number of physical cores?
Dan Bader RP Team on May 28, 2020
@norcal618: When you tell the pool to spawn N worker processes with the process
parameter it follows that blindly and creates that number of processes. It doesn’t check how many physical or logical cores your machine has. It just creates the number of processes you tell it to, and then the operating system decides how those are run across the available cores. Does that clear it up a bit?
philipwong37 on Dec. 1, 2020
Parallel processing is great but if I need to keep track of the order of “task” and need them to come back in the order of for-loop I set up. How do I accomplish this without sacrificing the performance?
In addition, you have one function and one iteration up there. How do I handle two iterables? (I know we can use partial but I can only handle on iterable and one static variable. I have two parameters to the function and each parameter is a list of variables)
Bartosz Zaczyński RP Team on Dec. 2, 2020
@philipwong37 There are many ways to keep track of the tasks’ order. One thing that comes to mind is using a queue, but you can also take advantage of the executor class from the concurrent.futures
module.
If you want to process the results as soon as possible, then use the .map()
method:
from concurrent.futures import ProcessPoolExecutor
import time
import random
def download(url):
print(f"Starting the download of {url}")
time.sleep(random.randint(500, 3000) / 1000)
print(f"Downloaded {url}")
return url
files = ["file1", "file2", "file3"]
with ProcessPoolExecutor() as executor:
for result in executor.map(download, files):
print(f"Main process got: {result}")
You’ll still process the files in the right order on the parent process even when they’re downloaded out of order by the individual subprocesses:
$ python example.py
Starting the download of file1
Starting the download of file2
Starting the download of file3
Downloaded file2
Downloaded file1
Main process got: file1
Main process got: file2
Downloaded file3
Main process got: file3
On the other hand, you can wait until all files are downloaded before processing them, too:
from concurrent.futures import wait
# ...
with ProcessPoolExecutor() as executor:
futures = [executor.submit(download, file) for file in files]
wait(futures)
for future in futures:
print(f"Main process got: {future.result()}")
This won’t resume the main process until all subtasks are done:
$ python example.py
Starting the download of file1
Starting the download of file2
Starting the download of file3
Downloaded file2
Downloaded file3
Downloaded file1
Main process got: file1
Main process got: file2
Main process got: file3
To answer your second question, try a Cartesian product of your input parameters:
from itertools import product
# ...
servers = ["locahost", "remote"]
files = ["file1", "file2", "file3"]
with ProcessPoolExecutor() as executor:
for result in executor.map(download, *zip(*product(servers, files))):
print(f"Main process got: {result}")
Output:
Starting the download of locahost/file1
Starting the download of locahost/file2
Starting the download of locahost/file3
Starting the download of remote/file1
Starting the download of remote/file2
Starting the download of remote/file3
Main process got: locahost/file1
Main process got: locahost/file2
Main process got: locahost/file3
Main process got: remote/file1
Main process got: remote/file2
Main process got: remote/file3
Become a Member to join the conversation.
Jishnu Banerjee on Feb. 27, 2020
Thanks Dan for exposing this functional programming approach. In this particular context I guess it is worth mentioning that if “lambda” function is used inside the “map” function then we cannot re-use the same code inside Pool.map as it will fail to unpickle. So it is always good to use regular function and pass it to the map() as we exercise this functional programming approach.