When you hear ‘Monad’, think ‘Chainable’

There comes a point in every Functional Programmer’s life where they feel the curse of the Monad has lifted and they must now explain Monads to their friends who just don’t get it. What follows is probably wrong and confusing, cause there is no escaping the curse. But here goes…

Suppose you have a system property that contains the name of another system property, like:


And you want the value of FOO, like:


So there is a chain of operations. We need to first lookup the KEYNAME value and then use that to lookup another value.

In a non-functional world you may do something like:

String prop = null;
String keyname = system.getProperty("KEYNAME");
if (keyname != null) {
  prop = system.getProperty(keyname);

In the functional world we can instead use a type that represents the nullable value, usually called an Option, like:

val maybeKeyname: Option[String] = sys.props.get("KEYNAME")

Now we can’t use the maybeKeyname to lookup the second value because it might be None and props.get doesn’t take an Option:

sys.props.get(maybeKeyname) // this won't work

So we need to chain together two options. We can do this with Monads via a flatMap function:

val maybeProp: Option[String] = maybeKeyname.flatMap(keyname => sys.props.get(keyname))

Since flatMap takes a function we can also just do:


But there is some syntactic sugar for Monads in Scala that we can use on anything that has the shape of a Monad (i.e. Monadic):

val maybeProp: Option[String] = for {
  keyname <- sys.props.get("keyname")
  prop <- sys.props.get(keyname)
} yield prop

The for comprehension makes Monad chaining look like a chain. So when you hear the word Monad just think chainable instead.

There are many different chainable types. Another is a Try for things that can fail.

For example, if we want to ask a user for two numbers and add them together, but handle number parsing failures we can do this:

import scala.io.StdIn
import scala.util.Try

for {
  num1 <- Try(StdIn.readLine("Number 1: ").toInt)
  num2 <- Try(StdIn.readLine("Number 2: ").toInt)
} yield num1 + num2

If you enter two numbers you get a Success but if either number is not an integer then the result will be a Failure.

Another Monadic type is Future which may hold a value later (i.e. async). You can chain futures together like Option and Try, for example:

import scala.concurrent.{Promise, Future}
import scala.concurrent.ExecutionContext.Implicits.global

// a Promise provides a place we write a value to later
val p1 = Promise[String]()
val p2 = Promise[String]()

val nameFuture: Future[String] = for {
  first <- p1.future
  last <- p2.future
} yield first + " " + last

// nameFuture does not yet have a value so lets write a value to the first Promise

// still no value for nameFuture because p1 and p2 are chained together and p2 doesn't have a value yet

// ok, now nameFuture has a value - which you can print

Monads just help us chain together operations on items in some form of container. In these examples the chains have been short (two operations) but could be much longer – making the value more apparent.

Thank You for 12 Years in Developer Evangelism

Even though it was 12 years ago, I vividly remember sitting in a musty conference room with big-name analysts from Gartner. My palms were sweaty and I was almost too terrified to speak. Macromedia had brought me into this meeting to be the customer voice for their new programming platform, Flex. I was an early adopter building a customer portal using this new technology. But I was a coder—not someone who talks publicly. Somehow the Macromedia folks thought I did a good job, and so my life took an interesting turn as I moved from engineering to evangelism.

Back then I would have never dreamed that I’d be on stage with two fortune 500 CEOs, in front of almost ten thousand people, demoing nerdy stuff. Really? Traveling to conferences around the world? That made no sense because I don’t like to be in front of people—I even have a fear of public speaking! Twelve years ago this all seemed crazy. But somehow it happened—this nerdy coder became an evangelist.

Over the past twelve years I’ve been stretched, drained, and fulfilled in ways I never imagined. I’ve been on some amazing adventures while working for some of the greatest tech companies in the world (Macromedia/Adobe, Salesforce/Heroku, and Typesafe/Lightbend). With hundreds of presentations behind me, I still get nervous before I present. Yet, I’ve realized I can do anything; as long as I’ve almost done it before. That has given me comfort on trips where I’ve presented in a different city every day of the week, surviving on only a few hours of sleep. I’ve flown around the world in five days with stops in Tokyo and Bulgaria. I’ve had engineers tell me that I helped change their careers (for the better). And along the way I’ve also been able to build some stuff that I’m pretty proud of.

For example, while at a networking conference in Vegas I stayed up all night to build the Census benchmark app which ultimately helped convince at least a few people to use Flex. Tour de Flex was probably one of the most widely used projects I’ve helped build; we had a real-time dashboard to visualize the location of developers using it, and if you left it open for only a few minutes, the globe was covered. Mixing Loom, which no one used, is still one of the most fun projects I ever worked on. At Heroku, I created the first prototypes for what has become Heroku Button and the GitHub auto-deployment. At Typesafe, I helped create Activator which has been used by tons of developers to get started with Scala, Play Framework, and Akka. One year, at Devoxx France, I created WebJars which now gets over a million downloads per month. Here at Salesforce, I’ve built some really fun stuff like the Salesforce IFTTT channel, but probably my favorite is Koober, a data pipeline sample app.

Ok, enough reminiscing… What’s next??

Today I’m thrilled to be joining the Technology & Products organization at Salesforce—they are the engineers that build the technology which powers the Salesforce platform. I will help our engineers write blogs about what they do, speak at conferences, and open source their projects. I’m incredibly excited to be heading back to my roots in engineering while taking my experience in evangelism with me. Our engineers build some amazing things and I want to help them share those accomplishments with the world. And luckily, I’ll still be able to write code because we need a bunch of tooling to help us do Open Source better. The first of these projects is our Contributor License Agreement GitHub bot, which we just open sourced! I won’t be doing much blogging here since I now have the Salesforce Engineering blog at my disposal.

I don’t want this blog to sound like a eulogy, but it is kinda the end of an era for me. So I’m going to take a minute to thank some people. I absolutely couldn’t have gone from the nervous coder to the evangelist I am today without the help of many people. (I’ll keep this list short so it doesn’t sound like an Oscar speech.)

Christophe Coenraets (still the best evangelist I know) saw something in me that I didn’t. He gave me a shot and has mentored me for 12 years. Mike Slinn has always given me exactly the advice I needed to hear. Greg Wilson encouraged and enabled me to build many of the things I’m most proud of. Dave Carroll has now twice given me a great home at Salesforce. Bruce Eckel continues to help me see things from a different angle and fight passive voice in my writing. All of you (plus many more) have not just been co-workers, you have also become friends who I’m incredibly fortunate to have in my life. So here is a huge thank you to everyone has has been part of my journey as an evangelist! I’m sure we will still cross paths on Twitter, GitHub, at conferences, and on the Salesforce Engineering blog—I just won’t be trying to market anything to you any longer. :)

