Kotan Code 枯淡コード

In search of simple, elegant code

Menu Close

Tag: actors (page 1 of 3)

Creating a Multiplayer Chat System with the Akka Event Bus

In my previous blog post, I talked about how to use the subchannel classification system with the Akka Event Bus. In that post, I showed a really simplistic way of determining if a channel is a subclass of another – using startswith(). That’s a really primitive way of doing things and doesn’t really show you the true power of the event bus.

In this blog post, I want to walk you through a slightly more realistic example. Let’s assume that we’re using Akka to build the back-end for a multiplayer game. We know that this game needs to support chatting between players, so we want to create an event bus to handle all of the chat traffic for the game. This should, in theory, give the system some nice flexibility in allowing individual players and player-actor components to subscribe and unsubscribe from different segments of the chat traffic.

We have three different segments that we want to divide traffic up into:

  1. Players – Individual players can receive private chat messages, or tells.
  2. Sectors – The players’ space ships are floating in 3D space in large areas called sectors. It is possible for those players to transmit messages to an entire sector.
  3. Groups – Players can form small groups where from 1 to 8 players band together to share experience as well as their own chat channel.

One of the interesting things to keep in mind is that our chat bus cannot read player state. In order for the chat bus to be really useful, general purpose, and, most importantly, testable, it can’t go reaching into the inside of some player object to figure out which sector object they are in or to which group they belong.

So we’re going to revise our previous sub-classification event bus sample with a new case class called ChatCoordinate that will be used as our classifier (remember in the previous blog post we used simple strings). Here’s the new code for the bus and its messages:

package chatbus

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.event.SubchannelClassification
import akka.util.Subclassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

/*
 * Chat Bus Coordinates
 * players/XXX - sends a private tell to a player
 * sectors/XXX - sends a tell to an entire sector
 * groups/XXX - sends a tell to members of a player group
 */
object ChatSegments {
	val Player = "players"
	val Sector = "sectors"
	val Group = "groups"
}

case class ChatCoordinate(segment: String, target: Option[String])

sealed trait Chat
case class ChatMessage(source: String, message: String) extends Chat
case class ChatEvent(coord:ChatCoordinate, msg: ChatMessage) extends Chat

class ChatEventBus extends ActorEventBus with SubchannelClassification {
  type Event = ChatEvent
  type Classifier = ChatCoordinate

  protected def classify(event: Event): Classifier = event.coord

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = (x.segment == y.segment && x.target == y.target)

    /* Is X a subclass of Y? e.g. is Player/Bob a subclass of Player/None? YES. Is Player/None a subclass of Player/Bob? NO. */
    def isSubclass(x: Classifier, y: Classifier) = {
		val res = (x.segment == y.segment && x.target == None)
		//println(s"Subclass check: $x $y: $res")
		res
	}
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.msg
  }
}

This new chat bus gives us some great flexibility. I can send a message to all players by sending a message to the Players segment with a target of None. I can send a message to all groups by sending a message to the Groups segment with a target of None, a specific group by sending to Groups/Some(“GroupId”) and a specific player by sending to Players/Some(“Player Id”).

One of the aspects of gameplay that this chat bus supports is a player moving from one sector to another. If a player leaves a sector, then ideally some actor (maybe a Player actor?) would unsubscribe from the chat bus @ Sectors/Some(“Foo”) and then subscribe to the chat bus @ Sectors/Some(“Bar”) if they moved from sector Foo to sector Bar, while always maintaining a persistent subscription to Players/Some(“Player ID”). Likewise, if a player joins a player group, they would subscribe to Groups/Some(“Group ID”) and unsubscribe from that same coordinate when they leave the group.

The system is inherently extensible and so whenever we want to add a new segment (e.g Race, Alliance, Guild, etc) we can just drop that in there and we’re good to go.

Finally, just so you believe me, here’s some Scala Test specs exercising the chat bus with some fake actors:

package tests

import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.TestKit
import akka.testkit.TestProbe
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.ImplicitSender
import scala.concurrent.duration._

import chatbus._

object ChatBusSpec {
	
	val toKevin = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Kevin")), ChatMessage("system", "This goes just to Kevin"))
	val toBob = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Bob")), ChatMessage("system", "This goes just to Bob"))
	val toAllPlayers = ChatEvent(ChatCoordinate(ChatSegments.Player,None), ChatMessage("kevin", "This goes to all players"))
	
	val toSectorAlpha = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Alpha")), ChatMessage("system", "Sector Alpha is about to explode."))
	val toSectorBeta = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Beta")), ChatMessage("system", "Sector Beta is about to explode."))
}

class ChatBusSpec (_system: ActorSystem) extends TestKit(_system) with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {
 
  import ChatBusSpec._

  val theProbe = TestProbe()

  def this() = this(ActorSystem("ChatBusSpec"))

  def getEchoSubscriber() = {
	system.actorOf(Props(new Actor {
	    def receive = {
	      case m:ChatMessage => theProbe.ref ! m
	    }
	}))	
  }
 
  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "A chat event bus" must {
     "send a private message to a player with no snoopers" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		theProbe.expectNoMsg(500 millis)
     }	

     "send a single message to all players" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toAllPlayers)
		// Each player should receive one of these, so the probe should bounce it back twice.
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
     }

     "send to all players in a sector should only deliver once per player" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
	
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		theProbe.expectNoMsg(500 millis)
     }

     "support a player moving from one sector to another" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		eventBus.unsubscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Beta")))
		eventBus.publish(toSectorBeta)
		theProbe.expectMsg(500 millis, toSectorBeta.msg)
     }
  }
	
}

Segmenting Traffic on the Akka Event Bus with Subchannel Classification

In my previous post, I provided a simple “hello world” type application that utilizes the Akka Event Bus. In this sample, I showed how you can use a simple LookupClassification to classify the events being published. I glossed over this detail in the previous post to keep things simple, but the lookup classification does what its name implies – it does a lookup on the classifier (in my case, it was a string) and returns the subscribers for that classifier. There is no hierarchy involved. With this situation, if you subscribe to /zombies/Foo you will not see events published to /zombies.

