all that jazz

james' blog about scala and all that jazz

SBT per version global plugins

I'm an IntelliJ user, so I have the SBT idea plugin permanently in my global SBT plugins. This is fine, until I start working across versions of SBT, and particularly with milestone builds of unreleased SBT versions, because in that case, the version of the idea plugin that I usually use with other versions of SBT may not be available. Hence I need to have per SBT version global plugins. I think this might be easier to do in future, but for now, here's a way to do it:

libraryDependencies <++= (sbtBinaryVersion in update, scalaBinaryVersion in update) { (sbtV, scalaV) =>
  sbtV match {
    case sbt013 if sbt013.startsWith("0.13.") => Seq(
      Defaults.sbtPluginExtra("com.github.mpeltonen" % "sbt-idea" % "1.5.0-SNAPSHOT", sbtV, scalaV)
    )
    case _ => Seq(
      Defaults.sbtPluginExtra("com.github.mpeltonen" % "sbt-idea" % "1.4.0", sbtV, scalaV)
    )
  }
}

Advanced routing in Play Framework

We frequently get questions about how to meet all sorts of different routing needs in Play Framework. While the built in router is enough for most users, sometimes you may encounter use cases where it's not enough. Or, maybe you want a more convenient way to implement some routing pattern. Whatever it is, Play will allow you to do pretty much anything. This blog post is going to describe some common use cases.

Hooking into Plays routing mechanism

If for some reason you don't like Plays router, or if you want to use a modified router, then Play allows you to do this easily. Global.onRouteRequest is the method that is invoked to do routing. By default, this delegates to the Play router, but you can override it to do whatever you want. For example:

override def onRouteRequest(req: RequestHeader): Option[Handler] = {
  (req.method, req.path) match {
    case ("GET", "/") => Some(controllers.Application.index)
    case ("POST", "/submit") => Some(controllers.Application.submit)
    case _ => None
  }
}

As you can see, I've practically implemented my own little routing DSL here. I could also delegate back to the default router by invoking super.onRouteRequest(req).

An interesting thing that could also be done is to delegate to different routers based on something in the request. A play router compiles to an instance of Router.Routes, and it will be an object called Routes itself. By default, any file with the .routes extension in the conf directory will by compiled, and will go in the package with the same name as the filename, minus the .routes. So if I had two routers, foo.routes and bar.routes, I could implemented a crude form of virtual hosting like so:

override def onRouteRequest(req: RequestHeader): Option[Handler] = {
  if (req.host == "foo.example.com") {
    foo.Routes.routes.lift(req)
  } else if (req.host == "bar.example.com") {
    bar.Routes.routes.lift(req)
  } else {
    super.onRouteRequest(req)
  }
}

So here are some use cases that overriding onRouteRequest may be useful for:

  • Modifying the request in some way before routing is done
  • Plugging in a completely different router (eg, jaxrs)
  • Delegating to different routes files based on some aspect of the request

Implementing a custom router

We saw in the previous example how to use Plays Router.Routes interface, another option is to implement it. Now, there's no real reason to implement it if you're just going to delegate to it directly from onRouteRequest. However, by implementing this interface, you can include it in another routes file, using the sub routes include syntax, which in case you haven't come across this before, typically looks like this:

->    /foo         foo.Routes

Now something that people often criticise Play for is that it doesn't support rails style resource routing, where a convention is used to route commonly needed REST endpoints to particular methods on a controller. Although Play comes with nothing out of the box that does this, it is not hard to implement this today for your project, Play 2.1 has everything you need to support it, by using the routes includes syntax, and implementing your own router. And I have some good news too, we will be introducing a feature like this into Play soon. But until then, and also if you have your own custom conventions that you want to implement, you will probably find these instructions very helpful.

So let's start off with an interface that our controllers can implement:

trait ResourceController[T] extends Controller {
  def index: EssentialAction
  def newScreen: EssentialAction
  def create: EssentialAction
  def show(id: T): EssentialAction
  def edit(id: T): EssentialAction
  def update(id: T): EssentialAction
  def destroy(id: T): EssentialAction
}

I could provide default implementations that return not implemented, but then implementing it would require using override keywords. I think it's a matter of preference here.

Now I'm going to write a router. The router interface looks like this:

trait Routes {
  def routes: PartialFunction[RequestHeader, Handler]
  def documentation: Seq[(String, String, String)]
  def setPrefix(prefix: String)
  def prefix: String
}

The routes method is pretty self explanatory, it is the function that looks up the handler for a request. documentation is used to document the router, it is not mandatory, but it used by at least one REST API documenting tool to discover what routes are available and what they look like. For brevity in this post, we won't worry about implementing it. The prefix and setPrefix methods are used by Play to inject the path of the router. In the routes includes syntax that I showed above, you could see that we declared the router to be on the path /foo. This path is injected using this mechanism.

So we'll write an abstract class that implements the routes interface and the ResourceController interface:

abstract class ResourceRouter[T](implicit idBindable: PathBindable[T]) 
    extends Router.Routes with ResourceController[T] {
  private var path: String = ""
  def setPrefix(prefix: String) {
    path = prefix
  }
  def prefix = path
  def documentation = Nil
  def routes = ...
}

I've given it a PathBindable, this is so that we have a way to convert the id from a String extracted from the path to the type accepted by the methods. PathBindable is the same interface that's used under the covers when in a normal routes file to convert types.

