Kotan Code 枯淡コード

In search of simple, elegant code

Menu Close

Tag: typesafe

Consuming a REST (micro) Service with Akka HTTP

In a recent blog post, I talked about how we could quickly and easily pull in all the bootstrapping necessary to fire up an HTTP server and create an Akka HTTP micro service. In this blog post, I’m going to walk you through using the same Akka HTTP library to consume a service.

First, let’s set up a flow that starts with an HttpRequest and finishes with an HttpResponse:

lazy val zombieConnectionFlow: Flow[HttpRequest, HttpResponse, Any] =
    Http().outgoingConnection("localhost", 9001)

This sets up a connection flow from the client (which can be a full app, or in many cases, another micro service) pointing at http://localhost:9001. Note that you can add many more options to the Http() builder syntax to set up things like authentication, SSL, etc.

Once we have a flow, we need a way to convert requests into responses:

def zombieRequest(request:HttpRequest): Future[HttpResponse] = 
    Source.single(request).via(zombieConnectionFlow).runWith(Sink.head)

This function takes an HttpRequest and uses the Akka HTTP scala DSL to instruct Akka HTTP in how to complete that request. In our case, we create a single request and transmit it via our zombieConnectionFlow.

Now that we’ve got the plumbing (a flow and a request handler) set up, we can create a simple function that will consume our zombie micro service, fetching a single zombie by ID:

def fetchZombieInfo(id: String) : Future[Either[String, Zombie]] = {
 zombieRequest(RequestBuilding.Get(s"/zombies/$id")).flatMap { response =>
   response.status match {
     case OK => Unmarshal(response.entity).to[Zombie].map(Right(_))
     case BadRequest => Future.successful(Left(s"bad request"))
     case _ => Unmarshal(response.entity).to[String].flatMap { entity => 
       val error = s"FAIL - ${response.status}"
       Future.failed(new IOException(error))
     }
   }
 }
}

There are a couple of really important things to note here. The first is that when invoking my zombieRequest method, I am using just the REST API spec – there’s no URL used here – that was abstracted earlier as part of the flow.

The potential here is enormous. With Akka HTTP, we no longer have to string together a bunch of repetitive, imperative-looking statements to manifest a client that consumes another service. Instead, we can declare our intended flow, define a bunch of requests that execute over that flow, and then pattern match the results of invoking those requests.

Finally, with the fetchZombieInfo method created, we can expose that in our own micro service route (assuming, for the sake of example, that we had augmented the other micro service and we weren’t just proxying here):

pathPrefix("zombieproxy") {
  (get & path(Segment)) { zombieId =>
     complete {
       fetchZombieInfo(zombieId).map[ToResponseMarshallable] {
         case Right(zombie) => zombie
         case Left(errorMessage) => BadRequest -> errorMessage
       }
     }
   }
 }

While I personally feel that the convention of using the left side of Scala’s Either type is prejudiced against those of us who are left-handed, I can understand where the convention started.

So now if we issue the following curl:

curl http://localhost:9001/zombieclient/spongebob

It will go through our proxy and consume the micro service we wrote the other day, and return the single zombie we’re looking for:

{
 "name": "spongebob",
 "speed": 12
}

I am a huge fan of a number of HTTP client libraries. Lately, my favorite has been OK HTTP, for a number of reasons. It provides a very clean, simple, easy-to-use syntax for performing relatively complex HTTP operations. Also, it’s available automatically for Android, which allows me to write Java code that consumes services that is identical on my Android and server platforms.

As much as I love that library, Akka HTTP is my new favorite HTTP client. There are a pile of reasons, but I think the biggest is that Akka HTTP provides the smallest impedance mismatch between what I want to do over HTTP and how I write code to accomplish that.

In every project I have created in the past that had a component that consumed other services, the service client code rapidly became bloated, confusing, and covered in scar tissue. I actually feel like putting a little thought into creating a layer atop Akka HTTP to consume services could prevent that and still allow us to use futures and remain within the confines of the “reactive manifesto”.

Creating a Multiplayer Chat System with the Akka Event Bus

In my previous blog post, I talked about how to use the subchannel classification system with the Akka Event Bus. In that post, I showed a really simplistic way of determining if a channel is a subclass of another – using startswith(). That’s a really primitive way of doing things and doesn’t really show you the true power of the event bus.

In this blog post, I want to walk you through a slightly more realistic example. Let’s assume that we’re using Akka to build the back-end for a multiplayer game. We know that this game needs to support chatting between players, so we want to create an event bus to handle all of the chat traffic for the game. This should, in theory, give the system some nice flexibility in allowing individual players and player-actor components to subscribe and unsubscribe from different segments of the chat traffic.

We have three different segments that we want to divide traffic up into:

  1. Players – Individual players can receive private chat messages, or tells.
  2. Sectors – The players’ space ships are floating in 3D space in large areas called sectors. It is possible for those players to transmit messages to an entire sector.
  3. Groups – Players can form small groups where from 1 to 8 players band together to share experience as well as their own chat channel.

One of the interesting things to keep in mind is that our chat bus cannot read player state. In order for the chat bus to be really useful, general purpose, and, most importantly, testable, it can’t go reaching into the inside of some player object to figure out which sector object they are in or to which group they belong.

