all that jazz

james' blog about scala and all that jazz

Comments complement code

Today I read this quote:

Good code is its own best documentation. As you’re about to add a comment, ask yourself, ‘How can I improve the code so that this comment isn’t needed?’

I just want to say, it's a load of rubbish. Take a look at the following code:

def toCharArray(
     decoder: CharsetDecoder = Charset.forName("UTF-8").newDecoder()
  ): Enumeratee[Array[Byte], Array[Char]] = new Enumeratee[Array[Byte],Array[Char]] {

  def step[A](inner: Iteratee[Array[Char], A], partialChar: Option[Array[Byte]] = None)(in: Input[Array[Byte]]): 
      Iteratee[Array[Byte], Iteratee[Array[Char], A]] = {
    in match {
      case EOF => partialChar.map(_ => Error[Array[Byte]]("EOF encountered mid character", EOF))
        .getOrElse(Done[Array[Byte],Iteratee[Array[Char],A]](inner, EOF))

      case Empty => Cont(step(inner, partialChar))

      case El(data) => {
        val charBuffer = CharBuffer.allocate(data.length + 1)
        val byteBuffer = partialChar.map({ leftOver =>
          val buffer = ByteBuffer.allocate(leftOver.length + data.length)
          buffer.mark()
          buffer.put(leftOver).put(data)
          buffer.reset()
          buffer
        }).getOrElse(ByteBuffer.wrap(data))

        decoder.decode(byteBuffer, charBuffer, false)

        val leftOver = if (byteBuffer.limit() > byteBuffer.position()) {
          Some(byteBuffer.array().drop(byteBuffer.position()))
        } else None

        val decoded = charBuffer.array().take(charBuffer.position())
        val input = if (decoded.length == 0) Empty else El(decoded)

        inner.pureFlatFold {
          case Step.Cont(k) => Cont(step(k(input), leftOver))
          case _ => Done(inner, Input.Empty)
        }
      }
    }
  }

  def applyOn[A](inner: Iteratee[Array[Char], A]) = Cont(step(inner))
}

If you know iteratees and you know Scala, it's pretty obvious what this does. It converts a stream of byte arrays into a stream of char arrays, taking into the consideration the possibility that one character may be split across multiple byte arrays. Structurally it is purely functional, however the actual decoding is not, it uses the high performance Java CharBuffer and ByteBuffer classes to do the decoding, which are mutable, and arguably this is necessary since this enumeratee is a place where performance matters. I wrote it, and in my opinion it's not badly written, though if you can see anything that could be improved, please let me know.

So, tell me, on line 14, why do I allocate a char buffer of the incoming byte array length plus one? What is the reason for the plus one? When I first wrote it, I didn't have the plus one there, I didn't think it was needed. You see, when converting an array of bytes to an array of UTF-16 Java characters, at most, 8 bytes will become 8 characters, right? 8 bytes could become 4 characters, if those characters were multi byte characters, the number of chars needed might be less than the number of bytes being decoded, but it can never be more, right? One byte can't become multiple UTF-16 chars, so why would I ever need 9 characters for 8 bytes?

Now maybe you might criticise my code because the +1 is actually a magic number, and if I gave it a name, then that would explain everything. Well, let's give it a name, and reasonable a name (I could give it a two hundred character long name and that might explain everything but you can hardly call two hundred character long variable names good code. Well, maybe you can in Java, but not in Scala). So I'll create a val PotentialMultiCharOffset = 1. Does that help you at all? Do you know what it's for? Why is it 1? Why is it added, why don't I multiply by 2? If you do know the reason behind it, then hats off to you, you are a genius. But for the rest of us, we don't know. It as only after I wrote comprehensive unit tests for the code that I found the bug (I've heard other people say that unit tests are not necessary for functional code, another fallacy).

Let me show you the comment that is above that line of code:

// The +1 here is very important, it is there for the case when there are
// 3 bytes of a 4 byte character in the partialChar array, and so this data
// should contain the final byte, but that one byte will become 2 Chars.
val charBuffer = CharBuffer.allocate(data.length + 1)

Understand it now? Was there any way that I could have written the code that would have explained that? Was there any variable name that I could have given it that would have explained it better than that comment? No, it just needed a simple comment explaining its purpose. Without the comment, you'd be sitting there wondering why on earth I had added 1, you might have even thought "this is allocating more memory than needed, I'll just optimise this" and you would have injected a bug. In this case, a comment is aptly suited to making the code understandable. The comment complements the code. It is necessary and the best way of describing it.

And the fact is that we come across things every day where some really obscure edge case means we have to do some otherwise obscure behaviour. Maybe in a world of higher order logic this isn't the case, but we work in a world of far less than perfect protocols with edge cases that are impossible to memorise, where optimising an equals comparison to return early when you encounter the first character that isn't equal is a security vulnerability, where bugs in other software that our software has to interface to means we have to do counter intuitive things to work around their issues, and where some things are just plain hard to get your heard around, and sometimes a little plain English (or whatever language you speak) just does that little bit to helping you or the next developer make sense of it all.

