Reactive Web Request Batching with Scala and Play Framework

At first glance it seems silly to do batching in the reactive world. When I first started with reactive programming I thought I wouldn’t have to worry about things like resource starvation. After all, the reactive magic bullet was *magical*! But my magic bullet fizzled when it hit downstream resource constraints causing me to need batching.

With a reactive web client library like Play Framework’s, I can easily spin up tens of thousands of web requests, in parallel, using very little resources. But what if that saturates the server I’m making requests to? In an ideal world I could get backpressure but most web endpoints don’t provide a way to do that. So we just have to be nicer to the server and batch the requests. (Sidenote: How do you know how nice you should be to the service, e.g. batch size?)

So in some scenarios you might just need to batch a bunch of web requests. Let’s say you need to make 100 requests and you’ve figured out that you should only do 10 at a time. Then once all 100 have completed you need to do some aggregation of the results. Of course you could block until each batch is complete and mutate a shared value that accumulates the results. But we can do better than that! You can still use immutable data and non-blocking requests with batching.

In Play I have a /echo?num=1 request handler that just returns the number passed into it:

def echo(num: Int) = Action {
  Logger.info(s"num=$num")
  Ok(num.toString)
}

Now in another async controller I want to send 10,000 requests to the echo controller but batched 256 at a time, then return the aggregated results. First I need a function that will handle the batching:

private def processBatch(results: Future[Seq[String]], batch: Seq[String]): Future[Seq[String]] = {
  // when the results for the previous batches have been completed, start a new batch
  results.flatMap { responses =>
    // create the web requests for this batch
    val batchFutures: Seq[Future[String]] = batch.map(ws.url(_).get().map(_.body))
 
    // sequence the futures for this batch into a singe future
    val batchFuture: Future[Seq[String]] = Future.sequence(batchFutures)
 
    // when this batch is complete, append the responses to the existing responses
    batchFuture.map { batchResponses =>
      Logger.info("Finished a batch")
      responses ++ batchResponses
    }
  }
}

This processBatch function takes a Future that holds the previously accumulated results which are a sequence of Strings. It also takes a batch of urls and returns a Future that holds the previously accumulated results and the results from the batch. When the results that were passed in have all been completed, the batch is processed by creating a web request for each URL. Each request is transformed to just the String from the response body. All of the requests in the batch are then transformed into a single Future that completes when all of the requests in the batch have been completed. Once that future completes the results of the batch are combined with the previously completed results.

This could also be written using Scala’s fancy for comprehension:

for {
  responses <- results
  batchFuture = Future.sequence(batch.map(ws.url(_).get().map(_.body)))
  batchResponses <- batchFuture
} yield {
  Logger.info("Finished a batch")
  responses ++ batchResponses
}

Here is an async controller that uses the processBatch function:

def index = Action.async { implicit request =>
 
  // ints 1 to 10000
  val numbers: Seq[Int] = 1 to 10000
 
  // turn each number into a url
  val urls: Seq[String] = numbers.map(routes.Application.echo(_).absoluteURL())
 
  // split into groups of 256
  val batches: Iterator[Seq[String]] = urls.grouped(256)
 
  // futures for all of the response strings
  val futureResponses = batches.foldLeft(Future.successful(Seq.empty[String]))(processBatch)
 
  // render the list of responses when they are all complete
  futureResponses.map { responses =>
    Ok(responses.toString)
  }
 
}

A sequence of 10,000 URLs /echo?num=1 to /echo?num=10000 are assembled. That sequence is then partitioned into groups of 256. Then the reactive batching magic… Take the batches and do a foldLeft starting with an empty sequence of String, calling the processBatch function for each batch, accumulating the results. Once the future returned from the foldLeft completes the results are turned into a String and returned in an the Ok response.

There you have reactive batching of web requests! Check out the full source.

Combining Reactive Streams, Heroku Kafka, and Play Framework

Heroku recently announced early access to the new Heroku Kafka service and while I’ve heard great things about Apache Kafka I hadn’t played with it because I’m too lazy to set that kind of stuff up on my own. Now that I can setup a Kafka cluster just by provisioning a Heroku Addon I figured it was time to give it a try.

If you aren’t familiar with Kafka it is kinda a next generation messaging system. It uses pub-sub, scales horizontally, and has built-in message durability and delivery guarantees. Originally Kafka was built at LinkedIn but is now being used by pretty much every progressive enterprise that needs to move massive amounts of data through transformation pipelines.

