Kotan Code 枯淡コード

In search of simple, elegant code

Menu Close

Tag: scala (page 1 of 4)

Consuming a REST (micro) Service with Akka HTTP

In a recent blog post, I talked about how we could quickly and easily pull in all the bootstrapping necessary to fire up an HTTP server and create an Akka HTTP micro service. In this blog post, I’m going to walk you through using the same Akka HTTP library to consume a service.

First, let’s set up a flow that starts with an HttpRequest and finishes with an HttpResponse:

lazy val zombieConnectionFlow: Flow[HttpRequest, HttpResponse, Any] =
    Http().outgoingConnection("localhost", 9001)

This sets up a connection flow from the client (which can be a full app, or in many cases, another micro service) pointing at http://localhost:9001. Note that you can add many more options to the Http() builder syntax to set up things like authentication, SSL, etc.

Once we have a flow, we need a way to convert requests into responses:

def zombieRequest(request:HttpRequest): Future[HttpResponse] = 
    Source.single(request).via(zombieConnectionFlow).runWith(Sink.head)

This function takes an HttpRequest and uses the Akka HTTP scala DSL to instruct Akka HTTP in how to complete that request. In our case, we create a single request and transmit it via our zombieConnectionFlow.

Now that we’ve got the plumbing (a flow and a request handler) set up, we can create a simple function that will consume our zombie micro service, fetching a single zombie by ID:

def fetchZombieInfo(id: String) : Future[Either[String, Zombie]] = {
 zombieRequest(RequestBuilding.Get(s"/zombies/$id")).flatMap { response =>
   response.status match {
     case OK => Unmarshal(response.entity).to[Zombie].map(Right(_))
     case BadRequest => Future.successful(Left(s"bad request"))
     case _ => Unmarshal(response.entity).to[String].flatMap { entity => 
       val error = s"FAIL - ${response.status}"
       Future.failed(new IOException(error))
     }
   }
 }
}

There are a couple of really important things to note here. The first is that when invoking my zombieRequest method, I am using just the REST API spec – there’s no URL used here – that was abstracted earlier as part of the flow.

The potential here is enormous. With Akka HTTP, we no longer have to string together a bunch of repetitive, imperative-looking statements to manifest a client that consumes another service. Instead, we can declare our intended flow, define a bunch of requests that execute over that flow, and then pattern match the results of invoking those requests.

Finally, with the fetchZombieInfo method created, we can expose that in our own micro service route (assuming, for the sake of example, that we had augmented the other micro service and we weren’t just proxying here):

pathPrefix("zombieproxy") {
  (get & path(Segment)) { zombieId =>
     complete {
       fetchZombieInfo(zombieId).map[ToResponseMarshallable] {
         case Right(zombie) => zombie
         case Left(errorMessage) => BadRequest -> errorMessage
       }
     }
   }
 }

While I personally feel that the convention of using the left side of Scala’s Either type is prejudiced against those of us who are left-handed, I can understand where the convention started.

So now if we issue the following curl:

curl http://localhost:9001/zombieclient/spongebob

It will go through our proxy and consume the micro service we wrote the other day, and return the single zombie we’re looking for:

{
 "name": "spongebob",
 "speed": 12
}

I am a huge fan of a number of HTTP client libraries. Lately, my favorite has been OK HTTP, for a number of reasons. It provides a very clean, simple, easy-to-use syntax for performing relatively complex HTTP operations. Also, it’s available automatically for Android, which allows me to write Java code that consumes services that is identical on my Android and server platforms.

As much as I love that library, Akka HTTP is my new favorite HTTP client. There are a pile of reasons, but I think the biggest is that Akka HTTP provides the smallest impedance mismatch between what I want to do over HTTP and how I write code to accomplish that.

In every project I have created in the past that had a component that consumed other services, the service client code rapidly became bloated, confusing, and covered in scar tissue. I actually feel like putting a little thought into creating a layer atop Akka HTTP to consume services could prevent that and still allow us to use futures and remain within the confines of the “reactive manifesto”.

Creating a Microservice with Akka HTTP and Scala

Lately there’s been a lot of hype and buzz over micro services. The funny thing is, there’s absolutely nothing new about micro services. In fact, back in the good old days when I was trying to convince people that SOA was the way to go, if you designed a service bus the way you were supposed to, you ended up with micro services.

What is new, however, is the technology for creating, deploying, and managing micro services. Today we’ve got Docker, we have mesosphere, marathon, AWS, countless cloud providers, and Typesafe has even given us ConductR.

A microservice is, as its name implies, a very small service. This doesn’t mean that a micro service will have very few lines of code – it means that it should be singular in purpose. Think of a micro service as a service endpoint embodying the Single Responsibility Principle (SRP) that we all love from standard Object-Oriented Design.

To create a micro service you basically need three things:

  • HTTP Server (typically bootstrapped by the application)
  • Route Management (e.g. the REST resource definitions)
  • JSON serialization and de-serialization (although there’s nothing preventing you from using XML if you want)