Connecting to the Salesforce REST APIs with Spring Boot and Java

Broadly speaking there are two types of integrations with Salesforce, either a system-to-system integration or a user interface integration. One of the primary ways to do these integrations is by using the Salesforce REST API. When using the Salesforce REST API you need to obtain an access token that identifies who is making the requests. OAuth 2 provides an HTTP interface to obtain a Salesforce access token.

When using the Salesforce OAuth 2 API there are three options for obtaining an access token:

  1. Use the Web Server Flow where a Salesforce user in a traditional web app is asked to authorize a third party application which then allows the web server to obtain an access token.
  2. Use the User Agent Flow where a Salesforce user in a mobile or JavaScript web app is asked to authorize a third party application which then allows the client side application (native mobile, JavaScript, etc) to obtain an access token.
  3. Use the Username and Password Flow where stored credentials for a system-to-system integration user are exchanged for an access token.

The OAuth Web Server Flow can seem tricky to implement by hand because there are a number of necessary steps but luckily this flow is pretty standard so many libraries have done the hard work for us. In the case of a Java Spring application the hard work has all been done by the Spring Security OAuth library. And Spring Boot makes it super easy to setup a new application that has everything needed for the OAuth Web Server Flow.

Let’s walk through the different pieces of a simple web application which has the OAuth stuff, fetches Accounts from the Salesforce REST API, and renders them in a web page via JavaScript. (Note: You can skip straight to the completed project if you just want to deploy the app on Heroku or get it all running locally.)

