You have a program that performs a lot of CPU-intensive work, and you want to makeit run faster by having it take advantage of multiple CPUs.
The concurrent.futures library provides a ProcessPoolExecutor class that can beused to execute computationally intensive functions in a separately running instance ofthe Python interpreter. However, in order to use it, you first need to have some com‐putationally intensive work. Let’s illustrate with a simple yet practical example.Suppose you have an entire directory of gzip-compressed Apache web server logs:
logs/20120701.log.gz20120702.log.gz20120703.log.gz20120704.log.gz20120705.log.gz20120706.log.gz...
Further suppose each log file contains lines like this:
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] “GET /robots.txt ...” 200 71210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] “GET /ply/ ...” 200 11875210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] “GET /favicon.ico ...” 404 36961.135.216.105 - - [10/Jul/2012:00:20:04 -0500] “GET /blog/atom.xml ...” 304 -...
Here is a simple script that takes this data and identifies all hosts that have accessed therobots.txt file:
import gzipimport ioimport glob
def find_robots(filename):
‘''Find all of the hosts that access robots.txt in a single log file‘''robots = set()with gzip.open(filename) as f:
for line in io.TextIOWrapper(f,encoding='ascii'):> fields = line.split()if fields[6] == ‘/robots.txt':
robots.add(fields[0])
return robots
def find_all_robots(logdir):
‘''Find all hosts across and entire sequence of files‘''files = glob.glob(logdir+'/*.log.gz')all_robots = set()for robots in map(find_robots, files):
all_robots.update(robots)
return all_robots
if name == ‘main':
robots = find_all_robots(‘logs')for ipaddr in robots:
print(ipaddr)
The preceding program is written in the commonly used map-reduce style. The functionfind_robots() is mapped across a collection of filenames and the results are combinedinto a single result (the all_robots set in the find_all_robots() function).Now, suppose you want to modify this program to use multiple CPUs. It turns out tobe easy—simply replace the map() operation with a similar operation carried out on aprocess pool from the concurrent.futures library. Here is a slightly modified versionof the code:
import gzipimport ioimport globfrom concurrent import futures
def find_robots(filename):
‘''Find all of the hosts that access robots.txt in a single log file
‘''robots = set()with gzip.open(filename) as f:
for line in io.TextIOWrapper(f,encoding='ascii'):> fields = line.split()if fields[6] == ‘/robots.txt':
robots.add(fields[0])
return robots
def find_all_robots(logdir):
‘''Find all hosts across and entire sequence of files‘''files = glob.glob(logdir+'/*.log.gz')all_robots = set()with futures.ProcessPoolExecutor() as pool:
for robots in pool.map(find_robots, files):all_robots.update(robots)
return all_robots
if name == ‘main':
robots = find_all_robots(‘logs')for ipaddr in robots:
print(ipaddr)
With this modification, the script produces the same result but runs about 3.5 timesfaster on our quad-core machine. The actual performance will vary according to thenumber of CPUs available on your machine.
Typical usage of a ProcessPoolExecutor is as follows:from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:...do work in parallel using pool...
Under the covers, a ProcessPoolExecutor creates N independent running Python in‐terpreters where N is the number of available CPUs detected on the system. You canchange the number of processes created by supplying an optional argument to ProcessPoolExecutor(N). The pool runs until the last statement in the with block is executed,at which point the process pool is shut down. However, the program will wait until allsubmitted work has been processed.Work to be submitted to a pool must be defined in a function. There are two methodsfor submission. If you are are trying to parallelize a list comprehension or a map()operation, you use pool.map():
...return result
results = pool.map(work, data)
Alternatively, you can manually submit single tasks using the pool.submit() method:
...return result
with ProcessPoolExecutor() as pool:
...# Example of submitting work to the poolfuture_result = pool.submit(work, arg)
If you manually submit a job, the result is an instance of Future. To obtain the actualresult, you call its result() method. This blocks until the result is computed and re‐turned by the pool.Instead of blocking, you can also arrange to have a callback function triggered uponcompletion instead. For example:
def when_done(r):print(‘Got:', r.result())with ProcessPoolExecutor() as pool:future_result = pool.submit(work, arg)future_result.add_done_callback(when_done)
The user-supplied callback function receives an instance of Future that must be usedto obtain the actual result (i.e., by calling its result() method).Although process pools can be easy to use, there are a number of important consider‐ations to be made in designing larger programs. In no particular order:
decomposed into independent parts.
instance methods, closures, or other kinds of constructs are not supported.
carried out in a separate interpreter using interprocess communication. Thus, dataexchanged between interpreters has to be serialized.
effects. With the exception of simple things such as logging, you don’t really haveany control over the behavior of child processes once started. Thus, to preserve yoursanity, it is probably best to keep things simple and carry out work in pure-functionsthat don’t alter their environment.
clone of the Python interpreter, including all of the program state at the time of thefork. On Windows, an independent copy of the interpreter that does not clone stateis launched. The actual forking process does not occur until the first pool.map()or pool.submit() method is called.
threads. In particular, you should probably create and launch process pools priorto the creation of any threads (e.g., create the pool in the main thread at programstartup).