For the first, we get a truckload of functionality by using Akka HTTP. I haven’t really started delving too much into this (for many reasons, not the least of which is that the documentation is literally littered with fragments that say “todo”), but it looks really powerful. The whole concept of flows and flow management of HTTP interaction sitting atop Akka streams seems like it could dramatically simplify the lives of developers who write services or HTTP clients (or in many cases, services that are also HTTP clients of other services).

For the second, Akka HTTP also provides built-in functionality that lets you bind routing definitions to an HTTP server. In classic Scala fashion, they’ve decided to use pattern matching and “combinator” syntax to let you define your routes.

Here’s a sample snippet where I’ve defined routes that expose GET resources for querying a single zombie or getting the complete list of all zombies in the zombie micro service:

val routes = {
 logRequestResult("zombies-microservice") {
   pathPrefix("zombies") {
     (get & path(Segment)) { zombieId =>
       complete {
         Seq(Zombie(zombieId, 12))
       }
     } ~
     (get) {
       complete {
         Seq(Zombie("bob", 1), Zombie("alfred", 2))
       }
     }
   }
}

You can probably infer from the little sample above how to use the pattern matching, parser combinator syntax to build robust, powerful route definitions. Syntax like “get & path(Segment)” is pretty straightforward – it defines a response to a GET on the resource and extracts out the segment as a lambda parameter, which we then use in the “complete { }” section below (complete indicates a completion of an HTTP server response future).

So now that we have our routes, we can start up an HTTP server using those routes:

object ZombieAkkaHttpMicroservice extends App with Service {
 override implicit val system = ActorSystem()
 override implicit val executor = system.dispatcher
 override implicit val materializer = ActorFlowMaterializer()

 override val config = ConfigFactory.load()
 override val logger = Logging(system, getClass)

 Http().bindAndHandle(routes,
     config.getString("http.interface"), config.getInt("http.port"))
}

The third piece of plumbing that makes this all work is the use of JSON serializers and de-serializers. In my sample, which is extrapolated from an Activator template, I use Spray JSON formatters. I find these a little disappointing, as the “JSON inception” macros you get with Play Framework work almost as if by magic, whereas the Spray formatters require you to tell spray the number of properties that are contained in the case class for which you’re creating a round-trip JSON formatter:

case class Zombie(name: String, speed: Int)

trait Protocols extends DefaultJsonProtocol {
 implicit val zombieFormat = jsonFormat2(Zombie.apply)
}

With those three pieces of the “bootstrap” in place (HTTP server, JSON serialization, REST API/routes definition) you have all the building blocks of a micro service that can be run standalone, or can be bundled and deployed as part of a Docker infrastructure, or deployed and managed using Typesafe’s newest product: ConductR.

With my app running (you can see nearly identical samples in the Activator template library), I can hit the local zombie micro service like so:

curl http://localhost:9001/zombies  (returns all zombies)
curl http://localhost:9001/zombies/spongebob (returns the zombie with ID spongebob)

Regardless of where you sit on the fence of opinion of micro services, it’s good to know that frameworks like Akka and their new (extremely appealing) HTTP libraries are at your disposal and you don’t have to resort to relying on bloated, bulky container models to get the job done.

Creating a Multiplayer Chat System with the Akka Event Bus

In my previous blog post, I talked about how to use the subchannel classification system with the Akka Event Bus. In that post, I showed a really simplistic way of determining if a channel is a subclass of another – using startswith(). That’s a really primitive way of doing things and doesn’t really show you the true power of the event bus.

In this blog post, I want to walk you through a slightly more realistic example. Let’s assume that we’re using Akka to build the back-end for a multiplayer game. We know that this game needs to support chatting between players, so we want to create an event bus to handle all of the chat traffic for the game. This should, in theory, give the system some nice flexibility in allowing individual players and player-actor components to subscribe and unsubscribe from different segments of the chat traffic.

We have three different segments that we want to divide traffic up into:

  1. Players – Individual players can receive private chat messages, or tells.
  2. Sectors – The players’ space ships are floating in 3D space in large areas called sectors. It is possible for those players to transmit messages to an entire sector.
  3. Groups – Players can form small groups where from 1 to 8 players band together to share experience as well as their own chat channel.

One of the interesting things to keep in mind is that our chat bus cannot read player state. In order for the chat bus to be really useful, general purpose, and, most importantly, testable, it can’t go reaching into the inside of some player object to figure out which sector object they are in or to which group they belong.

So we’re going to revise our previous sub-classification event bus sample with a new case class called ChatCoordinate that will be used as our classifier (remember in the previous blog post we used simple strings). Here’s the new code for the bus and its messages:

package chatbus

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.event.SubchannelClassification
import akka.util.Subclassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

/*
 * Chat Bus Coordinates
 * players/XXX - sends a private tell to a player
 * sectors/XXX - sends a tell to an entire sector
 * groups/XXX - sends a tell to members of a player group
 */
object ChatSegments {
	val Player = "players"
	val Sector = "sectors"
	val Group = "groups"
}

case class ChatCoordinate(segment: String, target: Option[String])

sealed trait Chat
case class ChatMessage(source: String, message: String) extends Chat
case class ChatEvent(coord:ChatCoordinate, msg: ChatMessage) extends Chat

class ChatEventBus extends ActorEventBus with SubchannelClassification {
  type Event = ChatEvent
  type Classifier = ChatCoordinate

  protected def classify(event: Event): Classifier = event.coord

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = (x.segment == y.segment && x.target == y.target)

    /* Is X a subclass of Y? e.g. is Player/Bob a subclass of Player/None? YES. Is Player/None a subclass of Player/Bob? NO. */
    def isSubclass(x: Classifier, y: Classifier) = {
		val res = (x.segment == y.segment && x.target == None)
		//println(s"Subclass check: $x $y: $res")
		res
	}
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.msg
  }
}

