all that jazz

james' blog about scala and all that jazz

Iteratees for imperative programmers

When I first heard the word iteratee, I thought it was a joke. Turns out, it wasn't a joke, in fact there are also enumerators (that's ok) and enumeratees (you're killing me). If you're an imperative programmer, or rather a programmer who feels more comfortable writing imperative code than functional code, then you may be a little overwhelmed by all the introductions to iteratees out there, because they all assume that you think from a functional perspective. Well I just learnt iteratees, and although I'm feeling more and more comfortable with functional programming every day, I still think like an imperative programmer at heart. This made learning iteratees very difficult for me. So while I'm still in the imperative mindset, I thought this a very good opportunity to explain iteratees from an imperative programmers perspective, taking no functional knowledge for granted. If you're an imperative programmer who wants to learn iteratees, this is the blog post for you. I'm going to specifically be looking at Play's Iteratee API, but the concepts learnt here will apply to all Iteratees in general.

So let's start off with explaining what iteratees, and their counterparts, are trying to achieve. An iteratee is a method of reactively handling streams of data that is very easily composable. By reactive, I mean non blocking, ie you react to data being available to read, and react to the opportunity to write data. By composable, I mean you write simple iteratees that do one small thing well, then you use those as the building blocks to write iteratees that do bigger things, and you use those as the building blocks to write iteratees to do even bigger things, and so on. At each stage, everything is simple and easy to reason about.

Reactive stream handling

If you're looking for information about iteratees, then I'm guessing you already know a bit about what reactive stream handling is. Let's contrast it to synchronous IO code:

trait InputStream {
  def read(): Byte
}

So this should be very familiar, if you want to read a byte, you call read. If no byte is currently available to be read, that call will block, and your thread will wait until a byte is available. With reactive streams, obviously it's the other way around, you pass a callback to the stream you want to receive data from, and it will call that when it's ready to give data to you. So typically you might implement a trait that looks like this:

trait InputStreamHandler {
  def onByte(byte: Byte)
}

So before we go on, let's look at how the same thing would be achieved in a pure functional world. At this point I don't want you to ask why we want to do things this way, you will see that later on, but if you know anything about functional programming, you know that everything tends to be immutable, and functions have no side effects. The trait above has to have side effects, because unless you are ignoring the bytes passed to onByte, you must be changing your state (or something elses state) somehow in that function. So, how do we handle data without changing our state? The answer is the same way other immutable data structures work, we return a copy of ourselves, updated with the new state. So if the InputStreamHandler were to be functional, it might look like this:

trait InputStreamHandler {
  def onByte(byte: Byte): InputStreamHandler
}

And an example implementation of one, that reads input into a seq, might look like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler {
  def onByte(byte: Byte) = new Consume(data :+ byte)
}

So we now have imperative and functional traits that react to our input stream, and you might be thinking this is all there is to reactive streams. If that's the case, you're wrong. What if we're not ready to handle data when the onByte method is called? If we're building structures in memory this will never be the case, but if for example we're storing them to a file or to a database as we receive the data, then this very likely will be the case. So reactive streams are two way, it's not just you, the stream consumer that is reacting to input, the stream producer must react to you being ready for input.

Now this is possible to implement in an imperative world, though things do start looking much more functional. We simply start using futures:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[Unit]
}

So, when the stream we are consuming has a byte for us, it calls onByte, and then attaches a callback to the future we return, to pass the next byte, when it's ready. If you have a look at Netty's asynchronous channel APIs, you'll see it uses exactly this pattern. We can also implement something similar for an immutable functional API:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[InputStreamHandler]
}

And so here we have a functional solution for reactive stream handling. But it's not a very good one, for a start, there's no way for the handlers to communicate to the code that uses them that they don't want to receive any more input, or if they've encountered an error (exceptions are frowned upon in functional programming). We could add things to handle this, but very soon our interface would become quite complex, hard to break up into small pieces that can be composed, etc. I'm not going to justify this now, I think you'll see it later when I show you just how easy iteratees are to compose.

So, by this stage I hope you have understood two important points. Firstly, reactive stream handling means twofold reacting, both your code has to react to the stream being ready, and the stream has to react to you being ready. Secondly, when I say that we want a functional solution, I mean a solution where everything is immutable, and that is achieved by our stream handlers producing copies of themselves each time they receive/send data. If you've understood those two important points, then now we can move on to introducing iteratees.