In this blog post, I want to talk about how we can add hierarchical topic support to the event bus by switching from a lookup classification to a subchannel classification. In the subchannel classification system, I can subscribe to /zombies/Foo and I will receive events published to /zombies. Additionally, if I publish to /zombies/A and I am subscribed to /zombies/B I will not receive that event. This type of topic-based subscription should be familiar to those of you who have used service buses, messaging middleware, or the evil creature known as JMS.

So let’s get to it. First, I want to create my own subchannel classification. The beauty of this is that you can provide as little or as much logic as you like. In my case, I’m just going to do a standard string comparison to classify, and I am going to use “starts with” to identify a subchannel. I could get much more involved, even using regular expressions, if I wanted to.

class ZombieSightingSubclassEventBus extends ActorEventBus with SubchannelClassification {
  type Event = ZombieSightingEvent
  type Classifier = String

  protected def classify(event: Event): Classifier = event.topic

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = x == y
    def isSubclass(x: Classifier, y: Classifier) = x.startsWith(y)
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.sighting
  }
}

Now if I modify my previous blog post’s application object as follows, I’ll get some very robust behavior:

object ZombieTrackerApp extends App {

  val system = ActorSystem()
  //val eventBus = new ZombieSightingLookupEventBus
  val eventBus = new ZombieSightingSubclassEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case s:ZombieSighting => println(s"Spotted a zombie! ${s}")
    }
  }))

  val westCoastSightingHandler = system.actorOf(Props(new Actor {
  	def receive = {
		case s:ZombieSighting => println(s"West coast zombie ${s}!!")
	}
  }))

  val eastCoastSightingHandler = system.actorOf(Props(new Actor {
	def receive = {
		case s:ZombieSighting => println(s"East coast zombie ${s}!!")
	}
  }))

  eventBus.subscribe(subscriber, "/zombies")
  eventBus.subscribe(westCoastSightingHandler, "/zombies/WEST")
  eventBus.subscribe(eastCoastSightingHandler, "/zombies/EAST")

  eventBus.publish(ZombieSightingEvent("/zombies/WEST", ZombieSighting("FATZOMBIE1", 37.1234, 45.1234, 100.0)))
  eventBus.publish(ZombieSightingEvent("/zombies/EAST", ZombieSighting("SKINNYBOY", 30.1234, 50.1234, 12.0)))

  // And this one will NOT go off into deadletter like before ... this satisfies "startsWith" on the first subscriber
  eventBus.publish(ZombieSightingEvent("/zombies/foo/bar/baz", ZombieSighting("OTHERONE", 35.0, 42.5, 50.0)))
  system.shutdown
}

Here I’ve decided to create one subscriber that listens for west coast zombies and another one that listens for east coast zombies. In the sample above I’ve used two anonymous actor classes but I could very easily have used instances of the same actor class that were primed with different constructor props (such as the region to which they were bound). As I mentioned above, the old listener should receive all events on /zombies, but also my new east and west subscribers should also receive /zombies messages in addition to their own regional localized events.

Here’s what the program run output looks like:

[info] Running ZombieTrackerApp 
Spotted a zombie! ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)
West coast zombie ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)!!
East coast zombie ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)!!
Spotted a zombie! ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)
Spotted a zombie! ZombieSighting(OTHERONE,35.0,42.5,50.0)
[success] Total time: 6 s, completed Feb 12, 2014 8:39:09 PM

And so this is hopefully enough to whet your appetite on the Akka Event Bus. And by “whet your appetite” I mean that in the most drug-pusher way possible. The first one’s free, now enjoy your new addiction to the event bus 🙂

Using the Akka Event Bus

It’s been so long since I’ve written a blog post about Akka, it feels like coming home after a long business trip – the softness that only your own bed in your own home can provide.

Anyway, let’s talk about event buses. An event bus is basically an intermediary between publishers and subscribers. Publishers publish events to the event bus and subscribers listening for certain types of events will receive those events. That’s the generic definition of an event bus. The Akka Event Bus is, as you might have guessed, an Akka-specific implementation of this pattern.

I find the Akka documentation for Event Bus to be inscrutable and downright off-putting for new developers who aren’t already Akka experts. Go ahead, try and read that stuff and work backwards from that into a hello world sample. You’re going to need at least a beer. So let’s start with the basics.

To set up an event bus you need three things: an event bus, subscribers, and publishers. When you create an event bus (a base class that you extend in Scala) you need to declare two types:

  • Event Type
  • Classifier Type

This is where the documentation gets annoying. The Event Type is just the type of objects that you expect to be put on the bus. You can make this type as specific as you want or as generic (Any works just fine as an event type, but I wouldn’t really recommend that as a pattern). The Classifier Type is the data type of the event bus classifier. This sounds more complicated than it needs to be. A classifier is really just some piece of information used to differentiate one event from another, or, in Akka parlance, to classify an event.

You could use an integer as a classifier and then when you subscribe your actors to the bus, you can subscribe them to individual numbers. So, let’s say you have 3 buckets, you could have a classifier and then publish into the 1 bucket or the 2 bucket (classifier). If you’re familiar with JMS or other messaging middleware systems then you might be familiar with the “topic” style classifiers, which often look like slash-delimited hierarchies such as /buckets/1 or /buckets/2. There is some additional complexity around classifiers and classification that I will defer until the next blog post so we can get right to the hello world sample and keep things simple.

First, we need a sample domain. As such, we’ll use the old standby domain of zombies. Let’s say we want to set up a bus to which we can publish zombie sighting messages. We then have subscribers who are interested in those zombie sighting messages for whatever reason. In the past, my actor samples all directly sent messages from one actor to another. But, with the Event Bus, the actors sending the message don’t have to care about or know anything about the actors interested in receiving it. This kind of loose coupling pays off in spades in ease of maintenance and troubleshooting.

And without further ado, here’s the full source that includes the zombie sighting message, a zombie sighting event (with a string classifier), the zombie bus, and the code that sets up the subscriptions and publishes to the bus:

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

case class ZombieSighting(zombieTag: String, lat: Double, long: Double, alt: Double)
case class ZombieSightingEvent(val topic: String, val sighting: ZombieSighting)

class ZombieSightingLookupEventBus extends ActorEventBus with LookupClassification {
  type Event = ZombieSightingEvent
  type Classifier = String

  protected def mapSize(): Int = 10

  protected def classify(event: Event): Classifier = {
    event.topic
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.sighting
  }
}