This new chat bus gives us some great flexibility. I can send a message to all players by sending a message to the Players segment with a target of None. I can send a message to all groups by sending a message to the Groups segment with a target of None, a specific group by sending to Groups/Some(“GroupId”) and a specific player by sending to Players/Some(“Player Id”).

One of the aspects of gameplay that this chat bus supports is a player moving from one sector to another. If a player leaves a sector, then ideally some actor (maybe a Player actor?) would unsubscribe from the chat bus @ Sectors/Some(“Foo”) and then subscribe to the chat bus @ Sectors/Some(“Bar”) if they moved from sector Foo to sector Bar, while always maintaining a persistent subscription to Players/Some(“Player ID”). Likewise, if a player joins a player group, they would subscribe to Groups/Some(“Group ID”) and unsubscribe from that same coordinate when they leave the group.

The system is inherently extensible and so whenever we want to add a new segment (e.g Race, Alliance, Guild, etc) we can just drop that in there and we’re good to go.

Finally, just so you believe me, here’s some Scala Test specs exercising the chat bus with some fake actors:

package tests

import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.TestKit
import akka.testkit.TestProbe
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.ImplicitSender
import scala.concurrent.duration._

import chatbus._

object ChatBusSpec {
	
	val toKevin = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Kevin")), ChatMessage("system", "This goes just to Kevin"))
	val toBob = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Bob")), ChatMessage("system", "This goes just to Bob"))
	val toAllPlayers = ChatEvent(ChatCoordinate(ChatSegments.Player,None), ChatMessage("kevin", "This goes to all players"))
	
	val toSectorAlpha = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Alpha")), ChatMessage("system", "Sector Alpha is about to explode."))
	val toSectorBeta = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Beta")), ChatMessage("system", "Sector Beta is about to explode."))
}

class ChatBusSpec (_system: ActorSystem) extends TestKit(_system) with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {
 
  import ChatBusSpec._

  val theProbe = TestProbe()

  def this() = this(ActorSystem("ChatBusSpec"))

  def getEchoSubscriber() = {
	system.actorOf(Props(new Actor {
	    def receive = {
	      case m:ChatMessage => theProbe.ref ! m
	    }
	}))	
  }
 
  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "A chat event bus" must {
     "send a private message to a player with no snoopers" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		theProbe.expectNoMsg(500 millis)
     }	

     "send a single message to all players" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toAllPlayers)
		// Each player should receive one of these, so the probe should bounce it back twice.
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
     }

     "send to all players in a sector should only deliver once per player" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
	
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		theProbe.expectNoMsg(500 millis)
     }

     "support a player moving from one sector to another" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		eventBus.unsubscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Beta")))
		eventBus.publish(toSectorBeta)
		theProbe.expectMsg(500 millis, toSectorBeta.msg)
     }
  }
	
}

Segmenting Traffic on the Akka Event Bus with Subchannel Classification

In my previous post, I provided a simple “hello world” type application that utilizes the Akka Event Bus. In this sample, I showed how you can use a simple LookupClassification to classify the events being published. I glossed over this detail in the previous post to keep things simple, but the lookup classification does what its name implies – it does a lookup on the classifier (in my case, it was a string) and returns the subscribers for that classifier. There is no hierarchy involved. With this situation, if you subscribe to /zombies/Foo you will not see events published to /zombies.

In this blog post, I want to talk about how we can add hierarchical topic support to the event bus by switching from a lookup classification to a subchannel classification. In the subchannel classification system, I can subscribe to /zombies/Foo and I will receive events published to /zombies. Additionally, if I publish to /zombies/A and I am subscribed to /zombies/B I will not receive that event. This type of topic-based subscription should be familiar to those of you who have used service buses, messaging middleware, or the evil creature known as JMS.