Now for the implementation of routes. First I'm going to create some regular expressions for matching the different paths:

  private val MaybeSlash = "/?".r
  private val NewScreen = "/new/?".r
  private val Id = "/([^/]+)/?".r
  private val Edit = "/([^/]+)/edit/?".r

I'm also going to create a helper function for the routes that require the id to be bound:

def withId(id: String, action: T => EssentialAction) = 
  idBindable.bind("id", id).fold(badRequest, action)

badRequest is actually a method on Router.Routes that takes the error message and turns it into an action that returns that as a result. Now I'm ready to implement the partial function:

def routes = new AbstractPartialFunction[RequestHeader, Handler] {
  override def applyOrElse[A <: RequestHeader, B >: Handler](rh: A, default: A => B) = {
    if (rh.path.startsWith(path)) {
      (rh.method, rh.path.drop(path.length)) match {
        case ("GET", MaybeSlash()) => index
        case ("GET", NewScreen()) => newScreen
        case ("POST", MaybeSlash()) => create
        case ("GET", Id(id)) => withId(id, show)
        case ("GET", Edit(id)) => withId(id, edit)
        case ("PUT", Id(id)) => withId(id, update)
        case ("DELETE", Id(id)) => withId(id, destroy)
        case _ => default(rh)
      }
    } else {
      default(rh)
    }
  }

  def isDefinedAt(rh: RequestHeader) = ...
}

I've implemented AbstractPartialFunction, and the main method to implement then is applyOrElse. The match statement doesn't look much unlike the mini DSL I showed in the first code sample. I'm using regular expressions as extractor objects to extract the ids out of the path. Note that I haven't shown the implementation of isDefinedAt. Play actually won't call this, but it's good to implement it anyway, it's basically the same implementation as applyOrElse, except instead of invoking the corresponding methods, it returns true, or for when nothing matches, it returns false.

And now we're done. So what does using this look like? My controller looks like this:

package controllers

object MyResource extends ResourceRouter[Long] {
  def index = Action {...}
  def create(id: Long) = Action {...}
  ...
  def custom(id: Long) = Action {...}
}

And in my routes file I have this:

->     /myresource              controllers.MyResource
POST   /myresource/:id/custom   controllers.MyResource.custom(id: Long)

You can see I've also shown an example of adding a custom action to the controller, obviously the standard crud actions are not going to be enough, and the nice thing about this is that you can add as many arbitrary routes as you want.

But what if we want to have a managed controller, that is, one whose instantiation is managed by a DI framework? Well let's created another router that does this:

class ManagedResourceRouter[T, R >: ResourceController[T]]
    (implicit idBindable: PathBindable[T], ct: ClassTag[R]) 
    extends ResourceRouter[T] {

  private def invoke(action: R => EssentialAction) = {
    Play.maybeApplication.map { app =>
      action(app.global.getControllerInstance(ct.runtimeClass.asInstanceOf[Class[R]]))
    } getOrElse {
      Action(Results.InternalServerError("No application"))
    }
  }

  def index = invoke(_.index)
  def newScreen = invoke(_.newScreen)
  def create = invoke(_.create)
  def show(id: T) = invoke(_.show(id))
  def edit(id: T) = invoke(_.edit(id))
  def update(id: T) = invoke(_.update(id))
  def destroy(id: T) = invoke(_.destroy(id))
}

This uses the same Global.getControllerInstance method that managed controllers in the regular router use. Now to use this is very simple:

package controllers

class MyResource(dbService: DbService) extends ResourceController[Long] {
  def index = Action {...}
  def create(id: Long) = Action {...}
  ...
  def custom(id: Long) = Action {...}
}
object MyResource extends ManagedResourceRouter[Long, MyResource]

And in the routes file:

->     /myresource              controllers.MyResource
POST   /myresource/:id/custom   @controllers.MyResource.custom(id: Long)

The final thing we need to consider is reverse routing and the Javascript router. Again this is very simple, but I'm not going to go into any details here. Instead, you can check out the final product, which has a few more features, here.

Java 8 Lambdas - The missing link to moving away from Java

I learnt functional programming, but then I decided I liked imperative programming better so I switched back.
— Nobody, ever

Moving from imperative programming to functional programming is a very common thing to do today. Blog posts on the internet abound with testimonies about it. Everything I've read and every person I've talked to, including myself, has the same story. Once they started functional programming, there was no looking back. They loved it, and in the early days, even the small amount they learnt gave them a thirst to learn more.

To me it seems clear, going from imperative programming to functional programming is a one way street under heavy traffic. It's a diode with a million volts sitting across it. It's a check valve on a mains water pipe. Not only can you not go back, but it comes with an irresistible desire to explore and learn more that pushes you further into functional programming.

Java 8 Lambdas

With Java 8 lambdas on the way, this poses an interesting turning point for one of the largest groups of developers on the planet. Lambdas in themselves don't necessarily equate to functional programming. But they do enable it. And as a developer here starts dabbling in functional programming, a library maintainer there, we'll start to see some new things in Java source code. Methods that previously might have returned null will start returning Optional. Libraries that do IO, for example HTTP client libraries, will start returning CompletableFuture. More and more functional concepts will start creeping into Java interfaces, there will be methods called fold, map, reduce, collect. And so will start the one way street of the Java masses moving from imperative programming to functional programming.

