Reactive Web Request Batching with Scala and Play Framework

At first glance it seems silly to do batching in the reactive world. When I first started with reactive programming I thought I wouldn’t have to worry about things like resource starvation. After all, the reactive magic bullet was *magical*! But my magic bullet fizzled when it hit downstream resource constraints causing me to need batching.

With a reactive web client library like Play Framework’s, I can easily spin up tens of thousands of web requests, in parallel, using very little resources. But what if that saturates the server I’m making requests to? In an ideal world I could get backpressure but most web endpoints don’t provide a way to do that. So we just have to be nicer to the server and batch the requests. (Sidenote: How do you know how nice you should be to the service, e.g. batch size?)

So in some scenarios you might just need to batch a bunch of web requests. Let’s say you need to make 100 requests and you’ve figured out that you should only do 10 at a time. Then once all 100 have completed you need to do some aggregation of the results. Of course you could block until each batch is complete and mutate a shared value that accumulates the results. But we can do better than that! You can still use immutable data and non-blocking requests with batching.

In Play I have a /echo?num=1 request handler that just returns the number passed into it:

def echo(num: Int) = Action {
  Logger.info(s"num=$num")
  Ok(num.toString)
}

Now in another async controller I want to send 10,000 requests to the echo controller but batched 256 at a time, then return the aggregated results. First I need a function that will handle the batching:

private def processBatch(results: Future[Seq[String]], batch: Seq[String]): Future[Seq[String]] = {
  // when the results for the previous batches have been completed, start a new batch
  results.flatMap { responses =>
    // create the web requests for this batch
    val batchFutures: Seq[Future[String]] = batch.map(ws.url(_).get().map(_.body))
 
    // sequence the futures for this batch into a singe future
    val batchFuture: Future[Seq[String]] = Future.sequence(batchFutures)
 
    // when this batch is complete, append the responses to the existing responses
    batchFuture.map { batchResponses =>
      Logger.info("Finished a batch")
      responses ++ batchResponses
    }
  }
}

This processBatch function takes a Future that holds the previously accumulated results which are a sequence of Strings. It also takes a batch of urls and returns a Future that holds the previously accumulated results and the results from the batch. When the results that were passed in have all been completed, the batch is processed by creating a web request for each URL. Each request is transformed to just the String from the response body. All of the requests in the batch are then transformed into a single Future that completes when all of the requests in the batch have been completed. Once that future completes the results of the batch are combined with the previously completed results.

This could also be written using Scala’s fancy for comprehension:

for {
  responses <- results
  batchFuture = Future.sequence(batch.map(ws.url(_).get().map(_.body)))
  batchResponses <- batchFuture
} yield {
  Logger.info("Finished a batch")
  responses ++ batchResponses
}

Here is an async controller that uses the processBatch function:

def index = Action.async { implicit request =>
 
  // ints 1 to 10000
  val numbers: Seq[Int] = 1 to 10000
 
  // turn each number into a url
  val urls: Seq[String] = numbers.map(routes.Application.echo(_).absoluteURL())
 
  // split into groups of 256
  val batches: Iterator[Seq[String]] = urls.grouped(256)
 
  // futures for all of the response strings
  val futureResponses = batches.foldLeft(Future.successful(Seq.empty[String]))(processBatch)
 
  // render the list of responses when they are all complete
  futureResponses.map { responses =>
    Ok(responses.toString)
  }
 
}

A sequence of 10,000 URLs /echo?num=1 to /echo?num=10000 are assembled. That sequence is then partitioned into groups of 256. Then the reactive batching magic… Take the batches and do a foldLeft starting with an empty sequence of String, calling the processBatch function for each batch, accumulating the results. Once the future returned from the foldLeft completes the results are turned into a String and returned in an the Ok response.

There you have reactive batching of web requests! Check out the full source.

Machine Learning on Heroku with PredictionIO

Last week at the TrailheaDX Salesforce Dev Conference we launched the DreamHouse sample application to showcase the Salesforce App Cloud and numerous possible integrations. I built an integration with the open source PredictionIO Machine Learning framework. The use case for ML in DreamHouse is a real estate recommendation engine that learns based on users with similar favorites. Check out a demo and get the source.