While learning Kafka I wanted to build something really simple: an event producer that just sends random numbers to a Kafka topic and a event consumer that receives those random numbers and sends them to a browser via a WebSocket. I decided to use Play Framework and the Akka Streams implementation of Reactive Streams.

In Reactive Streams there is the pretty standard “Source” and “Sink” where an event producer is a Source and a consumer is a Sink. A “Flow” is a pairing between a Source and a Sink with an optional transformation. In my example there are two apps, each with a Flow. A worker process will send random numbers to Kafka so its Source will be periodically generated random numbers and its Sink will be Kafka. In the web process the Source is Kafka and the Sink is a WebSocket that will push the random numbers to the browser.

Here is the worker app with some necessary config omitted (check out the full source):

object RandomNumbers extends App {
 
  val tickSource = Source.tick(Duration.Zero, 500.milliseconds, Unit).map(_ => Random.nextInt().toString)
 
  kafka.sink("RandomNumbers").map { kafkaSink =>
    tickSource
      .map(new ProducerRecord[String, String]("RandomNumbers", _))
      .to(kafkaSink)
      .run()(app.materializer)
  }
 
}

The tickSource is a Source that generates a new random Int every 500 milliseconds. That Source is connected to a Kafka Sink with a Flow that transforms an Int into a ProducerRecord (for Kafka). This uses the Reactive Kafka library which is a Reactive Streams API for working with Kafka.

On the web app side, Play Framework has builtin support for using Reactive Streams with WebSockets so all we need is a controller method that creates a Source from a Kafka topic and hooks that to a WebSocket Flow (full source):

def ws = WebSocket.acceptOrResult[Any, String] { _ =>
  kafka.source(Set("RandomNumbers")) match {
    case Failure(e) =>
      Future.successful(Left(InternalServerError("Could not connect to Kafka")))
    case Success(source) =>
      val flow = Flow.fromSinkAndSource(Sink.ignore, source.map(_.value))
      Future.successful(Right(flow))
  }
}

Notice that the Flow has a Sink.ignore which just says to ignore any incoming messages on the WebSocket (those sent from the browser). Play takes care of all the underlying stuff and then whenever Kafka gets a message on the “RandomNumbers” topic, it will be sent out via the WebSocket.

And it all works!
hello-play-kafka

Check out the full source for instructions on how to get this example setup on your machine and on Heroku. Let me know how it goes!

Reactive Postgres with Play Framework & ScalikeJDBC

Lately I’ve built a few apps that have relational data. Instead of trying to shoehorn that data into a NoSQL model I decided to use the awesome Heroku Postgres service but I didn’t want to lose out on the Reactiveness that most of the NoSQL data stores support. I discovered ScalikeJDBC-Async which uses postgresql-async, a Reactive (non-blocking), JDBC-ish, Postgres driver. With those libraries I was able to keep my data relational and my app Reactive all the way down. Lets walk through how to do it in a Play Framework app. (TL;DR: Jump to the the full source.)

If you want to start from scratch, create a new Play app from the Play Scala Seed.

The minimum dependencies needed in the build.sbt file are:

libraryDependencies ++= Seq(
  "org.postgresql"       %  "postgresql"                    % "9.3-1102-jdbc41",
  "com.github.tototoshi" %% "play-flyway"                   % "1.2.0",
 
  "com.github.mauricio"  %% "postgresql-async"              % "0.2.16",
  "org.scalikejdbc"      %% "scalikejdbc-async"             % "0.5.5",
  "org.scalikejdbc"      %% "scalikejdbc-async-play-plugin" % "0.5.5"
)

The play-flyway library handles schema evolutions using Flyway. It is a great alternative to Play’s JDBC module because it just does evolutions and does one-way evolutions (i.e. no downs). But because play-flyway doesn’t use the postgresql-async driver, it needs the standard postgresql JDBC driver as well.

The scalikejdbc-async-play-plugin library manages the lifecycle of the connection pool used by scalikejdbc-async in a Play app.

To use play-flyway and scalikejdbc-async-play-plugin a conf/play.plugins file must tell Play about the plugins:

776:com.github.tototoshi.play2.flyway.Plugin
777:scalikejdbc.async.PlayPlugin

A first evolution script in conf/db/migration/default/V1__create_tables.sql will create a table named bar that will hold a list of bars for our little sample app:

