Kotan Code 枯淡コード

In search of simple, elegant code

Menu Close

Sending and Receiving Google Protocol Buffers via Akka IO and Scala

In a previous blog post, I illustrated how you can use the C++ versions of the generated protocol buffer classes to be able to work with protobufs on your iOS and Mac OS X applications via Objective-C (Objective-C++ actually). This is all well and good, but protobufs don’t do anybody any good unless you can send and receive them.

Yesterday I attempted an experiment where I was going to use Akka 2.0’s IO module to create a socket server that listened for Google Protocol Buffer messages. Unfortunately the tricky part of doing something like that is that you have to create a client to send protobufs also. Then, while debugging, you have to figure out whether the problem is on your server side or your client side, or, as was in my case, the problem may lie with the developer.

The first issue I ran into was this: when sucking bytes off the wire in streaming fashion like you do with Akka IO, you can’t tell when one message starts and one message stops. In short, you have nothing to show you the boundaries and so, without a little bit of heuristics, you can’t properly frame your messages. That’s when I found this blog post here, a very nice little sample of using Akka IO to receive length-bounded strings. In it, you see that it uses take(x) to pull bytes off the wire. One thing that I find very appealing about Akka IO and its use of Iteratees is that when you call take(10) it will take 10 bytes off the wire, but it will do so when 10 bytes are available. So all that code you used to have to write to maintain your “expected received bytes” count and your “current received bytes” count just to grab a block of bytes off the wire is taken care of for you.

The next problem I had was generating Scala case classes from my .proto files. For that, I turned to ScalaBuff. The author of that particular library is an amazingly helpful guy and actually helped me troubleshoot some of my issues with serialization and sockets in StackOverflow chat.

Here’s what it looks like to create a new zombie sighting message using the generated Scala code:

val zombieMessage = ZombieSighting(
    name="Kevin",
    longitude=21.0,
    latitude=23.0,
    description=Option("This is a zombie"),
    zombieType=ZombieType.SLOW
)

So far so good. I have a socket server that can receive length-framed strings, I’ve got a protobuf library that lets me generate protobuf objects in Scala. Next, I needed to modify the server code to receive length-framed protobufs. Here’s a snippet where I’ve modified the readMessage method to yield ByteStrings rather than yielding ASCII-fied strings like the original blog author’s method did:

def readMessage: IO.Iteratee[ByteString] =
 for {
    lengthBytes <- take(4)
    len = ascii(lengthBytes).toInt
    bytes <- take(len)
 } yield {
    bytes
 }
}

On the other side of this Iteratee, I’ve got the original author’s printString method, but now I need to modify it to extract a zombie sighting message:

def printMessage: IO.Iteratee[Unit] =
   repeat {
      for {
        string <- readMessage
      }
      yield {
	val theBytes: Array[Byte] = string.toArray
	val theMessage = ZombieSighting.defaultInstance.mergeFrom(theBytes)
        println("name:" +theMessage.name)
     }
 }

That’s pretty much it for the server… I didn’t alter any of the other code I got from Richard Searle’s blog. For the client, it’s all pretty simple – create a socket, connect to the server, and then write some bytes. Here’s the code (the code that caused me no end of frustration all night last night until I got help from Sandro Grzicic) to encode a protobuf object and drop it on a socket:

val zombieMessage = ZombieSighting(
   name="Kevin",
   longitude=21.0,
   latitude=23.0,
   description=Option("This is a zombie"),
   zombieType=ZombieType.SLOW
)
val zombieBytes = zombieMessage.toByteArray()

// and here's the code that writes to the socket handle (in this case it's the parameter indicated by the underscore)
_ write ByteString("%04d%s".format(zombieMessage.getSerializedSize,
		new String(zombieBytes)))

So, there are a couple of important take-aways here. First and foremost, people on the internet are freaking awesome, and I never would have figured this out without a combination of other people’s blog posts, open source software, and late-night chat sessions on StackOverflow.

The second most important thing to note here is that Scala is awesome and Akka is awesome-er.