object ZombieTrackerApp extends App {

  val system = ActorSystem()
  val eventBus = new ZombieSightingLookupEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case s:ZombieSighting => println(s"Spotted a zombie! ${s}")
    }
  }))

  val indifferentSubscriber = system.actorOf(Props(new Actor {
    def receive = {
	   case s:ZombieSighting => println(s"I saw a zombie, but I don't give a crap.")
    }
  }))

  eventBus.subscribe(subscriber, "/zombies")
  eventBus.subscribe(indifferentSubscriber, "/zombies")

  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("FATZOMBIE1", 37.1234, 45.1234, 100.0)))
  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("SKINNYBOY", 30.1234, 50.1234, 12.0)))

  // And this one will go off into deadletter - nobody is subscribed to this.
  eventBus.publish(ZombieSightingEvent("/zombies/foo/bar/baz", ZombieSighting("OTHERONE", 35.0, 42.5, 50.0)))
  system.shutdown
}

And when we run this application, what we expect to see are two trace outputs from the subscriber that doesn’t give a crap and the subscriber that is outputting information about the zombie it received. Here’s the output:

[info] Running ZombieTrackerApp
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)

In the next blog post, I’ll cover how to upgrade this sample so that we can support the hierarchical classifier style (subchannels).

Asynchronous, Non-Blocking NOM NOMs

Most of the time when I encounter fascinating situations in the real world, I am struck by how well that situation might translate into a novel. For example, I constantly see situations and people that inspire character traits or plot points that someday might make their way into one of my books. However, every once in a while, I see a situation in real life that tickles my technical fancy.

There is a “deli” (the quotes are because in New York, “deli” doesn’t mean what most people think it means.. a NY deli is often a giant cafeteria-style buffet place that serves 20 kinds of food, and sells everything from chips to bobble-heads and gummy bears) near my office in Manhattan that is a marvel of modern retail efficiency. On its busiest day, you can make your way through the throng of people, order what you want, and get out in less than 20 minutes. The really remarkable thing is that the checkout line is unbelievably fast and I can be six people back and still get through the line and out the front door in less than 3 minutes.

What makes this an interesting technical blog post is that this place employs a number of techniques that people like me have been using as large-scale application development design patterns for years. My coworkers and I often refer to the lines of people feeding up toward the cash registers as non-blocking, asynchronous processing.

You really need to see these people in action to believe it. There’s one person who is handling nothing but the credit card swiping, another person is handling the cash register, yet another takes your food and puts it in a bag for you, a bag which comes pre-loaded with a fork, a knife, and napkins. There is absolutely no wasted time from the perspective of the customer. All of the things that a customer might be blocked on in a traditional purchase queue have been parallelized. 

There isn’t a per-customer wait for the bag to get the common essentials stuffed into it. When the card-swiper person is swiping your credit card, the cash register person is actually ringing up the order of the person behind you in the queue, and the bag-stuffer is working on the bag of the person behind that.

This is all well and good and getting your food fast is always a good thing, but what does it have to do with building scalable systems? Everything.

Imagine that a web request is a customer waiting in line for food. If, upon this request’s arrival at the web server, the web server has to go and do everything in serialized order, for every single request, then the time it takes to handle a single request may not appear to be all that bad, but the more requests you have, the longer the lines get. The more back-up you have building up in a queue, the worse the perceived performance of your site will be, even if the time it takes to handle a single request is fixed and relatively short. Why is that? Because each person’s request isn’t being handled immediately upon arrival at the site, their requests are piling up in a queue waiting for other requests to be processed.

Typical scaling patterns just increase the number of concurrent threads to deal with incoming requests. That’s fine, and it works for a little while, until you run out of threads. Now, let’s say you have 30 threads. The first 30 people to hit your site have a decent experience and then the 31st and thereafter experience the same delays as before. People see this and the initial knee-jerk reaction is to add more servers. So now let’s say you’ve got 4 servers, each with 30 threads (I’m simplifying to make the numbers easier to picture). The first 120 people to hit your site now have a decent experience and then thereafter, additional customers are subject to delays and waits.

This is a classic fix the symptom not the problem approach. If we apply this to retail then I’m sure you know what the knee-jerk retail reaction to the high load and peak volume problems are – add more cashiers. So now instead of four queues handling people’s orders, you have 8. Great, but you run into the same problems as above, plus an even worse one – your store runs out of room to accommodate the people waiting in the queues. In some circles, we also refer to this problem as the latency versus throughput problem. It’s actually more involved than that. If you optimize for latency, then you train your cashiers to be unbelievably fast at processing a single order. This appears to be good because this (ideally) means your queue drains faster, with customers moving through the line faster. To increase your throughput, you just add more highly trained, super-fast cashiers. Simple, right? NOPE.

What has this one deli done that developers building systems for scale failed to do? It’s remarkably simple, when you think about it. Rather than taking a monolithic process and scaling that one process out (completing the entire checkout process from start to finish), they have deconstructed all of the tasks that need to be done in order to get a customer through the line and have applied enterprise software patterns to dealing with those tasks. First, they have done some tasks before the customers even arrive, such as pre-filling the bags with forks, knives, and napkins. This removes several seconds from the per-customer pipeline which, when you multiply that out by the number of customers in this place (it’s huge, trust me) and the number of registers, makes a significant impact.

The next thing they’ve done is identify tasks that can be done in parallel. The same cashier doesn’t need to ring up your order and swipe your credit card. These can be done by two different people. This is where throughput versus latency becomes really important. The per-customer time to finish remains the same because the customer can’t leave until both tasks are complete (ring-up and swipe), however, you’re dramatically increasing your throughput because while the first customer is having their card swiped, the next customer is having their order rung up, which allows the pipeline to absorb more customers.

So what’s the moral of this long-winded story? Simple: Model your request processing pipelines like a super-efficient Manhattan deli 🙂

The Futures Have Arrived

As you may know, I’ve been working on building a fairly complex enterprise LOB application using Play, Scala, and Akka. When I started, I had a rudimentary understanding of Akka and Play, and I’d done some playing around with Akka before, but my experience was a far cry from the type of experience a grizzled, production-deployed veteran might have.

