Skip to content

Hello Disruptor

by Kevin Hoffman on January 6th, 2012

Before you read this post, I highly recommend you check out some of the other material available on the web regarding the Disruptor. I would start with the overview and then check out Martin Fowler’s review. If you’re already familiar with the Disruptor or you feel like continuing anyway, then keep reading.

Historically multi-threaded, or concurrent, applications have been extremely difficult to write and even harder to troubleshoot and maintain. They are tough to test in isolation and developers spend more time reasoning about semaphores, locks, critical sections, and re-entrancy than they do about the actual business problem their application is supposed to tackle.

The Disruptor aims to change that by creating a lightweight, easy-to-use component that allows developers to pump events through a pipeline that never creates locks and doesn’t incur the threading penalties of context switching. Some of the other optimizations the LMAX folks have made under the hood also tweak things for “mechanical sympathy”, aiming to maximize your application’s use of CPU caching to dramatically speed up processing.

With this in mind, I set out to try and create a simple “Hello world” application using the Disruptor. Really the the simplest way to do this is to create an application that has one event publisher and one event handler and you just pump hello-world messages into the Disruptor and see what happens.

This is exactly what I did.

To start, I created an event that I wanted to stream through my application, presumably to eventually be handled by some downstream business logic. Here’s the ValueEvent class, of which 99% is stolen from the LMAX documentation:

package com.kotancode.hellodisruptor;

import com.lmax.disruptor.*;

public final class ValueEvent {
    private long _value;

    public long getValue() {
	return _value;
    }

    public void setValue(long value) {
	_value = value;
    }

    public final static EventFactory<ValueEvent> EVENT_FACTORY =
        new EventFactory<ValueEvent>()
        {
            public ValueEvent newInstance() {
  	        return new ValueEvent();
            }
        };
}

Don’t worry too much about the event factory – it’s there to allow the ring buffer (you’ll see that shortly) to create new events of that type. Once we’ve got an event, we should be able to create an application that configures a disruptor pipeline, pumps messages into it, and handles the messages. In the sample code below, I am counting the number of messages I handle and then reporting on the time it took to handle them in terms of operations per second (in my case the operation is a simple increment done when I handle the event).

package com.kotancode.hellodisruptor;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class App {

    private final static int RING_SIZE = 1024 * 8;

    private static long handleCount = 0;

    private final static long ITERATIONS = 1000L * 1000L * 300L;
    private final static int NUM_EVENT_PROCESSORS = 8;

    private final static EventHandler<ValueEvent> handler =
        new EventHandler<ValueEvent>() {
	    public void onEvent(final ValueEvent event,
                                final long sequence,
                                final boolean endOfBatch) throws Exception {
		handleCount++;
	}
    };

    public static void main(String[] args) {
	System.out.println("Starting disruptor app.");

	ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);

	Disruptor<ValueEvent> disruptor =
		new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
			new SingleThreadedClaimStrategy(RING_SIZE),
			new SleepingWaitStrategy());
	disruptor.handleEventsWith(handler);
	RingBuffer<ValueEvent> ringBuffer = disruptor.start();

	long start = System.currentTimeMillis();

        long sequence;
        ValueEvent event;
	for (long x=0; x<ITERATIONS; x++) {
	    sequence = ringBuffer.next();
	    event = ringBuffer.get(sequence);
  	    event.setValue(x);
	    ringBuffer.publish(sequence);
        }
	final long expectedSequence = ringBuffer.getCursor();

	while (handleCount < expectedSequence) { }

	long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
	System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount);
    }
}

I’m not going to go into a huge amount of detail here because a lot of this information is described on LMAX’s site and made a little more clearer after you read Martin Fowler’s review of the Disruptor technology.

There are a couple of fairly important things to note here. The first is that you’ll notice that when I want to publish an event, I don’t create a new instance of an event. Instead, I ask the ring buffer for it. This is because the ring buffer is highly optimized for performance. If I have a ring buffer with a “ring size” of 2048 (must be an even number), then I will consume a maximum of 2048 instances of ValueEvent. Each time I grab one of these off the ring, modify it, and then publish I am re-using an existing instance. There will never be more than (ringsize) instances of an event object.

The ExecutorService business is actually part of the standard Java concurrency libraries and gives me a strategy for how I want the disruptor to manage it’s workload. In this case, I’ve created a fixed thread pool size of 8 (just because I felt like it, not because it has any significant meaning) and that thread pool will be responsible for executing the event handlers that are pulling from the Disruptor that you register with the handleEventsWith() method.

This is just the tip of the iceberg. As you’ll see in a subsequent blog post, you can set up elaborate processing pipelines that give you real, visual, concrete visibility into how events are being processed asynchronously in your application without incurring the typical multi-threading penalties.

If you’re curious, I typically get between 8 million and 21 million operations (handle + increment) per second using this sample code. I’m guessing the fluctuation is related to what my PC is doing at the time but I haven’t been able to reliably get it to stay around the 20+ million mark.

So, if you’re building an application that might even remotely be considered multi-threaded or, better yet, considered an event or message processor, then I highly recommend you go out and play with the Disruptor and I promise I’ll have an even more fun example of using Disruptors in the next blog post.

  • Pingback: Using the Disruptor to Power an MMORPG | Kotan Code 枯淡コード

  • Prasanth R

    Nice article!!

    Could you kindly tell me about consuming?

    Thanks

  • subodhomjoshi82

    Is this code runnable it have compile time exception please post working code thanks

    • Amir

      Just replace:
      Disruptor disruptor =
      new Disruptor(ValueEvent.EVENT_FACTORY, executor,
      new SingleThreadedClaimStrategy(RING_SIZE),
      new SleepingWaitStrategy());

      with:
      EventFactory EVENT_FACTORY = new ValueEventFactory();
      Disruptor disruptor =
      new Disruptor(EVENT_FACTORY,
      RING_SIZE,
      executor,
      ProducerType.SINGLE,
      new SleepingWaitStrategy());

      and it will compile and run fine.

      • subodhomjoshi82

        thanks

  • Ram Pantangi

    Do you have any comparison between Disruptor and using a simple ArrayLinkedBlockingQueue to do the same thing you are doing above? To my surprise, i am getting better throughput with ArrayLinkedBlockingQueue. Not sure what I am doing wrong. I posted it here: http://stackoverflow.com/questions/21359445/i-dont-see-the-performance-improvement-of-disruptor

    Any suggestions? Thanks.