But will Java satisfy their thirst? Looking at the Lambda spec, I suspect not. I see an essence of genius in the Lambda spec, it makes many, many existing libraries instantly usable, with no changes, with Lambdas. The reason for this is that a Lambda is just syntactic sugar for implementing a single-abstract-method (SAM) interface. In Java you'll find SAM's everywhere, from Runnable and Callable in the concurrency package, to ActionListener in Swing, to Function and Supplier in Guava, and the list goes on. All of these libraries are Lambda ready today.

However, this also presents a problem. Functional programming gets interesting when you start composing things. The ability to pass functions around and compose them together gives a lot of power - but the Java 8 Lambdas are not composable. Java 8 does provide a Future SAM, but so does Guava, and many other libraries. To compose these together, you'd need all permutations of composition methods. Two SAM's of the same type aren't even very simple to compose, at least, not in a traditional Java way, since you can't add any methods to the SAM, such as a map or transform method, to do the composing.

So, without the ability to do one of the most basic functional concepts, composing functions, can Java ever become a functional language? Maybe there are some creative ways to solve this that I haven't thought of. And maybe it doesn't need to, I don't think the designers of Java 8 Lambdas had any intention of making Java a functional language, so you can't call this a fault of the Lambda spec. But the problem is the developers, having got a taste for functional programming, as I pointed out earlier, are going to want more, and going to want more fast. Even if Java could become a functional language, I don't think it will keep up with the movement of Java developers to functional programming.

So I'm going to make a prediction. Java 8 Lambdas will be eagerly adopted. So eagerly that Java itself will be left behind, and the majority of Java developers will move on to a language that meets their needs as eager budding new functional programmers.

Which language?

Before I speculate on which language Java developers will move to, let me first just qualify that I am both biased and ignorant. I work for Typesafe, and so am obviously biased towards Scala. And apart from playing with Haskell and ML at university, I have never used any other functional language in anger. So take my words with a grain of salt, and if you disagree, write your own blog post about it.

Scala as a transitionary language

So firstly, I think Scala makes a great transitionary language for imperative programmers to switch to functional programming. Having got a taste for functional programming with Java 8 Lambdas, a Java developer will find themselves very comfortable in Scala. They can still do everything in the same way they used to, they have vars and mutable collections, they have all the standard Java libraries at their finger tips. And of course, they can start deepening their knowledge in functional programming. So Scala provides a smooth transition from imperative programming to functional programming, you can adopt functional programming as quickly or as slowly as you like.

Scala as the destination language

Having transitioned to functional programming, will developers stay at Scala, or will they move on, like they moved on from Java, in search of a more pure language? My opinion here is no. Broadly speaking, I see two camps in the functional programming community. The first camp sees functional programming as a set of laws that must be followed. For this camp, Scala has a lot of things that are not necessary and/or dangerous, and they would probably not see Scala as the end destination.

The second camp sees functional programming as a powerful tool that should be widely exploited, but not a set of laws that must be followed. This is where I stand, and Scala fills the needs of this camp very well. Functional programming has first class support in Scala, but you can always fall back to imperative when it makes sense. I suspect that the majority of the Java community would be inclined to join this camp, otherwise, they would already be shunning Java and writing Haskell.

So I think Java 8 Lambdas are going to be very good for Scala, in that they give Java developers a taste for what Scala will do for them, and thus funnel the masses into Scala development.

Understanding the Play Filter API

With Play 2.1 hot off the press, there have been a lot of people asking about the new Play filter API. In actual fact, the API is incredibly simple:

trait EssentialFilter {
  def apply(next: EssentialAction): EssentialAction
}

Essentially, a filter is just a function that takes an action and returns another action. The usual thing that would be done by the filter is wrap the action, invoking it as a delegate. To then add a filter to your application, you just add it to your Global doFilter method. We provide a helper class to do that for you:

object Global extends WithFilters(MyFilter) {
  ...
}

Easy right? Wrap the action, register it in global. Well, it is easy, but only if you understand Plays architecture. This is very important, because once you understand Play's architecture, you will be able to do far more with Play. We have some documentation here that explains Plays architecture at a high level. In this blog post, I'm going to explain Play's architecture in the context of filters, with code snippets and use cases along the way.

A short introduction to Plays architecture

I don't need to go in depth here because I've already provided a link to our architecture documentation, but in short Play's architecture matches the flow of an HTTP request very well.

The first thing that arrives when an HTTP request is made is the request header. So an action in Play therefore must be a function that accepts a request header.

What happens next in an HTTP request? The body is received. So, the function that receives the request must return something that consumes the body. This is an iteratee, which is a reactive stream handler, that eventually produces a single result after consuming the stream. You don't necessarily need to understand the details about how iteratees work in order to understand filters, the important thing to understand is that iteratees eventually produce a result that you can map, just like a future, using their map function. For details on writing iteratees, read my blog post.

The next thing that happens in an HTTP request is that the http response must be sent. So what is the result that of the iteratee? An HTTP response. And an HTTP response is a set of response headers, followed by a response body. The response body is an enumerator, which is a reactive stream producer.

All of this is captured in Plays EssentialAction trait:

trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])

This reads that an essential action is a function that takes a request header and returns an iteratee that consumes the byte array body chunks and eventually produces a result.

The simpler way

Before I go on, I'd like to point out that Play provides a helper trait called Filter that makes writing filters easier than when using EssentialFilter. This is similar to the Action trait, in that Action simplifies writing EssentialAction's by not needing to worry about iteratees and how the body is parsed, rather you just provide a function that takes a request with a parsed body, and return a result. The Filter trait simplifies things in a similar way, however I'm going to leave talking about that until the end, because I think it is better to understand how filters work from the bottom up before you start using the helper class.