My website, without giving away any details, allows me to do a keyword search across multiple different types of entities. Let’s say I’ve built a zombie apocalypse preparedness social networking site (who doesn’t need one of those?!?) and I want to be able to enter a keyword and have it look through the repository containing identified zombie classifications as well as possible friends and even the names of weapons.

I built this in a way that I thought was pretty decent, I created a Search actor that takes a KeywordSearch case class message. This Search actor then asks the Akka actor system for references to the different repositories, which are also actors. In this case, I might need a reference to the ZombieIdentificationRepository actor, the WeaponRepository actor, and the UsersRepository actor.

This problem is screaming for parallelism. I want the search actor to send fire-and-sorta-forget messages to the different repositories and, when it gets answers from all three repositories, send a message back to the actor that invoked the search containing all of the results. Further, I want the web request itself to not block on waiting for the results, so I want to use Play’s asynchronous controller pattern.

When I first built the search, I had code that looked kind of like this:

def index(keyWord:String) = Action { implicit request =>
 ...
 val resFuture = ActorProvider.Searcher ? SearchActorProtocol.KeywordSearch(keyword)
 val results = Await.result(resFuture, timeout.duration).asInstanceOf[Array[SearchResult]]
 Ok(views.html.search.index(results)
}

So, while under the hood I was using an Actor, I’m still performing an explicit block. Worse, the search actor was also performing an explicit block as it was doing an Await.result for each of the different repository queries. I had heard that Future composition was possible, so I thought I’d give it a shot.

Here’s how I refactored the Search actor’s search method to combine multiple searches performed in parallel:

def performKeywordSearch(seeker: ActorRef, keyword: String) {
    implicit val timeout = Timeout(3 seconds)

    val zombieFuture = ActorProvider.ZombieRepository ? KeywordSearch(keyword)
    val weaponFuture = ActorProvider.WeaponRepository ? KeywordSearch(keyword)
    val userFuture = ActorProvider.UserRepository ? KeywordSearch(keyword)

    val combFuture = for {
        z <- zombieFuture.mapTo[Array[SearchResult]]
        w <- weaponFuture.mapTo[Array[SearchResult]]
        u <- userFuture.mapTo[Array[SearchResult]]
    } yield seeker ! ( z ++ w ++ u).sortBy( r => r.name )
}

Note that there isn’t a single line of blocking code in the preceding sample. Everything happens when it’s done and there’s no explicit “sit and wait” code. I can even then modify the search controller’s method to remove the Await.result:

...
val searchFuture = ActorProvider.Searcher ? SearchActorProtocol.KeywordSearch(keyword)
val timeoutFuture = play.api.libs.concurrent.Promise.timeout("Failed to finish search", 5 seconds)
Async {
    Future.firstCompletedOf(Seq(searchFuture, timeoutFuture)).map {
        case searchResults: Array[SearchResult] =>
            render {
                case Accepts.Html() => Ok(views.html.search.index(searchResults))
                case Accepts.Json() => Ok(Json.toJson(searchResults))
            }
        case t:String => InternalServerError(t)
    }
}

And now I’ve removed a chain of blocking calls and replaced all of it with completely asynchronous, non-blocking code and now the only thing that ever sits and waits is Play itself, as it awaits the response from the Actor.

Coding with Futures and in a completely asynchronous fashion with Actors is difficult, and it requires a lot of up front effort and takes time to understand what’s really going on, but, as you can see from the code above, the clean and simple non-blocking elegance that you get as a reward is well worth the effort.

Implementing an Asynchronous Repository with Akka Actors

I’ve been fortunate enough to be able to work on an enterprise LOB application that uses the Typesafe stack – Play Framework using Scala and Akka. One of the things that I’ve noticed with application development in Scala is that every morning when I take a look at my code I scan it for opportunities to refactor. I want to know how I can reduce the number of lines of code, which reduces the number of ways it can fail, I want to know how I can make my code cleaner, more elegant, faster, and more scalable.

I was sifting through my code last week and I noticed this little nugget:

ZombieRepository.findByName( zombie.name ).map { ... some stuff ... }

My first thought was that this code looked really old fashioned. Making synchronous method calls like that is just so last year. I immediately saw an opportunity to add a point of scalability and durability by putting my zombie repository (it’s not actually zombies, I’m just using that as an example…) behind a nice abstraction point as well as allow for all kinds of things like load balancing of requests, distributed calls, etc. I decided to implement a repository as an Actor.

The first thing I did, which is part of one of my favorite practices of contract-first design, was build the Actor Protocol.

object ZombieRepositoryActorProtocol {
    case class Insert(zombie: Zombie)
    case class Update(zombie: Zombie)
    case class Delete(zombieId: String)
    case class QueryByName(zombieName: String)
}

At the moment I don’t need to create case classes for the replies (but I could easily do this) because these messages will either be one-way messages or the replies will be things like Vector[Zombie] or Option[Zombie].

In situations where I want to unit test things that rely upon this repository, I need to be able to give the repository a reference to a different actor than the default one we typically reply to (sender), so I can modify the protocol as follows to allow references to the Akka test actor to be passed along and, if it isn’t passed along, then we just reply to sender:

object ZombieRepositoryActorProtocol {
    case class Insert(zombie: Zombie)
    case class Update(zombie: Zombie)
    case class Delete(zombieId: String)
    case class QueryByName(zombieName: String, requester: Option[ActorRef])
}

I could also make the protocol a little more complicated and reply to some of the other operations with status codes or something to allow clients to confirm that an operation completed, but I am keeping it simple for now.

In my own code, the backing store is MongoDB but, another benefit of this type of encapsulation around the repository is that the only thing anybody else knows is how to send messages to the repo which take case classes as arguments. Not only is this a great decoupling away from a persistence medium, but it also allows for all the benefits you get from running Akka actors – you can put the repository behind a round-robin or load-balancing router, you can distribute the repository actors across a grid, and, my personal favorite, you get all the benefit of supervisory control so if your repository fails and crashes, another one can be started up, etc.

So now I can just handle messages like this:

import ZombieRepositoryActorProtocol._

def receive = {
    case Insert(zombie) => ...
    case Update(zombie) => ...
    case Delete(zombieName) => ...
    case QueryByName(zombie, requester) => requester.map { _ }.getOrElse { sender } ! FetchByName(zombie)  
}

I honestly don’t know how I got any large-scale, asynchronous work done before without the use of Akka.

Bundling Actor Messages into a Protocol

If you have done any programming with Actors (in my case Akka actors) then you know that the vast majority of the messages that you typically end up sending to those actors are case classes. Sure, you can send any type of data you like, but as a practice, most people like using case classes to make the pattern matching of the messages much easier to read. Sometimes you’ll see people send tuples or Vectors but only when it’s really clear that the actor doesn’t receive many other types of messages.

In situations where I have a ton of actors, one phenomenon that I’ve noticed is that I end up polluting package space with bucketloads of case classes. For example, if I put the StarportPlanetStarbase, and Ship actors in the com.kotancode.space package, I’ll likely end up with the messages for those actors all floating around within the package namespace. It might not seem like a problem, but let’s say I have an import com.kotancode.space._ line at the top of one of my actor files, now I’ve included all of those messages even though I’m just defining one actor.

I haven’t been able to find sufficient evidence to see who started the trend of actor protocols but I certainly cannot claim any credit, I’m just borrowing something I’ve seen. The idea is to wrap all of the case classes for a particular actor in a protocol object as a nice way of isolating those case classes in their own sub-scope, even if all the actors belong to the same package.

For example, instead of defining all my messages for Planet at the top of the Planet.scala file (like I normally would do), I can instead define them in an actor protocol like this:

object PlanetProtocol {
    case class Land(player:ActorRef)
    case class Depart(player:ActorRef)
    case class HarvestResources(player:ActorRef, ... , ... )
    case class ...
}

Now, I can define a receive method that looks like this, which keeps the case classes from cluttering up the namespace and, in my opinion, is just much cleaner and more elegant than scattering case classes to the winds at random.

def receive = {
    import PlanetProtocol._

    case Land(player) => ...
    case HarvestResources(player, ... , ...) =>
}

So, it might not seem like this is much of an earth-shattering thing, but every chance I can get to refactor my code to gain more elegance, more cleanliness, more expressiveness, and less ceremony – I’ll take it. Hopefully you find this tip as useful as I did.

Building an MMO Server in Akka : Using MongoDB as a Backing Store

The story so far: I’ve created an Akka application that is using non-blocking Akka IO over sockets to communicate via Google Protocol Buffers with iOS (or potentially others…) clients. In the last blog post, I created some Actors that allow objects that have a physical presence in space to respond to messages, namely radar pings. This allows objects to “appear” to players. Their client will get radar pong messages, which will allow the client GUI to create icons for the items in the radar pong like planets, star ports, ships (owned players or NPCs), harvestable asteroids, and so on.

That’s all well and good, but in the last blog post, I had to hard code things like the name, description, and location of the objects in the universe. The first thing I needed to do was to refactor the PhysicalObject trait … in the last iteration it had “magic knowledge” of the types of physical objects that might be using that trait. So here’s my new physical object:

package com.kotancode.ulysses.space

import com.kotancode.ulysses.space.Vector3D._
import akka.actor._

case class RadarPing(source: ActorRef, origin: Vector3D, scanDistance: Int)
case class RadarPong(source: ActorRef, origin: Vector3D, objectType: Int, name:String, description: String)

object RadarObjectTypes {
	val Planet = 1
	val Station = 2
	val Port = 3
	val Ship = 4

	val Unknown = 99
}

trait PhysicalObject extends Actor with ActorLogging {
	def replyPong(p:RadarPing): Option[RadarPong]

	def receiveSpatial: Receive = {
		case p:RadarPing => {
			replyPong(p).map { pongReply => p.source ! pongReply }
		}
	}
}

There’s a couple of new things happening here. First, this trait no longer has a member variable called location. Second, in the last iteration, this trait constructed the pong reply itself. This would have been really problematic because I would have ended up with one method with a pile of “switch” cases to determine how it should reply in different circumstances… and that would’ve been smelly. Now, it is up to the actor using this trait to define a method that creates pong replies. Also, note that the abstract method that the concrete actors will implement returns an Option so I can just run a map over that – if the method returns a None then the above code won’t deliver the pong message.

Now that I’ve cleaned up my physical objects (to make them easier to be instantiated dynamically based on data from a backing store like Mongo), I can create a Persistable trait:

package com.kotancode.ulysses.state

import akka.actor._
import com.mongodb.casbah.Imports._
import org.bson.types.ObjectId

case class LoadState(id:String)
case object PersistState

object BackingStoreKeys {
	val Name = "name"
	val Location = "location"
	val Description = "description"
	val ID = "_id"
}

trait Persistable extends Actor {
	var backingObject: MongoDBObject = null

	def receivePersistable: Receive = {
		case LoadState(id) => {
			loadFromMongo(id)
		}
		case PersistState => {
		  // Nothing here yet...
		}
	}

	def persistenceCollectionName:String

	def loadFromMongo(id:String) = {
		val mongoClient = MongoClient()
		val ulysses = mongoClient("ulysses")
		val collection = ulysses(persistenceCollectionName)
		val objectId = new ObjectId(id)
		backingObject = collection.findOne(MongoDBObject(BackingStoreKeys.ID -> objectId)).getOrElse { throw new IllegalArgumentException(s"id ${id} not found in ${collection}")}
		println(s"backing object ${backingObject}")
	}
}

Here I’m using the Casbah library for Scala to talk to MongoDB. What’s worth mentioning here is that the only way a persistable actor should load its state is after it has received a message to do so. This keeps the data-loading activity in the thread-safe message pattern already enforced by Akka, so I don’t have to worry about race conditions where I have to delay some activity until after my reference data has been loaded (this is a common problem I see regularly in message-based applications).

Let’s see what our new, cleaner Planet actor looks like now:

package com.kotancode.ulysses.space

import com.kotancode.ulysses.space.Vector3D._
import com.kotancode.ulysses.state.Persistable
import com.kotancode.ulysses.state.BackingStoreKeys
import com.mongodb.casbah.Imports._

import akka.actor._

object Planet {
	val BackingCollectionName = "planets"
}

class Planet extends Actor with PhysicalObject with Persistable {

	override def persistenceCollectionName = Planet.BackingCollectionName

	override def replyPong(ping:RadarPing): Option[RadarPong] = {
		val currentLocation = Vector3D.fromBackingObject(backingObject.as[BasicDBObject](BackingStoreKeys.Location))
		if ( (currentLocation distanceFrom ping.origin) < ping.scanDistance) {
			Some(RadarPong(self,
				currentLocation,
				RadarObjectTypes.Planet,
				backingObject.as[String](BackingStoreKeys.Name),
				backingObject.as[String](BackingStoreKeys.Description)
			))
		}
		else {
			None
		}
	}

	def receivePlanet: Receive = {
		case _ => log.debug("Received some message for a planet.")
	}

	def receive = {
		receivePersistable orElse receiveSpatial orElse receivePlanet
	}
}

In my Mongo database, the location field contains a nested object with the properties x, y, and z. The Vector3D class has a utility method on it (I will refactor this into a “pimp” or “augment” pattern later) that converts a Casbah BasicDBObject into a 3D vector by extracting these properties.

Finally, remember that Universe class? The one that hard-coded the instantiation of the “Utopia 1” planet? Here’s what it looks like now:

package com.kotancode.ulysses.space
import com.kotancode.ulysses.state.LoadState
import com.kotancode.ulysses.state.Repository

import akka.actor._

class Universe extends Actor with ActorLogging {

	def loadPlanets = {
		for (id <- Repository.allIds(Planet.BackingCollectionName)) {
			context.actorOf(Props(new Planet()), name=id) ! LoadState(id)
		}
	}
	override def preStart() = {
		loadPlanets
	}

	def receive = {
		case p:RadarPing => context.children foreach (_.forward(p))
		case _ => log.debug("Received a universe message")
	}
}

Here I’m just making use of a simple object I created called Repository that issues a Casbah query to Mongo – it’s an empty query on a collection so it returns all the objects and then converts the Object IDs into strings and yields them so they can be used in an enumerator, as shown in the above code.

Now, instead of having a hard coded planet, I can create multiple planets in my database and this code will instantiate an actor for each of those planets, each planet will be backed by its own unique Mongo DB document. Here’s some trace log output that shows a pong reply from each of the 3 planets I created in my MongoDB:

[DEBUG] [03/24/2013 08:31:12.998] [UlyssesAgenda-akka.actor.default-dispatcher-1] [akka://UlyssesAgenda/user/ServerCore/Clients/96e1e42b-5cda-4faa-aa89-019c2bde9794] I received a radar pong RadarPong(Actor[akka://UlyssesAgenda/user/ServerCore/Universe/514de823b485acb28e02bb91],(5.0,5.0,5.0),1,Utopia 1,This is a utopian planet, chock full of win.)

[DEBUG] [03/24/2013 08:31:12.999] [UlyssesAgenda-akka.actor.default-dispatcher-1] [akka://UlyssesAgenda/user/ServerCore/Clients/96e1e42b-5cda-4faa-aa89-019c2bde9794] I received a radar pong RadarPong(Actor[akka://UlyssesAgenda/user/ServerCore/Universe/514e194004538147414b4675],(6.0,6.0,6.0),1,Utopia 2,This is a little less utopian than the other one.)

[DEBUG] [03/24/2013 08:31:12.999] [UlyssesAgenda-akka.actor.default-dispatcher-1] [akka://UlyssesAgenda/user/ServerCore/Clients/96e1e42b-5cda-4faa-aa89-019c2bde9794] I received a radar pong RadarPong(Actor[akka://UlyssesAgenda/user/ServerCore/Universe/514e19b304538147414b4676],(7.0,7.0,7.0),1,Doom,Every game needs a planet doom.)

Now that I’ve got the core framework for persistence up and running, I can do some more useful things like giving players persistence so I can keep their names and locations separate. Once I have that, I can move on to creating the most basic of the actual gameplay functions.

Building an MMO Server in Akka: Objects in Space

I originally wanted to call this post Piiiiiiiggssss innnn Spaaaaace in homage to the Muppet Show, but I figured it would make it harder for people to actually find this content, so, we’re stuck with the boring title for now. In my previous blog post, I talked about enabling multiplayer and rigging up the essential infrastructure to support multiple players within an Akka-backed MMO universe.

So, we’ve got an iPad client (haven’t shown you this yet, mostly because it has no graphics) that connects to our Akka server. This iPad client sends a SignIn protobuf message and then, to keep the test going, sends itself a DirectMessage protobuf message. That’s all well and good if we’re building an MMO chat game, but this is a universe and, sooner or later, we’re going to have to put stuff in our universe.

So let’s take a look at some of the things that we think our stuff in this universe is going to need to do. Firstly, it’s going to need to exist at some point in space, e.g. a coordinate. All of the stuff that exists in space is only discovered by the player by virtue of appearing on their radar scan. I haven’t designed all this out yet, but I know that players who have invested more time and money in their ship’s navigational and scanning equipment will be able to see more stuff, see further out into space, and glean more information from what happens to be out there. Whether that stuff is visible may also have something to do with the thing itself. For example, I might have a cloaking device (though I’d probably have to call it something else, for fear of copyright infringement) which might make it so that cloaked stuff doesn’t appear on radar.

In other words, this is a very quantum style universe – you don’t actually know what’s in it until you inspect it, and the act of inspecting it can alter the results. I might, in the future, decide to code a sneaky NPC that waits to be radar scanned by nearby players and then automatically targets the source of that scan.

So what I’ve decided to do is create a sub-tree of my actor system hierarchy for physical things that exist in space, which includes planets, space ports, ships, star ports, and other stuff that might be harvestable for resources like asteroids and moons. For starters, these things can all be sent a RadarPing message which ultimately originates from a player. These things can then respond to that ping with a RadarPong. The iPad client can then collect radar pongs and use that to dynamically populate a star map of the “nearby universe”. Cool, huh?

So let’s start with the universe:

package com.kotancode.ulysses.space

import akka.actor._

class Universe extends Actor with ActorLogging {
	// create some child objects
	val testPlanet = context.actorOf(Props(new Planet()), name="Utopia1")

	def receive = {
		case p:RadarPing => context.children foreach (_.forward(p))
		case _ => log.debug("Received a universe message")
	}
}

So what we’re doing here is if I send the universeRadarPing message, that message then gets sent to all physical objects within that universe. If the object is out of range of the ping or is cloaked (or something else), then it won’t respond. Otherwise, it will comply and send back a RadarPong message. I know off the bat that I’m going to have many different types of actors that act as physical things in my universe, so this seems like the right time to bust out a trait that encapsulates this behavior:

package com.kotancode.ulysses.space

import com.kotancode.ulysses.space.Vector3D._
import akka.actor._

case class RadarPing(source: ActorRef, origin: Vector3D, scanDistance: Int)
case class RadarPong(source: ActorRef, origin: Vector3D, objectType: Int, name:String, description: String)

object RadarObjectTypes {
	val Planet = 1
	val Station = 2
	val Port = 3
	val Ship = 4

	val Unknown = 99
}

trait PhysicalObject extends Actor with ActorLogging {
	private var location = (6.5, 5.0, 7.2)
	var name = "Game Object"
	var description = "This is a game object"
	var objectType = RadarObjectTypes.Unknown

	def receiveSpatial: Receive = {
		case p:RadarPing => {
			log.debug(s"I received radar ping ${p}, it originated ${p.origin distanceFrom location} units from my location at ${location}.")
			if ( (p.origin distanceFrom location) < p.scanDistance) {
				p.source ! RadarPong(self, location, objectType, name, description)
			}
		}
	}
}

Since my game doesn’t currently have persistence, I’m making up hardcoded values for the planet but, in the future, the planet would ideally reach into some database, identify itself, and then pluck out a map of all of its data, such as its spatial coordinates. The Vector3D class I created back when I was writing a MUD in an earlier version of Akka, and it basically allows me to take a 3-tuple (e.g. (5.0, 1.5, 2.5)) and implicitly turn that into something that supports basic cartesian math. You can see this in the line of code that contains p.origin distanceFrom location, which utilizes the distanceFrom method that I added to my Vector3D class.

Now how about we create a planet that includes the PhysicalObject trait?

package com.kotancode.ulysses.space

import com.kotancode.ulysses.space.Vector3D._
import akka.actor._

class Planet extends Actor with PhysicalObject {

	objectType = RadarObjectTypes.Planet
	name = "Utopia 1"
	description = "This is a utopian planet, chock full of epic win."

	def receivePlanet: Receive = {
		case _ => log.debug("Received some message for a planet.")
	}

	def receive = {
		receiveSpatial orElse receivePlanet
	}
}

This is a pretty simple class and you can see that I’m doing some more hard-coding of values to serve as placeholders until I deal with the persistence deal (remember the YAGNI portion of the Agile Manifesto?). What I really like here is the receiveSpatial orElse receivePlanet partial function chaining. This lets me grab the receive function from the PhysicalObject trait and chain on the receive function from the Planet actor. I don’t have any planet-specific messages but you can imagine I might have messages like Land and TakeOff for when players bring their ships into orbit, etc.

Almost done. We’ve got a planet, which includes the physical object trait (and with it the associated message handling and state … I freaking love traits), and we’ve got a universe that contains the planet. Now we just need to send out radar pings. Rather than make the iPad client manually send radar ping packets, we can use an Akka scheduler on the server side to force the player’s proxy (GameClientProxy actor) to issue the RadarPing to the Universe object periodically.

Here’s the new code for my GameClientProxy class’ preStart() actor method:

lazy val universe = context.actorFor("../../Universe")

override def preStart() = {
	log.debug(s"GameClientProxy pre-start, socket uuid: ${socketHandle.uuid}")
	implicit val timeout = Timeout(1 second)
	import context.dispatcher
	SocketServer.system.scheduler.schedule(
		10 seconds,
		10 seconds,
		universe,
		RadarPing(self,  (5.0, 5.0, 5.0), 10)
	)
}

That’s pretty much it. We know from the code above that we’re going to be sending pongs directly back to the player proxy actor, so we need to respond to that. In the near future, this will involve creating a protobuf object and writing it back out to the client over the socket connection, but for now we’ll just log the fact that we got the pong back:

def receive = {
	case m:DirectMessage => {
		log.debug("I received a direct message.")
		writeRawMessage(m, MessageTypes.DirectMessage)
	}
	case pong:RadarPong => {
		log.debug(s"I received a radar pong ${pong}")
		// writeRawMessage(new raw pong protobuf ...)
	}
}

So that’s it… In just a few quick steps with a few small, lightweight actors, I’ve given my universe the ability to expose an infinitely expandable list of objects to players by virtue of modeling a message-based radar system. I’m nowhere near the point where I can add combat yet, but I’m adding building blocks slowly, which is always fun to do. I think next I’ll work on the “universe map” display for the iPad and possibly figure out a persistence method so I can load planets from my database.

Building an MMO Server in Akka : First Steps Toward Multiplayer

As you can imagine, building a Massively Multiplayer Online Game (MMOG, or MMORPG for “roleplaying” types, also you may have seen MMORTS for online real-time strategy games) is no small task. In fact, it’s pretty daunting. However, the technology we have now, including hardware, software, and virtualization, makes it possible to scale these games to incredible sizes.

To put it in perspective, EverQuest, what I consider the first truly huge, commercially successful MMO (yes, Ultima Online was first, but EQ quickly rose to eclipse UO’s volume), during one of the periods where I was playing, used to have a per-server capacity of around 1700-2000 players. Once our buddies started noticing that the number of players online in a particular server went up close to 2000, many of us started running like hell for the nearest safe spot, city, or useful campground.

Today, while still sharding out its content across a ridiculous number of servers, games like World of Warcraft not only have vastly more content and interaction than EQ did, but each shard can hold many more players. Being able to create a server that can handle that kind of scale and respond to messages from player clients in real-time is a daunting task. The sheer number of minutiae that you need to get absolutely perfect for things to work properly is mind-boggling.

The EverQuest servers, as well as similar servers for games that came shortly thereafter like Anarchy Online, were written in C++. To handle the volume of information from that large a number of simultaneous players, they had to manage their own threads, they had to do absurd low-level optimizations, and they had to make sure that every single line of code on that server was as fast and stable as possible. An entire shard (server process, or, later in EQ’s life, a cluster of server processes) simply could not crash. People paying $10-15/month for the privilege of playing the game (a concept which was brand new and controversial at the time) would not tolerate a server crash.

I am just one person, and giant game companies have entire armies of developers. How can I possibly hope to make an MMO server that players will actually use? Simple. Standing on the shoulders of giants. I don’t need to write all the low-level stuff, I don’t need to write the distributed grid-style asynchronous computing fabric that should underpin my game logic. It’s been done for me, and it’s called Akka.

You may have seen some of my previous blog posts where I talk about some of the low-level details like google protocol buffers and creating a non-blocking socket server using Akka IO. The next step toward being able to build an MMO is the core functionality required for a multiplayer game: asynchronous message routing and transmission.

The data flow for an MMO works something like this: A message comes into your server’s gateway (in my case, a non-blocking socket server). The gateway then does whatever it needs to do with the message and dispatches it accordingly. The message, or a version of it, then floats its way through the “business logic” (your game core) and as a result of it entering and executing your game logic, 0..n messages will be emitted as a result. Some of those messages will come back to the player (“your spell missed you blind moron”), some of those messages will arrive at another player’s client (“moronA failed to hit you with a magic missile”), and some messages will never leave the server environment (“persist this bit of information for me please, mr. database” or “hey NPC, player just missed you with a magic missile, your AI should respond.”). Whether you consider your NPC AI message-based or just part of your game logic is a minor semantic detail.

The point is this: we need to keep track of which players are connected, we need a way to identify those players so that messages can be targeted to them, and we need a way to be able to receive messages, funnel them into our game logic, and emit messages as a result. No matter what kind of game you’re building, no matter what your game logic may look like, this is the essential core nugget of functionality on which all of the rest of your MMO code must be built.

If you remember my dispatch map from this blog post, I can now add a new potential target for dispatch, the GameClientProxy, which is an actor that serves as a gateway between my game logic and the player. If I send the GCP a native scala case-class message, I can expect that as a result, the GCP will produce a protobuf message and put it on the wire, headed for the target player. In some cases I can send the GCP an already built protobuf message, which is the case when a player sends a DirectMessage, a protobuf message to send a “chat” from one player to another.

Here’s a look at my current dispatch map:

val dispatchMap: Map[Int, (Mergeable, String)] = Map(
   1 -> (ZombieSighting.defaultInstance.asInstanceOf[Mergeable], "akka://game/user/ServerCore/DispatchTarget"),
   MessageTypes.SignIn -> (SignIn.defaultInstance.asInstanceOf[Mergeable], "akka://game/user/ServerCore/Clients"),
   MessageTypes.DirectMessage -> (DirectMessage.defaultInstance.asInstanceOf[Mergeable], "akka://game/user/ServerCore/Clients")
  )

I’ve already got the code in place that dispatches these messages and does the protobuf de-serialization, so now I just need a “clients” actor, which is my GCP:

class ClientManager extends Actor with ActorLogging {

	var clientPlayerMap : scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map.empty

	override def preStart() = {
		log.debug("Client manager pre-starting.")
	}

	def playerForUsername(username:String) : ActorRef = {
		context.actorFor(clientPlayerMap(username))
	}

	def removeUUIDFromPlayerMap(uuid:String) : Unit = {
		clientPlayerMap.foreach {  m => if (m._2 == uuid) clientPlayerMap.remove(m._1) }
	}

	def receive = {
		case ClientDisconnected(socketHandle) => {
			removeUUIDFromPlayerMap(socketHandle.uuid.toString())
			// Stop this actor
			context.stop(context.actorFor(socketHandle.uuid.toString()))
			log.debug(s"Player at socket ${socketHandle.uuid} disconnected.")
		}
		case ClientConnected(socketHandle) => {
			val newClient = context.actorOf(Props(new GameClientProxy(socketHandle)), name = socketHandle.uuid.toString())
		}
		case (uuid:String, signIn:SignIn) => {
			clientPlayerMap(signIn.username) = uuid
			log.debug(s"Player ${uuid} signed in ${signIn}")
		}
		case dm:DirectMessage => {
			log.debug(s"Player ${dm.sourcePlayer} sent dm to ${dm.targetPlayer}")
			playerForUsername(dm.targetPlayer) ! dm
		}

	}
}

This class maintains a mapping between socket UUIDs (what I use to identify the game client proxy/GCP actors) and player names. This mapping is the essential bit of state that allows messages to be targeted at players rather than UUIDs. Game clients will be unaware of the UUID of other players (because that could change if they disconnect and reconnect while messages are flowing toward them). Also note that the s”foo ${bar}” syntax is Scala 2.10 string interpolation syntax.

class GameClientProxy(socketHandle: IO.Handle) extends Actor with ActorLogging {

	override def preStart() = {
		log.debug(s"GameClientProxy pre-start, socket uuid: ${socketHandle.uuid}")
	}

	override def postStop() = {
		log.debug("GameClientProxy stopped.")
	}

	def writeRawMessage(msg:com.google.protobuf.GeneratedMessageLite, messageType: Int): Unit = {
		val bb: java.nio.ByteBuffer = ByteBuffer.allocate(4)
		bb.putInt(msg.getSerializedSize)
		bb.flip
		socketHandle.asSocket write ByteString(bb)

		val bb2 = ByteBuffer.allocate(4)
		bb2.putInt(messageType)
		bb2.flip
		socketHandle.asSocket write ByteString(bb2)
		socketHandle.asSocket write ByteString(msg.toByteArray)
	}

	def receive = {
		case m:DirectMessage => {
			log.debug("I received a direct message.")
			writeRawMessage(m, MessageTypes.DirectMessage)
		}
	}
}

This is also my attempt to delay side-effects as long as possiblewhich is a core functional programming mantra. A message being written on the wire to a particular socket is a side effect of the exchange of messages between Actors in my Actor System (the game grid). Note that I didn’t have a “player” class floating around anywhere that I used to represent a single player, instead that’s a GCP. In this blog post, I talked about the importance of modeling an actor system in a way that gets work done which may not be the same as creating an actor for every potential interactive thing visible to a player.

So, with this bit of infrastructure in place, I am now in a position where I can start building small multiplayer interaction. From there, you scale out, you test, and you build more interaction. The key point in a situation like mine where I’m just one person and not an army is that I make small bits work, I test as I go, and more importantly, the act of creating the game should be just as fun as playing it.

So far, with Akka, it’s been a blast.