Multi-core processing hangs

My code looks like the following. It seems to be "hanging" during the proc.join() loop. If I create the dataframe df with 10 records, the whole process completes fast, but starting with 10000 (as shown), then the program seems to just hang. I am using htop to look at the CPU core usages, and I do see all of them spike up to 100%, but then long after they go back down to 0%, the program doesn't seem to continue. Any ideas on what I'm doing wrong?

import pandas as pd
import numpy as np
import multiprocessing
from multiprocessing import Process, Queue

def do_something(df, partition, q):
    for index in partition:
        q.put([v for v in df.iloc[index]])

def start_parallel_processing(df, partitions):
    q = Queue()
    procs = []
    results = []

    for partition in partitions:
        proc = Process(target=do_something, args=(df, partition, q))
        proc.start()
        procs.extend([proc])

    for i in range(len(partitions)):
        results.append(q.get(True))

    for proc in procs:
        proc.join()

    return results

num_cpus = multiprocessing.cpu_count()
df = pd.DataFrame([(x, x+1) for x in range(10000)], columns=['x','y'])
partitions = np.array_split(df.index, num_cpus)
results = start_parallel_processing(df, partitions)
len(results)

1 answer

  • answered 2017-06-17 18:57 Mason.Chase

    It appears Queue.Queue doesn't behave as you want and it wasn't made for sharing between multiple process, instead you must use Manager.Queue()

    I have added some print to understand your code flow,

    You can still polish your code to use Pool() instead of num_cpus

    import pandas as pd
    import numpy as np
    import multiprocessing
    import pprint
    from multiprocessing import Process, Queue, Manager
    
    def do_something(df, partition, q):
        # print "do_something " + str(len(partition)) + " times"
        for index in partition:
            # print index
            for v in df.iloc[index]:
                #print "sending v to queue: " + str(len(df.iloc[index]))
                q.put(v, False)
    
        print "task_done(), qsize is "+ str(q.qsize())
    
    
    def start_parallel_processing(df, partitions):
        m = Manager()
        q = m.Queue()
        procs = []
        results = []
        print "START: launching "+ str(len(partitions)) + " process(es)"
        index = 0
        for partition in partitions:
            print "launching "+ str(len(partitions)) + " process"
            proc = Process(target=do_something, args=(df, partition, q))
            procs.extend([proc])
            proc.start()
            index += 1
            print "launched "+ str(index) + "/" + str(len(partitions)) + " process(es)"
    
        while True:
            try:
                results.append(q.get( block=False ))
            except:
                print "QUEUE END"
                break
    
        print pprint.pformat(results)
    
        process_count = 0
        for proc in procs:
            process_count += 1
            print "joining "+ str(process_count) + "/" + str(len(procs)) + " process(es)"
            proc.join()
    
        return results
    
    num_cpus = multiprocessing.cpu_count()
    df = pd.DataFrame([(x, x+1) for x in range(10000)], columns=['x','y'])
    partitions = np.array_split(df.index, num_cpus)
    results = start_parallel_processing(df, partitions)
    print "len(results) is: "+ str(len(results))