The noop filter

To demonstrate what a filter looks like, the first thing I will show is a noop filter:

class NoopFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      next(request)
    }
  }
}

Each time the filter is executed, we create a new EssentialAction that wraps it. Since EssentialAction is just a function, we can just invoke it, passing the passed in request. So the above is our basic pattern for implementing an EssentialFilter.

Handling the request header

Let's say we want to look at the request header, and conditionally invoke the wrapped action based on what we inspect. An example of a filter that would do that might be a blanket security policy for the /admin area of your website. This might look like this:

class AdminFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      if (request.path.startsWith("/admin") && request.session.get("user").isEmpty) {
        Iteratee.ignore[Array[Byte]].map(_ => Results.Forbidden())
      } else {
        next(request)
      }
    }
  }
}

You can see here that since we are intercepting the action before the body has been parsed, we still need to provide a body parser when we block the action. In this case we are returning a body parser that will simply ignore the whole body, and mapping it to have a result of forbidden.

Handling the body

In some cases, you might want to do something with the body in your filter. In some cases, you might want to parse the body. If this is the case, consider using action composition instead, because that makes it possible to hook in to the action processing after the action has parsed the body. If you want to parse the body at the filter level, then you'll have to buffer it, parse it, and then stream it again for the action to parse again.

However there are some things that can be easily be done at the filter level. One example is gzip decompression. Play framework already provides gzip decompression out of the box, but if it didn't this is what it might look like (using the gunzip enumeratee from my play extra iteratees project):

class GunzipFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      if (request.headers.get("Content-Encoding").exists(_ == "gzip")) {
        Gzip.gunzip() &>> next(request)
      } else {
        next(request)
      }
    }
  }
}

Here using iteratee composition we are wrapping the body parser iteratee in a gunzip enumeratee.

Handling the response headers

When you're filtering you will often want to do something to the response that is being sent. If you just want to add a header, or add something to the session, or do any write operation on the response, without actually reading it, then this is quite simple. For example, let's say you wanted to add a custom header to every response:

class SosFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      next(request).map(result => 
        result.withHeaders("X-Sos-Message" -> "I'm trapped inside Play Framework please send help"))
    }
  }
}

Using the map function on the iteratee that handles the body, we are given access to the result produced by the action, which we can then modify as demonstrated.

If however you want to read the result, then you'll need to unwrap it. Play results are either AsyncResult or PlainResult. An AsyncResult is a Result that contains a Future[Result]. It has a transform method that allows the eventual PlainResult to be transformed. A PlainResult has a header and a body.

So let's say you want to add a timestamp to every newly created session to record when it was created. This could be done like this:

class SessionTimestampFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {

      def addTimestamp(result: PlainResult): Result = {
        val session = Session.decodeFromCookie(Cookies(result.header.headers.get(HeaderNames.COOKIE)).get(Session.COOKIE_NAME))
        if (!session.isEmpty) {
          result.withSession(session + ("timestamp" -> System.currentTimeMillis.toString))
        } else {
          result
        }
      }

      next(request).map {
        case plain: PlainResult => addTimestamp(plain)
        case async: AsyncResult => async.transform(addTimestamp)
      }
    }
  }
}

Handling the response body

The final thing you might want to do is transform the response body. PlainResult has two implementations, SimpleResult, which is for bodies with no transfer encoding, and ChunkedResult, for bodies with chunked transfer encoding. SimpleResult contains an enumerator, and ChunkedResult contains a function that accepts an iteratee to write the result out to.

An example of something you might want to do is implement a gzip filter. A very naive implementation (as in, do not use this, instead use my complete implementation from my play extra iteratees project) might look like this:

class GzipFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {

      def gzipResult(result: PlainResult): Result = result match {
        case simple @ SimpleResult(header, content) => SimpleResult(header.copy(
          headers = (header.headers - "Content-Length") + ("Content-Encoding" -> "gzip")
        ), content &> Enumeratee.map(a => simple.writeable.transform(a)) &> Gzip.gzip())
      }

      next(request).map {
        case plain: PlainResult => gzipResult(plain)
        case async: AsyncResult => async.transform(gzipResult)
      }
    }
  }
}

Using the simpler API

Now you've seen how you can achieve everything using the base EssentialFilter API, and hopefully therefore you understand how filters fit into Play's architecture and how you can utilise them to achieve your requirements. Let's now have a look at the simpler API:

trait Filter extends EssentialFilter {
  def apply(f: RequestHeader => Result)(rh: RequestHeader): Result
  def apply(next: EssentialAction): EssentialAction = {
    ...
  }
}

object Filter {
  def apply(filter: (RequestHeader => Result, RequestHeader) => Result): Filter = new Filter {
    def apply(f: RequestHeader => Result)(rh: RequestHeader): Result = filter(f,rh)
  }
}

Simply put, this API allows you to write filters without having to worry about body parsers. It makes it look like actions are just functions of request headers to results. This limits the full power of what you can do with filters, but for many use cases, you simply don't need this power, so using this API provides a simple alternative.

To demonstrate, a noop filter class looks like this:

class NoopFilter extends Filter {
  def apply(f: (RequestHeader) => Result)(rh: RequestHeader) = {
    f(rh)
  }
}

Or, using the Filter companion object:

val noopFilter = Filter { (next, req) =>
  next(req)
}

And a request timing filter might look like this:

val timingFilter = Filter { (next, req) =>
  val start = System.currentTimeMillis

  def logTime(result: PlainResult): Result = {
    Logger.info("Request took " + (System.currentTimeMillis - start))
    result
  }

  next(req) match {
    case plain: PlainResult => logTime(plain)
    case async: AsyncResult => async.transform(logTime)
  }
}

Iteratees for imperative programmers

When I first heard the word iteratee, I thought it was a joke. Turns out, it wasn't a joke, in fact there are also enumerators (that's ok) and enumeratees (you're killing me). If you're an imperative programmer, or rather a programmer who feels more comfortable writing imperative code than functional code, then you may be a little overwhelmed by all the introductions to iteratees out there, because they all assume that you think from a functional perspective. Well I just learnt iteratees, and although I'm feeling more and more comfortable with functional programming every day, I still think like an imperative programmer at heart. This made learning iteratees very difficult for me. So while I'm still in the imperative mindset, I thought this a very good opportunity to explain iteratees from an imperative programmers perspective, taking no functional knowledge for granted. If you're an imperative programmer who wants to learn iteratees, this is the blog post for you. I'm going to specifically be looking at Play's Iteratee API, but the concepts learnt here will apply to all Iteratees in general.

So let's start off with explaining what iteratees, and their counterparts, are trying to achieve. An iteratee is a method of reactively handling streams of data that is very easily composable. By reactive, I mean non blocking, ie you react to data being available to read, and react to the opportunity to write data. By composable, I mean you write simple iteratees that do one small thing well, then you use those as the building blocks to write iteratees that do bigger things, and you use those as the building blocks to write iteratees to do even bigger things, and so on. At each stage, everything is simple and easy to reason about.

Reactive stream handling

If you're looking for information about iteratees, then I'm guessing you already know a bit about what reactive stream handling is. Let's contrast it to synchronous IO code:

trait InputStream {
  def read(): Byte
}

So this should be very familiar, if you want to read a byte, you call read. If no byte is currently available to be read, that call will block, and your thread will wait until a byte is available. With reactive streams, obviously it's the other way around, you pass a callback to the stream you want to receive data from, and it will call that when it's ready to give data to you. So typically you might implement a trait that looks like this:

trait InputStreamHandler {
  def onByte(byte: Byte)
}

So before we go on, let's look at how the same thing would be achieved in a pure functional world. At this point I don't want you to ask why we want to do things this way, you will see that later on, but if you know anything about functional programming, you know that everything tends to be immutable, and functions have no side effects. The trait above has to have side effects, because unless you are ignoring the bytes passed to onByte, you must be changing your state (or something elses state) somehow in that function. So, how do we handle data without changing our state? The answer is the same way other immutable data structures work, we return a copy of ourselves, updated with the new state. So if the InputStreamHandler were to be functional, it might look like this:

trait InputStreamHandler {
  def onByte(byte: Byte): InputStreamHandler
}

And an example implementation of one, that reads input into a seq, might look like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler {
  def onByte(byte: Byte) = new Consume(data :+ byte)
}

So we now have imperative and functional traits that react to our input stream, and you might be thinking this is all there is to reactive streams. If that's the case, you're wrong. What if we're not ready to handle data when the onByte method is called? If we're building structures in memory this will never be the case, but if for example we're storing them to a file or to a database as we receive the data, then this very likely will be the case. So reactive streams are two way, it's not just you, the stream consumer that is reacting to input, the stream producer must react to you being ready for input.

Now this is possible to implement in an imperative world, though things do start looking much more functional. We simply start using futures:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[Unit]
}

So, when the stream we are consuming has a byte for us, it calls onByte, and then attaches a callback to the future we return, to pass the next byte, when it's ready. If you have a look at Netty's asynchronous channel APIs, you'll see it uses exactly this pattern. We can also implement something similar for an immutable functional API:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[InputStreamHandler]
}

And so here we have a functional solution for reactive stream handling. But it's not a very good one, for a start, there's no way for the handlers to communicate to the code that uses them that they don't want to receive any more input, or if they've encountered an error (exceptions are frowned upon in functional programming). We could add things to handle this, but very soon our interface would become quite complex, hard to break up into small pieces that can be composed, etc. I'm not going to justify this now, I think you'll see it later when I show you just how easy iteratees are to compose.

So, by this stage I hope you have understood two important points. Firstly, reactive stream handling means twofold reacting, both your code has to react to the stream being ready, and the stream has to react to you being ready. Secondly, when I say that we want a functional solution, I mean a solution where everything is immutable, and that is achieved by our stream handlers producing copies of themselves each time they receive/send data. If you've understood those two important points, then now we can move on to introducing iteratees.

Iteratees

There are a few things that our interface hasn't yet addressed. The first is, how does the stream communicate to us that it is finished, that is, that it has no more data for us? To do this, instead of passing in a byte directly, we're going to abstract our byte to be something of type Input[Byte], and that type can have three possible implementations, EOF, an element, or empty. Let's not worry about why we need empty just yet, but assume for some reason we might want to pass empty. So this is what Input looks like:

sealed trait Input[+E]

object Input {
  case object EOF extends Input[Nothing]
  case object Empty extends Input[Nothing]
  case class El[+E](e: E) extends Input[E]
}

Updating our InputStreamHandler, we now get something that looks like this:

trait InputStreamHandler[E] {
  def onInput(in: Input[E]): Future[InputStreamHandler[E]]
}

