Reactive Web Request Batching with Scala and Play Framework

At first glance it seems silly to do batching in the reactive world. When I first started with reactive programming I thought I wouldn’t have to worry about things like resource starvation. After all, the reactive magic bullet was *magical*! But my magic bullet fizzled when it hit downstream resource constraints causing me to need batching.

With a reactive web client library like Play Framework’s, I can easily spin up tens of thousands of web requests, in parallel, using very little resources. But what if that saturates the server I’m making requests to? In an ideal world I could get backpressure but most web endpoints don’t provide a way to do that. So we just have to be nicer to the server and batch the requests. (Sidenote: How do you know how nice you should be to the service, e.g. batch size?)

So in some scenarios you might just need to batch a bunch of web requests. Let’s say you need to make 100 requests and you’ve figured out that you should only do 10 at a time. Then once all 100 have completed you need to do some aggregation of the results. Of course you could block until each batch is complete and mutate a shared value that accumulates the results. But we can do better than that! You can still use immutable data and non-blocking requests with batching.

In Play I have a /echo?num=1 request handler that just returns the number passed into it:

def echo(num: Int) = Action {"num=$num")

Now in another async controller I want to send 10,000 requests to the echo controller but batched 256 at a time, then return the aggregated results. First I need a function that will handle the batching:

private def processBatch(results: Future[Seq[String]], batch: Seq[String]): Future[Seq[String]] = {
  // when the results for the previous batches have been completed, start a new batch
  results.flatMap { responses =>
    // create the web requests for this batch
    val batchFutures: Seq[Future[String]] =
    // sequence the futures for this batch into a singe future
    val batchFuture: Future[Seq[String]] = Future.sequence(batchFutures)
    // when this batch is complete, append the responses to the existing responses { batchResponses =>"Finished a batch")
      responses ++ batchResponses

This processBatch function takes a Future that holds the previously accumulated results which are a sequence of Strings. It also takes a batch of urls and returns a Future that holds the previously accumulated results and the results from the batch. When the results that were passed in have all been completed, the batch is processed by creating a web request for each URL. Each request is transformed to just the String from the response body. All of the requests in the batch are then transformed into a single Future that completes when all of the requests in the batch have been completed. Once that future completes the results of the batch are combined with the previously completed results.

This could also be written using Scala’s fancy for comprehension:

for {
  responses <- results
  batchFuture = Future.sequence(
  batchResponses <- batchFuture
} yield {"Finished a batch")
  responses ++ batchResponses

Here is an async controller that uses the processBatch function:

def index = Action.async { implicit request =>
  // ints 1 to 10000
  val numbers: Seq[Int] = 1 to 10000
  // turn each number into a url
  val urls: Seq[String] =
  // split into groups of 256
  val batches: Iterator[Seq[String]] = urls.grouped(256)
  // futures for all of the response strings
  val futureResponses = batches.foldLeft(Future.successful(Seq.empty[String]))(processBatch)
  // render the list of responses when they are all complete { responses =>

A sequence of 10,000 URLs /echo?num=1 to /echo?num=10000 are assembled. That sequence is then partitioned into groups of 256. Then the reactive batching magic… Take the batches and do a foldLeft starting with an empty sequence of String, calling the processBatch function for each batch, accumulating the results. Once the future returned from the foldLeft completes the results are turned into a String and returned in an the Ok response.

There you have reactive batching of web requests! Check out the full source.

Combining Reactive Streams, Heroku Kafka, and Play Framework

Heroku recently announced early access to the new Heroku Kafka service and while I’ve heard great things about Apache Kafka I hadn’t played with it because I’m too lazy to set that kind of stuff up on my own. Now that I can setup a Kafka cluster just by provisioning a Heroku Addon I figured it was time to give it a try.

If you aren’t familiar with Kafka it is kinda a next generation messaging system. It uses pub-sub, scales horizontally, and has built-in message durability and delivery guarantees. Originally Kafka was built at LinkedIn but is now being used by pretty much every progressive enterprise that needs to move massive amounts of data through transformation pipelines.

While learning Kafka I wanted to build something really simple: an event producer that just sends random numbers to a Kafka topic and a event consumer that receives those random numbers and sends them to a browser via a WebSocket. I decided to use Play Framework and the Akka Streams implementation of Reactive Streams.

In Reactive Streams there is the pretty standard “Source” and “Sink” where an event producer is a Source and a consumer is a Sink. A “Flow” is a pairing between a Source and a Sink with an optional transformation. In my example there are two apps, each with a Flow. A worker process will send random numbers to Kafka so its Source will be periodically generated random numbers and its Sink will be Kafka. In the web process the Source is Kafka and the Sink is a WebSocket that will push the random numbers to the browser.

Here is the worker app with some necessary config omitted (check out the full source):

object RandomNumbers extends App {
  val tickSource = Source.tick(Duration.Zero, 500.milliseconds, Unit).map(_ => Random.nextInt().toString)
  kafka.sink("RandomNumbers").map { kafkaSink =>
      .map(new ProducerRecord[String, String]("RandomNumbers", _))

The tickSource is a Source that generates a new random Int every 500 milliseconds. That Source is connected to a Kafka Sink with a Flow that transforms an Int into a ProducerRecord (for Kafka). This uses the Reactive Kafka library which is a Reactive Streams API for working with Kafka.

On the web app side, Play Framework has builtin support for using Reactive Streams with WebSockets so all we need is a controller method that creates a Source from a Kafka topic and hooks that to a WebSocket Flow (full source):

def ws = WebSocket.acceptOrResult[Any, String] { _ =>
  kafka.source(Set("RandomNumbers")) match {
    case Failure(e) =>
      Future.successful(Left(InternalServerError("Could not connect to Kafka")))
    case Success(source) =>
      val flow = Flow.fromSinkAndSource(Sink.ignore,

Notice that the Flow has a Sink.ignore which just says to ignore any incoming messages on the WebSocket (those sent from the browser). Play takes care of all the underlying stuff and then whenever Kafka gets a message on the “RandomNumbers” topic, it will be sent out via the WebSocket.

And it all works!

Check out the full source for instructions on how to get this example setup on your machine and on Heroku. Let me know how it goes!

Reactive Postgres with Play Framework & ScalikeJDBC

Lately I’ve built a few apps that have relational data. Instead of trying to shoehorn that data into a NoSQL model I decided to use the awesome Heroku Postgres service but I didn’t want to lose out on the Reactiveness that most of the NoSQL data stores support. I discovered ScalikeJDBC-Async which uses postgresql-async, a Reactive (non-blocking), JDBC-ish, Postgres driver. With those libraries I was able to keep my data relational and my app Reactive all the way down. Lets walk through how to do it in a Play Framework app. (TL;DR: Jump to the the full source.)

If you want to start from scratch, create a new Play app from the Play Scala Seed.

The minimum dependencies needed in the build.sbt file are:

libraryDependencies ++= Seq(
  "org.postgresql"       %  "postgresql"                    % "9.3-1102-jdbc41",
  "com.github.tototoshi" %% "play-flyway"                   % "1.2.0",
  "com.github.mauricio"  %% "postgresql-async"              % "0.2.16",
  "org.scalikejdbc"      %% "scalikejdbc-async"             % "0.5.5",
  "org.scalikejdbc"      %% "scalikejdbc-async-play-plugin" % "0.5.5"

The play-flyway library handles schema evolutions using Flyway. It is a great alternative to Play’s JDBC module because it just does evolutions and does one-way evolutions (i.e. no downs). But because play-flyway doesn’t use the postgresql-async driver, it needs the standard postgresql JDBC driver as well.

The scalikejdbc-async-play-plugin library manages the lifecycle of the connection pool used by scalikejdbc-async in a Play app.

To use play-flyway and scalikejdbc-async-play-plugin a conf/play.plugins file must tell Play about the plugins:


A first evolution script in conf/db/migration/default/V1__create_tables.sql will create a table named bar that will hold a list of bars for our little sample app:


You will of course need a Postgres database to proceed. You can either install one locally or create a free one on the Heroku Postres cloud service. Then update the conf/application.conf file to point to the database:


The last line above overrides the database connection url if there is a DATABASE_URL environment variable set (which is the case if your app is running on Heroku).

To run this app locally you can start the Play app by starting the Activator UI or from the command line with:

activator ~run

When you first open your app in the browser, the play-flyway plugin should detect that evolutions needs to be applied and ask you to apply them. Once applied you will be ready to create a simple database object and a few reactive request handlers.

Here is a Bar database object named app/models/Bar.scala that uses scalikejdbc-async for reactive creation and querying of Bars:

package models
import play.api.libs.json.Json
import scalikejdbc.WrappedResultSet
import scalikejdbc._
import scalikejdbc.async._
import scalikejdbc.async.FutureImplicits._
import scala.concurrent.Future
case class Bar(id: Long, name: String)
object Bar extends SQLSyntaxSupport[Bar] {
  implicit val jsonFormat = Json.format[Bar]
  override val columnNames = Seq("id", "name")
  lazy val b = Bar.syntax
  def db(b: SyntaxProvider[Bar])(rs: WrappedResultSet): Bar = db(b.resultName)(rs)
  def db(b: ResultName[Bar])(rs: WrappedResultSet): Bar = Bar(
  def create(name: String)(implicit session: AsyncDBSession = AsyncDB.sharedSession): Future[Bar] = {
    val sql = withSQL(insert.into(Bar).namedValues( -> name).returningId)
    sql.updateAndReturnGeneratedKey().map(id => Bar(id, name))
  def findAll(implicit session: AsyncDBSession = AsyncDB.sharedSession): Future[List[Bar]] = {
    withSQL(select.from[Bar](Bar as b)).map(Bar.db(b))

The db functions perform the mapping from SQL results to the Bar case class.

The create function takes a Bar name and returns a Future[Bar] by doing a non-blocking insert using the ScalikeJDBC Query DSL. When the insert has completed the primary key is returned and a new Bar instance is created and returned.

The findAll method uses the ScalikeJDBC Query DSL to select all of the Bars from the database, returning a Future[List[Bar]]].

Now that we have a reactive database object, lets expose these through reactive request handlers. First setup the routes in the conf/routes file:

GET        /bars                   controllers.Application.getBars
POST       /bars                   controllers.Application.createBar

Define the controller functions in the app/controllers/Application.scala file:

def getBars = Action.async { { bars =>
def createBar = Action.async(parse.urlFormEncoded) { request =>
  Bar.create(request.body("name").head).map { bar =>

Both functions use Action.async which holds a function that takes a request and returns a response (Result) in the future. By returning a Future[Result] Play is able to make requests to the controller function non-blocking. The getBars controller function calls the Bar.findAll and then transforms the Future[List[Bar]] into a Future[Result], the 200 response containing the JSON serialized list of bars. The createBar controller function parses the request, creates the Bar, and then transforms the Future[Bar] into a Future[Result] once the Bar has been created.

From the non-blocking perspective, here is what a request to the getBars controller function looks like:

  1. Web request made to /bars
  2. Thread allocated to web request
  3. Database request made for the SQL select
  4. Thread allocated to the database request
  5. Web request thread is deallocated (but the connection remains open)
  6. Database request thread is deallocated (but the connection remains open)
  7. Database response handler reallocates a thread
  8. SQL result is transformed to List[Bar]
  9. Database response thread is deallocated
  10. Web response handler reallocates a thread
  11. Web response is created from the list of bars
  12. Web response thread is deallocated

So everything is now reactive all the way down because there is a moment where the web request is waiting on the database to respond but no threads are allocated to the request.

Try it yourself with curl:

$ curl -X POST -d "name=foo" http://localhost:9000/bars
$ curl http://localhost:9000/bars

Grab the the full source and let me know if you have any questions. Thanks!

Going Reactive at OSCON 2014

This year at OSCON I will be leading a hands-on lab and presenting about Reactive, Play Framework, and Scala. Here are two sessions:

  • Reactive All The Way Down (lab) – 9:00am Monday, July 21

    In this tutorial you will build a Reactive application with Play Framework, Scala, WebSockets, and AngularJS. We will get started with a template app in Typesafe Activator. Then we will add a Reactive RESTful JSON service and a WebSocket in Scala. We will then build the UI with AngularJS.

  • Building Modern Web Apps with Play Framework and Scala – 2:30pm Wednesday, July 23

    Play Framework is the High Velocity Web Framework For Java and Scala. It is lightweight, stateless, RESTful, and developer friendly. This is an introduction to building web applications with Play. You will learn about: routing, Scala controllers & templates, database access, asset compilation for LESS & CoffeeScript, and JSON services.

Hope to see you there!

Optimizing Static Asset Loading with Play Framework

Modern web applications are a mix of a back-end services, dynamic web pages, custom static assets, and library-based static assets. To maintain a productive development process it is easiest to have all this stuff in one project. But in production there are a number of optimizations that can dramatically speed up the loading of the application.

HTTP 304, Not Modified, responses enable the browser to not re-download an entire static asset a second time. Far-future expires enable the browser to cache static assets for a very long time so that they never request them a second time. The challenge with far-future expires is that you need to have a way to invalidate that cache. Asset fingerprinting allows you to do that invalidation. GZip encoding compresses the static assets. Putting static assets on a CDN caches the static assets near the consumer of the content.

Lets look at how to setup these different optimizations with a Play Framework application.

Not Modified Responses

In Play you don’t need to do anything special to enable 304, Not Modified responses for static content. The Assets controller does it for you automatically. Lets take a look at how this works… Create a file in a Play app named app/assets/javascripts/index.js containing:


Lets make a request to Play for this file using curl:
curl -v http://localhost:9000/assets/javascripts/index.js

You should see something like:

> GET /assets/javascripts/index.js HTTP/1.1
> Host: localhost:9000
< HTTP/1.1 200 OK
< Last-Modified: Sat, 26 Apr 2014 15:23:40 GMT
< Etag: "1c5e355bf7bd8c9373a92714fece94de5ca13362"
< Content-Length: 14
< Content-Type: application/javascript; charset=utf-8
< Date: Sat, 26 Apr 2014 15:23:40 GMT

The response is a status code 200 and contains the contents of the index.js file. The HTTP response contains Last-Modified and Etag headers. These provide two different ways of handing 304 responses. As you guessed the Last-Modified tells the client / browser when the content was last modified. (The Date header is included so that the client can know what time the server thinks it is and is required to effectively use Last-Modified based 304’s.) The Etag specifies a UUID for the content which the client can use to determine if it has the right version of some content.

To see 304’s in action we need to use either the Last-Modified or Etag for a subsequent request to the server. To use the Last-Modified method an If-Modified-Since request header is sent with the Last-Modified date of the content we already have cached locally. For instance we can test this with something like:

curl -v -H "If-Modified-Since: Sat, 26 Apr 2014 15:23:40 GMT" http://localhost:9000/assets/javascripts/index.js

The server will check that I have the latest content and if so respond with the 304:

> GET /assets/javascripts/index.js HTTP/1.1
> User-Agent: curl/7.35.0
> If-Modified-Since: Sat, 26 Apr 2014 15:23:40 GMT
< HTTP/1.1 304 Not Modified
< Date: Sat, 26 Apr 2014 15:39:29 GMT
< Content-Length: 0

For the Etag method an If-None-Match request header is sent containing the Etag value of the content we already have cached locally. For example:

curl -v -H "If-None-Match: \"1c5e355bf7bd8c9373a92714fece94de5ca13362\"" http://localhost:9000/assets/javascripts/index.js

Again the server will return a 304 if what I have locally matches what the server has.

> GET /assets/javascripts/index.js HTTP/1.1
> Host: localhost:9000
> If-None-Match: "1c5e355bf7bd8c9373a92714fece94de5ca13362"
< HTTP/1.1 304 Not Modified
< Etag: "1c5e355bf7bd8c9373a92714fece94de5ca13362"
< Last-Modified: Sat, 26 Apr 2014 15:23:40 GMT
< Content-Length: 0

The 304 responses can really speed up web apps but things aren’t totally optimized yet since a round trip to the server is still needed to validate the local cache.

Far-Future Expires

Browsers can avoid a network round trip by caching assets based on a Cache-Control HTTP response header. This header indicates how long the browser should be able to rely on it’s cached version. When running Play in dev mode the default Cache-Control for static assets is no-cache which tells the browser not to use the cached version. (This doesn’t mean the browser can’t still use the Etag and Last-Modified methods – but those require a round trip.)

Make a request to a static asset named app/assets/javascripts/index.js with:

curl -v http://localhost:9000/assets/javascripts/index.js

Now you should see the following in your output:

< Cache-Control: no-cache

This happens in Play’s dev mode because when you are testing constantly changing static assets you do not want the browser caching them. If you restart your Play app in Prod mode play start or activator start and make the same request you should see:

< Cache-Control: max-age=3600

This sets the default cache length to one hour, meaning the browser will use it’s cached version of this content for an hour. After that another server request will be needed. You can override the default one hour with a new value by setting a assets.defaultCache configuration parameter in your conf/application.conf file. For instance:


The only way to get the browser to manually invalidate its cache before the set time is to get the asset from a different URL. If you can do that, or if you know an asset will never change, then you can use Far-Future Expires which simply use a very large value in the Cache-Control header, for instance:


This would instruct the browser to not request that asset for another 3,360 days.

Asset Fingerprinting

Asset fingerprinting is a method to put version information in the URL so that you can invalidate the browser’s cache by pointing to a new URL. Play doesn’t yet have a built-in way to do this (but will in 2.3) so we need to handle fingerprinting manually. There are a number of ways to do this: sbt plugins, S3 content upload scripts, and an Asset controller wrapper. The method I like most (until 2.3 arrives) is an Asset controller wrapper because it is pretty simple and doesn’t require an additional deployment / build step. To setup this method of fingerprinting create a new controller in app/controllers/StaticAssets.scala containing:

package controllers
import play.api.mvc.{Action, Controller}
import play.api.Play.current
object StaticAssets extends Controller {
  // drop the version
  def at(path: String, version: String, file: String): Action[_] = {, file)
  // returns a path that has a version if the assets.version config is set
  def url(file: String): String = {
    val maybeAssetsVersion = current.configuration.getString("assets.version")
    maybeAssetsVersion.fold(, file).url)

This new controller wraps Play’s Assets controller and adds fingerprinting based on a configured version. The at function just returns a static asset using the regular method. The url function looks to see if a assets.version config param exists and if so it returns a String URL containing that version number, otherwise it returns the non-versioned URL. This new controller needs a new route definition in the conf/routes file:

GET     /assets-static/:version/*file"/public", version, file)

Then when doing reverse routing for assets use the new StaticAssets.url method, like:

<script type='text/javascript' src='@StaticAssets.url("javascripts/index.js")'></script>

Setting the assets.version config parameter in conf/application.conf will change the static asset URLs to the fingerprinted URLs. For instance, before setting that param, a call to StaticAssets.url("javascripts/index.js") results in /assets/javascripts/index.js and if you add assets.version=1 to conf/application.conf then the result is /assets-static/1/javascripts/index.js – the fingerprinted URL. Deploying a new version of the app with a new assets.version config param results in new URLs, thus invalidating any Far-Future Expires and 304-based caching.

GZip Encoding

GZip encoding is very widely supported by browsers but it is not turned on by default in Play. However, enabling it is very easy. This will result in significantly smaller HTTP responses for static content. First add the filters library to a project by updating the build.sbt file to something like:

libraryDependencies ++= Seq(
  // your deps here

Then create a new app/Global.scala file containing:

import play.api.mvc.WithFilters
import play.filters.gzip.GzipFilter
object Global extends WithFilters(new GzipFilter())

That is it! This even is useful when working with minified JavaScript and CSS. For instance, without GZip, the jquery.min.js file from jQuery 1.9.0 is 91.1KB. With GZip enabled the HTTP response goes down to 32.5KB!


Another optimization you can make with static content in Play is to not serve the it from Play. Play is not really tuned for serving static content and usually a web app’s servers are centrally located. Using a Content Delivery Network (CDN) or caching proxy can really speed up the loading of static content for most users. A caching proxy like Nginx (with additional plugins) can move the bulk of content loading to something that is specifically tuned for serving content. A CDN takes that a step further and serves that content from a location that is geographically near the consumer. Surprisingly the speed of light is pretty slow when it comes to moving content around the globe. A round trip TCP packet between San Francisco and New York can take almost 100ms in just transit time. These 100ms round trips can really add up. So caching static content geographically close to the consumer is usually an important and trivial web app optimization.

There are now a number of CDNs which make it very easy to “edge-cache” content, like: CloudFront, Fastly, and MaxCDN. The original CDNs like Akamai would require an additional upload step to distribute the content. Luckily modern CDNs now support a proxy mode where no additional steps are required to distribute content on the CDN.

If you setup a proxy CDN then the CDN doesn’t have your content until someone requests it. So if a user requests a URL on the CDN and the CDN doesn’t have the requested content then it will make a request to the configured origin server to get the content. Subsequent requests will just get the content from the CDN. And that content will be served from the closest place possible to the user, thus reducing the transit time. Most modern CDNs also support all of the optimizations covered above. So your HTTP response headers and content not only impact how the browser caches your content, they can also impact how the CDN caches your content.

Your Play app needs to be reachable by the CDN for this all to work so if you want to follow along then you will need to deploy your app on a publicly accessible service like Heroku. Once your app is publicly accessible you can follow the usual steps to create a CDN in front of it. For CloudFront it is pretty easy to create a new Distribution in the AWS Management Console. Just make sure you specify the Origin Domain Name to be the domain where the Play app is deployed.

Once you have an origin-based CDN setup the final step is to get the Play app to use the new CDN URLs instead of the regular relative URLs. In local development mode we want to still use the local Play app while in production we want to use the CDN. Building on the custom controller in the fingerprinting section we need to add in the capability to prepend a CDN prefix in front of the static asset URLs. (Note: The actual serving of assets doesn’t change and this can be used without fingerprinting.) Here is a new app/controllers/StaticAssets.scala controller that has both the fingerprinting and the CDN support:

package controllers
import play.api.mvc.{Action, Controller}
import play.api.Play.current
object StaticAssets extends Controller {
  // drop the version and serve the asset
  def at(path: String, version: String, file: String): Action[_] = {, file)
  // returns a path that has a version if the assets.version config is set
  // prepends a url if the assets.url config is set
  def url(file: String): String = {
    val maybeAssetsVersion = current.configuration.getString("assets.version")
    val maybeVersionedUrl = maybeAssetsVersion.fold(, file).url)
    val maybeAssetsUrl = current.configuration.getString("assets.url")
    maybeAssetsUrl.fold(maybeVersionedUrl)(_ + maybeVersionedUrl)

Once the possibly versioned URL is determined in the url function, if the assets.url config exists then it’s value will be prepended in front of the maybeVersionedUrl value. Try it by adding the following to the conf/application.conf file:


Now the calls to StaticAssets.url will return a fully qualified (and possibly versioned) URL. If you setup a proxy CDN then you should be able to use its URL.

Wrap Up

Each of these methods of static asset optimization provide a great way to speed up the loading of your Play web application. They can be used together or independently. They can also be used with WebJars.

Check out the a working sample app that includes these optimizations on GitHub:

Let me know how it goes!

Presenting in SF: sbt-web & Reactive All the Way Down

This week I will be presenting twice in San Francisco at SF Scala:

  • Thursday April 10: Introducing sbt-web – A Node & WebJar Compatible Asset Pipeline for the Typesafe Platform

    sbt-web is a new web asset pipeline for Play Framework and other sbt-based frameworks. It can pull dependencies from both Node and WebJars. The pipeline covers all of the phases of client-side development, including: linting, compiling (CoffeeScript, LESS, etc), minification, concatenation, fingerprinting, and gzipping. This session will give you an introduction to sbt-web and show you how to get started using it.

  • Friday April 11: Reactive All the Way Down

    The world is going Reactive but not just for the back-end; UIs are also becoming Reactive. In this session we will walk through how to build an end-to-end Reactive application with Scala, Play Framework, Akka, and AngularJS.

Hope to see you there!

Presenting Play Framework and Reactive This Week in Boulder, Dallas, and Vancouver

This week I’ll be circling around North America presenting about Play Framework and Reactive Apps. Here is the lineup:

  • Tuesday Feb 11 in Boulder, Colorado at the Boulder JUG:
    6pm – Intro to Play Framework
    7:30pm – Building Reactive Apps
  • Wednesday Feb 12 in Dallas, Texas at the Java MUG:
    6:30pm – Building Reactive Apps
  • Thursday Feb 13 in Vancouver, BC at the inaugural Vancouver Reactive Programmers meetup:
    7pm – Building Reactive Apps

It is going to be a fun week – I hope to see you at one of these events!