#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ An example of using the multiprocessing Pool to process a large text file with parallel processes. @author: bgregor """ # The problem: count lowercase digraphs (2-letter combinations) in a file. # # Example: Abacadabra # Output: {'ab': 2, 'ba': 1, 'ac': 1, 'ca': 1, 'ad': 1, 'da': 1, 'br': 1, 'ra': 1} import os import re import time # The multiprocessing library import multiprocessing as mp # A handy variation of the dictionary for adding up multiple dictionaries # https://docs.python.org/3/library/collections.html#collections.Counter from collections import Counter # A function to do the calculation. Uses a regular expression to find # letter pairs. # Regular expression docs: https://docs.python.org/3/library/re.html def get_counts(line): ''' Return a dictionary of digraphs and their counts from a line.''' tmp = line.lower() counts = {} for m in re.finditer(r'[a-z](?=[a-z])', tmp): graph = tmp[m.start():m.end()+1] counts[graph] = counts.get(graph, 0) + 1 return counts # Serial processing using the Python map() function def map_count(filename): ''' Basically the same as serial_count but reads the entire file into a list and runs the map() function ''' with open(filename, encoding='utf-8') as f: lines = f.readlines() multi_counts = map(get_counts,lines) ser_counts = Counter() for count in multi_counts: ser_counts.update(count) return ser_counts # Now let's trying processing this with a Pool. # Steps: # 1. Start up a Pool with a number of processes. # 2. Read the lines of the file into a list. # 3. Use the Pool to run get_counts() over the lines of the # file. # 4. Combine the returned counts from the Pool into a single # dictionary. # Method 1 to start the pool: # pool = mp.Pool(processes=2) # ...python code... #shut it down. # pool.close() # pool.join() # Or use 'with' to start up and shut it down: def par_count(filename, nprocs=2): with mp.Pool(processes=nprocs) as pool: # The pool is now running. Read the file. This is still # on the main process. with open(filename, encoding='utf-8') as f: lines = f.readlines() # The Python map function applies a function to every element of # a list and returns a new list. pool.map() works the same way, but # distributes the function & data to the worker processes multi_counts = pool.map(get_counts,lines) # ****** # The order of multi_counts IS THE SAME as it would be # in a serial computation. This is a property of the pool.map() # ****** # Shut down the pool by leaving the 'with' code block. Combine the counts. par_counts = Counter() for count in multi_counts: par_counts.update(count) return par_counts # Note - on Windows when doing multiprocessing you MUST use this # convention to start the script: if __name__=='__main__': # shakespeare.txt - The complete works of William Shakespeare. # 169k lines, 5.7M characters. in_filename = '../data/shakespeare.txt' # Run in serial. st = time.perf_counter() mc = map_count(in_filename) et = time.perf_counter() print('Serial: %s sec' % (et-st)) # And in parallel. The number of processes can # be specified here. If submitted as an SCC job # pull the number of cores to use from the environment. n_cores = 2 if 'NSLOTS' in os.environ: n_cores = int(os.environ['NSLOTS']) # Put a practical limit on this. n_cores = 8 if n_cores > 8 else n_cores st = time.perf_counter() # Run in parallel par = par_count(in_filename, nprocs=n_cores) et = time.perf_counter() print('Parallel: %s sec' % (et-st))