Now updating our Consumer from before to handle this, it might look like this:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler[Byte] {
  def onInput(in: Input[Byte]) = in match {
    case El(byte) => Future.successful(new Consume(data :+ byte))
    case _ => Future.successful(this)
  }
}

You can see that when we get EOF or Empty, there's nothing for us to do to change our state, so we just return ourselves again. If we were writing to another stream, we might, when we receive EOF, close that stream (or rather, send it an EOF).

The next thing we're going to do is make it easier for our handler to consume input immediately without having to create a future. To do this, rather than passing the byte directly, we'll pass a function, that takes a function as a parameter, and that function will take the byte as a parameter. So, our handler, when it's ready, will create a function to handle the byte, and then invoke the function that was passed to it, with that function. We'll call the first function the cont function, which is short for continue, and means when you're ready to continue receiving input invoke me. Too many functions? Let's look at the code:

trait InputStreamHandler[E] {
  def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B]
}

Now where did this Future[B] come from? B is just the mechanism that the stream uses to pass state back to itself. As the handler, we don't have to worry about what it is, we just have to make sure that we eventually invoke the cont function, and eventually make sure that the B it returns makes it back to our caller. And what does this look like in our Consume iteratee? Let's have a look:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler {
  def onByte(cont: (Input[Byte] => InputStreamHandler) => Future[B]) = cont {
    case Input.El(byte) => new Consume(data :+ byte)
    case _ => this
  }
}

You can see in our simple case of being ready to handle input immediately, we just immediately invoke cont, we no longer need to worry about creating futures. If we want to handle the input asynchronously, it is a little more complex, but we'll take a look at that later.

Now we have one final step in producing our iteratee API. How does the handler communicate back to the stream that it is finished receiving data? There could be two reasons for this, one is that it's finished receiving data. For example, if our handler is a JSON parser, it might have reached the end of the object it was parsing, and so doesn't want to receive anymore. The other reason is that it's encountered an error, for a JSON parser, this might be a syntax error, or if it's sending data through to another stream, it might be an IO error on that stream.

To allow our iteratee to communicate with the stream, we're going to create a trait that represents its state. We'll call this trait Step, and the three states that the iteratee can be in will be Cont, Done and Error. Our Cont state is going to contain our Input[Byte] => InputStreamHandler function, so that the stream can invoke it. Our Done state will contain our result (in the case of Consume, a Seq[Byte]) and our Error state will contain an error message.

In addition to this, both our Done and Error states need to contain the left over input that they didn't consume. This will be important for when we are composing iteratees together, so that once one iteratee has finished consuming input from a stream, the next can pick up where the first left off. This is one reason why we need Input.Empty, because if we did consume all the input, then we need some way to indicate that.

So, here's our Step trait:

sealed trait Step[E, +A]

object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => InputStreamHandler[E, A]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}

The type parameter E is the type of input our iteratee wants to accept, and A is what it's producing. So our handler trait now looks like this:

trait InputStreamHandler[E, A] {
  def onInput[B](step: Step[E, A] => Future[B]): Future[B]
}

And our consumer is implemented like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte, Seq[Byte]] {
  def onInput(step: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Cont({
    case Input.El(byte) => new Consume(data :+ byte)
    case Input.EOF => new InputStreamHandler[Byte, Seq[Byte]] {
      def onInput(cont: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Done(data, Input.Empty))
    }       
    case Input.Empty => this
  }))
}

One big difference here that you now notice is when we receive EOF, we actually pass Done into the step function, to say we are done consuming the input.

And so now we've built our iteratee interface. Our naming isn't quite right though, so we'll rename the trait obviously to Iteratee, and we'll rename onInput to fold, since we are folding our state into one result. And so now we get our interface:

trait Iteratee[E, +A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B]
}

Iteratees in practice

So far we've started with the requirements of a traditional imperative input stream, and described what an iteratee is in constrast to that. But looking at the above code, you might think that using them is really difficult. They seem like they are far more complex than they need to be, at least conceptually, to implement reactive streams. Well, it turns out that although so far we've shown the basics of the iteratee interface, there is a lot more that a full iteratee API has to offer, and once we start understanding this, and using it, you will start to see how powerful, simple and useful iteratees are.

So remember how iteratees are immutable? And remember how iteratees can be in one of three states, cont, done and error, and depending on which state it's in, it will pass its corresponding step class to the folder function? Well, if an iteratee is immutable and it can be in one of three states, then it can only ever be in that state that it's in, and therefore it will only ever pass that corresponding step to the folder function. If an iteratee is done, it's done, it doesn't matter how many times you call its fold function, it will never become cont or error, and its done value will never change, it will only ever pass the Done step to the folder function with the same A value and the same left over input. Because of this, there is only one implementation of a done iteratee that we'll ever need, it looks like this:

case class Done[E, A](a: A, e: Input[E] = Input.Empty) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Done(a, e))
}

This is the only done iteratee you'll ever need to indicate that you're done. In the Consume iteratee above, when we reached EOF, we created a done iteratee using an anonymous inner class, we didn't need to do this, we could have just used the Done iteratee above. The exact same thing holds for error iteratees:

case class Error[E](msg: String, e: Input[E]) extends Iteratee[E, Nothing] {
  def fold[B](folder: Step[E, Nothing] => Future[B]): Future[B] = folder(Step.Error(msg, e))
}

You may be surprised to find out the exact same thing applies to cont iteratees too - a cont iteratee just passes a function the folder, and that function, because the iteratee is immutable, is never going to change. So consequently, the following iteratee will usually be good enough for your requirements:

case class Cont[E, A](k: Input[E] => Iteratee[E, A]) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Cont(k))
}

So let's rewrite our consume iteratee to use these helper classes:

def consume(data: Array[Byte]): Iteratee[Byte, Array[Byte]] = Cont {
  case Input.El(byte) => consume(data :+ byte)
  case Input.EOF => Done(data)
  case Input.Empty => consume(data)
}

A CSV parser

Now we're looking a lot simpler, our code is focussed on just handling the different types of input we could receive, and returning the correct result. So let's start writing some different iteratees. In fact, let's write an iteratee that parses a CSV file from a stream of characters. Our CSV parser will support optionally quoting fields, and escaping quotes with a double quote.

Our first step will be to write the building blocks of our parser. First up, we want to write something that skips some kinds of white space. So let's write a general purpose drop while iteratee:

def dropWhile(p: Char => Boolean): Iteratee[Char, Unit] = Cont {
  case in @ Input.El(char) if !p(char) => Done(Unit, in)
  case in @ Input.EOF => Done(Unit, in)
  case _ => dropWhile(p)
}

Since we're just dropping input, our result is actually Unit. We return Done if the predicate doesn't match the current char, or if we reach EOF, and otherwise, we return ourselves again. Note that when we are done, we include the input that was passed into us as the remaining data, because this is going to be needed to be consumed by the next iteratee. Using this iteratee we can now write an iteratee that drops white space:

def dropSpaces = dropWhile(c => c == ' ' || c == '\t' || c == '\r')

Next up, we're going to write a take while iteratee, it's going to be a mixture between our earlier consume iteratee, carrying state between each invocation, and the drop while iteratee:

def takeWhile(p: Char => Boolean, data: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, Seq[Char]] = Cont {
  case in @ Input.El(char) => if (p(char)) {
    takeWhile(p, data :+ char)
  } else {
    Done(data, in)
  }
  case in @ Input.EOF => Done(data, in)
  case _ => takeWhile(p, data)
}

We also want to write a peek iteratee, that looks at what the next input is, without actually consuming it:

def peek: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char), in)
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => peek
}

Note that our peek iteratee must return an option, since if it encounters EOF, it can't return anything.

And finally, we want a take one iteratee:

def takeOne: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char))
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => takeOne
}

Using the take one iteratee, we'll build an expect iteratee, that mandates that a certain character must appear next otherwise it throws an error:

def expect(char: Char): Iteratee[Char, Unit] = takeOne.flatMap {
  case Some(c) if c == char => Done(Unit)
  case Some(c) => Error("Expected " + char + " but got " + c, Input.El(c))
  case None => Error("Premature end of input, expected: " + char, Input.EOF)
}

Notice the use of flatMap here. If you haven't come across it before, in the asynchronous world, flatMap basically means "and then". It applies a function to the result of the iteratee, and returns a new iteratee. In our case we're using it to convert the result to either a done iteratee, or an error iteratee, depending on whether the result is what we expected. flatMap is one of the fundamental mechanisms that we'll be using to compose our iteratees together.

Now with our building blocks, we are ready to start building our CSV parser. The first part of it that we'll write is an unquoted value parser. This is very simple, we just want to take all characters that aren't a comma or new line, with one catch. We want the result to be a String, not a Seq[Char] like takeWhile produces. Let's see how we do that:

def unquoted = takeWhile(c => c != ',' && c != '\n').map(v => v.mkString.trim)

As you can see, we've used the map function to transform the end result from a sequence of characters into a String. This is another key method on iteratees that you will find useful.

Our next task is to parse a quoted value. Let's start with an implementation that doesn't take into account escaped quotes. To parse a quoted value, we need to expect a quote, and then we need to take any value that is not a quote, and then we need to expect a quote. Notice that during that sentence I said "and then" 2 times? What method can we use to do an "and then"? That's right, the flatMap method that I talked about before. Let's see what our quoted value parser looks like:

def quoted = expect('"')
  .flatMap(_ => takeWhile(_ != '"'))
  .flatMap(value => expect('"')
    .map(_ => value.mkString))

So now you can probably start to see the usefulness of flatMap. In fact it is so useful, not just for iteratees, but many other things, that Scala has a special syntax for it, called for comprehensions. Let's rewrite the above iteratee using that:

def quoted = for {
  _     <- expect('"')
  value <- takeWhile(_ != '"')
  _     <- expect('"')
} yield value.mkString

Now at this point I hope you are getting excited. What does the above code look like? It looks like ordinary imperative synchronous code. Read this value, then read this value, then read this value. Except it's not synchronous, and it's not imperative. It's functional and asynchronous. We've taken our building blocks, and composed them into a piece of very readable code that makes it completely clear exactly what we are doing.

Now in case you're not 100% sure about the above syntax, the values to the left of the <- signs are the results of the iteratees to the right. These are able to be used anywhere in any subsequent lines, including in the end yield statement. Underscores are used to say we're not interested in the value, we're using this for the expect iteratee since that just returns Unit anyway. The statement after the yield is a map function, which gives us the opportunity to take all the intermediate values and turn them into a single result.

So now that we understand that, let's rewrite our quoted iteratee to support escaped quotes. After reading our quote, we want to peek at the next character. If it's a quote, then we want to append the value we just read, plus a quote to our cumulated value, and recursively invoke the quoted iteratee again. Otherwise, we've reached the end of the value.

