Java Concurrency with Akka: Composing Futures

I’ve been intrigued by Akka for a while but finally I was able to take it for a spin. The first thing I wanted to learn was how to compose Futures. Composing Futures provides a way to do two (or more) things at the same time and then wait until they are done. Typically in Java this would be done with a CyclicBarrier ExecutorService. But setting up the code to manage a CyclicBarrier is challenging. (UPDATE: Turns out it’s not very challenging, I just didn’t know how to do it. I’m new to concurrency in Java and didn’t find much on this stuff – probably because I didn’t even know what to search for.) So I put together a quick little demo that shows how to do the same thing with Futures in Akka.

All of the code for this demo is on github:
http://github.com/jamesward/AkkaFun

First I setup a Gradle build that pulls in the Akka dependency and will allow me to easily launch the demo app. Here is the build.gradle file:

apply plugin:"application"
mainClassName = "com.jamesward.akkafun.SimpleFutures"
 
repositories {
    mavenCentral()
}
 
dependencies {
  compile "se.scalablesolutions.akka:akka-actor:1.2-RC6"
}

For this demo I also wanted to increase the Akka timeout to 1 minute (the default is 5 seconds). To do this I created a src/main/resources/akka.conf file containing:

akka {
    actor {
        timeout = 60
    }
}

I then setup a Callable class that does some work and then returns it’s result. For this example the work is just to pause for a random amount of time and the result is the amount of time it paused for. Here is the src/main/java/com/jamesward/akkafun/RandomPause.java file:

package com.jamesward.akkafun;
 
import java.util.concurrent.Callable;
 
public class RandomPause implements Callable<Long>
{
 
    private Long millisPause;
 
    public RandomPause()
    {
        millisPause = Math.round(Math.random() * 8000) + 2000; // 2,000 to 10,000
        System.out.println(this.toString() + " will pause for " + millisPause + " milliseconds");
    }
 
    public Long call() throws Exception
    {
        Thread.sleep(millisPause);
        System.out.println(this.toString() + " was paused for " + millisPause + " milliseconds");
        return millisPause;
    }
}

I used a simple Java app to compose the RandomPause futures. Here is the src/main/java/com/jamesward/akkafun/SimpleFutures.java file:

package com.jamesward.akkafun;
 
import java.util.ArrayList;
import java.util.List;
 
import akka.dispatch.Future;
import static akka.dispatch.Futures.future;
import static akka.dispatch.Futures.sequence;
 
public class SimpleFutures
{
    public static void main(String[] args)
    {
        List<Future<Long>> futures = new ArrayList<Future<Long>>();
 
        System.out.println("Adding futures for two random length pauses");
 
        futures.add(future(new RandomPause()));
        futures.add(future(new RandomPause()));
 
        System.out.println("There are " + futures.size() + " RandomPause's currently running");
 
        // compose a sequence of the futures
        Future<Iterable<Long>> futuresSequence = sequence(futures);
 
        // block until the futures come back
        Iterable<Long> results = futuresSequence.get();
 
        System.out.println("All RandomPause's are complete");
 
        Long totalPause = 0L;
        for (Long result : results)
        {
            System.out.println("One pause was for " + result + " milliseconds");
            totalPause += result;
        }
 
        System.out.println("Total pause was for " + totalPause + " milliseconds");
    }
}

Lets walk through the pieces of this.

First, a place to store the list of Futures is created:

List<Future<Long>> futures = new ArrayList<Future<Long>>();

The Future object is parameterized with the type of result the Future will return – a Long in this case. (I’m using the Akka Future not the regular Java Future.)

The Futures.future static method is used to create a new Future from an instance of a Callable object and that Future is added to the list of Futures:

futures.add(future(new RandomPause()));

In this case a RandomPause instance is created. This is done twice to add two futures to the list.

You may have noticed in RandomPause that Callable is parameratized with a Long:

public class RandomPause implements Callable<Long>

The result of the work (the call method) returns a Long so the Callable and the Future must be parameratized with a Long.

In order to compose the futures together, another Future will be created containing the sequence of the list of futures:

Future<Iterable<Long>> futuresSequence = sequence(futures);

The Future is parameratized with an Iterable which is parameratized with a Long to match the result of the Callable. The Futures.sequence method is used to create the new Future from the list of Futures.

Using the futuresSequence the applicaiton can wait (or block) until the RandomPause objects in futures list have all returned, or the timeout was reached:

Iterable<Long> results = futuresSequence.get();

Each result is now available. That seems too easy! Thanks Akka!

Let me know if you have any questions about this example.