Comments complement code. Good code does not negate the need of comments. Good code includes comments where comments are needed.

Iteratees for imperative programmers

When I first heard the word iteratee, I thought it was a joke. Turns out, it wasn't a joke, in fact there are also enumerators (that's ok) and enumeratees (you're killing me). If you're an imperative programmer, or rather a programmer who feels more comfortable writing imperative code than functional code, then you may be a little overwhelmed by all the introductions to iteratees out there, because they all assume that you think from a functional perspective. Well I just learnt iteratees, and although I'm feeling more and more comfortable with functional programming every day, I still think like an imperative programmer at heart. This made learning iteratees very difficult for me. So while I'm still in the imperative mindset, I thought this a very good opportunity to explain iteratees from an imperative programmers perspective, taking no functional knowledge for granted. If you're an imperative programmer who wants to learn iteratees, this is the blog post for you. I'm going to specifically be looking at Play's Iteratee API, but the concepts learnt here will apply to all Iteratees in general.

So let's start off with explaining what iteratees, and their counterparts, are trying to achieve. An iteratee is a method of reactively handling streams of data that is very easily composable. By reactive, I mean non blocking, ie you react to data being available to read, and react to the opportunity to write data. By composable, I mean you write simple iteratees that do one small thing well, then you use those as the building blocks to write iteratees that do bigger things, and you use those as the building blocks to write iteratees to do even bigger things, and so on. At each stage, everything is simple and easy to reason about.

Reactive stream handling

If you're looking for information about iteratees, then I'm guessing you already know a bit about what reactive stream handling is. Let's contrast it to synchronous IO code:

trait InputStream {
  def read(): Byte
}

So this should be very familiar, if you want to read a byte, you call read. If no byte is currently available to be read, that call will block, and your thread will wait until a byte is available. With reactive streams, obviously it's the other way around, you pass a callback to the stream you want to receive data from, and it will call that when it's ready to give data to you. So typically you might implement a trait that looks like this:

trait InputStreamHandler {
  def onByte(byte: Byte)
}

So before we go on, let's look at how the same thing would be achieved in a pure functional world. At this point I don't want you to ask why we want to do things this way, you will see that later on, but if you know anything about functional programming, you know that everything tends to be immutable, and functions have no side effects. The trait above has to have side effects, because unless you are ignoring the bytes passed to onByte, you must be changing your state (or something elses state) somehow in that function. So, how do we handle data without changing our state? The answer is the same way other immutable data structures work, we return a copy of ourselves, updated with the new state. So if the InputStreamHandler were to be functional, it might look like this:

trait InputStreamHandler {
  def onByte(byte: Byte): InputStreamHandler
}

And an example implementation of one, that reads input into a seq, might look like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler {
  def onByte(byte: Byte) = new Consume(data :+ byte)
}

So we now have imperative and functional traits that react to our input stream, and you might be thinking this is all there is to reactive streams. If that's the case, you're wrong. What if we're not ready to handle data when the onByte method is called? If we're building structures in memory this will never be the case, but if for example we're storing them to a file or to a database as we receive the data, then this very likely will be the case. So reactive streams are two way, it's not just you, the stream consumer that is reacting to input, the stream producer must react to you being ready for input.

Now this is possible to implement in an imperative world, though things do start looking much more functional. We simply start using futures:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[Unit]
}

So, when the stream we are consuming has a byte for us, it calls onByte, and then attaches a callback to the future we return, to pass the next byte, when it's ready. If you have a look at Netty's asynchronous channel APIs, you'll see it uses exactly this pattern. We can also implement something similar for an immutable functional API:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[InputStreamHandler]
}

And so here we have a functional solution for reactive stream handling. But it's not a very good one, for a start, there's no way for the handlers to communicate to the code that uses them that they don't want to receive any more input, or if they've encountered an error (exceptions are frowned upon in functional programming). We could add things to handle this, but very soon our interface would become quite complex, hard to break up into small pieces that can be composed, etc. I'm not going to justify this now, I think you'll see it later when I show you just how easy iteratees are to compose.

So, by this stage I hope you have understood two important points. Firstly, reactive stream handling means twofold reacting, both your code has to react to the stream being ready, and the stream has to react to you being ready. Secondly, when I say that we want a functional solution, I mean a solution where everything is immutable, and that is achieved by our stream handlers producing copies of themselves each time they receive/send data. If you've understood those two important points, then now we can move on to introducing iteratees.

Iteratees

There are a few things that our interface hasn't yet addressed. The first is, how does the stream communicate to us that it is finished, that is, that it has no more data for us? To do this, instead of passing in a byte directly, we're going to abstract our byte to be something of type Input[Byte], and that type can have three possible implementations, EOF, an element, or empty. Let's not worry about why we need empty just yet, but assume for some reason we might want to pass empty. So this is what Input looks like:

sealed trait Input[+E]

object Input {
  case object EOF extends Input[Nothing]
  case object Empty extends Input[Nothing]
  case class El[+E](e: E) extends Input[E]
}