def quoted(value: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, String] = for {
  _          <- expect('"')
  maybeValue <- takeWhile(_ != '"')
  _          <- expect('"')
  nextChar   <- peek
  value      <- nextChar match {
    case Some('"') => quoted(value ++ maybeValue :+ '"')
    case _ => Done[Char, String]((value ++ maybeValue).mkString)
  }
} yield value

Now we need to write an iteratee that can parse either a quoted or unquoted value. We choose which one by peeking at the first character, and then accordingly returning the right iteratee.

def value = for {
  char  <- peek
  value <- char match {
    case Some('"') => quoted()
    case None => Error[Char]("Premature end of input, expected a value", Input.EOF)
    case _ => unquoted
  }
} yield value

Let's now parse an entire line, reading until the end of line character.

def values(state: Seq[String] = IndexedSeq[String]()): Iteratee[Char, Seq[String]] = for {
  _        <- dropSpaces
  value    <- value
  _        <- dropSpaces
  nextChar <- takeOne
  values   <- nextChar match {
    case Some('\n') | None => Done[Char, Seq[String]](state :+ value)
    case Some(',') => values(state :+ value)
    case Some(other) => Error("Expected comma, newline or EOF, but found " + other, Input.El(other))
  }
} yield values

Enumeratees

Now, in a similar way to how we parse the values, we could also parse each line of a CSV file until we reach EOF. But this time we're going to do something a little different. We've seen how we can sequence iteratees using flatMap, but there are further possibilities for composing iteratees. Another concept in iteratees is enumeratees. Enumeratees adapt a stream to be consumed by an iteratee. The simplest enumeratees simply map the input values of the stream to be something else. So, for example, here's an enumeratee that converts a stream of strings to a stream of ints:

def toInt: Enumeratee[String,Int] = Enumeratee.map[String](_.toInt)

One of the methods on Enumeratee is transform. We can use this method to apply an enumeratee to an iteratee:

val someIteratee: Iteratee[Int, X] = ...
val adaptedIteratee: Iteratee[String, X] = toInt.transform(someIteratee)

This method is also aliased to an operator, &>>, and so this code below is equivalent to the code above:

val adaptedIteratee: Iteratee[String, X] = toInt &>> someIteratee

We can also make an enumeratee out of another iteratee, and this is exactly what we're going to do with our values iteratee. The Enumeratee.grouped method takes an iteratee and applies it to the stream over and over, the result of each application being an input to feed into the the iteratee that will be transformed. Let's have a look:

def csv = Enumeratee.grouped(values())

Now let's get a little bit more creative with enumeratees. Let's say that our CSV file is very big, so we don't want to load it into memory. Each line is a series of 3 integer columns, and we want to sum each column. So, let's define an enumeratee that converts each set of values to integers:

def toInts = Enumeratee.map[Seq[String]](_.map(_.toInt))

And another enumeratee to convert the sequence to a 3-tuple:

def toThreeTuple = Enumeratee.map[Seq[Int]](s => (s(0), s(1), s(2)))

And finally an iteratee to sum the them:

def sumThreeTuple(a: Int = 0, b: Int = 0, c: Int = 0): Iteratee[(Int, Int, Int), (Int, Int, Int)] = Cont {
  case Input.El((x, y, z)) => sumThreeTuple(a + x, b + y, c + z)
  case Input.Empty => sumThreeTuple(a, b, c)
  case in @ Input.EOF => Done((a, b, c), in)
}

Now to put them all together. There is another method on enumeratee called compose, which, you guessed it, let's you compose enumeratees. This has an alias operator, ><>. Let's use it:

val processCsvFile = csv ><> toInts ><> toThreeTuple &>> sumThreeTuple()

Enumerators

Finally, if an iteratee consumes a stream, what produces a stream? The answer is an enumerator. An enumerator can be applied to an iteratee using its apply method, which is also aliased to >>>. This will leave the iteratee in a cont state, ready to receive more input. If however the enumerator contains the entirety of the stream, then the run method can be used instead which will send the iteratee an EOF once it's finished. This is aliased to |>>>.

The Play enumerator API makes it easy to create an enumerator by passing a sequence of inputs to the Enumerator companion objects apply method. So, we can create an enumerator of characters using the following code:

val csvFile = Enumerator(
  """1,2,3
    |4,5,6""".stripMargin.toCharArray:_*)

And we can feed this into our iteratee like so:

val result = csvFile |>>> processCsvFile

And our result in this case will be a future that is eventually redeemed with (5, 7, 9).

Conclusion

Well, it's been a long journey, but hopefully if you're an imperative programmer, you not only understand iteratees, you understand the reasoning behind their design, and how easily they compose. I also hope you have a better understanding of both functional and asynchronous programming in general. The functional mindset is quite different to the imperative mindset, and I'm still getting my head around it, but particularly after seeing how nice and simple iteratees can be to work with (once you understand them), I'm becoming convinced that functional programming is the way to go.

If you are interested in downloading the code from this blog post, or if you want to see a more complex JSON parsing iteratee/enumeratee, checkout this GitHub project, which has a few examples, including parsing byte/character streams in array chunks, rather than one at a time.

About

Hi! My name is James Roper, and I am a software developer with a particular interest in open source development and trying new things. I program in Scala, Java, Go, PHP, Python and Javascript, and I work for Lightbend as the architect of Kalix. I also have a full life outside the world of IT, enjoy playing a variety of musical instruments and sports, and currently I live in Canberra.