So let’s get to it. First, I want to create my own subchannel classification. The beauty of this is that you can provide as little or as much logic as you like. In my case, I’m just going to do a standard string comparison to classify, and I am going to use “starts with” to identify a subchannel. I could get much more involved, even using regular expressions, if I wanted to.

class ZombieSightingSubclassEventBus extends ActorEventBus with SubchannelClassification {
  type Event = ZombieSightingEvent
  type Classifier = String

  protected def classify(event: Event): Classifier = event.topic

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = x == y
    def isSubclass(x: Classifier, y: Classifier) = x.startsWith(y)
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.sighting
  }
}

Now if I modify my previous blog post’s application object as follows, I’ll get some very robust behavior:

object ZombieTrackerApp extends App {

  val system = ActorSystem()
  //val eventBus = new ZombieSightingLookupEventBus
  val eventBus = new ZombieSightingSubclassEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case s:ZombieSighting => println(s"Spotted a zombie! ${s}")
    }
  }))

  val westCoastSightingHandler = system.actorOf(Props(new Actor {
  	def receive = {
		case s:ZombieSighting => println(s"West coast zombie ${s}!!")
	}
  }))

  val eastCoastSightingHandler = system.actorOf(Props(new Actor {
	def receive = {
		case s:ZombieSighting => println(s"East coast zombie ${s}!!")
	}
  }))

  eventBus.subscribe(subscriber, "/zombies")
  eventBus.subscribe(westCoastSightingHandler, "/zombies/WEST")
  eventBus.subscribe(eastCoastSightingHandler, "/zombies/EAST")

  eventBus.publish(ZombieSightingEvent("/zombies/WEST", ZombieSighting("FATZOMBIE1", 37.1234, 45.1234, 100.0)))
  eventBus.publish(ZombieSightingEvent("/zombies/EAST", ZombieSighting("SKINNYBOY", 30.1234, 50.1234, 12.0)))

  // And this one will NOT go off into deadletter like before ... this satisfies "startsWith" on the first subscriber
  eventBus.publish(ZombieSightingEvent("/zombies/foo/bar/baz", ZombieSighting("OTHERONE", 35.0, 42.5, 50.0)))
  system.shutdown
}

Here I’ve decided to create one subscriber that listens for west coast zombies and another one that listens for east coast zombies. In the sample above I’ve used two anonymous actor classes but I could very easily have used instances of the same actor class that were primed with different constructor props (such as the region to which they were bound). As I mentioned above, the old listener should receive all events on /zombies, but also my new east and west subscribers should also receive /zombies messages in addition to their own regional localized events.

Here’s what the program run output looks like:

[info] Running ZombieTrackerApp 
Spotted a zombie! ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)
West coast zombie ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)!!
East coast zombie ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)!!
Spotted a zombie! ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)
Spotted a zombie! ZombieSighting(OTHERONE,35.0,42.5,50.0)
[success] Total time: 6 s, completed Feb 12, 2014 8:39:09 PM

And so this is hopefully enough to whet your appetite on the Akka Event Bus. And by “whet your appetite” I mean that in the most drug-pusher way possible. The first one’s free, now enjoy your new addiction to the event bus 🙂

Using the Akka Event Bus

It’s been so long since I’ve written a blog post about Akka, it feels like coming home after a long business trip – the softness that only your own bed in your own home can provide.

Anyway, let’s talk about event buses. An event bus is basically an intermediary between publishers and subscribers. Publishers publish events to the event bus and subscribers listening for certain types of events will receive those events. That’s the generic definition of an event bus. The Akka Event Bus is, as you might have guessed, an Akka-specific implementation of this pattern.

I find the Akka documentation for Event Bus to be inscrutable and downright off-putting for new developers who aren’t already Akka experts. Go ahead, try and read that stuff and work backwards from that into a hello world sample. You’re going to need at least a beer. So let’s start with the basics.

To set up an event bus you need three things: an event bus, subscribers, and publishers. When you create an event bus (a base class that you extend in Scala) you need to declare two types:

  • Event Type
  • Classifier Type

This is where the documentation gets annoying. The Event Type is just the type of objects that you expect to be put on the bus. You can make this type as specific as you want or as generic (Any works just fine as an event type, but I wouldn’t really recommend that as a pattern). The Classifier Type is the data type of the event bus classifier. This sounds more complicated than it needs to be. A classifier is really just some piece of information used to differentiate one event from another, or, in Akka parlance, to classify an event.

You could use an integer as a classifier and then when you subscribe your actors to the bus, you can subscribe them to individual numbers. So, let’s say you have 3 buckets, you could have a classifier and then publish into the 1 bucket or the 2 bucket (classifier). If you’re familiar with JMS or other messaging middleware systems then you might be familiar with the “topic” style classifiers, which often look like slash-delimited hierarchies such as /buckets/1 or /buckets/2. There is some additional complexity around classifiers and classification that I will defer until the next blog post so we can get right to the hello world sample and keep things simple.