For the DreamHouse PredictionIO integration to work I needed to get the PredictionIO service running on Heroku. Since it is a Scala app everything worked great! Here are the steps to get PredictionIO up and running on Heroku.

First you will need a PredictionIO event server and app defined in the event server:

  1. Deploy:
  2. Create an app:

    heroku run -a <APP NAME> console app new <A PIO APP NAME>
  3. List apps:

    heroku run -a <APP NAME> console app list

Check out the source and local dev instructions for the event server.

Now that you have an event server and app, load some event data:

export ACCESS_KEY=<YOUR ACCESS KEY>
export URL=http://<YOUR HEROKU APP NAME>.herokuapp.com
for i in {1..5}; do curl -i -X POST $URL/events.json?accessKey=$ACCESS_KEY -H "Content-Type: application/json" -d "{ \"event\" : \"\$set\", \"entityType\" : \"user\", \"entityId\" : \"u$i\" }"; done
 
for i in {1..50}; do curl -i -X POST $URL/events.json?accessKey=$ACCESS_KEY -H "Content-Type: application/json" -d "{ \"event\" : \"\$set\", \"entityType\" : \"item\", \"entityId\" : \"i$i\", \"properties\" : { \"categories\" : [\"c1\", \"c2\"] } }"; done
 
for i in {1..5}; do curl -i -X POST $URL/events.json?accessKey=$ACCESS_KEY -H "Content-Type: application/json" -d "{ \"event\" : \"view\", \"entityType\" : \"user\", \"entityId\" : \"u$i\",  \"targetEntityType\" : \"item\", \"targetEntityId\" : \"i$(( ( RANDOM % 50 )  + 1 ))\" }"; done

Check out the demo data:

http://<YOUR HEROKU APP NAME>.herokuapp.com/events.json?accessKey=<YOUR APP ACCESS KEY>&limit=-1

Now you need an engine that will learn from a set of training data and then be able to make predictions. With PredictionIO you can use any algorithm you want but often SparkML is a great choice. For this simple example I’m just using single-node Spark and Postgres but the underlying data source and ML engine can be anything.

This example is based on PredictionIO’s Recommendation Template so it uses SparkML’s Alternating Least Squares (ALS) algorithm. To deploy it on Heroku follow these steps:

  1. Deploy:
  2. Attach your PredictionIO Event Server’s Postgres:

    heroku addons:attach <YOUR-ADDON-ID> -a <YOUR HEROKU APP NAME>

    Note: You can find out <YOUR-ADDON-ID> by running:

    heroku addons -a <YOUR EVENT SERVER HEROKU APP NAME>

  3. Train the app:

    heroku run -a <YOUR HEROKU APP NAME> train
  4. Restart the app to load the new training data:

    heroku restart -a <YOUR HEROKU APP NAME>
  5. Check the status of your engine:

    http://<YOUR HEROKU APP NAME>.herokuapp.com

Now you can check out the recommendations for an item (must be an item that has events):

curl -H "Content-Type: application/json" -d '{ "items": ["i11"], "num": 4 }' -k http://<YOUR HEROKU APP NAME>.herokuapp.com/queries.json

Check out the source and local dev instructions for this example engine.

Let me know if you have any questions or problems. Happy ML’ing!

Combining Reactive Streams, Heroku Kafka, and Play Framework

Heroku recently announced early access to the new Heroku Kafka service and while I’ve heard great things about Apache Kafka I hadn’t played with it because I’m too lazy to set that kind of stuff up on my own. Now that I can setup a Kafka cluster just by provisioning a Heroku Addon I figured it was time to give it a try.

If you aren’t familiar with Kafka it is kinda a next generation messaging system. It uses pub-sub, scales horizontally, and has built-in message durability and delivery guarantees. Originally Kafka was built at LinkedIn but is now being used by pretty much every progressive enterprise that needs to move massive amounts of data through transformation pipelines.

While learning Kafka I wanted to build something really simple: an event producer that just sends random numbers to a Kafka topic and a event consumer that receives those random numbers and sends them to a browser via a WebSocket. I decided to use Play Framework and the Akka Streams implementation of Reactive Streams.