So we’re going to revise our previous sub-classification event bus sample with a new case class called ChatCoordinate that will be used as our classifier (remember in the previous blog post we used simple strings). Here’s the new code for the bus and its messages:

package chatbus

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.event.SubchannelClassification
import akka.util.Subclassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

/*
 * Chat Bus Coordinates
 * players/XXX - sends a private tell to a player
 * sectors/XXX - sends a tell to an entire sector
 * groups/XXX - sends a tell to members of a player group
 */
object ChatSegments {
	val Player = "players"
	val Sector = "sectors"
	val Group = "groups"
}

case class ChatCoordinate(segment: String, target: Option[String])

sealed trait Chat
case class ChatMessage(source: String, message: String) extends Chat
case class ChatEvent(coord:ChatCoordinate, msg: ChatMessage) extends Chat

class ChatEventBus extends ActorEventBus with SubchannelClassification {
  type Event = ChatEvent
  type Classifier = ChatCoordinate

  protected def classify(event: Event): Classifier = event.coord

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = (x.segment == y.segment && x.target == y.target)

    /* Is X a subclass of Y? e.g. is Player/Bob a subclass of Player/None? YES. Is Player/None a subclass of Player/Bob? NO. */
    def isSubclass(x: Classifier, y: Classifier) = {
		val res = (x.segment == y.segment && x.target == None)
		//println(s"Subclass check: $x $y: $res")
		res
	}
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.msg
  }
}

This new chat bus gives us some great flexibility. I can send a message to all players by sending a message to the Players segment with a target of None. I can send a message to all groups by sending a message to the Groups segment with a target of None, a specific group by sending to Groups/Some(“GroupId”) and a specific player by sending to Players/Some(“Player Id”).

One of the aspects of gameplay that this chat bus supports is a player moving from one sector to another. If a player leaves a sector, then ideally some actor (maybe a Player actor?) would unsubscribe from the chat bus @ Sectors/Some(“Foo”) and then subscribe to the chat bus @ Sectors/Some(“Bar”) if they moved from sector Foo to sector Bar, while always maintaining a persistent subscription to Players/Some(“Player ID”). Likewise, if a player joins a player group, they would subscribe to Groups/Some(“Group ID”) and unsubscribe from that same coordinate when they leave the group.

The system is inherently extensible and so whenever we want to add a new segment (e.g Race, Alliance, Guild, etc) we can just drop that in there and we’re good to go.

Finally, just so you believe me, here’s some Scala Test specs exercising the chat bus with some fake actors:

package tests

import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.TestKit
import akka.testkit.TestProbe
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.ImplicitSender
import scala.concurrent.duration._

import chatbus._

object ChatBusSpec {
	
	val toKevin = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Kevin")), ChatMessage("system", "This goes just to Kevin"))
	val toBob = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Bob")), ChatMessage("system", "This goes just to Bob"))
	val toAllPlayers = ChatEvent(ChatCoordinate(ChatSegments.Player,None), ChatMessage("kevin", "This goes to all players"))
	
	val toSectorAlpha = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Alpha")), ChatMessage("system", "Sector Alpha is about to explode."))
	val toSectorBeta = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Beta")), ChatMessage("system", "Sector Beta is about to explode."))
}

class ChatBusSpec (_system: ActorSystem) extends TestKit(_system) with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {
 
  import ChatBusSpec._

  val theProbe = TestProbe()

  def this() = this(ActorSystem("ChatBusSpec"))

  def getEchoSubscriber() = {
	system.actorOf(Props(new Actor {
	    def receive = {
	      case m:ChatMessage => theProbe.ref ! m
	    }
	}))	
  }
 
  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "A chat event bus" must {
     "send a private message to a player with no snoopers" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		theProbe.expectNoMsg(500 millis)
     }	

     "send a single message to all players" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toAllPlayers)
		// Each player should receive one of these, so the probe should bounce it back twice.
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
     }

     "send to all players in a sector should only deliver once per player" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
	
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		theProbe.expectNoMsg(500 millis)
     }

     "support a player moving from one sector to another" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		eventBus.unsubscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Beta")))
		eventBus.publish(toSectorBeta)
		theProbe.expectMsg(500 millis, toSectorBeta.msg)
     }
  }
	
}

Segmenting Traffic on the Akka Event Bus with Subchannel Classification

In my previous post, I provided a simple “hello world” type application that utilizes the Akka Event Bus. In this sample, I showed how you can use a simple LookupClassification to classify the events being published. I glossed over this detail in the previous post to keep things simple, but the lookup classification does what its name implies – it does a lookup on the classifier (in my case, it was a string) and returns the subscribers for that classifier. There is no hierarchy involved. With this situation, if you subscribe to /zombies/Foo you will not see events published to /zombies.

In this blog post, I want to talk about how we can add hierarchical topic support to the event bus by switching from a lookup classification to a subchannel classification. In the subchannel classification system, I can subscribe to /zombies/Foo and I will receive events published to /zombies. Additionally, if I publish to /zombies/A and I am subscribed to /zombies/B I will not receive that event. This type of topic-based subscription should be familiar to those of you who have used service buses, messaging middleware, or the evil creature known as JMS.

