Reactive Web Request Batching with Scala and Play Framework

At first glance it seems silly to do batching in the reactive world. When I first started with reactive programming I thought I wouldn’t have to worry about things like resource starvation. After all, the reactive magic bullet was *magical*! But my magic bullet fizzled when it hit downstream resource constraints causing me to need batching.

With a reactive web client library like Play Framework’s, I can easily spin up tens of thousands of web requests, in parallel, using very little resources. But what if that saturates the server I’m making requests to? In an ideal world I could get backpressure but most web endpoints don’t provide a way to do that. So we just have to be nicer to the server and batch the requests. (Sidenote: How do you know how nice you should be to the service, e.g. batch size?)

So in some scenarios you might just need to batch a bunch of web requests. Let’s say you need to make 100 requests and you’ve figured out that you should only do 10 at a time. Then once all 100 have completed you need to do some aggregation of the results. Of course you could block until each batch is complete and mutate a shared value that accumulates the results. But we can do better than that! You can still use immutable data and non-blocking requests with batching.

In Play I have a /echo?num=1 request handler that just returns the number passed into it:

def echo(num: Int) = Action {"num=$num")

Now in another async controller I want to send 10,000 requests to the echo controller but batched 256 at a time, then return the aggregated results. First I need a function that will handle the batching:

private def processBatch(results: Future[Seq[String]], batch: Seq[String]): Future[Seq[String]] = {
  // when the results for the previous batches have been completed, start a new batch
  results.flatMap { responses =>
    // create the web requests for this batch
    val batchFutures: Seq[Future[String]] =
    // sequence the futures for this batch into a singe future
    val batchFuture: Future[Seq[String]] = Future.sequence(batchFutures)
    // when this batch is complete, append the responses to the existing responses { batchResponses =>"Finished a batch")
      responses ++ batchResponses

This processBatch function takes a Future that holds the previously accumulated results which are a sequence of Strings. It also takes a batch of urls and returns a Future that holds the previously accumulated results and the results from the batch. When the results that were passed in have all been completed, the batch is processed by creating a web request for each URL. Each request is transformed to just the String from the response body. All of the requests in the batch are then transformed into a single Future that completes when all of the requests in the batch have been completed. Once that future completes the results of the batch are combined with the previously completed results.

This could also be written using Scala’s fancy for comprehension:

for {
  responses <- results
  batchFuture = Future.sequence(
  batchResponses <- batchFuture
} yield {"Finished a batch")
  responses ++ batchResponses

Here is an async controller that uses the processBatch function:

def index = Action.async { implicit request =>
  // ints 1 to 10000
  val numbers: Seq[Int] = 1 to 10000
  // turn each number into a url
  val urls: Seq[String] =
  // split into groups of 256
  val batches: Iterator[Seq[String]] = urls.grouped(256)
  // futures for all of the response strings
  val futureResponses = batches.foldLeft(Future.successful(Seq.empty[String]))(processBatch)
  // render the list of responses when they are all complete { responses =>

A sequence of 10,000 URLs /echo?num=1 to /echo?num=10000 are assembled. That sequence is then partitioned into groups of 256. Then the reactive batching magic… Take the batches and do a foldLeft starting with an empty sequence of String, calling the processBatch function for each batch, accumulating the results. Once the future returned from the foldLeft completes the results are turned into a String and returned in an the Ok response.

There you have reactive batching of web requests! Check out the full source.

Reactive Postgres with Play Framework & ScalikeJDBC

Lately I’ve built a few apps that have relational data. Instead of trying to shoehorn that data into a NoSQL model I decided to use the awesome Heroku Postgres service but I didn’t want to lose out on the Reactiveness that most of the NoSQL data stores support. I discovered ScalikeJDBC-Async which uses postgresql-async, a Reactive (non-blocking), JDBC-ish, Postgres driver. With those libraries I was able to keep my data relational and my app Reactive all the way down. Lets walk through how to do it in a Play Framework app. (TL;DR: Jump to the the full source.)

If you want to start from scratch, create a new Play app from the Play Scala Seed.

The minimum dependencies needed in the build.sbt file are:

libraryDependencies ++= Seq(
  "org.postgresql"       %  "postgresql"                    % "9.3-1102-jdbc41",
  "com.github.tototoshi" %% "play-flyway"                   % "1.2.0",
  "com.github.mauricio"  %% "postgresql-async"              % "0.2.16",
  "org.scalikejdbc"      %% "scalikejdbc-async"             % "0.5.5",
  "org.scalikejdbc"      %% "scalikejdbc-async-play-plugin" % "0.5.5"

The play-flyway library handles schema evolutions using Flyway. It is a great alternative to Play’s JDBC module because it just does evolutions and does one-way evolutions (i.e. no downs). But because play-flyway doesn’t use the postgresql-async driver, it needs the standard postgresql JDBC driver as well.

The scalikejdbc-async-play-plugin library manages the lifecycle of the connection pool used by scalikejdbc-async in a Play app.

To use play-flyway and scalikejdbc-async-play-plugin a conf/play.plugins file must tell Play about the plugins:


A first evolution script in conf/db/migration/default/V1__create_tables.sql will create a table named bar that will hold a list of bars for our little sample app:


You will of course need a Postgres database to proceed. You can either install one locally or create a free one on the Heroku Postres cloud service. Then update the conf/application.conf file to point to the database:


The last line above overrides the database connection url if there is a DATABASE_URL environment variable set (which is the case if your app is running on Heroku).

To run this app locally you can start the Play app by starting the Activator UI or from the command line with:

activator ~run

When you first open your app in the browser, the play-flyway plugin should detect that evolutions needs to be applied and ask you to apply them. Once applied you will be ready to create a simple database object and a few reactive request handlers.

Here is a Bar database object named app/models/Bar.scala that uses scalikejdbc-async for reactive creation and querying of Bars:

package models
import play.api.libs.json.Json
import scalikejdbc.WrappedResultSet
import scalikejdbc._
import scalikejdbc.async._
import scalikejdbc.async.FutureImplicits._
import scala.concurrent.Future
case class Bar(id: Long, name: String)
object Bar extends SQLSyntaxSupport[Bar] {
  implicit val jsonFormat = Json.format[Bar]
  override val columnNames = Seq("id", "name")
  lazy val b = Bar.syntax
  def db(b: SyntaxProvider[Bar])(rs: WrappedResultSet): Bar = db(b.resultName)(rs)
  def db(b: ResultName[Bar])(rs: WrappedResultSet): Bar = Bar(
  def create(name: String)(implicit session: AsyncDBSession = AsyncDB.sharedSession): Future[Bar] = {
    val sql = withSQL(insert.into(Bar).namedValues( -> name).returningId)
    sql.updateAndReturnGeneratedKey().map(id => Bar(id, name))
  def findAll(implicit session: AsyncDBSession = AsyncDB.sharedSession): Future[List[Bar]] = {
    withSQL(select.from[Bar](Bar as b)).map(Bar.db(b))

The db functions perform the mapping from SQL results to the Bar case class.

The create function takes a Bar name and returns a Future[Bar] by doing a non-blocking insert using the ScalikeJDBC Query DSL. When the insert has completed the primary key is returned and a new Bar instance is created and returned.

The findAll method uses the ScalikeJDBC Query DSL to select all of the Bars from the database, returning a Future[List[Bar]]].

Now that we have a reactive database object, lets expose these through reactive request handlers. First setup the routes in the conf/routes file:

GET        /bars                   controllers.Application.getBars
POST       /bars                   controllers.Application.createBar

Define the controller functions in the app/controllers/Application.scala file:

def getBars = Action.async { { bars =>
def createBar = Action.async(parse.urlFormEncoded) { request =>
  Bar.create(request.body("name").head).map { bar =>

Both functions use Action.async which holds a function that takes a request and returns a response (Result) in the future. By returning a Future[Result] Play is able to make requests to the controller function non-blocking. The getBars controller function calls the Bar.findAll and then transforms the Future[List[Bar]] into a Future[Result], the 200 response containing the JSON serialized list of bars. The createBar controller function parses the request, creates the Bar, and then transforms the Future[Bar] into a Future[Result] once the Bar has been created.

From the non-blocking perspective, here is what a request to the getBars controller function looks like:

  1. Web request made to /bars
  2. Thread allocated to web request
  3. Database request made for the SQL select
  4. Thread allocated to the database request
  5. Web request thread is deallocated (but the connection remains open)
  6. Database request thread is deallocated (but the connection remains open)
  7. Database response handler reallocates a thread
  8. SQL result is transformed to List[Bar]
  9. Database response thread is deallocated
  10. Web response handler reallocates a thread
  11. Web response is created from the list of bars
  12. Web response thread is deallocated

So everything is now reactive all the way down because there is a moment where the web request is waiting on the database to respond but no threads are allocated to the request.

Try it yourself with curl:

$ curl -X POST -d "name=foo" http://localhost:9000/bars
$ curl http://localhost:9000/bars

Grab the the full source and let me know if you have any questions. Thanks!

Building & Deploying Reactive Service Pipelines — Live in Salt Lake City

This Wednesday (Aug 6, 2014) I will be presenting Building & Deploying Reactive Service Pipelines at the Utah Scala Enthusiasts group in Salt Lake City. Here is the abstract:

Composition of micro-service is a modern integration pattern that couples nicely with Reactive and Continuous Delivery. These paradigms enable small teams to move quickly while integrating cross-silo data stores for modern JavaScript UIs and REST services. This session will use Scala, Play Framework, and Heroku to illustrate how to build and deploy Reactive Service Pipelines.

RSVP Now! Hope to see you there.

Going Reactive at OSCON 2014

This year at OSCON I will be leading a hands-on lab and presenting about Reactive, Play Framework, and Scala. Here are two sessions:

  • Reactive All The Way Down (lab) – 9:00am Monday, July 21

    In this tutorial you will build a Reactive application with Play Framework, Scala, WebSockets, and AngularJS. We will get started with a template app in Typesafe Activator. Then we will add a Reactive RESTful JSON service and a WebSocket in Scala. We will then build the UI with AngularJS.

  • Building Modern Web Apps with Play Framework and Scala – 2:30pm Wednesday, July 23

    Play Framework is the High Velocity Web Framework For Java and Scala. It is lightweight, stateless, RESTful, and developer friendly. This is an introduction to building web applications with Play. You will learn about: routing, Scala controllers & templates, database access, asset compilation for LESS & CoffeeScript, and JSON services.

Hope to see you there!

Scala vs Java 8 at the Scala Summit

Bruce Eckel will be hosting the Scala Summit in Crested Butte again this summer. The Open Spaces conference will be September 15 – 19 which is a perfect time of year up in the Colorado Rockies. The theme of the Scala Summit this year is Scala vs. The New Features in Java 8. So there will definitely be some fascinating discussions. I’m also looking forward to working on some IoT projects during the hackathons. Bruce and I have a few pcDuino devices that will be fun to get Scala working on. Hope to see you there!

Intro to Reactive Composition with the Typesafe Reactive Platform

Reactive apps are all the rage lately. The Reactive Manifesto now has over 2000 signatures and all of my recent presentations about Reactive have been packed. I’ve just recorded a screencast that explains the async and non-blocking aspect of Reactive. This screencast walks through how to do Reactive Requests and Reactive Composition with the Typesafe Reactive Platform. Using Play Framework it is easy to handle async and non-blocking requests (Reactive Requests) and compose them together. This can be done with both Java and Scala but for this screencast I use Scala. Check it out and let me know what you think:

Building Reactive Apps with the Typesafe Platform

It is becoming pretty clear that Reactive is the next big thing in software. But there aren’t very many resources yet about how to actually build a Reactive application. Recently I hosted a webinar about “Building Reactive Apps with the Typesafe Platform” where I tried to explain Reactive in a tangible way using the Reactive Stocks sample app from Typesafe Activator. Here is the recording of that presentation:

Let me know what you think. Thanks!