Updating our InputStreamHandler, we now get something that looks like this:

trait InputStreamHandler[E] {
  def onInput(in: Input[E]): Future[InputStreamHandler[E]]
}

Now updating our Consumer from before to handle this, it might look like this:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler[Byte] {
  def onInput(in: Input[Byte]) = in match {
    case El(byte) => Future.successful(new Consume(data :+ byte))
    case _ => Future.successful(this)
  }
}

You can see that when we get EOF or Empty, there's nothing for us to do to change our state, so we just return ourselves again. If we were writing to another stream, we might, when we receive EOF, close that stream (or rather, send it an EOF).

The next thing we're going to do is make it easier for our handler to consume input immediately without having to create a future. To do this, rather than passing the byte directly, we'll pass a function, that takes a function as a parameter, and that function will take the byte as a parameter. So, our handler, when it's ready, will create a function to handle the byte, and then invoke the function that was passed to it, with that function. We'll call the first function the cont function, which is short for continue, and means when you're ready to continue receiving input invoke me. Too many functions? Let's look at the code:

trait InputStreamHandler[E] {
  def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B]
}

Now where did this Future[B] come from? B is just the mechanism that the stream uses to pass state back to itself. As the handler, we don't have to worry about what it is, we just have to make sure that we eventually invoke the cont function, and eventually make sure that the B it returns makes it back to our caller. And what does this look like in our Consume iteratee? Let's have a look:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler {
  def onByte(cont: (Input[Byte] => InputStreamHandler) => Future[B]) = cont {
    case Input.El(byte) => new Consume(data :+ byte)
    case _ => this
  }
}

You can see in our simple case of being ready to handle input immediately, we just immediately invoke cont, we no longer need to worry about creating futures. If we want to handle the input asynchronously, it is a little more complex, but we'll take a look at that later.

Now we have one final step in producing our iteratee API. How does the handler communicate back to the stream that it is finished receiving data? There could be two reasons for this, one is that it's finished receiving data. For example, if our handler is a JSON parser, it might have reached the end of the object it was parsing, and so doesn't want to receive anymore. The other reason is that it's encountered an error, for a JSON parser, this might be a syntax error, or if it's sending data through to another stream, it might be an IO error on that stream.

To allow our iteratee to communicate with the stream, we're going to create a trait that represents its state. We'll call this trait Step, and the three states that the iteratee can be in will be Cont, Done and Error. Our Cont state is going to contain our Input[Byte] => InputStreamHandler function, so that the stream can invoke it. Our Done state will contain our result (in the case of Consume, a Seq[Byte]) and our Error state will contain an error message.

In addition to this, both our Done and Error states need to contain the left over input that they didn't consume. This will be important for when we are composing iteratees together, so that once one iteratee has finished consuming input from a stream, the next can pick up where the first left off. This is one reason why we need Input.Empty, because if we did consume all the input, then we need some way to indicate that.

So, here's our Step trait:

sealed trait Step[E, +A]

object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => InputStreamHandler[E, A]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}

The type parameter E is the type of input our iteratee wants to accept, and A is what it's producing. So our handler trait now looks like this:

trait InputStreamHandler[E, A] {
  def onInput[B](step: Step[E, A] => Future[B]): Future[B]
}

And our consumer is implemented like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte, Seq[Byte]] {
  def onInput(step: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Cont({
    case Input.El(byte) => new Consume(data :+ byte)
    case Input.EOF => new InputStreamHandler[Byte, Seq[Byte]] {
      def onInput(cont: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Done(data, Input.Empty))
    }       
    case Input.Empty => this
  }))
}

One big difference here that you now notice is when we receive EOF, we actually pass Done into the step function, to say we are done consuming the input.

And so now we've built our iteratee interface. Our naming isn't quite right though, so we'll rename the trait obviously to Iteratee, and we'll rename onInput to fold, since we are folding our state into one result. And so now we get our interface:

trait Iteratee[E, +A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B]
}

Iteratees in practice

So far we've started with the requirements of a traditional imperative input stream, and described what an iteratee is in constrast to that. But looking at the above code, you might think that using them is really difficult. They seem like they are far more complex than they need to be, at least conceptually, to implement reactive streams. Well, it turns out that although so far we've shown the basics of the iteratee interface, there is a lot more that a full iteratee API has to offer, and once we start understanding this, and using it, you will start to see how powerful, simple and useful iteratees are.

So remember how iteratees are immutable? And remember how iteratees can be in one of three states, cont, done and error, and depending on which state it's in, it will pass its corresponding step class to the folder function? Well, if an iteratee is immutable and it can be in one of three states, then it can only ever be in that state that it's in, and therefore it will only ever pass that corresponding step to the folder function. If an iteratee is done, it's done, it doesn't matter how many times you call its fold function, it will never become cont or error, and its done value will never change, it will only ever pass the Done step to the folder function with the same A value and the same left over input. Because of this, there is only one implementation of a done iteratee that we'll ever need, it looks like this:

case class Done[E, A](a: A, e: Input[E] = Input.Empty) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Done(a, e))
}

This is the only done iteratee you'll ever need to indicate that you're done. In the Consume iteratee above, when we reached EOF, we created a done iteratee using an anonymous inner class, we didn't need to do this, we could have just used the Done iteratee above. The exact same thing holds for error iteratees:

case class Error[E](msg: String, e: Input[E]) extends Iteratee[E, Nothing] {
  def fold[B](folder: Step[E, Nothing] => Future[B]): Future[B] = folder(Step.Error(msg, e))
}

You may be surprised to find out the exact same thing applies to cont iteratees too - a cont iteratee just passes a function the folder, and that function, because the iteratee is immutable, is never going to change. So consequently, the following iteratee will usually be good enough for your requirements:

case class Cont[E, A](k: Input[E] => Iteratee[E, A]) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Cont(k))
}

So let's rewrite our consume iteratee to use these helper classes:

def consume(data: Array[Byte]): Iteratee[Byte, Array[Byte]] = Cont {
  case Input.El(byte) => consume(data :+ byte)
  case Input.EOF => Done(data)
  case Input.Empty => consume(data)
}

A CSV parser

Now we're looking a lot simpler, our code is focussed on just handling the different types of input we could receive, and returning the correct result. So let's start writing some different iteratees. In fact, let's write an iteratee that parses a CSV file from a stream of characters. Our CSV parser will support optionally quoting fields, and escaping quotes with a double quote.

Our first step will be to write the building blocks of our parser. First up, we want to write something that skips some kinds of white space. So let's write a general purpose drop while iteratee:

def dropWhile(p: Char => Boolean): Iteratee[Char, Unit] = Cont {
  case in @ Input.El(char) if !p(char) => Done(Unit, in)
  case in @ Input.EOF => Done(Unit, in)
  case _ => dropWhile(p)
}

Since we're just dropping input, our result is actually Unit. We return Done if the predicate doesn't match the current char, or if we reach EOF, and otherwise, we return ourselves again. Note that when we are done, we include the input that was passed into us as the remaining data, because this is going to be needed to be consumed by the next iteratee. Using this iteratee we can now write an iteratee that drops white space:

def dropSpaces = dropWhile(c => c == ' ' || c == '\t' || c == '\r')

Next up, we're going to write a take while iteratee, it's going to be a mixture between our earlier consume iteratee, carrying state between each invocation, and the drop while iteratee:

def takeWhile(p: Char => Boolean, data: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, Seq[Char]] = Cont {
  case in @ Input.El(char) => if (p(char)) {
    takeWhile(p, data :+ char)
  } else {
    Done(data, in)
  }
  case in @ Input.EOF => Done(data, in)
  case _ => takeWhile(p, data)
}

We also want to write a peek iteratee, that looks at what the next input is, without actually consuming it:

def peek: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char), in)
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => peek
}

Note that our peek iteratee must return an option, since if it encounters EOF, it can't return anything.

And finally, we want a take one iteratee:

def takeOne: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char))
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => takeOne
}

Using the take one iteratee, we'll build an expect iteratee, that mandates that a certain character must appear next otherwise it throws an error:

def expect(char: Char): Iteratee[Char, Unit] = takeOne.flatMap {
  case Some(c) if c == char => Done(Unit)
  case Some(c) => Error("Expected " + char + " but got " + c, Input.El(c))
  case None => Error("Premature end of input, expected: " + char, Input.EOF)
}

Notice the use of flatMap here. If you haven't come across it before, in the asynchronous world, flatMap basically means "and then". It applies a function to the result of the iteratee, and returns a new iteratee. In our case we're using it to convert the result to either a done iteratee, or an error iteratee, depending on whether the result is what we expected. flatMap is one of the fundamental mechanisms that we'll be using to compose our iteratees together.

Now with our building blocks, we are ready to start building our CSV parser. The first part of it that we'll write is an unquoted value parser. This is very simple, we just want to take all characters that aren't a comma or new line, with one catch. We want the result to be a String, not a Seq[Char] like takeWhile produces. Let's see how we do that:

def unquoted = takeWhile(c => c != ',' && c != '\n').map(v => v.mkString.trim)

As you can see, we've used the map function to transform the end result from a sequence of characters into a String. This is another key method on iteratees that you will find useful.

Our next task is to parse a quoted value. Let's start with an implementation that doesn't take into account escaped quotes. To parse a quoted value, we need to expect a quote, and then we need to take any value that is not a quote, and then we need to expect a quote. Notice that during that sentence I said "and then" 2 times? What method can we use to do an "and then"? That's right, the flatMap method that I talked about before. Let's see what our quoted value parser looks like:

def quoted = expect('"')
  .flatMap(_ => takeWhile(_ != '"'))
  .flatMap(value => expect('"')
    .map(_ => value.mkString))

So now you can probably start to see the usefulness of flatMap. In fact it is so useful, not just for iteratees, but many other things, that Scala has a special syntax for it, called for comprehensions. Let's rewrite the above iteratee using that:

def quoted = for {
  _     <- expect('"')
  value <- takeWhile(_ != '"')
  _     <- expect('"')
} yield value.mkString

Now at this point I hope you are getting excited. What does the above code look like? It looks like ordinary imperative synchronous code. Read this value, then read this value, then read this value. Except it's not synchronous, and it's not imperative. It's functional and asynchronous. We've taken our building blocks, and composed them into a piece of very readable code that makes it completely clear exactly what we are doing.

Now in case you're not 100% sure about the above syntax, the values to the left of the <- signs are the results of the iteratees to the right. These are able to be used anywhere in any subsequent lines, including in the end yield statement. Underscores are used to say we're not interested in the value, we're using this for the expect iteratee since that just returns Unit anyway. The statement after the yield is a map function, which gives us the opportunity to take all the intermediate values and turn them into a single result.

So now that we understand that, let's rewrite our quoted iteratee to support escaped quotes. After reading our quote, we want to peek at the next character. If it's a quote, then we want to append the value we just read, plus a quote to our cumulated value, and recursively invoke the quoted iteratee again. Otherwise, we've reached the end of the value.

def quoted(value: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, String] = for {
  _          <- expect('"')
  maybeValue <- takeWhile(_ != '"')
  _          <- expect('"')
  nextChar   <- peek
  value      <- nextChar match {
    case Some('"') => quoted(value ++ maybeValue :+ '"')
    case _ => Done[Char, String]((value ++ maybeValue).mkString)
  }
} yield value

Now we need to write an iteratee that can parse either a quoted or unquoted value. We choose which one by peeking at the first character, and then accordingly returning the right iteratee.

def value = for {
  char  <- peek
  value <- char match {
    case Some('"') => quoted()
    case None => Error[Char]("Premature end of input, expected a value", Input.EOF)
    case _ => unquoted
  }
} yield value

Let's now parse an entire line, reading until the end of line character.

def values(state: Seq[String] = IndexedSeq[String]()): Iteratee[Char, Seq[String]] = for {
  _        <- dropSpaces
  value    <- value
  _        <- dropSpaces
  nextChar <- takeOne
  values   <- nextChar match {
    case Some('\n') | None => Done[Char, Seq[String]](state :+ value)
    case Some(',') => values(state :+ value)
    case Some(other) => Error("Expected comma, newline or EOF, but found " + other, Input.El(other))
  }
} yield values

Enumeratees

Now, in a similar way to how we parse the values, we could also parse each line of a CSV file until we reach EOF. But this time we're going to do something a little different. We've seen how we can sequence iteratees using flatMap, but there are further possibilities for composing iteratees. Another concept in iteratees is enumeratees. Enumeratees adapt a stream to be consumed by an iteratee. The simplest enumeratees simply map the input values of the stream to be something else. So, for example, here's an enumeratee that converts a stream of strings to a stream of ints:

def toInt: Enumeratee[String,Int] = Enumeratee.map[String](_.toInt)

One of the methods on Enumeratee is transform. We can use this method to apply an enumeratee to an iteratee:

val someIteratee: Iteratee[Int, X] = ...
val adaptedIteratee: Iteratee[String, X] = toInt.transform(someIteratee)

This method is also aliased to an operator, &>>, and so this code below is equivalent to the code above:

val adaptedIteratee: Iteratee[String, X] = toInt &>> someIteratee

We can also make an enumeratee out of another iteratee, and this is exactly what we're going to do with our values iteratee. The Enumeratee.grouped method takes an iteratee and applies it to the stream over and over, the result of each application being an input to feed into the the iteratee that will be transformed. Let's have a look:

def csv = Enumeratee.grouped(values())

Now let's get a little bit more creative with enumeratees. Let's say that our CSV file is very big, so we don't want to load it into memory. Each line is a series of 3 integer columns, and we want to sum each column. So, let's define an enumeratee that converts each set of values to integers:

def toInts = Enumeratee.map[Seq[String]](_.map(_.toInt))

And another enumeratee to convert the sequence to a 3-tuple:

def toThreeTuple = Enumeratee.map[Seq[Int]](s => (s(0), s(1), s(2)))

And finally an iteratee to sum the them:

def sumThreeTuple(a: Int = 0, b: Int = 0, c: Int = 0): Iteratee[(Int, Int, Int), (Int, Int, Int)] = Cont {
  case Input.El((x, y, z)) => sumThreeTuple(a + x, b + y, c + z)
  case Input.Empty => sumThreeTuple(a, b, c)
  case in @ Input.EOF => Done((a, b, c), in)
}

Now to put them all together. There is another method on enumeratee called compose, which, you guessed it, let's you compose enumeratees. This has an alias operator, ><>. Let's use it:

val processCsvFile = csv ><> toInts ><> toThreeTuple &>> sumThreeTuple()

Enumerators

