14 Jul 2012
This article takes a first look at concurrent.futures and presents a
trivial demonstration of it. In case you missed it, here's part 1 of
the series.
Here's the complete program that this article will be discussing. It
launches 10 futures, which are handles on either processes or
threads under the control of the concurrent.futures module. Each
future sleeps for a random time between 0 and 10 seconds, then wakes
up to report who they are and how long they've been sleeping. All the
while the main thread of execution continues, counting to 10.
01 from concurrent.futures import ThreadPoolExecutor
02 import random
03 import time
04
05 def sleep_until_the_future(**kwargs):
06 time.sleep(kwargs['sleeptime'])
07 return kwargs
08
09 def say_hello_from_the_future(future):
10 result = future.result()
11 now = time.time()
12 actualsleep = now - result['launchtime']
13 print("This is future {}.".format(result['futurenum']),
14 "I was created at {} and".format(result['launchtime']),
15 "told to sleep for {} seconds.".format(result['sleeptime']),
16 "I woke up at {},".format(now),
17 "so I actually slept for {} seconds.".format(actualsleep))
18
19 def main():
20 executor = ThreadPoolExecutor(max_workers=10)
21 for i in range(10):
22 future = executor.submit(sleep_until_the_future,
23 futurenum=i,
24 sleeptime=(random.random() * 10),
25 launchtime = time.time())
26 future.add_done_callback(say_hello_from_the_future)
27 for i in range(10):
28 time.sleep(1)
29 print("MAIN THREAD. Seconds elapsed: {}".format(i + 1))
30
31 if __name__ == "__main__":
32 main()
In the first article of this series, I went to some pains to
differentiate between parallelism and concurrency. I did this because
the docs for concurrent.futures only contain examples of
parallelizing problems in the style of a map/reduce:
Here's the guts of one of the examples (both are very similar):
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
The process pool is created inside a with statement, which is
followed by a for that contains the fan-out of the futures. This is
a poor model for what I want to do, not only because the
executor.map() call halts the world, but because the with
statement creates a context containing the executor object.
This is bad because executor is the management handle for the pool
of futures. Once it goes out of scope, then the futures go out of
scope, and then the program is no longer concurrent or parallel. I
want to maintain the ability to do concurrent operations at any time.
It was frustrating to come to these realizations, and for a bit I was worried that I wouldn't be able to do what I had envisioned with the module. But it didn't take too much work to figure out what needed to be done, so now let's talk about the code.
The very simple -- and very obvious in retrospect -- solution to the
problems described above was to make sure the ThreadPoolExecutor
object hangs around for as long as I want. So the first thing I do in
the main() routine is create it.
19 def main():
20 executor = ThreadPoolExecutor(max_workers=10)
Now I have a TPE object (again named executor), which doesn't go out
of scope until the program ends, and which is controlling a pool of 10
thread-based futures.
There seems to be no way around the pool-based model of instantiating all futures at once -- but I discovered that I don't have to assign the entire pool to the same task, nor do I have to assign all futures a task at the same time.
21 for i in range(10):
22 future = executor.submit(sleep_until_the_future,
23 futurenum=i,
24 sleeptime=(random.random() * 10),
25 launchtime = time.time())
26 future.add_done_callback(say_hello_from_the_future)
Instead of calling executor.map() as the examples did, I call
executor.submit(). This lets me submit jobs one-at-a-time to my pool
of futures, and it also lets me have each future to do a different
job. executor.submit() has only one required argument: the function
where the future will begin execution. After that, any number of
positional and keyword arguments may be specified in the usual way,
and they will be passed on to the specified function.
On lines 22-25 I tell each of my pool of futures to run the function
sleep_until_the_future and supply three keyword arguments:
futurenum, which tells the future which of the pool it is. This is
simply for reporting purposes later on; it has nothing to do with
use of concurrent.futures.sleeptime, a random floating-point number between 0 and 10launchtime, the current time when the future was given its task.Line 26 is something else the examples don't show: how to give your
futures a callback. This is critical functionality, since I'm not
waiting on all futures to finish and then aggregating work
done. Instead, I want to do more work, as quickly as possible, as each
future completes its initial task. This is what add_done_callback()
allows. Its sole argument is the function which should be run by each
future, as soon as that future completes its work. In this program I
specify the same callback function for all futures:
say_hello_from_the_future.
27 for i in range(10):
28 time.sleep(1)
29 print("MAIN THREAD. Seconds elapsed: {}".format(i + 1))
These are the lines which will prove that the futures are working
independently and concurrently while the main thread continues
executing. It's exactly what it looks like: a loop that counts as ten
seconds go by, sleeping one second each time. While this is happening,
the futures will be off doing their thing. And now that I've shown you
the main() routine, we'll move on to what the futures are doing.
As explained above, the first argument to executor.submit() is the
routine where the futures will begin executing. (I'll call this the
initial function from now on.) In this program, they were all
assigned to start work in sleep_until_the_future(), so that's what
I'll talk about next.
5 def sleep_until_the_future(**kwargs):
6 time.sleep(kwargs['sleeptime'])
7 return kwargs
This is the whole routine, and it only does two things.
The first thing is that the future is put to sleep for
kwargs['sleeptime'] seconds. Recall that sleeptime was a randomly
generated number between 0 and 10, which was given as an argument to
the executor.submit() call. So each future immediately goes to sleep
for a period of time between 0 and 10 seconds.
The second thing is that, upon waking up, the function returns its keyword arguments. This is done because the return value of the initial function becomes the result value of the future object itself. This is so important that I'll say it again and then immediately show how it is used.
The return value of the initial function becomes the result value of the future object itself.
09 def say_hello_from_the_future(future):
10 result = future.result()
Here, in the first line of the future's callback function, we retrieve
its result value. The variable result is now holding the dictionary
of keyword arguments received by sleep_until_the_future() when it
was called by executor.submit().
Note that the only argument to the callback is the future object
itself. This is why anything that is needed by the callback must be in
the return value of the future's initial function: because that return
value becomes the value of future.result(), and future.result() is
the only mechanism for communicating from the future object back to
the rest of the program.
The remainder of the callback is rudimentary. The current time is checked and exactly how long the future was really asleep is calculated, then a message containing all the information we have about the future's state is printed out.
11 now = time.time()
12 actualsleep = now - result['launchtime']
13 print("This is future {}.".format(result['futurenum']),
14 "I was created at {} and".format(result['launchtime']),
15 "told to sleep for {} seconds.".format(result['sleeptime']),
16 "I woke up at {},".format(now),
17 "so I actually slept for {} seconds.".format(actualsleep))
This will happen while the main thread is sleeping, and the futures will report back out-of-order, as determined by their randomized sleep times. These behaviors show that the futures are executing both concurrently with, and independently from, each other and the main thread.
Here is the source of the script, and here is the output of a sample run.
Continue to part 3 of this series.