Kotan Code 枯淡コード

In search of simple, elegant code

Menu Close

Using the Akka Event Bus

It’s been so long since I’ve written a blog post about Akka, it feels like coming home after a long business trip – the softness that only your own bed in your own home can provide.

Anyway, let’s talk about event buses. An event bus is basically an intermediary between publishers and subscribers. Publishers publish events to the event bus and subscribers listening for certain types of events will receive those events. That’s the generic definition of an event bus. The Akka Event Bus is, as you might have guessed, an Akka-specific implementation of this pattern.

I find the Akka documentation for Event Bus to be inscrutable and downright off-putting for new developers who aren’t already Akka experts. Go ahead, try and read that stuff and work backwards from that into a hello world sample. You’re going to need at least a beer. So let’s start with the basics.

To set up an event bus you need three things: an event bus, subscribers, and publishers. When you create an event bus (a base class that you extend in Scala) you need to declare two types:

  • Event Type
  • Classifier Type

This is where the documentation gets annoying. The Event Type is just the type of objects that you expect to be put on the bus. You can make this type as specific as you want or as generic (Any works just fine as an event type, but I wouldn’t really recommend that as a pattern). The Classifier Type is the data type of the event bus classifier. This sounds more complicated than it needs to be. A classifier is really just some piece of information used to differentiate one event from another, or, in Akka parlance, to classify an event.

You could use an integer as a classifier and then when you subscribe your actors to the bus, you can subscribe them to individual numbers. So, let’s say you have 3 buckets, you could have a classifier and then publish into the 1 bucket or the 2 bucket (classifier). If you’re familiar with JMS or other messaging middleware systems then you might be familiar with the “topic” style classifiers, which often look like slash-delimited hierarchies such as /buckets/1 or /buckets/2. There is some additional complexity around classifiers and classification that I will defer until the next blog post so we can get right to the hello world sample and keep things simple.

First, we need a sample domain. As such, we’ll use the old standby domain of zombies. Let’s say we want to set up a bus to which we can publish zombie sighting messages. We then have subscribers who are interested in those zombie sighting messages for whatever reason. In the past, my actor samples all directly sent messages from one actor to another. But, with the Event Bus, the actors sending the message don’t have to care about or know anything about the actors interested in receiving it. This kind of loose coupling pays off in spades in ease of maintenance and troubleshooting.

And without further ado, here’s the full source that includes the zombie sighting message, a zombie sighting event (with a string classifier), the zombie bus, and the code that sets up the subscriptions and publishes to the bus:

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

case class ZombieSighting(zombieTag: String, lat: Double, long: Double, alt: Double)
case class ZombieSightingEvent(val topic: String, val sighting: ZombieSighting)

class ZombieSightingLookupEventBus extends ActorEventBus with LookupClassification {
  type Event = ZombieSightingEvent
  type Classifier = String

  protected def mapSize(): Int = 10

  protected def classify(event: Event): Classifier = {
    event.topic
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.sighting
  }
}

object ZombieTrackerApp extends App {

  val system = ActorSystem()
  val eventBus = new ZombieSightingLookupEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case s:ZombieSighting => println(s"Spotted a zombie! ${s}")
    }
  }))

  val indifferentSubscriber = system.actorOf(Props(new Actor {
    def receive = {
	   case s:ZombieSighting => println(s"I saw a zombie, but I don't give a crap.")
    }
  }))

  eventBus.subscribe(subscriber, "/zombies")
  eventBus.subscribe(indifferentSubscriber, "/zombies")

  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("FATZOMBIE1", 37.1234, 45.1234, 100.0)))
  eventBus.publish(ZombieSightingEvent("/zombies", ZombieSighting("SKINNYBOY", 30.1234, 50.1234, 12.0)))

  // And this one will go off into deadletter - nobody is subscribed to this.
  eventBus.publish(ZombieSightingEvent("/zombies/foo/bar/baz", ZombieSighting("OTHERONE", 35.0, 42.5, 50.0)))
  system.shutdown
}

And when we run this application, what we expect to see are two trace outputs from the subscriber that doesn’t give a crap and the subscriber that is outputting information about the zombie it received. Here’s the output:

[info] Running ZombieTrackerApp
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(FATZOMBIE1,37.1234,45.1234,100.0)
 I saw a zombie, but I don't give a crap.
 Spotted a zombie! ZombieSighting(SKINNYBOY,30.1234,50.1234,12.0)

In the next blog post, I’ll cover how to upgrade this sample so that we can support the hierarchical classifier style (subchannels).