Finally, if an iteratee consumes a stream, what produces a stream? The answer is an enumerator. An enumerator can be applied to an iteratee using its apply method, which is also aliased to >>>. This will leave the iteratee in a cont state, ready to receive more input. If however the enumerator contains the entirety of the stream, then the run method can be used instead which will send the iteratee an EOF once it's finished. This is aliased to |>>>.

The Play enumerator API makes it easy to create an enumerator by passing a sequence of inputs to the Enumerator companion objects apply method. So, we can create an enumerator of characters using the following code:

val csvFile = Enumerator(
  """1,2,3
    |4,5,6""".stripMargin.toCharArray:_*)

And we can feed this into our iteratee like so:

val result = csvFile |>>> processCsvFile

And our result in this case will be a future that is eventually redeemed with (5, 7, 9).

Conclusion

Well, it's been a long journey, but hopefully if you're an imperative programmer, you not only understand iteratees, you understand the reasoning behind their design, and how easily they compose. I also hope you have a better understanding of both functional and asynchronous programming in general. The functional mindset is quite different to the imperative mindset, and I'm still getting my head around it, but particularly after seeing how nice and simple iteratees can be to work with (once you understand them), I'm becoming convinced that functional programming is the way to go.

If you are interested in downloading the code from this blog post, or if you want to see a more complex JSON parsing iteratee/enumeratee, checkout this GitHub project, which has a few examples, including parsing byte/character streams in array chunks, rather than one at a time.

Scaling Scala vs Java

In my previous post I showed how it makes no sense to benchmark Scala against Java, and concluded by saying that when it comes to performance, the question you should be asking is "How will Scala help me when my servers are falling over from unanticipated load?" In this post I will seek to answer that, and show that indeed Scala is a far better language for building scalable systems than Java.

However, don't expect our journey to get there to be easy. For a start, while it's very easy to do micro benchmarks, trying to show how real world apps do or don't handle the loads that are put on them is very hard, because it's very hard to create an app that's small enough to demo and explain in a single blog post that is at the same time big enough to actually show how real world apps behave under load, and it's also very hard to simulate real world loads. So I am going to take one small aspect of something that might go wrong in the real world, and show just one way in which Scala will help you, where Java won't. Then I will explain that this is just the tip of the iceberg, there are far more situations, and far more features of Scala that will help you in the real world.

An online store

For this exercise I have implemented an online store. The architecture of this store is in the diagram below:

As you can see, there is a payment service and a search service that the store talks to, and the store handles three types of requests, one for the index page that doesn't require going to any other services, one for making payments that uses the payments service, and another for searching the stores product list which uses the search service. The online store is the part of the system that I am going to be benchmarking, I will implement one version in Java, and another in Scala, and compare them. The search and payment services won't change. Their actual implementations will be simple JSON APIs that return hard coded values, but they will each simulate a processing time of 20ms.

For the Java implementation of the store, I am going to keep it as simple as possible, using straight servlets to handle requests, Apache Commons HTTP client for making requests, and Jackson for JSON parsing and formatting. I will deploy the application to Tomcat, and configure Tomcat with the NIO connector, using the default connection limit of 10000 and thread pool size of 200.

For the Scala implementation I will use Play Framework 2.1, using the Play WS API which is backed by the Ning HTTP client to make requests, and the Play JSON API which is backed by Jackson to handle JSON parsing and formatting. Play Framework is built using Netty which has no connection limit, and uses Akka for thread pooling, and I have it configured to use the default thread pool size, which is one thread per CPU, and my machine has 4.

The benchmark I will be performing will be using JMeter. For each request type (index, payments and search) I will have 300 threads spinning in a loop making requests with a random 500-1500ms pause in between each request. This gives an average maximum throughput of 300 requests per second per request type, or 900 requests per second all up.

So, let's have a look at the result of the Java benchmark:

On this graph I have plotted 3 metrics per request type. The median is the median request time. For the index page, this is next to nothing, for the search and payments requests, this is about 77ms. I have also plotted the 90% line, which is a common metric in web applications, it shows what 90% of the requests were under, and so gives a good idea of what the slow requests are like. This shows again almost nothing for the index page, and 116ms for the search and payments requests. The final metric is the throughput, which shows number of requests per second that were handled. We are not too far off the theoretical maximum, with the index showing 290 requests per second, and the search and payments requests coming through at about 270 requests per second. These results are good, our Java service handles the load we are throwing at it without a sweat.

Now let's take a look at the Scala benchmark:

As you can see, it's identical to the Java results. This is not surprising, since both the Java and the Scala implementations of the online store are doing absolutely minimal work code wise, most of the processing time is going in to making requests on the remote services.

Something goes wrong

So, we've seen two happy implementations of the same thing in Scala and Java, shrugging off the load I give them. But what happens when things aren't so fine and dandy? What happens if one of the services that they are talking to goes down? Let's say the search service starts taking 30 seconds to respond, after which point it returns an error. This is not an unusual failure situation, particularly if you're load balancing through a proxy, the proxy tries to connect to the service, and fails after 30 seconds, giving you a gateway error. Let's see how our applications handle the load I throw at them now. We would expect the search request to take at least 30 seconds to respond, but what about the others? Here's the Java results:

Well, we no longer have a happy app at all. The search requests are naturally taking a long time, but the payments service is now taking an average of 9 seconds to respond, the 90% line is at 20 seconds. Not only that, but the index page is similarly impacted - users are not going to be waiting that long if they've browsed into your site for the home page to show up. And the throughput of each has gone down to 30 requests per second. This is not good, because your search service went down, your whole site is now practically unusable, and you will soon start losing customers and money.

So how does our Scala app fair? Let's find out:

Now before I say anything else, let me point out that I've bounded the response time to 160ms - the search requests are actually taking about 30 seconds to respond, but on the graph, with 30 seconds next to the other values, they hardly register a line a pixel high. So what we can see here is that while search is unusable, our payments and index request response times and throughput are unchanged. Obviously, customers aren't going to be happy with not being able to do searches, but at least they can still use other parts of your site, see your home page with specials, and even still make payments for items. And hey, Google isn't down, they can always use Google to search your site. So you might lose some business, but the impact is limited.

So, in this benchmark, we can see that Scala wins hands down. When things start to go wrong, a Scala application will take it in it's stride, giving you the best it can, while a Java application will likely just fall over.

But I can do that in Java

Now starts the bit where I counter the many anticipated criticisms that people will make of this benchmark. And the first, and most obvious one, is that in my Scala solution I used asynchronous IO, whereas in my Java solution I didn't, so they can't be compared. It is true, I could have implemented an asynchronous solution in Java, and in that case the Java results would have been identical to the Scala results. However, while I could have done that, Java developers don't do that. It's not that they can't, it's that they don't. I have written a lot of webapps in Java that make calls to other systems, and very rarely, and only in very special circumstances, have I ever used asynchronous IO. And let me show you why.

Let's say you have to do a series of calls on a series of remote services, each one depending on data returned from the previous. Here's a good old fashioned synchronous solution in Java:

User user = getUserById(id);
List<Order> orders = getOrdersForUser(user.email);
List<Product> products = getProductsForOrders(orders);
List<Stock> stock = getStockForProducts(products);

The above code is simple, easy to read, and feels completely natural for a Java developer to write. For completeness, let's have a look at the same thing in Scala:

val user = getUserById(id)
val orders = getOrdersForUser(user.email)
val products = getProductsForOrders(orders)
val stock = getStockForProducts(products)

Now, let's have a look at the same code, but this time assuming we are making asynchronous calls and returning the results in promises. What does it look like in Java?