First we need a build system to manage dependencies, run the app, and assemble the app for deployment. I’ve chosen Gradle so here is my build.gradle:

buildscript {
    repositories {
        maven {
            url 'https://plugins.gradle.org/m2/'
    dependencies {
        classpath 'org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE'
apply plugin: 'java'
apply plugin: 'org.springframework.boot'
repositories {
dependencies {
    compile 'org.springframework.boot:spring-boot-starter-web:1.4.2.RELEASE'
    compile 'org.springframework.boot:spring-boot-devtools:1.4.2.RELEASE'
    compile 'org.springframework.boot:spring-boot-starter-security:1.4.2.RELEASE'
    compile 'org.springframework.security.oauth:spring-security-oauth2:2.0.12.RELEASE'
    compile 'org.webjars:salesforce-lightning-design-system:2.1.4'
    compile 'org.webjars:jquery:3.1.1'

There you can see dependencies on the Spring Boot stuff, the Spring Security OAuth library and a few WebJars for the UI.

Now we need to set some properties for the Spring Security stuff. There are a few different ways to set these and you should make sure that your OAuth client id and secret don’t end up in SCM. At a minimum you will need these settings:

security.oauth2.client.client-authentication-scheme = form
security.oauth2.client.authentication-scheme = header
security.oauth2.client.grant-type = authorization_code
security.oauth2.client.access-token-uri = https://login.salesforce.com/services/oauth2/token
security.oauth2.client.user-authorization-uri = https://login.salesforce.com/services/oauth2/authorize
security.oauth2.resource.user-info-uri = https://login.salesforce.com/services/oauth2/userinfo
security.oauth2.client.client-id = YOUR_SALESFORCE_CONNECTED_APP_CLIENT_ID
security.oauth2.client.client-secret = YOUR_SALESFORCE_CONNECTED_APP_CLIENT_SECRET

I put all of the settings except the client id and secret in a src/main/resources/application.properties file. For the other settings you can use environment variables or config settings on Heroku. Spring Boot automatically will look for the client id and secret in environment variables named SECURITY_OAUTH2_CLIENT_CLIENT_ID and SECURITY_OAUTH2_CLIENT_CLIENT_SECRET.

Next we need a Spring component that will handle the REST communication with Salesforce including the query for Accounts and deserialization from JSON. Here is my src/main/java/com/example/Force.java code:

package com.example;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.security.oauth2.client.OAuth2ClientContext;
import org.springframework.security.oauth2.client.OAuth2RestTemplate;
import org.springframework.security.oauth2.client.resource.OAuth2ProtectedResourceDetails;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Force {
    private static final String REST_VERSION = "35.0";
    private OAuth2RestTemplate oAuth2RestTemplate(OAuth2ProtectedResourceDetails resource, OAuth2ClientContext context) {
        return new OAuth2RestTemplate(resource, context);
    private OAuth2RestTemplate restTemplate;
    private static String restUrl(OAuth2Authentication principal) {
        HashMap<String, Object> details = (HashMap<String, Object>) principal.getUserAuthentication().getDetails();
        HashMap<String, String> urls = (HashMap<String, String>) details.get("urls");
        return urls.get("rest").replace("{version}", REST_VERSION);
    public List<Account> accounts(OAuth2Authentication principal) {
        String url = restUrl(principal) + "query/?q={q}";
        Map<String, String> params = new HashMap<>();
        params.put("q", "SELECT Id, Name, Type, Industry, Rating FROM Account");
        return restTemplate.getForObject(url, QueryResultAccount.class, params).records;
    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class Account {
        public String Id;
        public String Name;
        public String Industry;
        public String Rating;
    @JsonIgnoreProperties(ignoreUnknown = true)
    private static class QueryResult<T> {
        public List<T> records;
    private static class QueryResultAccount extends QueryResult<Account> {}

There is some plumbing to setup the Spring REST Template which makes it easy to make the REST requests. The accounts method takes an OAuth2Authentication so that the URL can be determined. Finally there are some classes to represent the results from the query and the Account, which are created from the JSON data returned from the REST API.

For Spring Boot we need an application which will be run on the web server (or local machine for development). In this example the application will also contain our REST API implementation which will be used by a JavaScript UI. Here is the code for the src/main/java/com/example/SpringSalesforceApplication.java file:

package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.oauth2.client.EnableOAuth2Sso;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
public class SpringSalesforceApplication {
    private Force force;
    public List<Force.Account> accounts(OAuth2Authentication principal) {
        return force.accounts(principal);
    public static void main(String[] args) {
        SpringApplication.run(SpringSalesforceApplication.class, args);

This does all the magic to create a Spring Boot web application which uses the Spring Security OAuth stuff, the Force component, and a REST controller. There is a single REST controller method in this application that handles requests to /accounts, does the query to Salesforce using the Force component, deserializes the results, then reserializes them as JSON. In this little example we aren’t seeing why we’d want to proxy the Salesforce REST API in this way since we are not doing any aggregation or transformation of data. But that would be straightforward to do if needed.

The final piece of this application is a web UI that uses the Spring Boot app’s /accounts REST method to get and then display the accounts. In this case I’m using jQuery to do that. Here is the code for the src/main/resources/static/index.html file:

<!DOCTYPE html>
<html lang="en">
    <meta charset="utf-8" />
    <meta http-equiv="x-ua-compatible" content="ie=edge" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <title>Hello Spring Salesforce</title>
    <link rel="stylesheet" type="text/css" href="webjars/salesforce-lightning-design-system/2.1.4/assets/styles/salesforce-lightning-design-system.min.css"/>
    <script type="text/javascript" src="webjars/jquery/3.1.1/jquery.min.js"></script>
        $(function() {
          $.get("/accounts", function(data) {
            $.each(data, function(i, account) {
              var tr = $("<tr>");
    <header class="slds-global-header_container">
        <div class="slds-global-header slds-grid slds-grid--align-spread">
            <div class="slds-global-header__item">
                <div class="slds-text-heading--large">Hello Spring Salesforce</div>
    <div class="slds-container--center slds-container--medium" style="margin-top: 60px;">
        <div class="slds-text-heading--medium">Salesforce Accounts</div>
        <table class="slds-table slds-table--bordered slds-table--cell-buffer">
                <tr class="slds-text-title--caps">
                    <th scope="col">
                        <div class="slds-truncate" title="Id">Id</div>
                    <th scope="col">
                        <div class="slds-truncate" title="Id">Name</div>
                    <th scope="col">
                        <div class="slds-truncate" title="Id">Industry</div>
                    <th scope="col">
                        <div class="slds-truncate" title="Id">Rating</div>

With this application running locally when I visit http://localhost:8080 in my browser, Spring Security notices that I don’t yet have an access token so it walks through the OAuth Web Server Flow with Salesforce and once complete renders the index.html page. The JavaScript on that page makes the REST request to /accounts which does the query to the Salesforce REST APIs and then returns the JSON back to the browser. Finally, the data is rendered in the table on the web page. Here is what it looks like:

In total there were only five pretty small files needed to have an end-to-end Salesforce REST API integration with a web application. Hopefully that gets you started! For full instructions on Heroku deployment of this application or to get the app running locally, check out the complete project on GitHub.

Let me know how it goes!

Quick & Easy ETL from Salesforce to MySQL with Workflow & Heroku

While sometimes unfortunate it is often necessary to have data silos that share data. The Extract, Transform, and Load (ETL) pattern has been around for a long time to address this need and there are tons of solutions out there. If you just need a quick and easy way to copy new & updated records in Salesforce to an external data source, a simple Heroku app and Salesforce Workflow might be the quickest and easiest solution. I’ve put together a sample Node.js application for this: https://github.com/jamesward/salesforce-etl-mysql

Check out a demo:

This sample app uses a Salesforce Workflow that sends created and updated Contact records to an app on Heroku, which inserts or updates those records in a MySQL database. A simple transform JavaScript function makes it easy to customize this app for your own use case. To setup this app for your own uses, check out the instructions in the GitHub repo: https://github.com/jamesward/salesforce-etl-mysql

Let me know how it goes!

Scalable Continuous Delivery Pipelines

Back when I first started building web apps we’d just “do it in production” by vi’ing Perl & PHP files on the server. This was fine because the risks and expectations were low. No big deal if I broke the app for a few hours. Good thing I made an app.php-bak copy!

As software became more critical to businesses, the risks of making changes to production systems increased. To cope with these risks we slowed down delivery through processes. Today many enterprises are so bogged down by risk aversion that they may only deploy to production once a year or less. The rate of change in businesses and software continues to increase and the expectations are even higher. Downtime is not an option but that change also needs to go out now!

We need a way to reduce risk and increase the rate of delivery. The two seem to be opposing but Continuous Delivery provides a way to deliver more often AND reduce risk.

As the name implies, the idea of Continuous Delivery is to continuously deliver changes. There are many ways to do that but the process should be scalable depending on the acceptable amount of risk. For some apps a little potential downtime is worth the tradeoff of essentially being able to “do it in production” for some changes.

Continuous Delivery can be thought of as a pipeline… There is an input / source that needs to be moved and transformed in a variety of ways until it reaches it’s output, the production system. By adding or removing the steps in between we can adjust the risk profile.

In order to have a Continuous Delivery Pipeline there are a few things necessary to facilitate any size process:

  • A Source Control System (SCM) that enables developers to collaborate but also enables a direct correlation between a point-in-time in the source and a deployment of that source.
  • Single directional flow from source to deployment. No more “do it in production” because that breaks the source to deployment correlation.
  • A repeatable and pure method of transforming source into something that can run. Repeatability is usually accomplished by using a build tool. However, often builds specify dependencies with version ranges, sacrificing the pureness. Don’t do that.

With that infrastructure the simplest form of Continuous Delivery can be achieved. Systems like Heroku provide automated tooling for this kind of pipeline. On Heroku you can either kick off deployment by pushing a git repo directly to Heroku, which then runs the build, then stores and deploys the generated artifacts. This is the infamous git push heroku master. A newer method (which I prefer) is instead to push the changes to GitHub and have Heroku then auto-deploy those changes. Here is a demo of that:

For apps that can tolerate more risk this simple pipeline is great! But many apps need a process that better supports collaboration and risk mitigation. The next layer that may be added to the pipeline is a Continuous Integration system that runs the build and tests. If there is a failure then the changes should not be deployed. Here is an demo of doing CI-verified auto-deployment on Heroku:

Reducing risk further we could add a staging system that auto-deploys the changes (with or without CI validation depending on your needs). After manual validation the changes could then be manually promoted to production.

Taking things a step further we can hook into GitHub’s Pull Request process to deploy and validate changes before they are merged into a production branch (e.g. master). In Heroku this is called “Review Apps” in which Heroku automatically creates a fresh environment with the changes for every Pull Request. This enables actual app testing as part of the review cycle.

This full pipeline with Pull Request apps / Review Apps, CI validation, staging auto-deployment, and manual production promotion significantly reduces the risk of doing frequent deployments. Many organizations that use this kind of process are able to do hundreds of deployments every day! This also helps disperse risk over many deployments instead of accumulating risk for those big once-a-year deploys. Check out a demo of this entire flow on Heroku:

Sometimes a feature may not be ready for end user testing or launch but that shouldn’t prevent you from actually deploying the feature! For this you can use “Feature Flags” a.k.a. Feature Toggles.

Another technique that can be useful to reduce risk with Continuous Delivery is “Canary Deploys” where only a portion of users run on a new version of the app. Once a period of time validates the new version is “safe” for everyone, it can be rolled out to the rest of the users.

Of course Continuous Delivery isn’t a silver bullet as there are always tradeoffs. One of the challenges with Continuous Delivery is with database schemas. For instance, what if you were to do a schema migration as part of a Canary Deploy? With two versions of the app running simultaneously you may break one version with the schema changes from another. NoSQL / Schema-less databases are one way to address the issue. Another option is to decouple code deployments from schema migrations, utilizing testing / staging environments to validate the schema changes.

Implementing Continuous Delivery with large and complex systems can be pretty tough. But this is one of those things that if you don’t figure out how do, it won’t matter cause your business will likely fade as it is overtaken by startups that deliver software as the business needs it. If you need some more practical advice on how to get there check out my Comparing Application Deployment: 2005 vs. 2015 blog post. Let me know how it goes.

Reactive Web Request Batching with Scala and Play Framework

At first glance it seems silly to do batching in the reactive world. When I first started with reactive programming I thought I wouldn’t have to worry about things like resource starvation. After all, the reactive magic bullet was *magical*! But my magic bullet fizzled when it hit downstream resource constraints causing me to need batching.

With a reactive web client library like Play Framework’s, I can easily spin up tens of thousands of web requests, in parallel, using very little resources. But what if that saturates the server I’m making requests to? In an ideal world I could get backpressure but most web endpoints don’t provide a way to do that. So we just have to be nicer to the server and batch the requests. (Sidenote: How do you know how nice you should be to the service, e.g. batch size?)

So in some scenarios you might just need to batch a bunch of web requests. Let’s say you need to make 100 requests and you’ve figured out that you should only do 10 at a time. Then once all 100 have completed you need to do some aggregation of the results. Of course you could block until each batch is complete and mutate a shared value that accumulates the results. But we can do better than that! You can still use immutable data and non-blocking requests with batching.

In Play I have a /echo?num=1 request handler that just returns the number passed into it:

def echo(num: Int) = Action {

Now in another async controller I want to send 10,000 requests to the echo controller but batched 256 at a time, then return the aggregated results. First I need a function that will handle the batching:

private def processBatch(results: Future[Seq[String]], batch: Seq[String]): Future[Seq[String]] = {
  // when the results for the previous batches have been completed, start a new batch
  results.flatMap { responses =>
    // create the web requests for this batch
    val batchFutures: Seq[Future[String]] = batch.map(ws.url(_).get().map(_.body))
    // sequence the futures for this batch into a singe future
    val batchFuture: Future[Seq[String]] = Future.sequence(batchFutures)
    // when this batch is complete, append the responses to the existing responses
    batchFuture.map { batchResponses =>
      Logger.info("Finished a batch")
      responses ++ batchResponses

This processBatch function takes a Future that holds the previously accumulated results which are a sequence of Strings. It also takes a batch of urls and returns a Future that holds the previously accumulated results and the results from the batch. When the results that were passed in have all been completed, the batch is processed by creating a web request for each URL. Each request is transformed to just the String from the response body. All of the requests in the batch are then transformed into a single Future that completes when all of the requests in the batch have been completed. Once that future completes the results of the batch are combined with the previously completed results.

This could also be written using Scala’s fancy for comprehension:

for {
  responses <- results
  batchFuture = Future.sequence(batch.map(ws.url(_).get().map(_.body)))
  batchResponses <- batchFuture
} yield {
  Logger.info("Finished a batch")
  responses ++ batchResponses

Here is an async controller that uses the processBatch function:

def index = Action.async { implicit request =>
  // ints 1 to 10000
  val numbers: Seq[Int] = 1 to 10000
  // turn each number into a url
  val urls: Seq[String] = numbers.map(routes.Application.echo(_).absoluteURL())
  // split into groups of 256
  val batches: Iterator[Seq[String]] = urls.grouped(256)
  // futures for all of the response strings
  val futureResponses = batches.foldLeft(Future.successful(Seq.empty[String]))(processBatch)
  // render the list of responses when they are all complete
  futureResponses.map { responses =>

A sequence of 10,000 URLs /echo?num=1 to /echo?num=10000 are assembled. That sequence is then partitioned into groups of 256. Then the reactive batching magic… Take the batches and do a foldLeft starting with an empty sequence of String, calling the processBatch function for each batch, accumulating the results. Once the future returned from the foldLeft completes the results are turned into a String and returned in an the Ok response.

There you have reactive batching of web requests! Check out the full source.

Machine Learning on Heroku with PredictionIO

Last week at the TrailheaDX Salesforce Dev Conference we launched the DreamHouse sample application to showcase the Salesforce App Cloud and numerous possible integrations. I built an integration with the open source PredictionIO Machine Learning framework. The use case for ML in DreamHouse is a real estate recommendation engine that learns based on users with similar favorites. Check out a demo and get the source.

For the DreamHouse PredictionIO integration to work I needed to get the PredictionIO service running on Heroku. Since it is a Scala app everything worked great! Here are the steps to get PredictionIO up and running on Heroku.

First you will need a PredictionIO event server and app defined in the event server:

  1. Deploy:
  2. Create an app:

    heroku run -a <APP NAME> console app new <A PIO APP NAME>
  3. List apps:

    heroku run -a <APP NAME> console app list

Check out the source and local dev instructions for the event server.

Now that you have an event server and app, load some event data:

export URL=http://<YOUR HEROKU APP NAME>.herokuapp.com
for i in {1..5}; do curl -i -X POST $URL/events.json?accessKey=$ACCESS_KEY -H "Content-Type: application/json" -d "{ \"event\" : \"\$set\", \"entityType\" : \"user\", \"entityId\" : \"u$i\" }"; done
for i in {1..50}; do curl -i -X POST $URL/events.json?accessKey=$ACCESS_KEY -H "Content-Type: application/json" -d "{ \"event\" : \"\$set\", \"entityType\" : \"item\", \"entityId\" : \"i$i\", \"properties\" : { \"categories\" : [\"c1\", \"c2\"] } }"; done
for j in {1..20}; do for i in {1..5}; do curl -i -X POST $URL/events.json?accessKey=$ACCESS_KEY -H "Content-Type: application/json" -d "{ \"event\" : \"view\", \"entityType\" : \"user\", \"entityId\" : \"u$i\",  \"targetEntityType\" : \"item\", \"targetEntityId\" : \"i$(( ( RANDOM % 50 )  + 1 ))\" }"; done; done

Check out the demo data:

http://<YOUR HEROKU APP NAME>.herokuapp.com/events.json?accessKey=<YOUR APP ACCESS KEY>&limit=-1

Now you need an engine that will learn from a set of training data and then be able to make predictions. With PredictionIO you can use any algorithm you want but often SparkML is a great choice. For this simple example I’m just using single-node Spark and Postgres but the underlying data source and ML engine can be anything.

This example is based on PredictionIO’s Recommendation Template so it uses SparkML’s Alternating Least Squares (ALS) algorithm. To deploy it on Heroku follow these steps:

  1. Deploy:
  2. Remove the auto-added Heroku Postgres addon:

    heroku addons:destroy heroku-postgresql
  3. Attach your PredictionIO Event Server’s Postgres:

    heroku addons:attach <YOUR-ADDON-ID> -a <YOUR HEROKU APP NAME>

    Note: You can find out <YOUR-ADDON-ID> by running:

    heroku addons -a <YOUR EVENT SERVER HEROKU APP NAME>

  4. Train the app:

    heroku run -a <YOUR HEROKU APP NAME> train
  5. Restart the app to load the new training data:

    heroku restart -a <YOUR HEROKU APP NAME>
  6. Check the status of your engine:

    http://<YOUR HEROKU APP NAME>.herokuapp.com

Now you can check out the recommendations for an item (must be an item that has events):

curl -H "Content-Type: application/json" -d '{ "items": ["i11"], "num": 4 }' -k http://<YOUR HEROKU APP NAME>.herokuapp.com/queries.json

Check out the source and local dev instructions for this example engine.

Let me know if you have any questions or problems. Happy ML’ing!

Combining Reactive Streams, Heroku Kafka, and Play Framework

Heroku recently announced early access to the new Heroku Kafka service and while I’ve heard great things about Apache Kafka I hadn’t played with it because I’m too lazy to set that kind of stuff up on my own. Now that I can setup a Kafka cluster just by provisioning a Heroku Addon I figured it was time to give it a try.

If you aren’t familiar with Kafka it is kinda a next generation messaging system. It uses pub-sub, scales horizontally, and has built-in message durability and delivery guarantees. Originally Kafka was built at LinkedIn but is now being used by pretty much every progressive enterprise that needs to move massive amounts of data through transformation pipelines.

While learning Kafka I wanted to build something really simple: an event producer that just sends random numbers to a Kafka topic and a event consumer that receives those random numbers and sends them to a browser via a WebSocket. I decided to use Play Framework and the Akka Streams implementation of Reactive Streams.

In Reactive Streams there is the pretty standard “Source” and “Sink” where an event producer is a Source and a consumer is a Sink. A “Flow” is a pairing between a Source and a Sink with an optional transformation. In my example there are two apps, each with a Flow. A worker process will send random numbers to Kafka so its Source will be periodically generated random numbers and its Sink will be Kafka. In the web process the Source is Kafka and the Sink is a WebSocket that will push the random numbers to the browser.

Here is the worker app with some necessary config omitted (check out the full source):

object RandomNumbers extends App {
  val tickSource = Source.tick(Duration.Zero, 500.milliseconds, Unit).map(_ => Random.nextInt().toString)
  kafka.sink("RandomNumbers").map { kafkaSink =>
      .map(new ProducerRecord[String, String]("RandomNumbers", _))

The tickSource is a Source that generates a new random Int every 500 milliseconds. That Source is connected to a Kafka Sink with a Flow that transforms an Int into a ProducerRecord (for Kafka). This uses the Reactive Kafka library which is a Reactive Streams API for working with Kafka.

On the web app side, Play Framework has builtin support for using Reactive Streams with WebSockets so all we need is a controller method that creates a Source from a Kafka topic and hooks that to a WebSocket Flow (full source):

def ws = WebSocket.acceptOrResult[Any, String] { _ =>
  kafka.source(Set("RandomNumbers")) match {
    case Failure(e) =>
      Future.successful(Left(InternalServerError("Could not connect to Kafka")))
    case Success(source) =>
      val flow = Flow.fromSinkAndSource(Sink.ignore, source.map(_.value))

Notice that the Flow has a Sink.ignore which just says to ignore any incoming messages on the WebSocket (those sent from the browser). Play takes care of all the underlying stuff and then whenever Kafka gets a message on the “RandomNumbers” topic, it will be sent out via the WebSocket.

And it all works!

Check out the full source for instructions on how to get this example setup on your machine and on Heroku. Let me know how it goes!