Iteratees

There are a few things that our interface hasn't yet addressed. The first is, how does the stream communicate to us that it is finished, that is, that it has no more data for us? To do this, instead of passing in a byte directly, we're going to abstract our byte to be something of type Input[Byte], and that type can have three possible implementations, EOF, an element, or empty. Let's not worry about why we need empty just yet, but assume for some reason we might want to pass empty. So this is what Input looks like:

sealed trait Input[+E]

object Input {
  case object EOF extends Input[Nothing]
  case object Empty extends Input[Nothing]
  case class El[+E](e: E) extends Input[E]
}

Updating our InputStreamHandler, we now get something that looks like this:

trait InputStreamHandler[E] {
  def onInput(in: Input[E]): Future[InputStreamHandler[E]]
}

Now updating our Consumer from before to handle this, it might look like this:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler[Byte] {
  def onInput(in: Input[Byte]) = in match {
    case El(byte) => Future.successful(new Consume(data :+ byte))
    case _ => Future.successful(this)
  }
}

You can see that when we get EOF or Empty, there's nothing for us to do to change our state, so we just return ourselves again. If we were writing to another stream, we might, when we receive EOF, close that stream (or rather, send it an EOF).

The next thing we're going to do is make it easier for our handler to consume input immediately without having to create a future. To do this, rather than passing the byte directly, we'll pass a function, that takes a function as a parameter, and that function will take the byte as a parameter. So, our handler, when it's ready, will create a function to handle the byte, and then invoke the function that was passed to it, with that function. We'll call the first function the cont function, which is short for continue, and means when you're ready to continue receiving input invoke me. Too many functions? Let's look at the code:

trait InputStreamHandler[E] {
  def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B]
}

Now where did this Future[B] come from? B is just the mechanism that the stream uses to pass state back to itself. As the handler, we don't have to worry about what it is, we just have to make sure that we eventually invoke the cont function, and eventually make sure that the B it returns makes it back to our caller. And what does this look like in our Consume iteratee? Let's have a look:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler {
  def onByte(cont: (Input[Byte] => InputStreamHandler) => Future[B]) = cont {
    case Input.El(byte) => new Consume(data :+ byte)
    case _ => this
  }
}

You can see in our simple case of being ready to handle input immediately, we just immediately invoke cont, we no longer need to worry about creating futures. If we want to handle the input asynchronously, it is a little more complex, but we'll take a look at that later.

Now we have one final step in producing our iteratee API. How does the handler communicate back to the stream that it is finished receiving data? There could be two reasons for this, one is that it's finished receiving data. For example, if our handler is a JSON parser, it might have reached the end of the object it was parsing, and so doesn't want to receive anymore. The other reason is that it's encountered an error, for a JSON parser, this might be a syntax error, or if it's sending data through to another stream, it might be an IO error on that stream.

To allow our iteratee to communicate with the stream, we're going to create a trait that represents its state. We'll call this trait Step, and the three states that the iteratee can be in will be Cont, Done and Error. Our Cont state is going to contain our Input[Byte] => InputStreamHandler function, so that the stream can invoke it. Our Done state will contain our result (in the case of Consume, a Seq[Byte]) and our Error state will contain an error message.

In addition to this, both our Done and Error states need to contain the left over input that they didn't consume. This will be important for when we are composing iteratees together, so that once one iteratee has finished consuming input from a stream, the next can pick up where the first left off. This is one reason why we need Input.Empty, because if we did consume all the input, then we need some way to indicate that.

So, here's our Step trait:

sealed trait Step[E, +A]

object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => InputStreamHandler[E, A]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}

The type parameter E is the type of input our iteratee wants to accept, and A is what it's producing. So our handler trait now looks like this:

trait InputStreamHandler[E, A] {
  def onInput[B](step: Step[E, A] => Future[B]): Future[B]
}

And our consumer is implemented like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte, Seq[Byte]] {
  def onInput(step: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Cont({
    case Input.El(byte) => new Consume(data :+ byte)
    case Input.EOF => new InputStreamHandler[Byte, Seq[Byte]] {
      def onInput(cont: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Done(data, Input.Empty))
    }       
    case Input.Empty => this
  }))
}

One big difference here that you now notice is when we receive EOF, we actually pass Done into the step function, to say we are done consuming the input.