This entry was posted in Akka, Java. Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.
  • Thai Dang Vu

    “But setting up the code to manage a CyclicBarrier is challenging.”: could you elaborate a little bit more about it? Personally, in this example, I don’t see many advantanges of using Akka compared to CyclicBarrier or CountDownLatch except that with Akka, we don’t need to pass a CountDownLatch instance to each Callable and we don’t need to know beforehand how many Callable will be created (this is the biggest advantage that I see).

    • http://www.jamesward.com James Ward

      Maybe I did it wrong in the past, but my CyclcBarrier code was always much more verbose and harder to follow than this code. If you get a chance I’d love to see how you would write this example with a CyclicBarrier.

      • Thai Dang Vu
        package home;
         
        import java.util.ArrayList;
        import java.util.List;
        import java.util.concurrent.*;  // I'm using Idea 11 108.1333 and don't see the option to explicitly import Java classes!
         
        public class Main {
            public static void main(String[] args) throws Exception {
                List&lt;Future&gt; futures = new ArrayList&lt;Future&gt;();
         
                ExecutorService executorService = Executors.newFixedThreadPool(4);
         
                // 3 means we submit 2 tasks, that's the disadvantage of using CyclicBarrier
                CyclicBarrier barrier = new CyclicBarrier(3);
                futures.add(executorService.submit(new RandomPause(barrier)));
                futures.add(executorService.submit(new RandomPause(barrier)));
         
                barrier.await();
                System.out.println("All RandomPause's are complete");
         
                long totalPause = 0L;
                for (Future future : futures) {
                    System.out.println("One pause was for " + future.get() + " milliseconds");
                    totalPause += future.get();
                }
                System.out.println("Total pause was for " + totalPause + " milliseconds");
            }
        }
         
        class RandomPause implements Callable
        {
            private Long millisPause;
            private CyclicBarrier barrier;
         
            public RandomPause(CyclicBarrier barrier) {
                this.barrier = barrier;
                millisPause = Math.round(Math.random() * 8000) + 2000; // 2,000 to 10,000
                System.out.println(this.toString() + " will pause for " + millisPause + " milliseconds");
            }
         
            public Long call() throws Exception {
                Thread.sleep(millisPause);
                System.out.println(this.toString() + " was paused for " + millisPause + " milliseconds");
                barrier.await();
                return millisPause;
            }
        }
  • Danny

    I’m actually not even sure why you’d need a latch or a CyclicBarrier; this code looks almost identical to just using an ExecutorService and calling .get() on the returned futures, only your code uses Akka’s futures instead of java’s native futures. Is there something else I’m missing here? Conceptually, they seem near enough to identical, and programatically it’s the same amount of code (except in this case, you specify the timeout externally).

    I, too, am very interested in Akka, but I don’t feel this post does anything to highlight its benefits.

    • Thai Dang Vu

      “not even sure why you’d need a latch or a CyclicBarrier … almost identical to just using an ExecutorService and calling .get() on the returned futures”: agreed.
      “don’t feel this post does anything to highlight its benefits”: agreed again.

    • http://www.jamesward.com James Ward

      I really appreciate the feedback. I’m new to concurrency in Java and really didn’t know that I could just use an ExecutorService for this. The code that Thai posted is much simpler than the way I did this with the standard Java concurrency libraries. I’m definitely learning more about Java & Akka concurrency through this exercise. My goal with this exercise was not to “highlight the benefits” but rather just show how it would be done. Anyways, I really appreciate both of you jumping in here and helping me learn more about all of this. Thanks!

  • Dmitry Sobolev

    Some problem during running… Please, help

    Starting Build
    Settings evaluated using empty settings file.
    Projects loaded. Root project using build file '/home/dmitry/github/AkkaFun/build.gradle'.
    Included projects: [root project 'AkkaFun']
    Evaluating root project 'AkkaFun' using build file '/home/dmitry/github/AkkaFun/build.gradle'.
    All projects evaluated.
    Starting build for primary task 'run'.
    Tasks to be executed: [task ':compileJava', task ':processResources', task ':classes', task ':run']
    :compileJava
    :: loading settings :: url = jar:file:/opt/gradle/gradle-1.0-milestone-3/lib/ivy-2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    :: resolving dependencies :: #AkkaFun;unspecified
    	confs: [compile]
    :: resolution report :: resolve 678ms :: artifacts dl 0ms
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      compile     |   1   |   0   |   0   |   0   ||   0   |   0   |
    	---------------------------------------------------------------------
     
    :: problems summary ::
    :::: WARNINGS
    		module not found: se.scalablesolutions.akka#akka-actor;1.2-RC6
     
    	==== clientModule: tried
     
    	==== internal-repository: tried
     
    	==== MavenRepo: tried
     
    	  http://repo1.maven.org/maven2/se/scalablesolutions/akka/akka-actor/1.2-RC6/akka-actor-1.2-RC6.pom
     
    	  -- artifact se.scalablesolutions.akka#akka-actor;1.2-RC6!akka-actor.jar:
     
    	  http://repo1.maven.org/maven2/se/scalablesolutions/akka/akka-actor/1.2-RC6/akka-actor-1.2-RC6.jar
     
    		::::::::::::::::::::::::::::::::::::::::::::::
     
    		::          UNRESOLVED DEPENDENCIES         ::
     
    		::::::::::::::::::::::::::::::::::::::::::::::
     
    		:: se.scalablesolutions.akka#akka-actor;1.2-RC6: not found
     
    		::::::::::::::::::::::::::::::::::::::::::::::
     
     
     
    :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
     
    BUILD FAILED
     
    Total time: 4.765 secs
  • Pingback: Java future | Fabpro()

  • Pingback: Akka: Links, News And Resources (2) | Angel "Java" Lopez on Blog()



  • View James Ward's profile on LinkedIn