Promise<User> user = getUserById(id);
Promise<List<Order>> orders = user.flatMap(new Function<User, List<Order>>() {
  public Promise<List<Order>> apply(User user) {
    return getOrdersForUser(user.email);
  }
}
Promise<List<Product>> products = orders.flatMap(new Function<List<Order>, List<Product>>() {
  public Promise<List<Product>> apply(List<Order> orders) {
    return getProductsForOrders(orders);
  }
}
Promise<List<Stock>> stock = products.flatMap(new Function<List<Product>, List<Stock>>() {
  public Promise<List<Stock>> apply(List<Product> products) {
    return getStockForProducts(products);
  }
}

So firstly, the above code is not readable, in fact it's much harder to follow, there is a massively high noise level to actual code that does stuff, and hence it's very easy to make mistakes and miss things. Secondly, it's tedious to write, no developer wants to write code that looks like that, I hate doing it. Any developer that wants to write their whole app like that is insane. And finally, it just doesn't feel natural, it's not the way you do things in Java, it's not idiomatic, it doesn't play well with the rest of the Java ecosystem, third party libraries don't integrate well with this style. As I said before, Java developers can write code that does this, but they don't, and as you can see, they don't for good reason.

So let's take a look at the asynchronous solution in Scala:

for {
  user <- getUserById(id)
  orders <- getOrdersForUser(user.email)
  products <- getProductsForOrders(orders)
  stock <- getStockForProducts(products)
} yield stock

In contrast to the Java asynchronous solution, this solution is completely readable, just as readable as the Scala and Java synchronous solutions. And this isn't just some weird Scala feature that most Scala developers never touch, this is how a typical Scala developer writes code every day. Scala libraries are designed to work using these idioms, it feels natural, the language is working with you. It's fun to write code like this in Scala!

This post is not about how with one language you can write a highly tuned app for performance that's faster than the same app written in another language highly tuned for performance. This post is about how Scala helps you write applications that are scalable by default, using natural, readable and idiomatic code. Just like a ball in lawn bowls has a bias, Scala has a bias to helping you write scalable applications, where Java makes you swim upstream.

But scaling means so much more than that

The example I've provided of Scala scaling well where Java doesn't is a very specific example, but then what situation where your app is failing under high load isn't? Let me give a few other examples of where Scala's much nicer asynchronous IO support helps you to write scalable code:

  • Using Akka, you can easily define actors for different types of requests, and allocate them different resource limits. So if certain parts of your single application start struggling or receiving unanticipated load, those parts may stop responding, but the rest of your app can stay healthy.
  • Scala, Play and Akka make handling single requests using multiple threads running in parallel doing different operations incredibly simple, allowing you to have requests that do a lot in very little time. Klout wrote an excellent article about how they did just that in their API.
  • Because asynchronous IO is so simple, offloading processing onto other machines can be safely done without tying up threads on the first machine.

Java 8 will make asynchronous IO simple in Java

Java 8 is probably going to include support for closures of some sort, which is great news for the Java world, especially if you want to do asynchronous IO. However, the syntax still won't be anywhere near is readable as the Scala code I showed above. And when will Java 8 be released? Java 7 was released last year, and it took 5 years to release that. Java 8 is scheduled for summer 2013, but even if it arrives on schedule, how long will it take for the ecosystem to catch up? And how long will it take for Java developers to switch from a synchronous to an asynchronous mindset? In my opinion, Java 8 is too little too late.

So this is all about asynchronous IO?

So far all I've talked about and shown is how easy Scala makes asynchronous IO, and how that helps you scale. But it doesn't stop there. Let me pick another feature of Scala, immutability.

When you start using multiple threads to process single requests, you start sharing state between those threads. And this is where things get very messy, because the world of shared state in a computer system is a crazy world where impossible things happen. It's a world of deadlocks, a world of updating memory in one thread, but another thread not seeing that change, a world of race conditions, and a world of performance bottle necks because you over eagerly marked some methods as synchronized.

However, it's not that bad, because there is a very simple solution, make all your state immutable. If all your state is immutable, then none of the above problems can happen. And this is again where Scala helps you big time, because in Scala, things are immutable by default. The collection APIs are immutable, you have to explicitly ask for a mutable collection in order to get mutable collections.

Now in Java, you can make things immutable. There are some libraries that help you (albeit clumsily) to work with immutable collections. But it's so easy to accidentally forget to make something mutable. The Java API and language itself don't make working with immutable structures easy, and if you're using a third party library, it's highly likely that it's not using immutable structures, and often requires you to use mutable structures, for example, JPA requires this.

Let's have a look at some code. Here is an immutable class in Scala:

case class User(id: Long, name: String, email: String)

That structure is immutable. Moreover, it automatically generates accessors for the properties. Let's look at the corresponding Java:

public class User {
  private final long id;
  private final String name;
  private final String email;

  public User(long id, String name, String email) {
    this.id = id;
    this.name = name;
    this.email = email;
  }

  public long getId() {
    return id;
  }

  public String getName() {
    return name;
  }

  public String getEmail() {
    return email
  }
}

That's an enormous amount of code! And what if I add a new property? I have to add a new parameter to my constructor which will break existing code, or I have to define a second constructor. In Scala I can just do this:

case class User(id: Long, name: String, email: String, company: Option[Company] = None)

All my existing code that calls that constructor will still work. And what about when this object grows to have 10 items in the constructor, constructing it becomes a nightmare! A solution to this in Java is to use the builder pattern, which more than doubles the amount of code you have to write for the object. In Scala, you can name the parameters, so it's easy to see which parameter is which, and they don't have to be in the right order. But maybe I might want to just modify one property. This can be done in Scala like this:

case class User(id: Long, name: String, email: String, company: Option[Company] = None) {
  def copy(id: Long = id, name: String = name, email: String = email, company: Option[Company] = company) = User(id, name, email, company)
}

val james = User(1, "James", "james@jazzy.id.au")
val jamesWithCompany = james.copy(company = Some(Company("Typesafe")))

The above code is natural, it's simple, it's readable, it's how Scala developers write code every day, and it's immutable. It is aptly suited to concurrent code, and allows you to safely write systems that scale. The same can be done in Java, but it's tedious, and not at all a joy to write. I am a big advocate of immutable code in Java, and I have written many immutable classes in Java, and it hurts, but it's the lesser of two hurts. In Scala, it takes more code to use mutable objects than to use immutable. Again, Scala is biased towards helping you scale.

Conclusion

I cannot possibly go into all the ways in which Scala helps you scale where Java doesn't. But I hope I have given you a taste of why Scala is on your side when it comes to writing Scalable systems. I've shown some concrete metrics, I've compared Java and Scala solutions for writing scalable code, and I've shown, not that Scala systems will always scale better than Java systems, but rather that Scala is the language that is on your side when writing scalable systems. It is biased towards scaling, it encourages practices that help you scale. Java, in contrast, makes it difficult for you to implement these practices, it works against you.

If you're interested in my code for the online store, you can find it in this GitHub repository. The numbers from my performance test can be found in this spreadsheet.

About

Hi! My name is James Roper, and I am a software developer with a particular interest in open source development and trying new things. I program in Scala, Java, PHP, Python and Javascript, and I work for Lightbend as a developer on Lagom. I also have a full life outside the world of IT, am a passionate Christian, enjoy playing a variety of musical instruments and sports, and currently I live in Canberra.

I also have a another blog called Roped In about when my wife and I lived in Berlin for a year to help a church reconnect with its city.