FIREPEAR //////////

INFORMATICS //////

Futures: Concurrency in Python 3, Part 2

14 Jul 2012

This article takes a first look at concurrent.futures and presents a trivial demonstration of it. In case you missed it, here's part 1 of the series.

Here's the complete program that this article will be discussing. It launches 10 futures, which are handles on either processes or threads under the control of the concurrent.futures module. Each future sleeps for a random time between 0 and 10 seconds, then wakes up to report who they are and how long they've been sleeping. All the while the main thread of execution continues, counting to 10.

01  from concurrent.futures import ThreadPoolExecutor
02  import random
03  import time
04  
05  def sleep_until_the_future(**kwargs):
06      time.sleep(kwargs['sleeptime'])
07      return kwargs
08  
09  def say_hello_from_the_future(future):
10      result = future.result()
11      now = time.time()
12      actualsleep = now - result['launchtime']
13      print("This is future {}.".format(result['futurenum']),
14            "I was created at {} and".format(result['launchtime']),
15            "told to sleep for {} seconds.".format(result['sleeptime']),
16            "I woke up at {},".format(now),
17            "so I actually slept for {} seconds.".format(actualsleep))
18  
19  def main():
20      executor = ThreadPoolExecutor(max_workers=10)
21      for i in range(10):
22          future = executor.submit(sleep_until_the_future,
23                                   futurenum=i,
24                                   sleeptime=(random.random() * 10),
25                                   launchtime = time.time())
26          future.add_done_callback(say_hello_from_the_future)
27      for i in range(10):
28          time.sleep(1)
29          print("MAIN THREAD. Seconds elapsed: {}".format(i + 1))
30  
31  if __name__ == "__main__":
32      main()

Concurrency (or not) in the 'future' docs

In the first article of this series, I went to some pains to differentiate between parallelism and concurrency. I did this because the docs for concurrent.futures only contain examples of parallelizing problems in the style of a map/reduce:

Here's the guts of one of the examples (both are very similar):

with concurrent.futures.ProcessPoolExecutor() as executor:
    for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
        print('%d is prime: %s' % (number, prime))

The process pool is created inside a with statement, which is followed by a for that contains the fan-out of the futures. This is a poor model for what I want to do, not only because the executor.map() call halts the world, but because the with statement creates a context containing the executor object.

This is bad because executor is the management handle for the pool of futures. Once it goes out of scope, then the futures go out of scope, and then the program is no longer concurrent or parallel. I want to maintain the ability to do concurrent operations at any time.

It was frustrating to come to these realizations, and for a bit I was worried that I wouldn't be able to do what I had envisioned with the module. But it didn't take too much work to figure out what needed to be done, so now let's talk about the code.

Keeping a handle on things

The very simple -- and very obvious in retrospect -- solution to the problems described above was to make sure the ThreadPoolExecutor object hangs around for as long as I want. So the first thing I do in the main() routine is create it.

19  def main():
20      executor = ThreadPoolExecutor(max_workers=10)

Now I have a TPE object (again named executor), which doesn't go out of scope until the program ends, and which is controlling a pool of 10 thread-based futures.

There seems to be no way around the pool-based model of instantiating all futures at once -- but I discovered that I don't have to assign the entire pool to the same task, nor do I have to assign all futures a task at the same time.

21      for i in range(10):
22          future = executor.submit(sleep_until_the_future,
23                                   futurenum=i,
24                                   sleeptime=(random.random() * 10),
25                                   launchtime = time.time())
26          future.add_done_callback(say_hello_from_the_future)

Instead of calling executor.map() as the examples did, I call executor.submit(). This lets me submit jobs one-at-a-time to my pool of futures, and it also lets me have each future to do a different job. executor.submit() has only one required argument: the function where the future will begin execution. After that, any number of positional and keyword arguments may be specified in the usual way, and they will be passed on to the specified function.

On lines 22-25 I tell each of my pool of futures to run the function sleep_until_the_future and supply three keyword arguments:

Line 26 is something else the examples don't show: how to give your futures a callback. This is critical functionality, since I'm not waiting on all futures to finish and then aggregating work done. Instead, I want to do more work, as quickly as possible, as each future completes its initial task. This is what add_done_callback() allows. Its sole argument is the function which should be run by each future, as soon as that future completes its work. In this program I specify the same callback function for all futures: say_hello_from_the_future.

27      for i in range(10):
28          time.sleep(1)
29          print("MAIN THREAD. Seconds elapsed: {}".format(i + 1))

These are the lines which will prove that the futures are working independently and concurrently while the main thread continues executing. It's exactly what it looks like: a loop that counts as ten seconds go by, sleeping one second each time. While this is happening, the futures will be off doing their thing. And now that I've shown you the main() routine, we'll move on to what the futures are doing.

The future starts here

As explained above, the first argument to executor.submit() is the routine where the futures will begin executing. (I'll call this the initial function from now on.) In this program, they were all assigned to start work in sleep_until_the_future(), so that's what I'll talk about next.

5  def sleep_until_the_future(**kwargs):
6      time.sleep(kwargs['sleeptime'])
7      return kwargs

This is the whole routine, and it only does two things.

The first thing is that the future is put to sleep for kwargs['sleeptime'] seconds. Recall that sleeptime was a randomly generated number between 0 and 10, which was given as an argument to the executor.submit() call. So each future immediately goes to sleep for a period of time between 0 and 10 seconds.

The second thing is that, upon waking up, the function returns its keyword arguments. This is done because the return value of the initial function becomes the result value of the future object itself. This is so important that I'll say it again and then immediately show how it is used.

The return value of the initial function becomes the result value of the future object itself.

The callback

09  def say_hello_from_the_future(future):
10      result = future.result()

Here, in the first line of the future's callback function, we retrieve its result value. The variable result is now holding the dictionary of keyword arguments received by sleep_until_the_future() when it was called by executor.submit().

Note that the only argument to the callback is the future object itself. This is why anything that is needed by the callback must be in the return value of the future's initial function: because that return value becomes the value of future.result(), and future.result() is the only mechanism for communicating from the future object back to the rest of the program.

The remainder of the callback is rudimentary. The current time is checked and exactly how long the future was really asleep is calculated, then a message containing all the information we have about the future's state is printed out.

11      now = time.time()
12      actualsleep = now - result['launchtime']
13      print("This is future {}.".format(result['futurenum']),
14            "I was created at {} and".format(result['launchtime']),
15            "told to sleep for {} seconds.".format(result['sleeptime']),
16            "I woke up at {},".format(now),
17            "so I actually slept for {} seconds.".format(actualsleep))

This will happen while the main thread is sleeping, and the futures will report back out-of-order, as determined by their randomized sleep times. These behaviors show that the futures are executing both concurrently with, and independently from, each other and the main thread.

Here is the source of the script, and here is the output of a sample run.

Continue to part 3 of this series.