First, we need a sample domain. As such, we’ll use the old standby domain of zombies. Let’s say we want to set up a bus to which we can publish zombie sighting messages. We then have subscribers who are interested in those zombie sighting messages for whatever reason. In the past, my actor samples all directly sent messages from one actor to another. But, with the Event Bus, the actors sending the message don’t have to care about or know anything about the actors interested in receiving it. This kind of loose coupling pays off in spades in ease of maintenance and troubleshooting.

And without further ado, here’s the full source that includes the zombie sighting message, a zombie sighting event (with a string classifier), the zombie bus, and the code that sets up the subscriptions and publishes to the bus:

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

case class ZombieSighting(zombieTag: String, lat: Double, long: Double, alt: Double)
case class ZombieSightingEvent(val topic: String, val sighting: ZombieSighting)

class ZombieSightingLookupEventBus extends ActorEventBus with LookupClassification {
  type Event = ZombieSightingEvent
  type Classifier = String

  protected def mapSize(): Int = 10

  protected def classify(event: Event): Classifier = {
    event.topic
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.sighting
  }
}

object ZombieTrackerApp extends App {

  val system = ActorSystem()
  val eventBus = new ZombieSightingLookupEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case s:ZombieSighting => println(s"Spotted a zombie! ${s}")
    }
  }))

  val indifferentSubscriber = system.actorOf(Props(new Actor {
    def receive = {
	   case s:ZombieSighting => println(s"I saw a zombie, but I don't give a crap.")
    }
  }))

  eventBus.subscribe(subscriber, "/zombies")
  eventBus.subscribe(indifferentSubscriber, "/zombies")

  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("FATZOMBIE1", 37.1234, 45.1234, 100.0)))
  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("SKINNYBOY", 30.1234, 50.1234, 12.0)))

  // And this one will go off into deadletter - nobody is subscribed to this.
  eventBus.publish(ZombieSightingEvent("/zombies/foo/bar/baz", ZombieSighting("OTHERONE", 35.0, 42.5, 50.0)))
  system.shutdown
}

And when we run this application, what we expect to see are two trace outputs from the subscriber that doesn’t give a crap and the subscriber that is outputting information about the zombie it received. Here’s the output:

[info] Running ZombieTrackerApp
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)

In the next blog post, I’ll cover how to upgrade this sample so that we can support the hierarchical classifier style (subchannels).

Asynchronous, Non-Blocking NOM NOMs

Most of the time when I encounter fascinating situations in the real world, I am struck by how well that situation might translate into a novel. For example, I constantly see situations and people that inspire character traits or plot points that someday might make their way into one of my books. However, every once in a while, I see a situation in real life that tickles my technical fancy.

There is a “deli” (the quotes are because in New York, “deli” doesn’t mean what most people think it means.. a NY deli is often a giant cafeteria-style buffet place that serves 20 kinds of food, and sells everything from chips to bobble-heads and gummy bears) near my office in Manhattan that is a marvel of modern retail efficiency. On its busiest day, you can make your way through the throng of people, order what you want, and get out in less than 20 minutes. The really remarkable thing is that the checkout line is unbelievably fast and I can be six people back and still get through the line and out the front door in less than 3 minutes.

What makes this an interesting technical blog post is that this place employs a number of techniques that people like me have been using as large-scale application development design patterns for years. My coworkers and I often refer to the lines of people feeding up toward the cash registers as non-blocking, asynchronous processing.

You really need to see these people in action to believe it. There’s one person who is handling nothing but the credit card swiping, another person is handling the cash register, yet another takes your food and puts it in a bag for you, a bag which comes pre-loaded with a fork, a knife, and napkins. There is absolutely no wasted time from the perspective of the customer. All of the things that a customer might be blocked on in a traditional purchase queue have been parallelized. 

There isn’t a per-customer wait for the bag to get the common essentials stuffed into it. When the card-swiper person is swiping your credit card, the cash register person is actually ringing up the order of the person behind you in the queue, and the bag-stuffer is working on the bag of the person behind that.

This is all well and good and getting your food fast is always a good thing, but what does it have to do with building scalable systems? Everything.

Imagine that a web request is a customer waiting in line for food. If, upon this request’s arrival at the web server, the web server has to go and do everything in serialized order, for every single request, then the time it takes to handle a single request may not appear to be all that bad, but the more requests you have, the longer the lines get. The more back-up you have building up in a queue, the worse the perceived performance of your site will be, even if the time it takes to handle a single request is fixed and relatively short. Why is that? Because each person’s request isn’t being handled immediately upon arrival at the site, their requests are piling up in a queue waiting for other requests to be processed.

Typical scaling patterns just increase the number of concurrent threads to deal with incoming requests. That’s fine, and it works for a little while, until you run out of threads. Now, let’s say you have 30 threads. The first 30 people to hit your site have a decent experience and then the 31st and thereafter experience the same delays as before. People see this and the initial knee-jerk reaction is to add more servers. So now let’s say you’ve got 4 servers, each with 30 threads (I’m simplifying to make the numbers easier to picture). The first 120 people to hit your site now have a decent experience and then thereafter, additional customers are subject to delays and waits.