And so now we've built our iteratee interface. Our naming isn't quite right though, so we'll rename the trait obviously to Iteratee, and we'll rename onInput to fold, since we are folding our state into one result. And so now we get our interface:

trait Iteratee[E, +A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B]
}

Iteratees in practice

So far we've started with the requirements of a traditional imperative input stream, and described what an iteratee is in constrast to that. But looking at the above code, you might think that using them is really difficult. They seem like they are far more complex than they need to be, at least conceptually, to implement reactive streams. Well, it turns out that although so far we've shown the basics of the iteratee interface, there is a lot more that a full iteratee API has to offer, and once we start understanding this, and using it, you will start to see how powerful, simple and useful iteratees are.

So remember how iteratees are immutable? And remember how iteratees can be in one of three states, cont, done and error, and depending on which state it's in, it will pass its corresponding step class to the folder function? Well, if an iteratee is immutable and it can be in one of three states, then it can only ever be in that state that it's in, and therefore it will only ever pass that corresponding step to the folder function. If an iteratee is done, it's done, it doesn't matter how many times you call its fold function, it will never become cont or error, and its done value will never change, it will only ever pass the Done step to the folder function with the same A value and the same left over input. Because of this, there is only one implementation of a done iteratee that we'll ever need, it looks like this:

case class Done[E, A](a: A, e: Input[E] = Input.Empty) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Done(a, e))
}

This is the only done iteratee you'll ever need to indicate that you're done. In the Consume iteratee above, when we reached EOF, we created a done iteratee using an anonymous inner class, we didn't need to do this, we could have just used the Done iteratee above. The exact same thing holds for error iteratees:

case class Error[E](msg: String, e: Input[E]) extends Iteratee[E, Nothing] {
  def fold[B](folder: Step[E, Nothing] => Future[B]): Future[B] = folder(Step.Error(msg, e))
}

You may be surprised to find out the exact same thing applies to cont iteratees too - a cont iteratee just passes a function the folder, and that function, because the iteratee is immutable, is never going to change. So consequently, the following iteratee will usually be good enough for your requirements:

case class Cont[E, A](k: Input[E] => Iteratee[E, A]) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Cont(k))
}

So let's rewrite our consume iteratee to use these helper classes:

def consume(data: Array[Byte]): Iteratee[Byte, Array[Byte]] = Cont {
  case Input.El(byte) => consume(data :+ byte)
  case Input.EOF => Done(data)
  case Input.Empty => consume(data)
}

A CSV parser

Now we're looking a lot simpler, our code is focussed on just handling the different types of input we could receive, and returning the correct result. So let's start writing some different iteratees. In fact, let's write an iteratee that parses a CSV file from a stream of characters. Our CSV parser will support optionally quoting fields, and escaping quotes with a double quote.

Our first step will be to write the building blocks of our parser. First up, we want to write something that skips some kinds of white space. So let's write a general purpose drop while iteratee:

def dropWhile(p: Char => Boolean): Iteratee[Char, Unit] = Cont {
  case in @ Input.El(char) if !p(char) => Done(Unit, in)
  case in @ Input.EOF => Done(Unit, in)
  case _ => dropWhile(p)
}

Since we're just dropping input, our result is actually Unit. We return Done if the predicate doesn't match the current char, or if we reach EOF, and otherwise, we return ourselves again. Note that when we are done, we include the input that was passed into us as the remaining data, because this is going to be needed to be consumed by the next iteratee. Using this iteratee we can now write an iteratee that drops white space:

def dropSpaces = dropWhile(c => c == ' ' || c == '\t' || c == '\r')

Next up, we're going to write a take while iteratee, it's going to be a mixture between our earlier consume iteratee, carrying state between each invocation, and the drop while iteratee:

def takeWhile(p: Char => Boolean, data: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, Seq[Char]] = Cont {
  case in @ Input.El(char) => if (p(char)) {
    takeWhile(p, data :+ char)
  } else {
    Done(data, in)
  }
  case in @ Input.EOF => Done(data, in)
  case _ => takeWhile(p, data)
}

We also want to write a peek iteratee, that looks at what the next input is, without actually consuming it:

def peek: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char), in)
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => peek
}

Note that our peek iteratee must return an option, since if it encounters EOF, it can't return anything.

And finally, we want a take one iteratee:

def takeOne: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char))
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => takeOne
}

Using the take one iteratee, we'll build an expect iteratee, that mandates that a certain character must appear next otherwise it throws an error:

def expect(char: Char): Iteratee[Char, Unit] = takeOne.flatMap {
  case Some(c) if c == char => Done(Unit)
  case Some(c) => Error("Expected " + char + " but got " + c, Input.El(c))
  case None => Error("Premature end of input, expected: " + char, Input.EOF)
}

Notice the use of flatMap here. If you haven't come across it before, in the asynchronous world, flatMap basically means "and then". It applies a function to the result of the iteratee, and returns a new iteratee. In our case we're using it to convert the result to either a done iteratee, or an error iteratee, depending on whether the result is what we expected. flatMap is one of the fundamental mechanisms that we'll be using to compose our iteratees together.

Now with our building blocks, we are ready to start building our CSV parser. The first part of it that we'll write is an unquoted value parser. This is very simple, we just want to take all characters that aren't a comma or new line, with one catch. We want the result to be a String, not a Seq[Char] like takeWhile produces. Let's see how we do that:

def unquoted = takeWhile(c => c != ',' && c != '\n').map(v => v.mkString.trim)

As you can see, we've used the map function to transform the end result from a sequence of characters into a String. This is another key method on iteratees that you will find useful.

Our next task is to parse a quoted value. Let's start with an implementation that doesn't take into account escaped quotes. To parse a quoted value, we need to expect a quote, and then we need to take any value that is not a quote, and then we need to expect a quote. Notice that during that sentence I said "and then" 2 times? What method can we use to do an "and then"? That's right, the flatMap method that I talked about before. Let's see what our quoted value parser looks like:

def quoted = expect('"')
  .flatMap(_ => takeWhile(_ != '"'))
  .flatMap(value => expect('"')
    .map(_ => value.mkString))

So now you can probably start to see the usefulness of flatMap. In fact it is so useful, not just for iteratees, but many other things, that Scala has a special syntax for it, called for comprehensions. Let's rewrite the above iteratee using that:

def quoted = for {
  _     <- expect('"')
  value <- takeWhile(_ != '"')
  _     <- expect('"')
} yield value.mkString

Now at this point I hope you are getting excited. What does the above code look like? It looks like ordinary imperative synchronous code. Read this value, then read this value, then read this value. Except it's not synchronous, and it's not imperative. It's functional and asynchronous. We've taken our building blocks, and composed them into a piece of very readable code that makes it completely clear exactly what we are doing.

Now in case you're not 100% sure about the above syntax, the values to the left of the <- signs are the results of the iteratees to the right. These are able to be used anywhere in any subsequent lines, including in the end yield statement. Underscores are used to say we're not interested in the value, we're using this for the expect iteratee since that just returns Unit anyway. The statement after the yield is a map function, which gives us the opportunity to take all the intermediate values and turn them into a single result.

So now that we understand that, let's rewrite our quoted iteratee to support escaped quotes. After reading our quote, we want to peek at the next character. If it's a quote, then we want to append the value we just read, plus a quote to our cumulated value, and recursively invoke the quoted iteratee again. Otherwise, we've reached the end of the value.

def quoted(value: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, String] = for {
  _          <- expect('"')
  maybeValue <- takeWhile(_ != '"')
  _          <- expect('"')
  nextChar   <- peek
  value      <- nextChar match {
    case Some('"') => quoted(value ++ maybeValue :+ '"')
    case _ => Done[Char, String]((value ++ maybeValue).mkString)
  }
} yield value

Now we need to write an iteratee that can parse either a quoted or unquoted value. We choose which one by peeking at the first character, and then accordingly returning the right iteratee.

def value = for {
  char  <- peek
  value <- char match {
    case Some('"') => quoted()
    case None => Error[Char]("Premature end of input, expected a value", Input.EOF)
    case _ => unquoted
  }
} yield value

Let's now parse an entire line, reading until the end of line character.

def values(state: Seq[String] = IndexedSeq[String]()): Iteratee[Char, Seq[String]] = for {
  _        <- dropSpaces
  value    <- value
  _        <- dropSpaces
  nextChar <- takeOne
  values   <- nextChar match {
    case Some('\n') | None => Done[Char, Seq[String]](state :+ value)
    case Some(',') => values(state :+ value)
    case Some(other) => Error("Expected comma, newline or EOF, but found " + other, Input.El(other))
  }
} yield values

