Sunday, June 29, 2014

Python Multiprocess programming - A Pool of Workers


Python's Global Interpreter Lock prevents multithreading to actually be parallelizable, and sometimes making it much slower than single thread version. To obtain true parallelism, we need to use multiple processes.

Here is the code to start a certain number of processes, and pass tasks to them through a queue. The done_queue.get() blocking function ensures that the parent process remains active until the child threads finish everything. Once we obtain all the results, we stop the child processes using the "STOP" input (as even the child processes also block on input.get()).


import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(2*random.random())
    return a * b

def plus(a, b):
    time.sleep(2*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 8
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

References: 
1. https://docs.python.org/dev/library/multiprocessing.html