In Reactive Streams there is the pretty standard “Source” and “Sink” where an event producer is a Source and a consumer is a Sink. A “Flow” is a pairing between a Source and a Sink with an optional transformation. In my example there are two apps, each with a Flow. A worker process will send random numbers to Kafka so its Source will be periodically generated random numbers and its Sink will be Kafka. In the web process the Source is Kafka and the Sink is a WebSocket that will push the random numbers to the browser.

Here is the worker app with some necessary config omitted (check out the full source):

object RandomNumbers extends App {
 
  val tickSource = Source.tick(Duration.Zero, 500.milliseconds, Unit).map(_ => Random.nextInt().toString)
 
  kafka.sink("RandomNumbers").map { kafkaSink =>
    tickSource
      .map(new ProducerRecord[String, String]("RandomNumbers", _))
      .to(kafkaSink)
      .run()(app.materializer)
  }
 
}

The tickSource is a Source that generates a new random Int every 500 milliseconds. That Source is connected to a Kafka Sink with a Flow that transforms an Int into a ProducerRecord (for Kafka). This uses the Reactive Kafka library which is a Reactive Streams API for working with Kafka.

On the web app side, Play Framework has builtin support for using Reactive Streams with WebSockets so all we need is a controller method that creates a Source from a Kafka topic and hooks that to a WebSocket Flow (full source):

def ws = WebSocket.acceptOrResult[Any, String] { _ =>
  kafka.source(Set("RandomNumbers")) match {
    case Failure(e) =>
      Future.successful(Left(InternalServerError("Could not connect to Kafka")))
    case Success(source) =>
      val flow = Flow.fromSinkAndSource(Sink.ignore, source.map(_.value))
      Future.successful(Right(flow))
  }
}

Notice that the Flow has a Sink.ignore which just says to ignore any incoming messages on the WebSocket (those sent from the browser). Play takes care of all the underlying stuff and then whenever Kafka gets a message on the “RandomNumbers” topic, it will be sent out via the WebSocket.

And it all works!
hello-play-kafka

Check out the full source for instructions on how to get this example setup on your machine and on Heroku. Let me know how it goes!

Building a Mock HVAC for Smart Thermostat Demos

Recently I needed to create a mock HVAC system so that I could have a portable smart thermostat for various demos. I searched around but couldn’t find any such thing. So with some sleuthing and the help of my friend Bruce Eckel I was able to build a simple system that powers a smart thermostat and simulates a heating system. This post will document how to do this in case anyone else ever needs such a thing.

Modern HVAC systems typically provide 24 volt AC power to thermostats so the first thing you will need is a 120 VAC (wall power) to 24 VAC transformer. I used this one:

Then you’ll need something that can simulate a single stage heating system. A smart thermostat uses 24 volt AC to flip on and off a relay (electronic component that uses one power current to toggle another, usually stronger, current). I could have used an actual relay for this but decided to go with something easier, a red LED light that runs on 24 volt AC:

For my smart thermostat I used an ecobee3 (due to their REST APIs) but a Nest should work as well:

Now to wire it together pick a terminal on the transformer to be the “common” and run two wires, one to the LED and one to the “C” terminal on the smart thermostat. Then run a wire from the other side of the LED to the “W1” terminal on the smart thermostat. And finally connect the other terminal on the transformer to the “Rh” terminal on the smart thermostat. Like this:
Ecobee3_Demo_Wiring

Here is what it looks like wired together:
wiring

Once the smart thermostat is powered on and setup, the LED should turn on when the “heater” is on:
on

And turn off when the heater is off:
off

I hope that helps someone out. Let me know if you have any questions!

The 6 Minute Cloud/Local Dev Roundtrip with Spring Boot

Great developer experiences allow you go from nothing to something amazing in under ten minutes. So I’m always trying to see how much I can minimize getting started experiences. My latest attempt is to deploy a Spring Boot app on Heroku, download the source to a developer’s machine, setup & run the app locally, make & test changes, and then redeploy those changes — all in under ten minutes (assuming a fast internet connection). Here is that experience in about six minutes:

To try it yourself, start at the hello-springboot GitHub repo. Let me know how it goes!

Pulling Go Code Colorado Data into Salesforce