DROP TABLE IF EXISTS bar;
CREATE TABLE bar (
  id SERIAL PRIMARY KEY,
  name VARCHAR NOT NULL
);

You will of course need a Postgres database to proceed. You can either install one locally or create a free one on the Heroku Postres cloud service. Then update the conf/application.conf file to point to the database:

db.default.driver="org.postgresql.Driver"
db.default.url="postgres://admin:admin@localhost:5432/test"
db.default.url=${?DATABASE_URL}

The last line above overrides the database connection url if there is a DATABASE_URL environment variable set (which is the case if your app is running on Heroku).

To run this app locally you can start the Play app by starting the Activator UI or from the command line with:

activator ~run

When you first open your app in the browser, the play-flyway plugin should detect that evolutions needs to be applied and ask you to apply them. Once applied you will be ready to create a simple database object and a few reactive request handlers.

Here is a Bar database object named app/models/Bar.scala that uses scalikejdbc-async for reactive creation and querying of Bars:

package models
 
import play.api.libs.json.Json
import scalikejdbc.WrappedResultSet
import scalikejdbc._
import scalikejdbc.async._
import scalikejdbc.async.FutureImplicits._
 
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
 
case class Bar(id: Long, name: String)
 
object Bar extends SQLSyntaxSupport[Bar] {
 
  implicit val jsonFormat = Json.format[Bar]
 
  override val columnNames = Seq("id", "name")
 
  lazy val b = Bar.syntax
 
  def db(b: SyntaxProvider[Bar])(rs: WrappedResultSet): Bar = db(b.resultName)(rs)
 
  def db(b: ResultName[Bar])(rs: WrappedResultSet): Bar = Bar(
    rs.long(b.id),
    rs.string(b.name)
  )
 
  def create(name: String)(implicit session: AsyncDBSession = AsyncDB.sharedSession): Future[Bar] = {
    val sql = withSQL(insert.into(Bar).namedValues(column.name -> name).returningId)
    sql.updateAndReturnGeneratedKey().map(id => Bar(id, name))
  }
 
  def findAll(implicit session: AsyncDBSession = AsyncDB.sharedSession): Future[List[Bar]] = {
    withSQL(select.from[Bar](Bar as b)).map(Bar.db(b))
  }
 
}

The db functions perform the mapping from SQL results to the Bar case class.

The create function takes a Bar name and returns a Future[Bar] by doing a non-blocking insert using the ScalikeJDBC Query DSL. When the insert has completed the primary key is returned and a new Bar instance is created and returned.

The findAll method uses the ScalikeJDBC Query DSL to select all of the Bars from the database, returning a Future[List[Bar]]].

Now that we have a reactive database object, lets expose these through reactive request handlers. First setup the routes in the conf/routes file:

GET        /bars                   controllers.Application.getBars
POST       /bars                   controllers.Application.createBar

Define the controller functions in the app/controllers/Application.scala file:

def getBars = Action.async {
  Bar.findAll.map { bars =>
    Ok(Json.toJson(bars))
  }
}
 
def createBar = Action.async(parse.urlFormEncoded) { request =>
  Bar.create(request.body("name").head).map { bar =>
    Redirect(routes.Application.index())
  }
}

Both functions use Action.async which holds a function that takes a request and returns a response (Result) in the future. By returning a Future[Result] Play is able to make requests to the controller function non-blocking. The getBars controller function calls the Bar.findAll and then transforms the Future[List[Bar]] into a Future[Result], the 200 response containing the JSON serialized list of bars. The createBar controller function parses the request, creates the Bar, and then transforms the Future[Bar] into a Future[Result] once the Bar has been created.

From the non-blocking perspective, here is what a request to the getBars controller function looks like:

  1. Web request made to /bars
  2. Thread allocated to web request
  3. Database request made for the SQL select
  4. Thread allocated to the database request
  5. Web request thread is deallocated (but the connection remains open)
  6. Database request thread is deallocated (but the connection remains open)
  7. Database response handler reallocates a thread
  8. SQL result is transformed to List[Bar]
  9. Database response thread is deallocated
  10. Web response handler reallocates a thread
  11. Web response is created from the list of bars
  12. Web response thread is deallocated

So everything is now reactive all the way down because there is a moment where the web request is waiting on the database to respond but no threads are allocated to the request.

Try it yourself with curl:

$ curl -X POST -d "name=foo" http://localhost:9000/bars
$ curl http://localhost:9000/bars
[{"id":1,"name":"foo"}]

Grab the the full source and let me know if you have any questions. Thanks!

Building & Deploying Reactive Service Pipelines — Live in Salt Lake City

This Wednesday (Aug 6, 2014) I will be presenting Building & Deploying Reactive Service Pipelines at the Utah Scala Enthusiasts group in Salt Lake City. Here is the abstract:

Composition of micro-service is a modern integration pattern that couples nicely with Reactive and Continuous Delivery. These paradigms enable small teams to move quickly while integrating cross-silo data stores for modern JavaScript UIs and REST services. This session will use Scala, Play Framework, and Heroku to illustrate how to build and deploy Reactive Service Pipelines.

RSVP Now! Hope to see you there.

Going Reactive at OSCON 2014

This year at OSCON I will be leading a hands-on lab and presenting about Reactive, Play Framework, and Scala. Here are two sessions:

  • Reactive All The Way Down (lab) – 9:00am Monday, July 21

    In this tutorial you will build a Reactive application with Play Framework, Scala, WebSockets, and AngularJS. We will get started with a template app in Typesafe Activator. Then we will add a Reactive RESTful JSON service and a WebSocket in Scala. We will then build the UI with AngularJS.

  • Building Modern Web Apps with Play Framework and Scala – 2:30pm Wednesday, July 23

    Play Framework is the High Velocity Web Framework For Java and Scala. It is lightweight, stateless, RESTful, and developer friendly. This is an introduction to building web applications with Play. You will learn about: routing, Scala controllers & templates, database access, asset compilation for LESS & CoffeeScript, and JSON services.

Hope to see you there!

Presenting Going Reactive with Java 8 Next Week in Boulder & Denver

Next week I will be presenting Going Reactive with Java 8 at the Boulder and Denver Java User Groups. Here is the session description:

Java 8’s lambdas make building Reactive applications a whole lot easier and cleaner. Through copious code examples this session will show you how to build event-driven, scalable, resilient, and responsive applications with Java 8, Play Framework and Akka. On the web side you will learn about using lambdas for async & non-blocking requests & WebSockets. You will also learn how the actor model in Akka pairs well with lambdas to create an event-driven foundation that provides concurrency, clustering and fault-tolerance.

Sign up for Tuesday, May 13, 2014 in Boulder or Wednesday, May 14, 2014 in Denver. Hope to see you there!

Going Reactive with Java 8 – Tonight at Triangle JUG

Tonight I will be presenting Going Reactive with Java 8 at the Triangle Java Users Group. Here is the session description:

Java 8’s lambdas make building Reactive applications a whole lot easier and cleaner. Through copious code examples this session will show you how to build event-driven, scalable, resilient, and responsive applications with Java 8, Play Framework and Akka. On the web side you will learn about using lambdas for async & non-blocking requests & WebSockets. You will also learn how the actor model in Akka pairs well with lambdas to create an event-driven foundation that provides concurrency, clustering and fault-tolerance.

Hope to see you there!

Presenting in SF: sbt-web & Reactive All the Way Down

This week I will be presenting twice in San Francisco at SF Scala:

  • Thursday April 10: Introducing sbt-web – A Node & WebJar Compatible Asset Pipeline for the Typesafe Platform

    sbt-web is a new web asset pipeline for Play Framework and other sbt-based frameworks. It can pull dependencies from both Node and WebJars. The pipeline covers all of the phases of client-side development, including: linting, compiling (CoffeeScript, LESS, etc), minification, concatenation, fingerprinting, and gzipping. This session will give you an introduction to sbt-web and show you how to get started using it.

  • Friday April 11: Reactive All the Way Down

    The world is going Reactive but not just for the back-end; UIs are also becoming Reactive. In this session we will walk through how to build an end-to-end Reactive application with Scala, Play Framework, Akka, and AngularJS.

Hope to see you there!

Presenting Building Reactive Apps in Denver

This Thursday (March 20, 2014) I will be presenting Building Reactive Apps at the Reactive Programming Enthusiasts Denver.

Here is the session description:

Non-blocking, asynchronous, and reactive programming models are all the rage today. This session will explore in-depth why these patterns are important in modern apps. We will drill down and see how to apply them to event-driven web, mobile, and RESTful apps. To illustrate the concepts, we will use Java, Scala, Akka, and the Play Framework as examples.

Hope to see you there!