This is a classic fix the symptom not the problem approach. If we apply this to retail then I’m sure you know what the knee-jerk retail reaction to the high load and peak volume problems are – add more cashiers. So now instead of four queues handling people’s orders, you have 8. Great, but you run into the same problems as above, plus an even worse one – your store runs out of room to accommodate the people waiting in the queues. In some circles, we also refer to this problem as the latency versus throughput problem. It’s actually more involved than that. If you optimize for latency, then you train your cashiers to be unbelievably fast at processing a single order. This appears to be good because this (ideally) means your queue drains faster, with customers moving through the line faster. To increase your throughput, you just add more highly trained, super-fast cashiers. Simple, right? NOPE.

What has this one deli done that developers building systems for scale failed to do? It’s remarkably simple, when you think about it. Rather than taking a monolithic process and scaling that one process out (completing the entire checkout process from start to finish), they have deconstructed all of the tasks that need to be done in order to get a customer through the line and have applied enterprise software patterns to dealing with those tasks. First, they have done some tasks before the customers even arrive, such as pre-filling the bags with forks, knives, and napkins. This removes several seconds from the per-customer pipeline which, when you multiply that out by the number of customers in this place (it’s huge, trust me) and the number of registers, makes a significant impact.

The next thing they’ve done is identify tasks that can be done in parallel. The same cashier doesn’t need to ring up your order and swipe your credit card. These can be done by two different people. This is where throughput versus latency becomes really important. The per-customer time to finish remains the same because the customer can’t leave until both tasks are complete (ring-up and swipe), however, you’re dramatically increasing your throughput because while the first customer is having their card swiped, the next customer is having their order rung up, which allows the pipeline to absorb more customers.

So what’s the moral of this long-winded story? Simple: Model your request processing pipelines like a super-efficient Manhattan deli 🙂

Accessing Neo4j from Play Framework 2.1 with Web Service Futures

As you may have noticed, I’ve recently been doing a little bit of talking about Neo4j and graph databases. A database on its own is a fantastic thing, but it doesn’t do anybody much good if the information contained within stays there, or if you can’t put information into it from some application. In my own proof of concept, I wanted to see how easy it would be to access Neo4j from my favorite web application framework, Play.

Turns out it was a little annoying because I started looking for frameworks to talk to Neo4j. First I looked at AnormCypher, which is a library that provides an “anorm-like” API access to Cypher. I couldn’t use this one because it failed with multiple runtime errors when I attempted to bring it into my Play application. Next, I tried another Cypher wrapper that I found on github and that one failed miserably, too – it had a conflict between the version of a JSON parser it was using versus the one that my Play application was using.

Then I figured, why even bother? It’s just a REST API. So, I decided to try accessing Neo4j using nothing but Play framework’s own Web services API, which is basically the WS object. This turned out to be stupidly easy and, when I figured out future chaining in combination with that WS API, amazing stuff started to happen. You know, like, productivity.

First, I created a teeny little wrapper around the Neo4j REST API URL, I just called it NeoService:

class NeoService(rootUrl: String) {
    def this() = this("http://default/neo/URL/location/db/data")

    val stdHeaders = Seq( ("Accept", "application/json"), ("Content-Type", "application/json") )

    def executeCypher(query: String, params: JsObject) : Future[Response] = {
        WS.url(rootUrl + "/cypher".withHeaders(stdHeaders:_*).post(Json.obj(
            "query" -> query,
            "params" -> params
            ))
       )
    }

    // Convert a Future[Response] into a Future[Int]!
    def findNodeIdByKindAndName(kind:String, name:String) : Future[Option[Int]] = {
        val cypher = """
          START n=node:node_auto_index(kind={theKind})
          WHERE n.name = {theName}
          RETURN id(n) as id
       """.stripMargin
        val params = Json.obj( "theName" -> name, "theKind" -> kind )
        for (r <- executeCyper(cypher, params)) yield {
            val theData = (r.json \ "data").as[JsArray]
            if (theData.value.size == 0)
               None
            else
               // Json: "data" : [ [ ### ] ]
               Some(theData.value(0).as[JsArray].value(0).as[Int])
        }
    }
}

So, in this little tiny class I’ve got a function that I can use to execute Cypher queries and because I know that I have an auto-index’d node property called kind and I have another property called name, I can attempt to find a node’s ID based on the kind and name properties using a Cypher query. Instead of finding them synchronously, I can just convert my Future[Response]  into a Future[Option[Int]] where the Option[Int] is the result of looking through the Neo4j database for that data. I’ve just converted a future with a future, and being able to do so is pretty freaking awesome.

Not only can I do that, but I can chain these method calls from other code, like this:

val neo = new utils.NeoService()
for (nodeId <- neo.findNodeIdByKindAndName("zombie", "bob"))
    for (zombieId <- otherObject.tweakZombie(zombieId))
        yield {
           zombieId.map { id => println("I got a zombie!") }.getOrElse { println("Zombie doesn't exist!") }
        }

In the code here, tweakZombie can be written to be aware of the Option[] so that if it’s empty, it doesn’t tweak anything, allowing me to chain call after call after call and not worry about slapping a crapload of if statements in there – a judicious use of map, and for, and options and futures gives me a ridiculous amount of power.

All of this is made possible by the fact that the Play Framework Web Services API is based on Futures. I was originally skeptical of futures because the old code I had seen before was more confusing than single-threaded, synchronous programming. With the new Futures syntax and well-coded libraries like the Play WS object, you can do ridiculous things in a very small number of lines of code.

Using JSON Inception in the Play Framework with Scala 2.10

The other day I was lamenting the fact that every time I made a tiny little change to the case classes that I use for reading/writing Ajax requests from the JavaScript client code for my web application, I have to go and manually modify my JSON combinators that convert between the Scala case classes and the Json representations.

There are some undocumented (Play Framework’s own website doesn’t make any mention of this, even on the page for Json combinators!) Scala 2.10 macros that actually allow for auto-generation of this conversion code … I wish I had coined the term myself, but someone else appropriately refers to this activity as JSON inception.

The basic idea behind Play’s JSON combinators is that they let you use a natural, fluid syntax to convert to/from JSON. For example, you might have the following implicit writes that lets you “jsonify” a zombie sighting case class:

implicit val zombieSightingWrites = (
    ( __ \ "name").write[String] and
    ( __ \ "timestamp").write[Int] and
    ( __ \ "location").write[GpsCoordinate]
)(unlift(ZombieSighting.unapply))
implicit val gpsCoordinateWrites = (
    ( __ \ "long").write[Double] and
    ( __ \ "lat").write[Double] and
    ( __ \ "altitude").write[Double]
)(unlift(GpsCoordinate.unapply))

It doesn’t look like all that much code to maintain, but let’s say my application deals in about 20 different kinds of individual case classes that can be sent or received from Ajax/web service calls. Certainly in the middle of development, making changes to this is going to be annoying and while doing it, I couldn’t shake the feeling that this could be cleaner, more elegant, more kotan.

The first thing I did was wrap all my implicit reads and writes up into a single Scala object so I could just do an import JsonReadsWrites._ and then all my Json conversion code is in a single place. That felt a little better, but I still thought it could be easier. The above sample is overly simplistic, my real case classes are filled with values of type Option[T] and dealing with those manually in the unapply/apply combinators you normally write for Play makes maintenance even more tedious.

Enter Scala 2.10 macros…

As of Scala 2.10, macros are now fully supported. A macro is basically a pre-compile code generation pass. If you flag a method as a macro method, then it will be executed at compile time and then the return value of your method is an AST (abstract syntax tree). So, what Play Framework has are macro methods called writes and reads. These methods are executed at compile time and they replace the writes and reads code that you see in the IDE with a syntax tree that constructs your Json combinators for case class conversion for you automatially.

To be honest, when I first looked at how this is done, it looked like it was some form of black magic, or that there was no way it would be possible without the use of some magic fairy dust. I read and re-read the documentation on Scala macros and after a while, it started to sink in. Reflection is available to the code you write in your macro, so, at compile time, your code can introspect the types of information passed to your macro via generics, and can then use that information to figure out how to construct a Json reader or writer.

So now, the code I wrote above can be re-written as:

implicit val zombieSightingReads = Json.reads[ZombieSighting]
implicit val gpsCoordinateReads = Json.reads[GpsCoordinate]

Now I can make changes to the case classes and the macro-generated code will automatically compensate for those changes, and it handles arrays, nested case classes (which Salat doesn’t even do for case class conversion for MongoDB…).

I would be hard pressed to find a better use of Scala macros than this.

The Futures Have Arrived

As you may know, I’ve been working on building a fairly complex enterprise LOB application using Play, Scala, and Akka. When I started, I had a rudimentary understanding of Akka and Play, and I’d done some playing around with Akka before, but my experience was a far cry from the type of experience a grizzled, production-deployed veteran might have.

My website, without giving away any details, allows me to do a keyword search across multiple different types of entities. Let’s say I’ve built a zombie apocalypse preparedness social networking site (who doesn’t need one of those?!?) and I want to be able to enter a keyword and have it look through the repository containing identified zombie classifications as well as possible friends and even the names of weapons.

I built this in a way that I thought was pretty decent, I created a Search actor that takes a KeywordSearch case class message. This Search actor then asks the Akka actor system for references to the different repositories, which are also actors. In this case, I might need a reference to the ZombieIdentificationRepository actor, the WeaponRepository actor, and the UsersRepository actor.