So let’s get to it. First, I want to create my own subchannel classification. The beauty of this is that you can provide as little or as much logic as you like. In my case, I’m just going to do a standard string comparison to classify, and I am going to use “starts with” to identify a subchannel. I could get much more involved, even using regular expressions, if I wanted to.

class ZombieSightingSubclassEventBus extends ActorEventBus with SubchannelClassification {
  type Event = ZombieSightingEvent
  type Classifier = String

  protected def classify(event: Event): Classifier = event.topic

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = x == y
    def isSubclass(x: Classifier, y: Classifier) = x.startsWith(y)
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.sighting
  }
}

Now if I modify my previous blog post’s application object as follows, I’ll get some very robust behavior:

object ZombieTrackerApp extends App {

  val system = ActorSystem()
  //val eventBus = new ZombieSightingLookupEventBus
  val eventBus = new ZombieSightingSubclassEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case s:ZombieSighting => println(s"Spotted a zombie! ${s}")
    }
  }))

  val westCoastSightingHandler = system.actorOf(Props(new Actor {
  	def receive = {
		case s:ZombieSighting => println(s"West coast zombie ${s}!!")
	}
  }))

  val eastCoastSightingHandler = system.actorOf(Props(new Actor {
	def receive = {
		case s:ZombieSighting => println(s"East coast zombie ${s}!!")
	}
  }))

  eventBus.subscribe(subscriber, "/zombies")
  eventBus.subscribe(westCoastSightingHandler, "/zombies/WEST")
  eventBus.subscribe(eastCoastSightingHandler, "/zombies/EAST")

  eventBus.publish(ZombieSightingEvent("/zombies/WEST", ZombieSighting("FATZOMBIE1", 37.1234, 45.1234, 100.0)))
  eventBus.publish(ZombieSightingEvent("/zombies/EAST", ZombieSighting("SKINNYBOY", 30.1234, 50.1234, 12.0)))

  // And this one will NOT go off into deadletter like before ... this satisfies "startsWith" on the first subscriber
  eventBus.publish(ZombieSightingEvent("/zombies/foo/bar/baz", ZombieSighting("OTHERONE", 35.0, 42.5, 50.0)))
  system.shutdown
}

Here I’ve decided to create one subscriber that listens for west coast zombies and another one that listens for east coast zombies. In the sample above I’ve used two anonymous actor classes but I could very easily have used instances of the same actor class that were primed with different constructor props (such as the region to which they were bound). As I mentioned above, the old listener should receive all events on /zombies, but also my new east and west subscribers should also receive /zombies messages in addition to their own regional localized events.

Here’s what the program run output looks like:

[info] Running ZombieTrackerApp 
Spotted a zombie! ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)
West coast zombie ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)!!
East coast zombie ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)!!
Spotted a zombie! ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)
Spotted a zombie! ZombieSighting(OTHERONE,35.0,42.5,50.0)
[success] Total time: 6 s, completed Feb 12, 2014 8:39:09 PM

And so this is hopefully enough to whet your appetite on the Akka Event Bus. And by “whet your appetite” I mean that in the most drug-pusher way possible. The first one’s free, now enjoy your new addiction to the event bus 🙂

Using the Akka Event Bus

It’s been so long since I’ve written a blog post about Akka, it feels like coming home after a long business trip – the softness that only your own bed in your own home can provide.

Anyway, let’s talk about event buses. An event bus is basically an intermediary between publishers and subscribers. Publishers publish events to the event bus and subscribers listening for certain types of events will receive those events. That’s the generic definition of an event bus. The Akka Event Bus is, as you might have guessed, an Akka-specific implementation of this pattern.

I find the Akka documentation for Event Bus to be inscrutable and downright off-putting for new developers who aren’t already Akka experts. Go ahead, try and read that stuff and work backwards from that into a hello world sample. You’re going to need at least a beer. So let’s start with the basics.

To set up an event bus you need three things: an event bus, subscribers, and publishers. When you create an event bus (a base class that you extend in Scala) you need to declare two types:

  • Event Type
  • Classifier Type

This is where the documentation gets annoying. The Event Type is just the type of objects that you expect to be put on the bus. You can make this type as specific as you want or as generic (Any works just fine as an event type, but I wouldn’t really recommend that as a pattern). The Classifier Type is the data type of the event bus classifier. This sounds more complicated than it needs to be. A classifier is really just some piece of information used to differentiate one event from another, or, in Akka parlance, to classify an event.

You could use an integer as a classifier and then when you subscribe your actors to the bus, you can subscribe them to individual numbers. So, let’s say you have 3 buckets, you could have a classifier and then publish into the 1 bucket or the 2 bucket (classifier). If you’re familiar with JMS or other messaging middleware systems then you might be familiar with the “topic” style classifiers, which often look like slash-delimited hierarchies such as /buckets/1 or /buckets/2. There is some additional complexity around classifiers and classification that I will defer until the next blog post so we can get right to the hello world sample and keep things simple.

First, we need a sample domain. As such, we’ll use the old standby domain of zombies. Let’s say we want to set up a bus to which we can publish zombie sighting messages. We then have subscribers who are interested in those zombie sighting messages for whatever reason. In the past, my actor samples all directly sent messages from one actor to another. But, with the Event Bus, the actors sending the message don’t have to care about or know anything about the actors interested in receiving it. This kind of loose coupling pays off in spades in ease of maintenance and troubleshooting.