This weekend I’m at the Go Code Colorado Challenge Weekend event in Durango. The purpose of Go Code Colorado 2016 is for teams to build something useful for businesses using one or more of the Colorado Public Datasets. Some teams are using Salesforce for the back-office / business process side of the app they are building. So I decided to see if I could pull a Colorado Public Dataset into Salesforce. Turns out it’s super easy! Just follow these steps:

  1. Sign up for a Salesforce Developer Edition
  2. Create a new External Data Source with the following field values:

    External Data Source = Colorado Public Data
    Name = Colorado_Public_Data
    Type = Lightning Connect: OData 2.0
    URL = https://data.colorado.gov/OData.svc
    Special Compatibility = Socrata

    new_external_data_source

  3. Save the new External Data Source and then hit “Validate and Sync” to fetch the metadata for the services.
  4. Select one or more tables from the list. A good table to test with is the “Occupational Employment Statistics” dataset.
    validate_and_sync
    Sync the table and you should see a new “External Object” in the list of External Objects.
    external_objects
  5. The data is now available in Salesforce. An easy way to see the dataset is to create a tab in the Salesforce UI. On the Custom Tabs Setup page create a new Custom Object Tab for the “Occupational Employment Statistics” object and select a Tab Style:
    new_custom_object_tab
    Complete the creation of the tab (select Next, Next, Save).
  6. Select the “Occupational Employment Statistics” tab (which might be in a drop-down menu depending on the width of your browser:
    custom_tabs
    Next to the View – All selector, hit “Go!” to fetch the data from the Colorado Public Data source. You’ll now see the records:
    records
    Note: The columns displayed in this view can be customized in the External Object’s Search Layout.
    Selecting a record’s ID will display the record details:
    record_details

That’s it! Now you can build all sorts of business processes and other employee-facing interactions around the public data.

Good luck to all of the Go Code Colorado teams!

Quick Force Java – Getting Started with Salesforce REST in Java

Recently I blogged about a toolchain that quickly gets you going with the Salesforce REST APIs. I believe developers should be able to get started with new technologies without having to install tons of stuff and struggle for days. That blog used Quick Force Node for those who want to use JavaScript / Node.js. I’ve had a number of requests for a Java version of this toolchain so I created Quick Force Java.

Check out a screencast that shows how to start with nothing, deploy a Salesforce REST app on Heroku, setup OAuth, setup a local dev environment, make & test changes to the app, and then deploy those changes back to the cloud (all in under 12 minutes):

Try out Quick Force Java and let me know how it goes!

Salesforce REST APIs – From Zero to Cloud to Local Dev in Minutes

When getting acquainted with new technologies I believe that users shouldn’t have to spend more than 15 minutes getting something simple up and running. I wanted to apply this idea to building an app on the Salesforce REST APIs so I built Quick Force (Node). In about 12 minutes you can deploy a Node.js app on Heroku that uses the Salesforce REST APIs, setup OAuth, then pull the app down to your local machine, make and test changes, and then redeploy those changes. Check out a video walkthrough:

Ok, now give it a try yourself by following the instructions in Quick Force (Node)!

I hope this will be the quickest and easiest way you’ve gotten started with the Salesforce REST APIs. Let me know how it goes!

FYI: This *should* work on Windows but I haven’t tested it there yet. So if you have any problems please let me know.

Winter Tech Forum 2016 – My Favorite Developer Conference!

I’ve been to a TON of developer conferences and by a landslide my favorite is the Winter Tech Forum (which used to be the Java Posse Roundup). Here is why… Learning for me is experiential.

Typical eyes-forward conferences are like being a passenger on a sail boat. I can watch what is happening but I could definitely not become the captain based on my experience as a passenger. This is what makes WTF different; every attendee is a captain (or maybe a skipper if you are new). The whole conference is the experiences that the attendees want to have. Sometimes that means we write code together, explore new technologies, or discuss ideas. Those experiences have made a significant impact on my technical skills. We also eat together and play together which has helped me build some amazing relationships.

This might sound a little crazy until you actually experience it. Which I highly encourage you to do! This year’s WTF is Feb 29 – March 4 in Crested Butte, CO and will be followed by a new Developer Retreat event that also looks to be awesome. I hope to see you there!