Kotan Code 枯淡コード

In search of simple, elegant code

Menu Close

Tag: playframework

Accessing Neo4j from Play Framework 2.1 with Web Service Futures

As you may have noticed, I’ve recently been doing a little bit of talking about Neo4j and graph databases. A database on its own is a fantastic thing, but it doesn’t do anybody much good if the information contained within stays there, or if you can’t put information into it from some application. In my own proof of concept, I wanted to see how easy it would be to access Neo4j from my favorite web application framework, Play.

Turns out it was a little annoying because I started looking for frameworks to talk to Neo4j. First I looked at AnormCypher, which is a library that provides an “anorm-like” API access to Cypher. I couldn’t use this one because it failed with multiple runtime errors when I attempted to bring it into my Play application. Next, I tried another Cypher wrapper that I found on github and that one failed miserably, too – it had a conflict between the version of a JSON parser it was using versus the one that my Play application was using.

Then I figured, why even bother? It’s just a REST API. So, I decided to try accessing Neo4j using nothing but Play framework’s own Web services API, which is basically the WS object. This turned out to be stupidly easy and, when I figured out future chaining in combination with that WS API, amazing stuff started to happen. You know, like, productivity.

First, I created a teeny little wrapper around the Neo4j REST API URL, I just called it NeoService:

class NeoService(rootUrl: String) {
    def this() = this("http://default/neo/URL/location/db/data")

    val stdHeaders = Seq( ("Accept", "application/json"), ("Content-Type", "application/json") )

    def executeCypher(query: String, params: JsObject) : Future[Response] = {
        WS.url(rootUrl + "/cypher".withHeaders(stdHeaders:_*).post(Json.obj(
            "query" -> query,
            "params" -> params
            ))
       )
    }

    // Convert a Future[Response] into a Future[Int]!
    def findNodeIdByKindAndName(kind:String, name:String) : Future[Option[Int]] = {
        val cypher = """
          START n=node:node_auto_index(kind={theKind})
          WHERE n.name = {theName}
          RETURN id(n) as id
       """.stripMargin
        val params = Json.obj( "theName" -> name, "theKind" -> kind )
        for (r <- executeCyper(cypher, params)) yield {
            val theData = (r.json \ "data").as[JsArray]
            if (theData.value.size == 0)
               None
            else
               // Json: "data" : [ [ ### ] ]
               Some(theData.value(0).as[JsArray].value(0).as[Int])
        }
    }
}

So, in this little tiny class I’ve got a function that I can use to execute Cypher queries and because I know that I have an auto-index’d node property called kind and I have another property called name, I can attempt to find a node’s ID based on the kind and name properties using a Cypher query. Instead of finding them synchronously, I can just convert my Future[Response]  into a Future[Option[Int]] where the Option[Int] is the result of looking through the Neo4j database for that data. I’ve just converted a future with a future, and being able to do so is pretty freaking awesome.

Not only can I do that, but I can chain these method calls from other code, like this:

val neo = new utils.NeoService()
for (nodeId <- neo.findNodeIdByKindAndName("zombie", "bob"))
    for (zombieId <- otherObject.tweakZombie(zombieId))
        yield {
           zombieId.map { id => println("I got a zombie!") }.getOrElse { println("Zombie doesn't exist!") }
        }

In the code here, tweakZombie can be written to be aware of the Option[] so that if it’s empty, it doesn’t tweak anything, allowing me to chain call after call after call and not worry about slapping a crapload of if statements in there – a judicious use of map, and for, and options and futures gives me a ridiculous amount of power.

All of this is made possible by the fact that the Play Framework Web Services API is based on Futures. I was originally skeptical of futures because the old code I had seen before was more confusing than single-threaded, synchronous programming. With the new Futures syntax and well-coded libraries like the Play WS object, you can do ridiculous things in a very small number of lines of code.

Using JSON Inception in the Play Framework with Scala 2.10

The other day I was lamenting the fact that every time I made a tiny little change to the case classes that I use for reading/writing Ajax requests from the JavaScript client code for my web application, I have to go and manually modify my JSON combinators that convert between the Scala case classes and the Json representations.