And without further ado, here’s the full source that includes the zombie sighting message, a zombie sighting event (with a string classifier), the zombie bus, and the code that sets up the subscriptions and publishes to the bus:

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

case class ZombieSighting(zombieTag: String, lat: Double, long: Double, alt: Double)
case class ZombieSightingEvent(val topic: String, val sighting: ZombieSighting)

class ZombieSightingLookupEventBus extends ActorEventBus with LookupClassification {
  type Event = ZombieSightingEvent
  type Classifier = String

  protected def mapSize(): Int = 10

  protected def classify(event: Event): Classifier = {
    event.topic
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.sighting
  }
}

object ZombieTrackerApp extends App {

  val system = ActorSystem()
  val eventBus = new ZombieSightingLookupEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case s:ZombieSighting => println(s"Spotted a zombie! ${s}")
    }
  }))

  val indifferentSubscriber = system.actorOf(Props(new Actor {
    def receive = {
	   case s:ZombieSighting => println(s"I saw a zombie, but I don't give a crap.")
    }
  }))

  eventBus.subscribe(subscriber, "/zombies")
  eventBus.subscribe(indifferentSubscriber, "/zombies")

  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("FATZOMBIE1", 37.1234, 45.1234, 100.0)))
  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("SKINNYBOY", 30.1234, 50.1234, 12.0)))

  // And this one will go off into deadletter - nobody is subscribed to this.
  eventBus.publish(ZombieSightingEvent("/zombies/foo/bar/baz", ZombieSighting("OTHERONE", 35.0, 42.5, 50.0)))
  system.shutdown
}

And when we run this application, what we expect to see are two trace outputs from the subscriber that doesn’t give a crap and the subscriber that is outputting information about the zombie it received. Here’s the output:

[info] Running ZombieTrackerApp
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)

In the next blog post, I’ll cover how to upgrade this sample so that we can support the hierarchical classifier style (subchannels).

Building Play Framework Applications behind a Corporate Firewall

When you’re sitting at home using your Mac or your Windows computer and you’re the master of your domain, everything is good. You control your firewall rules, you control which ports are blocked and if you want to access publicly available resources then you can do so with impunity. The world is your oyster.

Until you get into the office.

Then you notice that you have blocked ports, you can’t reach github because it’s classified as “social networking”, you are required to have a username and a password just to get traffic through the corporate proxy to get to the outside world. And even then, after you’ve managed to get outside, you notice that your organization has blocked access to the central Maven Nexus repository – the source for pretty much any framework, library, or tool that can be packaged as a JAR. Including your application dependencies.

I had this problem and it was causing me no end of grief. So I thought I’ll just add a reference to my company’s private Nexus repository and all will be just fine. This seemed like it should work, and it seemed to fix the vast majority of the problems I found on Stack Overflow:

resolvers += "Corporate Nexus" at "http://corpserver.corp.com/nexus/content/groups/public"

This actually didn’t work. The problem? Play (or SBT underneath Play, actually) stopped trying to resolve dependencies after it failed to connect to public. So what I really needed to do was to convince Play (and SBT) to never even try to go to the public Nexus. To do that, I added the following code to my Build.scala:

externalResolvers <= resolvers.map { rs =>
   Resolver.withDefaultResolvers(rs, mavenCentral = false)

Now my code works just fine and I can declare dependencies on internal JARs (stuff other teams within my company have deployed, as well as my own internal products) and I can declare dependencies on public stuff and I don’t have to worry about Play/SBT failing the attempt after they get error messages from Maven central, because the resolution process now bypasses Maven central entirely.

I realize this may be a small piece of information, but it took me hours and hours of searching to try and find this and finally one of the many ridiculously helpful people at Typesafe linked me to some information on sbt that finally held the answer.

Hopefully this little tidbit helps someone else who is coding behind a corporate firewall and can’t figure out why dependency resolution is failing so miserably.

Building an MMO Server in Akka : Using MongoDB as a Backing Store

The story so far: I’ve created an Akka application that is using non-blocking Akka IO over sockets to communicate via Google Protocol Buffers with iOS (or potentially others…) clients. In the last blog post, I created some Actors that allow objects that have a physical presence in space to respond to messages, namely radar pings. This allows objects to “appear” to players. Their client will get radar pong messages, which will allow the client GUI to create icons for the items in the radar pong like planets, star ports, ships (owned players or NPCs), harvestable asteroids, and so on.

That’s all well and good, but in the last blog post, I had to hard code things like the name, description, and location of the objects in the universe. The first thing I needed to do was to refactor the PhysicalObject trait … in the last iteration it had “magic knowledge” of the types of physical objects that might be using that trait. So here’s my new physical object:

package com.kotancode.ulysses.space

import com.kotancode.ulysses.space.Vector3D._
import akka.actor._

case class RadarPing(source: ActorRef, origin: Vector3D, scanDistance: Int)
case class RadarPong(source: ActorRef, origin: Vector3D, objectType: Int, name:String, description: String)

object RadarObjectTypes {
	val Planet = 1
	val Station = 2
	val Port = 3
	val Ship = 4

	val Unknown = 99
}

trait PhysicalObject extends Actor with ActorLogging {
	def replyPong(p:RadarPing): Option[RadarPong]

	def receiveSpatial: Receive = {
		case p:RadarPing => {
			replyPong(p).map { pongReply => p.source ! pongReply }
		}
	}
}

There’s a couple of new things happening here. First, this trait no longer has a member variable called location. Second, in the last iteration, this trait constructed the pong reply itself. This would have been really problematic because I would have ended up with one method with a pile of “switch” cases to determine how it should reply in different circumstances… and that would’ve been smelly. Now, it is up to the actor using this trait to define a method that creates pong replies. Also, note that the abstract method that the concrete actors will implement returns an Option so I can just run a map over that – if the method returns a None then the above code won’t deliver the pong message.

Now that I’ve cleaned up my physical objects (to make them easier to be instantiated dynamically based on data from a backing store like Mongo), I can create a Persistable trait:

package com.kotancode.ulysses.state

import akka.actor._
import com.mongodb.casbah.Imports._
import org.bson.types.ObjectId

case class LoadState(id:String)
case object PersistState

object BackingStoreKeys {
	val Name = "name"
	val Location = "location"
	val Description = "description"
	val ID = "_id"
}

trait Persistable extends Actor {
	var backingObject: MongoDBObject = null

	def receivePersistable: Receive = {
		case LoadState(id) => {
			loadFromMongo(id)
		}
		case PersistState => {
		  // Nothing here yet...
		}
	}

	def persistenceCollectionName:String

	def loadFromMongo(id:String) = {
		val mongoClient = MongoClient()
		val ulysses = mongoClient("ulysses")
		val collection = ulysses(persistenceCollectionName)
		val objectId = new ObjectId(id)
		backingObject = collection.findOne(MongoDBObject(BackingStoreKeys.ID -> objectId)).getOrElse { throw new IllegalArgumentException(s"id ${id} not found in ${collection}")}
		println(s"backing object ${backingObject}")
	}
}

Here I’m using the Casbah library for Scala to talk to MongoDB. What’s worth mentioning here is that the only way a persistable actor should load its state is after it has received a message to do so. This keeps the data-loading activity in the thread-safe message pattern already enforced by Akka, so I don’t have to worry about race conditions where I have to delay some activity until after my reference data has been loaded (this is a common problem I see regularly in message-based applications).

Let’s see what our new, cleaner Planet actor looks like now:

package com.kotancode.ulysses.space

import com.kotancode.ulysses.space.Vector3D._
import com.kotancode.ulysses.state.Persistable
import com.kotancode.ulysses.state.BackingStoreKeys
import com.mongodb.casbah.Imports._

import akka.actor._

object Planet {
	val BackingCollectionName = "planets"
}

class Planet extends Actor with PhysicalObject with Persistable {

	override def persistenceCollectionName = Planet.BackingCollectionName

	override def replyPong(ping:RadarPing): Option[RadarPong] = {
		val currentLocation = Vector3D.fromBackingObject(backingObject.as[BasicDBObject](BackingStoreKeys.Location))
		if ( (currentLocation distanceFrom ping.origin) < ping.scanDistance) {
			Some(RadarPong(self,
				currentLocation,
				RadarObjectTypes.Planet,
				backingObject.as[String](BackingStoreKeys.Name),
				backingObject.as[String](BackingStoreKeys.Description)
			))
		}
		else {
			None
		}
	}

	def receivePlanet: Receive = {
		case _ => log.debug("Received some message for a planet.")
	}

	def receive = {
		receivePersistable orElse receiveSpatial orElse receivePlanet
	}
}

In my Mongo database, the location field contains a nested object with the properties x, y, and z. The Vector3D class has a utility method on it (I will refactor this into a “pimp” or “augment” pattern later) that converts a Casbah BasicDBObject into a 3D vector by extracting these properties.

Finally, remember that Universe class? The one that hard-coded the instantiation of the “Utopia 1” planet? Here’s what it looks like now:

package com.kotancode.ulysses.space
import com.kotancode.ulysses.state.LoadState
import com.kotancode.ulysses.state.Repository

import akka.actor._

class Universe extends Actor with ActorLogging {

	def loadPlanets = {
		for (id <- Repository.allIds(Planet.BackingCollectionName)) {
			context.actorOf(Props(new Planet()), name=id) ! LoadState(id)
		}
	}
	override def preStart() = {
		loadPlanets
	}

	def receive = {
		case p:RadarPing => context.children foreach (_.forward(p))
		case _ => log.debug("Received a universe message")
	}
}

Here I’m just making use of a simple object I created called Repository that issues a Casbah query to Mongo – it’s an empty query on a collection so it returns all the objects and then converts the Object IDs into strings and yields them so they can be used in an enumerator, as shown in the above code.

Now, instead of having a hard coded planet, I can create multiple planets in my database and this code will instantiate an actor for each of those planets, each planet will be backed by its own unique Mongo DB document. Here’s some trace log output that shows a pong reply from each of the 3 planets I created in my MongoDB:

[DEBUG] [03/24/2013 08:31:12.998] [UlyssesAgenda-akka.actor.default-dispatcher-1] [akka://UlyssesAgenda/user/ServerCore/Clients/96e1e42b-5cda-4faa-aa89-019c2bde9794] I received a radar pong RadarPong(Actor[akka://UlyssesAgenda/user/ServerCore/Universe/514de823b485acb28e02bb91],(5.0,5.0,5.0),1,Utopia 1,This is a utopian planet, chock full of win.)

[DEBUG] [03/24/2013 08:31:12.999] [UlyssesAgenda-akka.actor.default-dispatcher-1] [akka://UlyssesAgenda/user/ServerCore/Clients/96e1e42b-5cda-4faa-aa89-019c2bde9794] I received a radar pong RadarPong(Actor[akka://UlyssesAgenda/user/ServerCore/Universe/514e194004538147414b4675],(6.0,6.0,6.0),1,Utopia 2,This is a little less utopian than the other one.)

[DEBUG] [03/24/2013 08:31:12.999] [UlyssesAgenda-akka.actor.default-dispatcher-1] [akka://UlyssesAgenda/user/ServerCore/Clients/96e1e42b-5cda-4faa-aa89-019c2bde9794] I received a radar pong RadarPong(Actor[akka://UlyssesAgenda/user/ServerCore/Universe/514e19b304538147414b4676],(7.0,7.0,7.0),1,Doom,Every game needs a planet doom.)

Now that I’ve got the core framework for persistence up and running, I can do some more useful things like giving players persistence so I can keep their names and locations separate. Once I have that, I can move on to creating the most basic of the actual gameplay functions.

Using WebSockets with Play Framework 2.1.0

As odd as this may seem to some of you, I have been creating an enterprise web application using the Play Framework and the full typesafe stack, including Scala and Akka. It doesn’t surprise me, however, as the experience I have been having with this stack is quite possibly one of the most productive experiences I’ve had with any web dev stack, open source or commercial.

To get right to the chase – this web application allows multiple users to edit the same entity (which I am calling documents for now) at the same time. I’ve already got the optimistic concurrency thing taken care of (e.g. you can’t commit your changes over top of someone else’s changes). What I wanted to make things even cooler was to push a “toast” (a fade-in-fade-out notification dialog) to the user if they are currently looking at a page that was just modified by someone else.

The solution? WebSockets. I decided I would create a news feed of edits and then each page can decide if it wants to use that socket to be made aware of notifications. The JavaScript on that page decides if the notification is relevant (e.g. the edit news item is for that current document). To do this, I adapted the Play Framework sample websockets-chat to serve as a document edit news feed. Users join the feed when the page decides it needs that feed (for push or pull or both), and they quit when the page goes away (if you navigate away from a page that had a JavaScript-initiated websocket connection, the connection dies and the chatroom-based code considers that a Quit).

So, here’s the code for my WebSocket controller, which will control all of the feeds for my site:

object WebSockets extends Controller {
    def editFeed(user:String) = WebSocket.async[JsValue] { request =>
        ChangeFeed.join(user)
    }
}

And here’s the code for the ChangeFeed actor class and it’s companion object. This code should look ridiculously familiar, since it’s basically the chatroom example. All I’ve really done is replace the Talk case class with my PostUpdate case class, which contains information about the type of document being updated and the document name. This information is enough for the individual pages to decide whether they need to display these push notifications.

// imports cut for brevity..
object ChangeFeed {
   implicit val timeout = Timeout(1 second)

   lazy val default = {
     val feedActor = Akka.system.actorOf(Props[ChangeFeed])
     // -- this is where the chatroom robot used to get initialized...
     feedActor
   }

   def join(username:String): scala.concurrent.Future[Iteratee[JsValue,_],Enumerator[JsValue])] = {
   ( default ? Join(username)).map {
     case Connected(enumerator) =>
       val iteratee = Iteratee.foreach[JsValue] { event =>
          default ! PostUpdate(username, (event \ "docType").as[String], (event \ "docName").as[String])
       }.mapDone { _ =>
          default ! Quit(username) // this quits the channel when the web page goes away
       }
       (iteratee, enumerator)
    case CannotConnect(error) => // .. handle error (snipped for brevity)
   }
   }
}

// The chatroom only allows one username per socket, on a website,
// my users could have multiple tabs open to the same site, so I let them
// open new sockets even if the username is the same.
class ChangeFeed extends Actor {
    val members = Set.empty[String]
    val (feedEnumerator, feedChannel) = Concurrent.broadcast[JsValue]

   def receive = {
     case Join(username) => {
         // yes, I know there's a more idiomatic scala way to do this
         if (!members.contains(username)) {
             members = members + username
         }
         sender ! Connected(feedEnumerator)
         self ! NotifyJoin(username)
      }
     case NotifyJoin(username) => {
        notifyAll("join", username, "", "")
     }
     case PostUpdate(username, docType, docName) => {
        notifyAll("update", username, docType, docName)
     }
     case Quit(username) => {
        members = members - username
        notifyAll("quit", username, "", "")
     }
   }

   def notifyAll(kind: String, user:String, docType: String, docName: String) {
     val msg = JsObject(
         Seq(
            "kind" -> JsString(kind),
            "user" -> JsString(user),
            "docType" -> JsString(docType),
            "docName" -> JsString(docName),
            "members" -> JsArray(members.toList.map(JsString))
         )
     )
   }
}

// case classes snipped for brevity

Once I’ve got a controller that can ask an Actor for a socket and dispense it accordingly, then the only thing left for me to do is create some client-side JavaScript that accesses my web socket and there are plenty of examples of doing that on the web. The one piece of this that makes things even nicer in Play is that I can automatically get the websocket URL from a controller route:

var WS = window['MozWebSocket'] ? MozWebSocket : WebSocket
var feedSocket = new WS(@"routes.WebSockets.editFeed.webSocketURL()")

This automatically gives me a URL that looks like ws://server/websockets/editFeed.

All I can say is that so far, using Play to build an enterprise web application has been an absolute joy and, having been able to create this websocket-based “edit feed” infrastructure in just a few hours was absolutely amazing.

You spilled some OOP in my Actor System!

Lately I’ve been doing a lot of experimenting and playing with Scala and Akka. I’ve been building a MUD to give me a goal as I learn the language, the framework, and the intricacies involved.

So far I’ve been focused mostly on syntax and learning what idiomatic Scala looks like, how to create and use Akka actors, and so on. It wasn’t until the other night that I started thinking about design and philosophy. This mini-revelation came about when I spent hours trying to find a way to get the actual type from underneath an ActorRef.

I was using context.actorOf() to create actors, as one should with Akka 2.0. So I might be creating and starting actors for rooms, monsters, or living room tables – anything that I thought might be instructive to mess around with. This is when I noticed that once you create an ActorRef by adding your actor to the actor system (either via system.actorOf or context.actorOf) you can’t get at the actual class underneath. All you can do is interact with it like an actor-shaped black box by sending and receiving messages.

In other words, let’s say I’ve got a class called Zombie that is an actor that I’ve added to the actor system. The Zombie has properties like name, eatingSpeed, and percentLivingTissueRemaining. With regular Scala actors, I would’ve been able to mix and match classic OOP-style programming with actor-style programming:

println(tehZombie.name)
tehZombie.percentLivingTissueRemaining = 12
tehZombie ! FeedUpon(randomVictim)

At first this might look nice and appealing, but there’s something insidious happening here. First, our code has access to the internal state of tehZombie for both reading and writing. Secondly, we can send actor-style messages to tehZombie. Back during my first attempt at a MUD using regular Scala actors, I regularly abused this capacity – reading public fields from instances of actors all the while sending them messages.

One main purpose of an actor system is to take the complexity away of multi-threaded access to shared state and business logic. If we have one actor able to access another actor using standard OOP-style member-access patterns, we’ve basically ruined all of the advantages we’re getting from the actor system to begin with because access to public state on an object is not inherently thread-safe.

If we change internal state on an Akka actor in response to a message, we are guaranteed that such a state change is thread-safe. Not only is it thread-safe, it’s single-threaded because the actor’s inbox is being processed by that actor’s thread (or thread pool) and no one else.

So what does this mean? It means that, within the context of an actor system, public state and OOP-style member access is BAD.

Unfortunately, what this means for my MUD is that I’ve got some refactoring to do because I noticed that I’ve been cheating and I passed around a reference to this to another actor – which gave that actor “unsafe” access to public members of my class. What I really should be doing is making sure that nothing on my actors is exposed publicly and I should be aiming for a message pure environment where everything works as though actors are black boxes that just receive messages, do what they need to do, and send back out messages to other actors within the system.

Akka 2.0 enforces an awful lot of this by suppressing access to this from other actors (I cheated… bad, bad, Kevin). Hopefully within a few days (after the Giants win the Super Bowl) I will have some refactoring checked into the Github repo that hopefully has a fully functioning system that is clean, message pure, and exposes no unsafe members.

Wish me luck 🙂

ScalaMUD – Updated to Akka 2.0-M3

Tonight I decided to dig back into my tiny little MUD sample (which doesn’t have much in common with a real MUD at this point… but that will change soon) and upgrade it to work with the current milestone of Akka, 2.0-M3 which came out just last week from what I can tell.

There is actually quite a bit of difference in complexity and power between Akka 1.3 and Akka 2.0. There’s a detailed blog post here that covers some details on the breaking changes between 1.2/1.3 and 2.0.

The biggest thing that concerns me is that Akka 2.0 has this concept of an actor system, which is really a tree-like hierarchy of actors. Some actors act in a supervisory capacity over other child actors and this tree is query able at runtime. If this sounds like Erlang’s supervisor process system to you that’s no accident… the philosophy guiding the development of Akka 2.0’s actor system is very Erlang-ish.

So when I create my top-level Game singleton object, I now need to create an instance of an Actor System and fire up a top level actor (I can have multiple top-level actors):

package com.kotancode.scalamud

import akka.actor._
import akka.routing._

/*
 * Root-level Game singleton
 * Creates the Akka Actor System for all subsequent game objects
 */
object Game extends App {
	val system = ActorSystem("ScalaMUD")
	val server = system.actorOf(Props(new GameServer()), "server")
	println("System Starting")
	server ! ServerStart()
}

Note here the use of system.actorOf. If I want to create a subordinate (child) actor within the context of the parent actor doing the creation, then I would use context.actorOf. I can give each of my actors a unique name so that they have descriptive path names when they appear in the actor system tree.

So far I was happy with the upgrade and I didn’t have too much trouble. Then I got to the point where I was creating anonymous actors in order to execute my while loops for accepting sockets and reading input from a socket in the background… Akka 2.0 has removed the spawn { } syntax and replaced it with some severe limitations, namely you can no longer create anonymous actors that do not implement a receive method. This means the old spawn { // do something } construct is impossible, replaced with this snippet from my GameServer class:

case s:ServerStart => {
	context.actorOf(Props(new Actor {
		def receive = {
			case ss:ServerStart => {
				startSocketListener
			}
		}
	})) ! s

If you’re thinking OMGWTFBBQ at this point, you’re not alone. Basically what I’m doing is creating an anonymous actor, defining a receive method that receives a specific message and does something in response to that message and then sending a message to my anonymous actor. This is actually the way the documentation tells you to spawn off asynchronous background tasks without using named actors.

Before you start the flame war here – it is worth taking a minute to think hard about the fact that everything that occurs asynchronously within Akka is supposed to be an asynchronous event-driven, message-passing construct. In short, if I was really anal about this, I would have spawned off an actor that reads from the input socket stream in its own private loop and I would have spawned off another actor that encapsulates the output writer. When the input reader has read a line, it should send a message indicating that a line has been read and when something needs to send text to a player, it should do so through its output writer actor.

At this point I’m not that anal and I don’t need that level of complexity, so I’m leaving it out. I only mention this alternative to ease the pain of the new way of doing anonymous actors.

In short, I did get the MUD running on Akka 2.0-M3 and, while a little put off by the anonymous actor syntax, I understand its need and I applaud the type safe people for forcing actors to receive messages.

If you want to follow the code for the MUD as I continue working on it, check out my GitHub repo.

Having fun with the Play! Framework

Those of you who have read this blog for a while know that I’ve spent a considerable amount of time with ASP.NET. In fact, I’ve been using ASP.NET since version 1.0, written several books that involve ASP.NET (including ASP.NET 4.0 Unleashed with Nate Dudek), and have done a ton of work with ASP.NET MVC. I’ve also written a pile of applications in Ruby on Rails. I’ve even written an application with Groovy on Grails.

To say that I am a fan of MVC-based web application frameworks would be like saying that a Bugati Veyron is a “kinda fast” car.

So when I saw Play!, a Java-based (and Scala, but we’ll cover that in another post) MVC web application framework, I figured I’d give it a shot. I have to admit that I was a bit skeptical at first. The concept of quick, fast development in a fluid, agile style doesn’t exactly scream “Java”, but I was open to having my mind changed.

The first thing I noticed was that there are no class files. The “play” interactive shell (which I believe can be run as a service for production deployments) takes care of the live compilation for you whenever anything changes. Sweet! That is about as un-java as you can expect… I fully expected to have to run some obtuse Maven build every time I changed the color of a span on a page.

The structure smells very much like an ASP.NET MVC application. There’s an app folder and beneath that you have controllers, models, and views. Each controller class is just a Java class that inherits from a base controller and is responsible for rendering a page.

There’s a routing table that works very much like ASP.NET MVC’s internal routing tables but doesn’t require me to write code to generate routes like MVC, they’re in a text file like Ruby and Grails:

# Routes
# This file defines all application routes (Higher priority routes first)
# ~~~~

# Home page
GET     /                                       Application.index

# Ignore favicon requests
GET     /favicon.ico                            404

# Map static resources from the /app/public folder to the /public path
GET     /public/                                staticDir:public

# Catch all
*       /{controller}/{action}                  {controller}.{action}

There’s a great deal of flexibility in this routes file that I haven’t covered. If you’re interested, head over to the Play! website and check the documentation and tutorials which are actually pretty good.

The Play! template language looks very similar to ASP.NET MVC as well, allowing you to blend HTML elements, function calls, variables, and more all in one fairly seamless HTML file. It’s not quite as concise as the ASP.NET MVC Razor syntax, but not as ugly as the old non-Razor syntax:

#{extends 'main.html' /}
#{set title:'Hello World' /}

Greetings, ${userName}<br/>

A view like this is made possible by calling render(userName) inside a controller. Note that unlike a property bag style usage from ASP.NET, I don’t have to give the userName variable a key – the view template knows that variable name implicitly. If I passed in a complex object such as user, then I could do things like ${user.name} in my template view.

The #{extends ‘main.html’ /} tag works very much like master pages or content place holders in the ASP.NET world. Main.html has some wrapper markup and then it identifies where extensions can take place. The hello.html content will appear wherever that content extension is indicated. You can get fairly advanced and have multiple content placeholders in a template, you can chain extensions like master page inheritance.

Finally, the other thing I liked about Play! is that it uses a built-in dependency management system for the framework itself but then resorts to Maven for resolving external dependencies. So, if my application depends on some other artifact that is floating around in a public nexus, I can just add a line like this to my dependencies.yml file:

require:
– play
org.ektorp -> org.ektorp 1.2.1

When I run play dependencies from the command line, the dependencies are resolved, downloaded, and stored in my lib directly, along with all transitive dependencies.  To be honest, if I had to manage my own dependencies for an MVC framework, I would never have even made it to “hello world”.

Overall I’m fairly impressed with Play! and will be continuing to play around with it and see what it can do and how it might be limited (or superior) to other MVC frameworks I’ve used in the past.