Kotan Code 枯淡コード

In search of simple, elegant code

Menu Close

Creating a Multiplayer Chat System with the Akka Event Bus

In my previous blog post, I talked about how to use the subchannel classification system with the Akka Event Bus. In that post, I showed a really simplistic way of determining if a channel is a subclass of another – using startswith(). That’s a really primitive way of doing things and doesn’t really show you the true power of the event bus.

In this blog post, I want to walk you through a slightly more realistic example. Let’s assume that we’re using Akka to build the back-end for a multiplayer game. We know that this game needs to support chatting between players, so we want to create an event bus to handle all of the chat traffic for the game. This should, in theory, give the system some nice flexibility in allowing individual players and player-actor components to subscribe and unsubscribe from different segments of the chat traffic.

We have three different segments that we want to divide traffic up into:

  1. Players – Individual players can receive private chat messages, or tells.
  2. Sectors – The players’ space ships are floating in 3D space in large areas called sectors. It is possible for those players to transmit messages to an entire sector.
  3. Groups – Players can form small groups where from 1 to 8 players band together to share experience as well as their own chat channel.

One of the interesting things to keep in mind is that our chat bus cannot read player state. In order for the chat bus to be really useful, general purpose, and, most importantly, testable, it can’t go reaching into the inside of some player object to figure out which sector object they are in or to which group they belong.

So we’re going to revise our previous sub-classification event bus sample with a new case class called ChatCoordinate that will be used as our classifier (remember in the previous blog post we used simple strings). Here’s the new code for the bus and its messages:

package chatbus

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.event.SubchannelClassification
import akka.util.Subclassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

/*
 * Chat Bus Coordinates
 * players/XXX - sends a private tell to a player
 * sectors/XXX - sends a tell to an entire sector
 * groups/XXX - sends a tell to members of a player group
 */
object ChatSegments {
	val Player = "players"
	val Sector = "sectors"
	val Group = "groups"
}

case class ChatCoordinate(segment: String, target: Option[String])

sealed trait Chat
case class ChatMessage(source: String, message: String) extends Chat
case class ChatEvent(coord:ChatCoordinate, msg: ChatMessage) extends Chat

class ChatEventBus extends ActorEventBus with SubchannelClassification {
  type Event = ChatEvent
  type Classifier = ChatCoordinate

  protected def classify(event: Event): Classifier = event.coord

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = (x.segment == y.segment && x.target == y.target)

    /* Is X a subclass of Y? e.g. is Player/Bob a subclass of Player/None? YES. Is Player/None a subclass of Player/Bob? NO. */
    def isSubclass(x: Classifier, y: Classifier) = {
		val res = (x.segment == y.segment && x.target == None)
		//println(s"Subclass check: $x $y: $res")
		res
	}
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.msg
  }
}

This new chat bus gives us some great flexibility. I can send a message to all players by sending a message to the Players segment with a target of None. I can send a message to all groups by sending a message to the Groups segment with a target of None, a specific group by sending to Groups/Some(“GroupId”) and a specific player by sending to Players/Some(“Player Id”).

One of the aspects of gameplay that this chat bus supports is a player moving from one sector to another. If a player leaves a sector, then ideally some actor (maybe a Player actor?) would unsubscribe from the chat bus @ Sectors/Some(“Foo”) and then subscribe to the chat bus @ Sectors/Some(“Bar”) if they moved from sector Foo to sector Bar, while always maintaining a persistent subscription to Players/Some(“Player ID”). Likewise, if a player joins a player group, they would subscribe to Groups/Some(“Group ID”) and unsubscribe from that same coordinate when they leave the group.

The system is inherently extensible and so whenever we want to add a new segment (e.g Race, Alliance, Guild, etc) we can just drop that in there and we’re good to go.

Finally, just so you believe me, here’s some Scala Test specs exercising the chat bus with some fake actors:

package tests

import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.TestKit
import akka.testkit.TestProbe
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.ImplicitSender
import scala.concurrent.duration._

import chatbus._

object ChatBusSpec {
	
	val toKevin = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Kevin")), ChatMessage("system", "This goes just to Kevin"))
	val toBob = ChatEvent(ChatCoordinate(ChatSegments.Player, Some("Bob")), ChatMessage("system", "This goes just to Bob"))
	val toAllPlayers = ChatEvent(ChatCoordinate(ChatSegments.Player,None), ChatMessage("kevin", "This goes to all players"))
	
	val toSectorAlpha = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Alpha")), ChatMessage("system", "Sector Alpha is about to explode."))
	val toSectorBeta = ChatEvent(ChatCoordinate(ChatSegments.Sector,Some("Beta")), ChatMessage("system", "Sector Beta is about to explode."))
}

class ChatBusSpec (_system: ActorSystem) extends TestKit(_system) with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {
 
  import ChatBusSpec._

  val theProbe = TestProbe()

  def this() = this(ActorSystem("ChatBusSpec"))

  def getEchoSubscriber() = {
	system.actorOf(Props(new Actor {
	    def receive = {
	      case m:ChatMessage => theProbe.ref ! m
	    }
	}))	
  }
 
  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "A chat event bus" must {
     "send a private message to a player with no snoopers" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		theProbe.expectNoMsg(500 millis)
     }	

     "send a single message to all players" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(bob, ChatCoordinate(ChatSegments.Player, Some("Bob")))
		eventBus.publish(toAllPlayers)
		// Each player should receive one of these, so the probe should bounce it back twice.
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
		theProbe.expectMsg(500 millis, toAllPlayers.msg)
     }

     "send to all players in a sector should only deliver once per player" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		val bob = getEchoSubscriber()
	
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		theProbe.expectNoMsg(500 millis)
     }

     "support a player moving from one sector to another" in {
		val eventBus = new ChatEventBus
		val kevin = getEchoSubscriber()
		
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Player, Some("Kevin")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.publish(toKevin)
		theProbe.expectMsg(500 millis, toKevin.msg)
		eventBus.publish(toSectorAlpha)
		theProbe.expectMsg(500 millis, toSectorAlpha.msg)
		eventBus.unsubscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Alpha")))
		eventBus.subscribe(kevin, ChatCoordinate(ChatSegments.Sector, Some("Beta")))
		eventBus.publish(toSectorBeta)
		theProbe.expectMsg(500 millis, toSectorBeta.msg)
     }
  }
	
}