Enumeratees

Now, in a similar way to how we parse the values, we could also parse each line of a CSV file until we reach EOF. But this time we're going to do something a little different. We've seen how we can sequence iteratees using flatMap, but there are further possibilities for composing iteratees. Another concept in iteratees is enumeratees. Enumeratees adapt a stream to be consumed by an iteratee. The simplest enumeratees simply map the input values of the stream to be something else. So, for example, here's an enumeratee that converts a stream of strings to a stream of ints:

def toInt: Enumeratee[String,Int] = Enumeratee.map[String](_.toInt)

One of the methods on Enumeratee is transform. We can use this method to apply an enumeratee to an iteratee:

val someIteratee: Iteratee[Int, X] = ...
val adaptedIteratee: Iteratee[String, X] = toInt.transform(someIteratee)

This method is also aliased to an operator, &>>, and so this code below is equivalent to the code above:

val adaptedIteratee: Iteratee[String, X] = toInt &>> someIteratee

We can also make an enumeratee out of another iteratee, and this is exactly what we're going to do with our values iteratee. The Enumeratee.grouped method takes an iteratee and applies it to the stream over and over, the result of each application being an input to feed into the the iteratee that will be transformed. Let's have a look:

def csv = Enumeratee.grouped(values())

Now let's get a little bit more creative with enumeratees. Let's say that our CSV file is very big, so we don't want to load it into memory. Each line is a series of 3 integer columns, and we want to sum each column. So, let's define an enumeratee that converts each set of values to integers:

def toInts = Enumeratee.map[Seq[String]](_.map(_.toInt))

And another enumeratee to convert the sequence to a 3-tuple:

def toThreeTuple = Enumeratee.map[Seq[Int]](s => (s(0), s(1), s(2)))

And finally an iteratee to sum the them:

def sumThreeTuple(a: Int = 0, b: Int = 0, c: Int = 0): Iteratee[(Int, Int, Int), (Int, Int, Int)] = Cont {
  case Input.El((x, y, z)) => sumThreeTuple(a + x, b + y, c + z)
  case Input.Empty => sumThreeTuple(a, b, c)
  case in @ Input.EOF => Done((a, b, c), in)
}

Now to put them all together. There is another method on enumeratee called compose, which, you guessed it, let's you compose enumeratees. This has an alias operator, ><>. Let's use it:

val processCsvFile = csv ><> toInts ><> toThreeTuple &>> sumThreeTuple()

Enumerators

Finally, if an iteratee consumes a stream, what produces a stream? The answer is an enumerator. An enumerator can be applied to an iteratee using its apply method, which is also aliased to >>>. This will leave the iteratee in a cont state, ready to receive more input. If however the enumerator contains the entirety of the stream, then the run method can be used instead which will send the iteratee an EOF once it's finished. This is aliased to |>>>.

The Play enumerator API makes it easy to create an enumerator by passing a sequence of inputs to the Enumerator companion objects apply method. So, we can create an enumerator of characters using the following code:

val csvFile = Enumerator(
  """1,2,3
    |4,5,6""".stripMargin.toCharArray:_*)

And we can feed this into our iteratee like so:

val result = csvFile |>>> processCsvFile

And our result in this case will be a future that is eventually redeemed with (5, 7, 9).

Conclusion

Well, it's been a long journey, but hopefully if you're an imperative programmer, you not only understand iteratees, you understand the reasoning behind their design, and how easily they compose. I also hope you have a better understanding of both functional and asynchronous programming in general. The functional mindset is quite different to the imperative mindset, and I'm still getting my head around it, but particularly after seeing how nice and simple iteratees can be to work with (once you understand them), I'm becoming convinced that functional programming is the way to go.

If you are interested in downloading the code from this blog post, or if you want to see a more complex JSON parsing iteratee/enumeratee, checkout this GitHub project, which has a few examples, including parsing byte/character streams in array chunks, rather than one at a time.

comments powered by Disqus

About

Hi! My name is James Roper, and I am a software developer with a particular interest in open source development and trying new things. I program in Scala, Java, Go, PHP, Python and Javascript, and I work for Lightbend as the architect of Kalix. I also have a full life outside the world of IT, enjoy playing a variety of musical instruments and sports, and currently I live in Canberra.