This problem is screaming for parallelism. I want the search actor to send fire-and-sorta-forget messages to the different repositories and, when it gets answers from all three repositories, send a message back to the actor that invoked the search containing all of the results. Further, I want the web request itself to not block on waiting for the results, so I want to use Play’s asynchronous controller pattern.

When I first built the search, I had code that looked kind of like this:

def index(keyWord:String) = Action { implicit request =>
 ...
 val resFuture = ActorProvider.Searcher ? SearchActorProtocol.KeywordSearch(keyword)
 val results = Await.result(resFuture, timeout.duration).asInstanceOf[Array[SearchResult]]
 Ok(views.html.search.index(results)
}

So, while under the hood I was using an Actor, I’m still performing an explicit block. Worse, the search actor was also performing an explicit block as it was doing an Await.result for each of the different repository queries. I had heard that Future composition was possible, so I thought I’d give it a shot.

Here’s how I refactored the Search actor’s search method to combine multiple searches performed in parallel:

def performKeywordSearch(seeker: ActorRef, keyword: String) {
    implicit val timeout = Timeout(3 seconds)

    val zombieFuture = ActorProvider.ZombieRepository ? KeywordSearch(keyword)
    val weaponFuture = ActorProvider.WeaponRepository ? KeywordSearch(keyword)
    val userFuture = ActorProvider.UserRepository ? KeywordSearch(keyword)

    val combFuture = for {
        z <- zombieFuture.mapTo[Array[SearchResult]]
        w <- weaponFuture.mapTo[Array[SearchResult]]
        u <- userFuture.mapTo[Array[SearchResult]]
    } yield seeker ! ( z ++ w ++ u).sortBy( r => r.name )
}

Note that there isn’t a single line of blocking code in the preceding sample. Everything happens when it’s done and there’s no explicit “sit and wait” code. I can even then modify the search controller’s method to remove the Await.result:

...
val searchFuture = ActorProvider.Searcher ? SearchActorProtocol.KeywordSearch(keyword)
val timeoutFuture = play.api.libs.concurrent.Promise.timeout("Failed to finish search", 5 seconds)
Async {
    Future.firstCompletedOf(Seq(searchFuture, timeoutFuture)).map {
        case searchResults: Array[SearchResult] =>
            render {
                case Accepts.Html() => Ok(views.html.search.index(searchResults))
                case Accepts.Json() => Ok(Json.toJson(searchResults))
            }
        case t:String => InternalServerError(t)
    }
}

And now I’ve removed a chain of blocking calls and replaced all of it with completely asynchronous, non-blocking code and now the only thing that ever sits and waits is Play itself, as it awaits the response from the Actor.

Coding with Futures and in a completely asynchronous fashion with Actors is difficult, and it requires a lot of up front effort and takes time to understand what’s really going on, but, as you can see from the code above, the clean and simple non-blocking elegance that you get as a reward is well worth the effort.

Bundling Actor Messages into a Protocol

If you have done any programming with Actors (in my case Akka actors) then you know that the vast majority of the messages that you typically end up sending to those actors are case classes. Sure, you can send any type of data you like, but as a practice, most people like using case classes to make the pattern matching of the messages much easier to read. Sometimes you’ll see people send tuples or Vectors but only when it’s really clear that the actor doesn’t receive many other types of messages.

In situations where I have a ton of actors, one phenomenon that I’ve noticed is that I end up polluting package space with bucketloads of case classes. For example, if I put the StarportPlanetStarbase, and Ship actors in the com.kotancode.space package, I’ll likely end up with the messages for those actors all floating around within the package namespace. It might not seem like a problem, but let’s say I have an import com.kotancode.space._ line at the top of one of my actor files, now I’ve included all of those messages even though I’m just defining one actor.

I haven’t been able to find sufficient evidence to see who started the trend of actor protocols but I certainly cannot claim any credit, I’m just borrowing something I’ve seen. The idea is to wrap all of the case classes for a particular actor in a protocol object as a nice way of isolating those case classes in their own sub-scope, even if all the actors belong to the same package.

For example, instead of defining all my messages for Planet at the top of the Planet.scala file (like I normally would do), I can instead define them in an actor protocol like this:

object PlanetProtocol {
    case class Land(player:ActorRef)
    case class Depart(player:ActorRef)
    case class HarvestResources(player:ActorRef, ... , ... )
    case class ...
}

Now, I can define a receive method that looks like this, which keeps the case classes from cluttering up the namespace and, in my opinion, is just much cleaner and more elegant than scattering case classes to the winds at random.

def receive = {
    import PlanetProtocol._

    case Land(player) => ...
    case HarvestResources(player, ... , ...) =>
}

So, it might not seem like this is much of an earth-shattering thing, but every chance I can get to refactor my code to gain more elegance, more cleanliness, more expressiveness, and less ceremony – I’ll take it. Hopefully you find this tip as useful as I did.