Java Concurrency with Akka: Composing Futures

I’ve been intrigued by Akka for a while but finally I was able to take it for a spin. The first thing I wanted to learn was how to compose Futures. Composing Futures provides a way to do two (or more) things at the same time and then wait until they are done. Typically in Java this would be done with a <del datetime="2011-09-23T22:23:58+00:00">CyclicBarrier</del> ExecutorService. But setting up the code to manage a CyclicBarrier is challenging. (UPDATE: Turns out it’s not very challenging, I just didn’t know how to do it. I’m new to concurrency in Java and didn’t find much on this stuff - probably because I didn’t even know what to search for.) So I put together a quick little demo that shows how to do the same thing with Futures in Akka.

All of the code for this demo is on github:

http://github.com/jamesward/AkkaFun

First I setup a Gradle build that pulls in the Akka dependency and will allow me to easily launch the demo app. Here is the build.gradle file:

apply plugin:"application"
mainClassName = "com.jamesward.akkafun.SimpleFutures"

repositories {
    mavenCentral()
}

dependencies {
  compile "se.scalablesolutions.akka:akka-actor:1.2-RC6"
}

For this demo I also wanted to increase the Akka timeout to 1 minute (the default is 5 seconds). To do this I created a src/main/resources/akka.conf file containing:

akka {
    actor {
        timeout = 60
    }
}

I then setup a Callable class that does some work and then returns it’s result. For this example the work is just to pause for a random amount of time and the result is the amount of time it paused for. Here is the src/main/java/com/jamesward/akkafun/RandomPause.java file:

package com.jamesward.akkafun;

import java.util.concurrent.Callable;

public class RandomPause implements Callable<Long>
{

    private Long millisPause;

    public RandomPause()
    {
        millisPause = Math.round(Math.random() * 8000) + 2000; // 2,000 to 10,000
        System.out.println(this.toString() + " will pause for " + millisPause + " milliseconds");
    }

    public Long call() throws Exception
    {
        Thread.sleep(millisPause);
        System.out.println(this.toString() + " was paused for " + millisPause + " milliseconds");
        return millisPause;
    }
}

I used a simple Java app to compose the RandomPause futures. Here is the src/main/java/com/jamesward/akkafun/SimpleFutures.java file:

package com.jamesward.akkafun;

import java.util.ArrayList;
import java.util.List;

import akka.dispatch.Future;
import static akka.dispatch.Futures.future;
import static akka.dispatch.Futures.sequence;

public class SimpleFutures
{
    public static void main(String[] args)
    {
        List<Future<Long>> futures = new ArrayList<Future<Long>>();

        System.out.println("Adding futures for two random length pauses");

        futures.add(future(new RandomPause()));
        futures.add(future(new RandomPause()));

        System.out.println("There are " + futures.size() + " RandomPause's currently running");

        // compose a sequence of the futures
        Future<Iterable<Long>> futuresSequence = sequence(futures);

        // block until the futures come back
        Iterable<Long> results = futuresSequence.get();

        System.out.println("All RandomPause's are complete");

        Long totalPause = 0L;
        for (Long result : results)
        {
            System.out.println("One pause was for " + result + " milliseconds");
            totalPause += result;
        }

        System.out.println("Total pause was for " + totalPause + " milliseconds");
    }
}

Lets walk through the pieces of this.

First, a place to store the list of Futures is created:

List<Future<Long>> futures = new ArrayList<Future<Long>>();

The Future object is parameterized with the type of result the Future will return - a Long in this case. (I’m using the Akka Future not the regular Java Future.)

The Futures.future static method is used to create a new Future from an instance of a Callable object and that Future is added to the list of Futures:

futures.add(future(new RandomPause()));

In this case a RandomPause instance is created. This is done twice to add two futures to the list.

You may have noticed in RandomPause that Callable is parameratized with a Long:

public class RandomPause implements Callable<Long>

The result of the work (the call method) returns a Long so the Callable and the Future must be parameratized with a Long.

In order to compose the futures together, another Future will be created containing the sequence of the list of futures:

Future<Iterable<Long>> futuresSequence = sequence(futures);

The Future is parameratized with an Iterable which is parameratized with a Long to match the result of the Callable. The Futures.sequence method is used to create the new Future from the list of Futures.

Using the futuresSequence the applicaiton can wait (or block) until the RandomPause objects in futures list have all returned, or the timeout was reached:

Iterable<Long> results = futuresSequence.get();

Each result is now available. That seems too easy! Thanks Akka!

Let me know if you have any questions about this example.