There are some undocumented (Play Framework’s own website doesn’t make any mention of this, even on the page for Json combinators!) Scala 2.10 macros that actually allow for auto-generation of this conversion code … I wish I had coined the term myself, but someone else appropriately refers to this activity as JSON inception.

The basic idea behind Play’s JSON combinators is that they let you use a natural, fluid syntax to convert to/from JSON. For example, you might have the following implicit writes that lets you “jsonify” a zombie sighting case class:

implicit val zombieSightingWrites = (
    ( __ \ "name").write[String] and
    ( __ \ "timestamp").write[Int] and
    ( __ \ "location").write[GpsCoordinate]
)(unlift(ZombieSighting.unapply))
implicit val gpsCoordinateWrites = (
    ( __ \ "long").write[Double] and
    ( __ \ "lat").write[Double] and
    ( __ \ "altitude").write[Double]
)(unlift(GpsCoordinate.unapply))

It doesn’t look like all that much code to maintain, but let’s say my application deals in about 20 different kinds of individual case classes that can be sent or received from Ajax/web service calls. Certainly in the middle of development, making changes to this is going to be annoying and while doing it, I couldn’t shake the feeling that this could be cleaner, more elegant, more kotan.

The first thing I did was wrap all my implicit reads and writes up into a single Scala object so I could just do an import JsonReadsWrites._ and then all my Json conversion code is in a single place. That felt a little better, but I still thought it could be easier. The above sample is overly simplistic, my real case classes are filled with values of type Option[T] and dealing with those manually in the unapply/apply combinators you normally write for Play makes maintenance even more tedious.

Enter Scala 2.10 macros…

As of Scala 2.10, macros are now fully supported. A macro is basically a pre-compile code generation pass. If you flag a method as a macro method, then it will be executed at compile time and then the return value of your method is an AST (abstract syntax tree). So, what Play Framework has are macro methods called writes and reads. These methods are executed at compile time and they replace the writes and reads code that you see in the IDE with a syntax tree that constructs your Json combinators for case class conversion for you automatially.

To be honest, when I first looked at how this is done, it looked like it was some form of black magic, or that there was no way it would be possible without the use of some magic fairy dust. I read and re-read the documentation on Scala macros and after a while, it started to sink in. Reflection is available to the code you write in your macro, so, at compile time, your code can introspect the types of information passed to your macro via generics, and can then use that information to figure out how to construct a Json reader or writer.

So now, the code I wrote above can be re-written as:

implicit val zombieSightingReads = Json.reads[ZombieSighting]
implicit val gpsCoordinateReads = Json.reads[GpsCoordinate]

Now I can make changes to the case classes and the macro-generated code will automatically compensate for those changes, and it handles arrays, nested case classes (which Salat doesn’t even do for case class conversion for MongoDB…).

I would be hard pressed to find a better use of Scala macros than this.

The Futures Have Arrived

As you may know, I’ve been working on building a fairly complex enterprise LOB application using Play, Scala, and Akka. When I started, I had a rudimentary understanding of Akka and Play, and I’d done some playing around with Akka before, but my experience was a far cry from the type of experience a grizzled, production-deployed veteran might have.

My website, without giving away any details, allows me to do a keyword search across multiple different types of entities. Let’s say I’ve built a zombie apocalypse preparedness social networking site (who doesn’t need one of those?!?) and I want to be able to enter a keyword and have it look through the repository containing identified zombie classifications as well as possible friends and even the names of weapons.

I built this in a way that I thought was pretty decent, I created a Search actor that takes a KeywordSearch case class message. This Search actor then asks the Akka actor system for references to the different repositories, which are also actors. In this case, I might need a reference to the ZombieIdentificationRepository actor, the WeaponRepository actor, and the UsersRepository actor.

This problem is screaming for parallelism. I want the search actor to send fire-and-sorta-forget messages to the different repositories and, when it gets answers from all three repositories, send a message back to the actor that invoked the search containing all of the results. Further, I want the web request itself to not block on waiting for the results, so I want to use Play’s asynchronous controller pattern.

