Python run two loops at the same time where one is rate limited and depends on data from the other

I have a problem in python where I want to run two loops at the same time. I feel like I need to do this because the second loop needs to be rate limited, but the first loop really shouldn't be rate limited. Also, the second loop takes an input from the first.

I'm looking fro something that works something like this:

for line in file:
do some stuff
list = []
list.append("an_item")

Rate limited:
for x in list:
do some stuff simultaneously

2 answers

  • answered 2018-02-13 02:20 pstatix

    You need to do 2 things:

    1. Put the function require data from the other on its own process
    2. Implement a way to communicate between the two processes (e.g. Queue)

    All of this must be done thanks to the GIL.

  • answered 2018-02-13 02:20 Nathan VÄ“rzemnieks

    There are two basic approaches with different tradeoffs: synchronously switching between tasks, and running in threads or subprocesses. First, some common setup:

    from queue import Queue # or Queue, if python 2
    work = Queue()
    
    def fast_task():
        """ Do the fast thing """
        if done:
            return None
        else:
            return result
    
    def slow_task(arg):
        """ Do the slow thing """
    
    RATE_LIMIT = 30 # seconds
    

    Now, the synchronous approach. It has the advantage of being much simpler, and easier to debug, at the cost of being a bit slower. How much slower depends on the details of your tasks. How it works is, we run a tight loop that calls the fast job every time, and the slow job only if enough time has passed. If the fast job is no longer producing work and the queue is empty, we quit.

    import time
    last_call = 0
    
    while True:
        next_job = fast_task()
        if next_job:
            work.put(next_job)
        elif work.empty():
            # nothing left to do
            break
        else:
            # fast task has done all its work - short sleep to slow the spin
            time.sleep(.1)
    
        now = time.time()
        if now - last_call > RATE_LIMIT:
            last_call = now
            slow_task(work.get())
    

    If you feel like this doesn't work fast enough, you can try the multiprocessing approach. You can use the same structure for working with threads or processes, depending on whether you import from multiprocessing.dummy or multiprocessing itself. We use a multiprocessing.Queue for communication instead of queue.Queue.

    def do_the_fast_loop(work_queue):
        while True:
            next_job = fast_task()
            if next_job:
                work_queue.put(next_job)
            else:
                work_queue.put(None) # sentinel - tells slow process to quit
                break
    
    def do_the_slow_loop(work_queue):
        next_call = time.time()
        while True:
            job = work_queue.get()
            if job is None: # sentinel seen - no more work to do
                break
            time.sleep(max(0, next_call - time.time()))
            next_call = time.time() + RATE_LIMIT
            slow_task(job)
    
    if __name__ == '__main__':
        # from multiprocessing.dummy import Queue, Process # for threads
        from multiprocessing import Queue, Process # for processes
        work = Queue()
        fast = Process(target=fast_task, args=(work,))
        slow = Process(target=slow_task, args=(work,))
        fast.start()
        slow.start()
        fast.join()
        slow.join()
    

    As you can see, there's quite a lot more machinery for you to implement, but it will be somewhat faster. Again, how much faster depends a lot on your tasks. I'd try all three approaches - synchronous, threaded, and multiprocess - and see which you like best.