When I first built the search, I had code that looked kind of like this:

def index(keyWord:String) = Action { implicit request =>
 ...
 val resFuture = ActorProvider.Searcher ? SearchActorProtocol.KeywordSearch(keyword)
 val results = Await.result(resFuture, timeout.duration).asInstanceOf[Array[SearchResult]]
 Ok(views.html.search.index(results)
}

So, while under the hood I was using an Actor, I’m still performing an explicit block. Worse, the search actor was also performing an explicit block as it was doing an Await.result for each of the different repository queries. I had heard that Future composition was possible, so I thought I’d give it a shot.

Here’s how I refactored the Search actor’s search method to combine multiple searches performed in parallel:

def performKeywordSearch(seeker: ActorRef, keyword: String) {
    implicit val timeout = Timeout(3 seconds)

    val zombieFuture = ActorProvider.ZombieRepository ? KeywordSearch(keyword)
    val weaponFuture = ActorProvider.WeaponRepository ? KeywordSearch(keyword)
    val userFuture = ActorProvider.UserRepository ? KeywordSearch(keyword)

    val combFuture = for {
        z <- zombieFuture.mapTo[Array[SearchResult]]
        w <- weaponFuture.mapTo[Array[SearchResult]]
        u <- userFuture.mapTo[Array[SearchResult]]
    } yield seeker ! ( z ++ w ++ u).sortBy( r => r.name )
}

Note that there isn’t a single line of blocking code in the preceding sample. Everything happens when it’s done and there’s no explicit “sit and wait” code. I can even then modify the search controller’s method to remove the Await.result:

...
val searchFuture = ActorProvider.Searcher ? SearchActorProtocol.KeywordSearch(keyword)
val timeoutFuture = play.api.libs.concurrent.Promise.timeout("Failed to finish search", 5 seconds)
Async {
    Future.firstCompletedOf(Seq(searchFuture, timeoutFuture)).map {
        case searchResults: Array[SearchResult] =>
            render {
                case Accepts.Html() => Ok(views.html.search.index(searchResults))
                case Accepts.Json() => Ok(Json.toJson(searchResults))
            }
        case t:String => InternalServerError(t)
    }
}

And now I’ve removed a chain of blocking calls and replaced all of it with completely asynchronous, non-blocking code and now the only thing that ever sits and waits is Play itself, as it awaits the response from the Actor.

Coding with Futures and in a completely asynchronous fashion with Actors is difficult, and it requires a lot of up front effort and takes time to understand what’s really going on, but, as you can see from the code above, the clean and simple non-blocking elegance that you get as a reward is well worth the effort.

Implementing an Asynchronous Repository with Akka Actors

I’ve been fortunate enough to be able to work on an enterprise LOB application that uses the Typesafe stack – Play Framework using Scala and Akka. One of the things that I’ve noticed with application development in Scala is that every morning when I take a look at my code I scan it for opportunities to refactor. I want to know how I can reduce the number of lines of code, which reduces the number of ways it can fail, I want to know how I can make my code cleaner, more elegant, faster, and more scalable.

I was sifting through my code last week and I noticed this little nugget:

ZombieRepository.findByName( zombie.name ).map { ... some stuff ... }

My first thought was that this code looked really old fashioned. Making synchronous method calls like that is just so last year. I immediately saw an opportunity to add a point of scalability and durability by putting my zombie repository (it’s not actually zombies, I’m just using that as an example…) behind a nice abstraction point as well as allow for all kinds of things like load balancing of requests, distributed calls, etc. I decided to implement a repository as an Actor.

The first thing I did, which is part of one of my favorite practices of contract-first design, was build the Actor Protocol.

object ZombieRepositoryActorProtocol {
    case class Insert(zombie: Zombie)
    case class Update(zombie: Zombie)
    case class Delete(zombieId: String)
    case class QueryByName(zombieName: String)
}

At the moment I don’t need to create case classes for the replies (but I could easily do this) because these messages will either be one-way messages or the replies will be things like Vector[Zombie] or Option[Zombie].

In situations where I want to unit test things that rely upon this repository, I need to be able to give the repository a reference to a different actor than the default one we typically reply to (sender), so I can modify the protocol as follows to allow references to the Akka test actor to be passed along and, if it isn’t passed along, then we just reply to sender:

object ZombieRepositoryActorProtocol {
    case class Insert(zombie: Zombie)
    case class Update(zombie: Zombie)
    case class Delete(zombieId: String)
    case class QueryByName(zombieName: String, requester: Option[ActorRef])
}

I could also make the protocol a little more complicated and reply to some of the other operations with status codes or something to allow clients to confirm that an operation completed, but I am keeping it simple for now.

In my own code, the backing store is MongoDB but, another benefit of this type of encapsulation around the repository is that the only thing anybody else knows is how to send messages to the repo which take case classes as arguments. Not only is this a great decoupling away from a persistence medium, but it also allows for all the benefits you get from running Akka actors – you can put the repository behind a round-robin or load-balancing router, you can distribute the repository actors across a grid, and, my personal favorite, you get all the benefit of supervisory control so if your repository fails and crashes, another one can be started up, etc.

So now I can just handle messages like this:

import ZombieRepositoryActorProtocol._

def receive = {
    case Insert(zombie) => ...
    case Update(zombie) => ...
    case Delete(zombieName) => ...
    case QueryByName(zombie, requester) => requester.map { _ }.getOrElse { sender } ! FetchByName(zombie)  
}

I honestly don’t know how I got any large-scale, asynchronous work done before without the use of Akka.

Building Play Framework Applications behind a Corporate Firewall

When you’re sitting at home using your Mac or your Windows computer and you’re the master of your domain, everything is good. You control your firewall rules, you control which ports are blocked and if you want to access publicly available resources then you can do so with impunity. The world is your oyster.

Until you get into the office.

Then you notice that you have blocked ports, you can’t reach github because it’s classified as “social networking”, you are required to have a username and a password just to get traffic through the corporate proxy to get to the outside world. And even then, after you’ve managed to get outside, you notice that your organization has blocked access to the central Maven Nexus repository – the source for pretty much any framework, library, or tool that can be packaged as a JAR. Including your application dependencies.

I had this problem and it was causing me no end of grief. So I thought I’ll just add a reference to my company’s private Nexus repository and all will be just fine. This seemed like it should work, and it seemed to fix the vast majority of the problems I found on Stack Overflow:

resolvers += "Corporate Nexus" at "http://corpserver.corp.com/nexus/content/groups/public"

This actually didn’t work. The problem? Play (or SBT underneath Play, actually) stopped trying to resolve dependencies after it failed to connect to public. So what I really needed to do was to convince Play (and SBT) to never even try to go to the public Nexus. To do that, I added the following code to my Build.scala:

externalResolvers <= resolvers.map { rs =>
   Resolver.withDefaultResolvers(rs, mavenCentral = false)

Now my code works just fine and I can declare dependencies on internal JARs (stuff other teams within my company have deployed, as well as my own internal products) and I can declare dependencies on public stuff and I don’t have to worry about Play/SBT failing the attempt after they get error messages from Maven central, because the resolution process now bypasses Maven central entirely.

I realize this may be a small piece of information, but it took me hours and hours of searching to try and find this and finally one of the many ridiculously helpful people at Typesafe linked me to some information on sbt that finally held the answer.

Hopefully this little tidbit helps someone else who is coding behind a corporate firewall and can’t figure out why dependency resolution is failing so miserably.

Using WebSockets with Play Framework 2.1.0

As odd as this may seem to some of you, I have been creating an enterprise web application using the Play Framework and the full typesafe stack, including Scala and Akka. It doesn’t surprise me, however, as the experience I have been having with this stack is quite possibly one of the most productive experiences I’ve had with any web dev stack, open source or commercial.

To get right to the chase – this web application allows multiple users to edit the same entity (which I am calling documents for now) at the same time. I’ve already got the optimistic concurrency thing taken care of (e.g. you can’t commit your changes over top of someone else’s changes). What I wanted to make things even cooler was to push a “toast” (a fade-in-fade-out notification dialog) to the user if they are currently looking at a page that was just modified by someone else.

The solution? WebSockets. I decided I would create a news feed of edits and then each page can decide if it wants to use that socket to be made aware of notifications. The JavaScript on that page decides if the notification is relevant (e.g. the edit news item is for that current document). To do this, I adapted the Play Framework sample websockets-chat to serve as a document edit news feed. Users join the feed when the page decides it needs that feed (for push or pull or both), and they quit when the page goes away (if you navigate away from a page that had a JavaScript-initiated websocket connection, the connection dies and the chatroom-based code considers that a Quit).

So, here’s the code for my WebSocket controller, which will control all of the feeds for my site:

object WebSockets extends Controller {
    def editFeed(user:String) = WebSocket.async[JsValue] { request =>
        ChangeFeed.join(user)
    }
}

And here’s the code for the ChangeFeed actor class and it’s companion object. This code should look ridiculously familiar, since it’s basically the chatroom example. All I’ve really done is replace the Talk case class with my PostUpdate case class, which contains information about the type of document being updated and the document name. This information is enough for the individual pages to decide whether they need to display these push notifications.

// imports cut for brevity..
object ChangeFeed {
   implicit val timeout = Timeout(1 second)

   lazy val default = {
     val feedActor = Akka.system.actorOf(Props[ChangeFeed])
     // -- this is where the chatroom robot used to get initialized...
     feedActor
   }

   def join(username:String): scala.concurrent.Future[Iteratee[JsValue,_],Enumerator[JsValue])] = {
   ( default ? Join(username)).map {
     case Connected(enumerator) =>
       val iteratee = Iteratee.foreach[JsValue] { event =>
          default ! PostUpdate(username, (event \ "docType").as[String], (event \ "docName").as[String])
       }.mapDone { _ =>
          default ! Quit(username) // this quits the channel when the web page goes away
       }
       (iteratee, enumerator)
    case CannotConnect(error) => // .. handle error (snipped for brevity)
   }
   }
}

// The chatroom only allows one username per socket, on a website,
// my users could have multiple tabs open to the same site, so I let them
// open new sockets even if the username is the same.
class ChangeFeed extends Actor {
    val members = Set.empty[String]
    val (feedEnumerator, feedChannel) = Concurrent.broadcast[JsValue]

   def receive = {
     case Join(username) => {
         // yes, I know there's a more idiomatic scala way to do this
         if (!members.contains(username)) {
             members = members + username
         }
         sender ! Connected(feedEnumerator)
         self ! NotifyJoin(username)
      }
     case NotifyJoin(username) => {
        notifyAll("join", username, "", "")
     }
     case PostUpdate(username, docType, docName) => {
        notifyAll("update", username, docType, docName)
     }
     case Quit(username) => {
        members = members - username
        notifyAll("quit", username, "", "")
     }
   }

   def notifyAll(kind: String, user:String, docType: String, docName: String) {
     val msg = JsObject(
         Seq(
            "kind" -> JsString(kind),
            "user" -> JsString(user),
            "docType" -> JsString(docType),
            "docName" -> JsString(docName),
            "members" -> JsArray(members.toList.map(JsString))
         )
     )
   }
}

// case classes snipped for brevity

Once I’ve got a controller that can ask an Actor for a socket and dispense it accordingly, then the only thing left for me to do is create some client-side JavaScript that accesses my web socket and there are plenty of examples of doing that on the web. The one piece of this that makes things even nicer in Play is that I can automatically get the websocket URL from a controller route:

var WS = window['MozWebSocket'] ? MozWebSocket : WebSocket
var feedSocket = new WS(@"routes.WebSockets.editFeed.webSocketURL()")

This automatically gives me a URL that looks like ws://server/websockets/editFeed.

All I can say is that so far, using Play to build an enterprise web application has been an absolute joy and, having been able to create this websocket-based “edit feed” infrastructure in just a few hours was absolutely amazing.