all that jazz

james' blog about scala and all that jazz

Call response WebSockets in Play Framework

I got a question from a Play user about implementing call/response WebSockets in Play Framework. This is not something that comes up that often, since it means using WebSockets to do basically what AJAX does for you, so what's the point? But here are some use cases that I've thought of:

  • You have some transformation of a stream that can only be done on the server side. For example, perhaps the transformation requires some heavy database work, or is too computationally expensive for a mobile client, or perhaps you want to encrypt the stream with a key that is private to the server.
  • You are already processing a stream of events from the server using WebSockets, and the responses to the calls are just more events in this stream, so you'd like to share the same transport mechanism for these events.
  • Your application is particularly chatty, and you don't want the overhead of the HTTP protocol on each call/response.

There are possibly more use cases - WebSockets is quite a new technology and as an industry we haven't really settled on what it's best use cases are.

A simple echo implementation

A Play WebSocket is implemented by providing an iteratee that consumes messages from the client, and an enumerator that produces messages for the client. If we simply wanted to echo every message that the client sent us, then we would want to return an iteratee whose input becomes the output of the enumerator that we return. Play doesn't come with anything out of the box to do this, but we will probably add something out of the box that does this in a future release. For now, I'm going to write a method called joined, that returns a joined iteratee/enumerator pair:

/**
 * Create a joined iteratee enumerator pair.
 *
 * When the enumerator is applied to an iteratee, the iteratee subsequently consumes whatever the iteratee in the pair
 * is applied to.  Consequently the enumerator is "one shot", applying it to subsequent iteratees will throw an
 * exception.
 */
def joined[A]: (Iteratee[A, Unit], Enumerator[A]) = {
  val promisedIteratee = Promise[Iteratee[A, Unit]]()
  val enumerator = new Enumerator[A] {
    def apply[B](i: Iteratee[A, B]) = {
      val doneIteratee = Promise[Iteratee[A, B]]()

      // Equivalent to map, but allows us to handle failures
      def wrap(delegate: Iteratee[A, B]): Iteratee[A, B] = new Iteratee[A, B] {
        def fold[C](folder: (Step[A, B]) => Future[C]) = {
          val toReturn = delegate.fold {
            case done @ Step.Done(a, in) => {
              doneIteratee.success(done.it)
              folder(done)
            }
            case Step.Cont(k) => {
              folder(Step.Cont(k.andThen(wrap)))
            }
            case err => folder(err)
          }
          toReturn.onFailure {
            case e => doneIteratee.failure(e)
          }
          toReturn
        }
      }

      if (promisedIteratee.trySuccess(wrap(i).map(_ => ()))) {
        doneIteratee.future
      } else {
        throw new IllegalStateException("Joined enumerator may only be applied once")
      }
    }
  }
  (Iteratee.flatten(promisedIteratee.future), enumerator)
}

This code might be a little scary if you don't understand iteratees, but as I said we will probably add this to Play itself in future. The rest of the code in this blog post will be simple.

Now that we have our joined iteratee/enumerator, let's implement an echo WebSocket. For the rest of this post we'll be assuming that all our WebSockets are sending/receiving JSON messages.

def echo = WebSocket.using[JsValue] { req =>
  joined[JsValue]
}

So now we have an echo call/response WebSocket. But this is not very useful, we want to do something with the incoming messages, and producing new outgoing messages as responses.

Processing messages

So now that we've expressed our call/response in terms of a joined iteratee/enumerator, how can we transform the call messages to be different response messages? The answer is enumeratees. Enumeratees can be used to transform iteratees and enumerators. We return both an enumerator and an iteratee, so which one do we transform? The answer is it doesn't matter, I'm going to use it to transform the iteratee. The enumeratee that we're going to use is the map enumeratee:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.map[JsValue] { json =>
    Json.obj(
      "status" -> "received",
      "msg" -> json
    )
  } &> iter, enum)
}

Enumeratees are one of the most powerful features of iteratees for end users. You could use any enumeratee here, but let's look at some examples of other common use cases.

What if we don't want to return a response to every message? There are numerous ways to do this, but the simplest is to use the collect enumeratee, which takes a partial function:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.collect[JsValue] { 
    case json if (json \ "foo").asOpt[JsValue].isDefined =>
      Json.obj(
        "status" -> "received",
        "msg" -> json
      )
  } &> iter, enum)
}

Perhaps we want to produce many responses for a single input. The mapConcat enumeratee can be used in this case, with our map function returning a sequence of JsValue messages to return:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.mapConcat[JsValue] { json =>
    Seq(
      Json.obj(
        "status" -> "received",
        "msg" -> json
      ),
      Json.obj("foo" -> "bar")
    )
  } &> iter, enum)
}

What if we want to do some blocking operations? In Play 2.2, this will be able to be done simply by providing an execution context suitable for blocking calls to whichever enumeratee you decide to use, but Play 2.1 does not yet support this, so we have to dispatch the callback to another execution context ourselves. This can be done using the mapM enumeratee:

val ec: ExecutionContext = ...

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.mapM[JsValue] { json =>
    Future {
      // Some expensive computation, eg a database call, that returns JsValue
    }(ec)
  } &> iter, enum)
}

Pushing from an external enumerator

You may want to combine your call/response messages with messages from some other enumerator that spontaneously pushes messages to the client, for example a broadcasting enumerator for all clients. This can be done by interleaving your joined enumerator with the external enumerator:

val globalEvents: Enumerator[JsValue] = ...

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.map[JsValue] { json =>
    ...
  } &> iter, Enumerator.interleave(enum, globalEvents))
}

Conclusion

Using WebSockets in a call response style may be something that your application needs. If so, using enumeratees to map the stream of messages coming in to messages going out is the most natural and idiomatic way of doing this in Play. It allows you to call on the large number of composable enumeratees that Play provides out of the box, and makes your code simple and easy to reason about.

Advanced routing in Play Framework

We frequently get questions about how to meet all sorts of different routing needs in Play Framework. While the built in router is enough for most users, sometimes you may encounter use cases where it's not enough. Or, maybe you want a more convenient way to implement some routing pattern. Whatever it is, Play will allow you to do pretty much anything. This blog post is going to describe some common use cases.

Hooking into Plays routing mechanism

If for some reason you don't like Plays router, or if you want to use a modified router, then Play allows you to do this easily. Global.onRouteRequest is the method that is invoked to do routing. By default, this delegates to the Play router, but you can override it to do whatever you want. For example:

override def onRouteRequest(req: RequestHeader): Option[Handler] = {
  (req.method, req.path) match {
    case ("GET", "/") => Some(controllers.Application.index)
    case ("POST", "/submit") => Some(controllers.Application.submit)
    case _ => None
  }
}

As you can see, I've practically implemented my own little routing DSL here. I could also delegate back to the default router by invoking super.onRouteRequest(req).

An interesting thing that could also be done is to delegate to different routers based on something in the request. A play router compiles to an instance of Router.Routes, and it will be an object called Routes itself. By default, any file with the .routes extension in the conf directory will by compiled, and will go in the package with the same name as the filename, minus the .routes. So if I had two routers, foo.routes and bar.routes, I could implemented a crude form of virtual hosting like so:

override def onRouteRequest(req: RequestHeader): Option[Handler] = {
  if (req.host == "foo.example.com") {
    foo.Routes.routes.lift(req)
  } else if (req.host == "bar.example.com") {
    bar.Routes.routes.lift(req)
  } else {
    super.onRouteRequest(req)
  }
}

So here are some use cases that overriding onRouteRequest may be useful for:

  • Modifying the request in some way before routing is done
  • Plugging in a completely different router (eg, jaxrs)
  • Delegating to different routes files based on some aspect of the request

Implementing a custom router

We saw in the previous example how to use Plays Router.Routes interface, another option is to implement it. Now, there's no real reason to implement it if you're just going to delegate to it directly from onRouteRequest. However, by implementing this interface, you can include it in another routes file, using the sub routes include syntax, which in case you haven't come across this before, typically looks like this:

->    /foo         foo.Routes

Now something that people often criticise Play for is that it doesn't support rails style resource routing, where a convention is used to route commonly needed REST endpoints to particular methods on a controller. Although Play comes with nothing out of the box that does this, it is not hard to implement this today for your project, Play 2.1 has everything you need to support it, by using the routes includes syntax, and implementing your own router. And I have some good news too, we will be introducing a feature like this into Play soon. But until then, and also if you have your own custom conventions that you want to implement, you will probably find these instructions very helpful.

So let's start off with an interface that our controllers can implement:

trait ResourceController[T] extends Controller {
  def index: EssentialAction
  def newScreen: EssentialAction
  def create: EssentialAction
  def show(id: T): EssentialAction
  def edit(id: T): EssentialAction
  def update(id: T): EssentialAction
  def destroy(id: T): EssentialAction
}

I could provide default implementations that return not implemented, but then implementing it would require using override keywords. I think it's a matter of preference here.

Now I'm going to write a router. The router interface looks like this:

trait Routes {
  def routes: PartialFunction[RequestHeader, Handler]
  def documentation: Seq[(String, String, String)]
  def setPrefix(prefix: String)
  def prefix: String
}

The routes method is pretty self explanatory, it is the function that looks up the handler for a request. documentation is used to document the router, it is not mandatory, but it used by at least one REST API documenting tool to discover what routes are available and what they look like. For brevity in this post, we won't worry about implementing it. The prefix and setPrefix methods are used by Play to inject the path of the router. In the routes includes syntax that I showed above, you could see that we declared the router to be on the path /foo. This path is injected using this mechanism.

So we'll write an abstract class that implements the routes interface and the ResourceController interface:

abstract class ResourceRouter[T](implicit idBindable: PathBindable[T]) 
    extends Router.Routes with ResourceController[T] {
  private var path: String = ""
  def setPrefix(prefix: String) {
    path = prefix
  }
  def prefix = path
  def documentation = Nil
  def routes = ...
}

I've given it a PathBindable, this is so that we have a way to convert the id from a String extracted from the path to the type accepted by the methods. PathBindable is the same interface that's used under the covers when in a normal routes file to convert types.

Now for the implementation of routes. First I'm going to create some regular expressions for matching the different paths:

  private val MaybeSlash = "/?".r
  private val NewScreen = "/new/?".r
  private val Id = "/([^/]+)/?".r
  private val Edit = "/([^/]+)/edit/?".r

I'm also going to create a helper function for the routes that require the id to be bound:

def withId(id: String, action: T => EssentialAction) = 
  idBindable.bind("id", id).fold(badRequest, action)

badRequest is actually a method on Router.Routes that takes the error message and turns it into an action that returns that as a result. Now I'm ready to implement the partial function:

def routes = new AbstractPartialFunction[RequestHeader, Handler] {
  override def applyOrElse[A <: RequestHeader, B >: Handler](rh: A, default: A => B) = {
    if (rh.path.startsWith(path)) {
      (rh.method, rh.path.drop(path.length)) match {
        case ("GET", MaybeSlash()) => index
        case ("GET", NewScreen()) => newScreen
        case ("POST", MaybeSlash()) => create
        case ("GET", Id(id)) => withId(id, show)
        case ("GET", Edit(id)) => withId(id, edit)
        case ("PUT", Id(id)) => withId(id, update)
        case ("DELETE", Id(id)) => withId(id, destroy)
        case _ => default(rh)
      }
    } else {
      default(rh)
    }
  }

  def isDefinedAt(rh: RequestHeader) = ...
}

I've implemented AbstractPartialFunction, and the main method to implement then is applyOrElse. The match statement doesn't look much unlike the mini DSL I showed in the first code sample. I'm using regular expressions as extractor objects to extract the ids out of the path. Note that I haven't shown the implementation of isDefinedAt. Play actually won't call this, but it's good to implement it anyway, it's basically the same implementation as applyOrElse, except instead of invoking the corresponding methods, it returns true, or for when nothing matches, it returns false.

And now we're done. So what does using this look like? My controller looks like this:

package controllers

object MyResource extends ResourceRouter[Long] {
  def index = Action {...}
  def create(id: Long) = Action {...}
  ...
  def custom(id: Long) = Action {...}
}

And in my routes file I have this:

->     /myresource              controllers.MyResource
POST   /myresource/:id/custom   controllers.MyResource.custom(id: Long)

You can see I've also shown an example of adding a custom action to the controller, obviously the standard crud actions are not going to be enough, and the nice thing about this is that you can add as many arbitrary routes as you want.

But what if we want to have a managed controller, that is, one whose instantiation is managed by a DI framework? Well let's created another router that does this:

class ManagedResourceRouter[T, R >: ResourceController[T]]
    (implicit idBindable: PathBindable[T], ct: ClassTag[R]) 
    extends ResourceRouter[T] {

  private def invoke(action: R => EssentialAction) = {
    Play.maybeApplication.map { app =>
      action(app.global.getControllerInstance(ct.runtimeClass.asInstanceOf[Class[R]]))
    } getOrElse {
      Action(Results.InternalServerError("No application"))
    }
  }

  def index = invoke(_.index)
  def newScreen = invoke(_.newScreen)
  def create = invoke(_.create)
  def show(id: T) = invoke(_.show(id))
  def edit(id: T) = invoke(_.edit(id))
  def update(id: T) = invoke(_.update(id))
  def destroy(id: T) = invoke(_.destroy(id))
}

This uses the same Global.getControllerInstance method that managed controllers in the regular router use. Now to use this is very simple:

package controllers

class MyResource(dbService: DbService) extends ResourceController[Long] {
  def index = Action {...}
  def create(id: Long) = Action {...}
  ...
  def custom(id: Long) = Action {...}
}
object MyResource extends ManagedResourceRouter[Long, MyResource]

And in the routes file:

->     /myresource              controllers.MyResource
POST   /myresource/:id/custom   @controllers.MyResource.custom(id: Long)

The final thing we need to consider is reverse routing and the Javascript router. Again this is very simple, but I'm not going to go into any details here. Instead, you can check out the final product, which has a few more features, here.

Understanding the Play Filter API

With Play 2.1 hot off the press, there have been a lot of people asking about the new Play filter API. In actual fact, the API is incredibly simple:

trait EssentialFilter {
  def apply(next: EssentialAction): EssentialAction
}

Essentially, a filter is just a function that takes an action and returns another action. The usual thing that would be done by the filter is wrap the action, invoking it as a delegate. To then add a filter to your application, you just add it to your Global doFilter method. We provide a helper class to do that for you:

object Global extends WithFilters(MyFilter) {
  ...
}

Easy right? Wrap the action, register it in global. Well, it is easy, but only if you understand Plays architecture. This is very important, because once you understand Play's architecture, you will be able to do far more with Play. We have some documentation here that explains Plays architecture at a high level. In this blog post, I'm going to explain Play's architecture in the context of filters, with code snippets and use cases along the way.

A short introduction to Plays architecture

I don't need to go in depth here because I've already provided a link to our architecture documentation, but in short Play's architecture matches the flow of an HTTP request very well.

The first thing that arrives when an HTTP request is made is the request header. So an action in Play therefore must be a function that accepts a request header.

What happens next in an HTTP request? The body is received. So, the function that receives the request must return something that consumes the body. This is an iteratee, which is a reactive stream handler, that eventually produces a single result after consuming the stream. You don't necessarily need to understand the details about how iteratees work in order to understand filters, the important thing to understand is that iteratees eventually produce a result that you can map, just like a future, using their map function. For details on writing iteratees, read my blog post.

The next thing that happens in an HTTP request is that the http response must be sent. So what is the result that of the iteratee? An HTTP response. And an HTTP response is a set of response headers, followed by a response body. The response body is an enumerator, which is a reactive stream producer.

All of this is captured in Plays EssentialAction trait:

trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])

This reads that an essential action is a function that takes a request header and returns an iteratee that consumes the byte array body chunks and eventually produces a result.

The simpler way

Before I go on, I'd like to point out that Play provides a helper trait called Filter that makes writing filters easier than when using EssentialFilter. This is similar to the Action trait, in that Action simplifies writing EssentialAction's by not needing to worry about iteratees and how the body is parsed, rather you just provide a function that takes a request with a parsed body, and return a result. The Filter trait simplifies things in a similar way, however I'm going to leave talking about that until the end, because I think it is better to understand how filters work from the bottom up before you start using the helper class.

The noop filter

To demonstrate what a filter looks like, the first thing I will show is a noop filter:

class NoopFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      next(request)
    }
  }
}

Each time the filter is executed, we create a new EssentialAction that wraps it. Since EssentialAction is just a function, we can just invoke it, passing the passed in request. So the above is our basic pattern for implementing an EssentialFilter.

Handling the request header

Let's say we want to look at the request header, and conditionally invoke the wrapped action based on what we inspect. An example of a filter that would do that might be a blanket security policy for the /admin area of your website. This might look like this:

class AdminFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      if (request.path.startsWith("/admin") && request.session.get("user").isEmpty) {
        Iteratee.ignore[Array[Byte]].map(_ => Results.Forbidden())
      } else {
        next(request)
      }
    }
  }
}

You can see here that since we are intercepting the action before the body has been parsed, we still need to provide a body parser when we block the action. In this case we are returning a body parser that will simply ignore the whole body, and mapping it to have a result of forbidden.

Handling the body

In some cases, you might want to do something with the body in your filter. In some cases, you might want to parse the body. If this is the case, consider using action composition instead, because that makes it possible to hook in to the action processing after the action has parsed the body. If you want to parse the body at the filter level, then you'll have to buffer it, parse it, and then stream it again for the action to parse again.

However there are some things that can be easily be done at the filter level. One example is gzip decompression. Play framework already provides gzip decompression out of the box, but if it didn't this is what it might look like (using the gunzip enumeratee from my play extra iteratees project):

class GunzipFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      if (request.headers.get("Content-Encoding").exists(_ == "gzip")) {
        Gzip.gunzip() &>> next(request)
      } else {
        next(request)
      }
    }
  }
}

Here using iteratee composition we are wrapping the body parser iteratee in a gunzip enumeratee.

Handling the response headers

When you're filtering you will often want to do something to the response that is being sent. If you just want to add a header, or add something to the session, or do any write operation on the response, without actually reading it, then this is quite simple. For example, let's say you wanted to add a custom header to every response:

class SosFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {
      next(request).map(result => 
        result.withHeaders("X-Sos-Message" -> "I'm trapped inside Play Framework please send help"))
    }
  }
}

Using the map function on the iteratee that handles the body, we are given access to the result produced by the action, which we can then modify as demonstrated.

If however you want to read the result, then you'll need to unwrap it. Play results are either AsyncResult or PlainResult. An AsyncResult is a Result that contains a Future[Result]. It has a transform method that allows the eventual PlainResult to be transformed. A PlainResult has a header and a body.

So let's say you want to add a timestamp to every newly created session to record when it was created. This could be done like this:

class SessionTimestampFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {

      def addTimestamp(result: PlainResult): Result = {
        val session = Session.decodeFromCookie(Cookies(result.header.headers.get(HeaderNames.COOKIE)).get(Session.COOKIE_NAME))
        if (!session.isEmpty) {
          result.withSession(session + ("timestamp" -> System.currentTimeMillis.toString))
        } else {
          result
        }
      }

      next(request).map {
        case plain: PlainResult => addTimestamp(plain)
        case async: AsyncResult => async.transform(addTimestamp)
      }
    }
  }
}

Handling the response body

The final thing you might want to do is transform the response body. PlainResult has two implementations, SimpleResult, which is for bodies with no transfer encoding, and ChunkedResult, for bodies with chunked transfer encoding. SimpleResult contains an enumerator, and ChunkedResult contains a function that accepts an iteratee to write the result out to.

An example of something you might want to do is implement a gzip filter. A very naive implementation (as in, do not use this, instead use my complete implementation from my play extra iteratees project) might look like this:

class GzipFilter extends EssentialFilter {
  def apply(next: EssentialAction) = new EssentialAction {
    def apply(request: RequestHeader) = {

      def gzipResult(result: PlainResult): Result = result match {
        case simple @ SimpleResult(header, content) => SimpleResult(header.copy(
          headers = (header.headers - "Content-Length") + ("Content-Encoding" -> "gzip")
        ), content &> Enumeratee.map(a => simple.writeable.transform(a)) &> Gzip.gzip())
      }

      next(request).map {
        case plain: PlainResult => gzipResult(plain)
        case async: AsyncResult => async.transform(gzipResult)
      }
    }
  }
}

Using the simpler API

Now you've seen how you can achieve everything using the base EssentialFilter API, and hopefully therefore you understand how filters fit into Play's architecture and how you can utilise them to achieve your requirements. Let's now have a look at the simpler API:

trait Filter extends EssentialFilter {
  def apply(f: RequestHeader => Result)(rh: RequestHeader): Result
  def apply(next: EssentialAction): EssentialAction = {
    ...
  }
}

object Filter {
  def apply(filter: (RequestHeader => Result, RequestHeader) => Result): Filter = new Filter {
    def apply(f: RequestHeader => Result)(rh: RequestHeader): Result = filter(f,rh)
  }
}

Simply put, this API allows you to write filters without having to worry about body parsers. It makes it look like actions are just functions of request headers to results. This limits the full power of what you can do with filters, but for many use cases, you simply don't need this power, so using this API provides a simple alternative.

To demonstrate, a noop filter class looks like this:

class NoopFilter extends Filter {
  def apply(f: (RequestHeader) => Result)(rh: RequestHeader) = {
    f(rh)
  }
}

Or, using the Filter companion object:

val noopFilter = Filter { (next, req) =>
  next(req)
}

And a request timing filter might look like this:

val timingFilter = Filter { (next, req) =>
  val start = System.currentTimeMillis

  def logTime(result: PlainResult): Result = {
    Logger.info("Request took " + (System.currentTimeMillis - start))
    result
  }

  next(req) match {
    case plain: PlainResult => logTime(plain)
    case async: AsyncResult => async.transform(logTime)
  }
}

Iteratees for imperative programmers

When I first heard the word iteratee, I thought it was a joke. Turns out, it wasn't a joke, in fact there are also enumerators (that's ok) and enumeratees (you're killing me). If you're an imperative programmer, or rather a programmer who feels more comfortable writing imperative code than functional code, then you may be a little overwhelmed by all the introductions to iteratees out there, because they all assume that you think from a functional perspective. Well I just learnt iteratees, and although I'm feeling more and more comfortable with functional programming every day, I still think like an imperative programmer at heart. This made learning iteratees very difficult for me. So while I'm still in the imperative mindset, I thought this a very good opportunity to explain iteratees from an imperative programmers perspective, taking no functional knowledge for granted. If you're an imperative programmer who wants to learn iteratees, this is the blog post for you. I'm going to specifically be looking at Play's Iteratee API, but the concepts learnt here will apply to all Iteratees in general.

So let's start off with explaining what iteratees, and their counterparts, are trying to achieve. An iteratee is a method of reactively handling streams of data that is very easily composable. By reactive, I mean non blocking, ie you react to data being available to read, and react to the opportunity to write data. By composable, I mean you write simple iteratees that do one small thing well, then you use those as the building blocks to write iteratees that do bigger things, and you use those as the building blocks to write iteratees to do even bigger things, and so on. At each stage, everything is simple and easy to reason about.

Reactive stream handling

If you're looking for information about iteratees, then I'm guessing you already know a bit about what reactive stream handling is. Let's contrast it to synchronous IO code:

trait InputStream {
  def read(): Byte
}

So this should be very familiar, if you want to read a byte, you call read. If no byte is currently available to be read, that call will block, and your thread will wait until a byte is available. With reactive streams, obviously it's the other way around, you pass a callback to the stream you want to receive data from, and it will call that when it's ready to give data to you. So typically you might implement a trait that looks like this:

trait InputStreamHandler {
  def onByte(byte: Byte)
}

So before we go on, let's look at how the same thing would be achieved in a pure functional world. At this point I don't want you to ask why we want to do things this way, you will see that later on, but if you know anything about functional programming, you know that everything tends to be immutable, and functions have no side effects. The trait above has to have side effects, because unless you are ignoring the bytes passed to onByte, you must be changing your state (or something elses state) somehow in that function. So, how do we handle data without changing our state? The answer is the same way other immutable data structures work, we return a copy of ourselves, updated with the new state. So if the InputStreamHandler were to be functional, it might look like this:

trait InputStreamHandler {
  def onByte(byte: Byte): InputStreamHandler
}

And an example implementation of one, that reads input into a seq, might look like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler {
  def onByte(byte: Byte) = new Consume(data :+ byte)
}

So we now have imperative and functional traits that react to our input stream, and you might be thinking this is all there is to reactive streams. If that's the case, you're wrong. What if we're not ready to handle data when the onByte method is called? If we're building structures in memory this will never be the case, but if for example we're storing them to a file or to a database as we receive the data, then this very likely will be the case. So reactive streams are two way, it's not just you, the stream consumer that is reacting to input, the stream producer must react to you being ready for input.

Now this is possible to implement in an imperative world, though things do start looking much more functional. We simply start using futures:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[Unit]
}

So, when the stream we are consuming has a byte for us, it calls onByte, and then attaches a callback to the future we return, to pass the next byte, when it's ready. If you have a look at Netty's asynchronous channel APIs, you'll see it uses exactly this pattern. We can also implement something similar for an immutable functional API:

trait InputStreamHandler {
  def onByte(byte: Byte): Future[InputStreamHandler]
}

And so here we have a functional solution for reactive stream handling. But it's not a very good one, for a start, there's no way for the handlers to communicate to the code that uses them that they don't want to receive any more input, or if they've encountered an error (exceptions are frowned upon in functional programming). We could add things to handle this, but very soon our interface would become quite complex, hard to break up into small pieces that can be composed, etc. I'm not going to justify this now, I think you'll see it later when I show you just how easy iteratees are to compose.

So, by this stage I hope you have understood two important points. Firstly, reactive stream handling means twofold reacting, both your code has to react to the stream being ready, and the stream has to react to you being ready. Secondly, when I say that we want a functional solution, I mean a solution where everything is immutable, and that is achieved by our stream handlers producing copies of themselves each time they receive/send data. If you've understood those two important points, then now we can move on to introducing iteratees.

Iteratees

There are a few things that our interface hasn't yet addressed. The first is, how does the stream communicate to us that it is finished, that is, that it has no more data for us? To do this, instead of passing in a byte directly, we're going to abstract our byte to be something of type Input[Byte], and that type can have three possible implementations, EOF, an element, or empty. Let's not worry about why we need empty just yet, but assume for some reason we might want to pass empty. So this is what Input looks like:

sealed trait Input[+E]

object Input {
  case object EOF extends Input[Nothing]
  case object Empty extends Input[Nothing]
  case class El[+E](e: E) extends Input[E]
}

Updating our InputStreamHandler, we now get something that looks like this:

trait InputStreamHandler[E] {
  def onInput(in: Input[E]): Future[InputStreamHandler[E]]
}

Now updating our Consumer from before to handle this, it might look like this:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler[Byte] {
  def onInput(in: Input[Byte]) = in match {
    case El(byte) => Future.successful(new Consume(data :+ byte))
    case _ => Future.successful(this)
  }
}

You can see that when we get EOF or Empty, there's nothing for us to do to change our state, so we just return ourselves again. If we were writing to another stream, we might, when we receive EOF, close that stream (or rather, send it an EOF).

The next thing we're going to do is make it easier for our handler to consume input immediately without having to create a future. To do this, rather than passing the byte directly, we'll pass a function, that takes a function as a parameter, and that function will take the byte as a parameter. So, our handler, when it's ready, will create a function to handle the byte, and then invoke the function that was passed to it, with that function. We'll call the first function the cont function, which is short for continue, and means when you're ready to continue receiving input invoke me. Too many functions? Let's look at the code:

trait InputStreamHandler[E] {
  def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B]
}

Now where did this Future[B] come from? B is just the mechanism that the stream uses to pass state back to itself. As the handler, we don't have to worry about what it is, we just have to make sure that we eventually invoke the cont function, and eventually make sure that the B it returns makes it back to our caller. And what does this look like in our Consume iteratee? Let's have a look:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler {
  def onByte(cont: (Input[Byte] => InputStreamHandler) => Future[B]) = cont {
    case Input.El(byte) => new Consume(data :+ byte)
    case _ => this
  }
}

You can see in our simple case of being ready to handle input immediately, we just immediately invoke cont, we no longer need to worry about creating futures. If we want to handle the input asynchronously, it is a little more complex, but we'll take a look at that later.

Now we have one final step in producing our iteratee API. How does the handler communicate back to the stream that it is finished receiving data? There could be two reasons for this, one is that it's finished receiving data. For example, if our handler is a JSON parser, it might have reached the end of the object it was parsing, and so doesn't want to receive anymore. The other reason is that it's encountered an error, for a JSON parser, this might be a syntax error, or if it's sending data through to another stream, it might be an IO error on that stream.

To allow our iteratee to communicate with the stream, we're going to create a trait that represents its state. We'll call this trait Step, and the three states that the iteratee can be in will be Cont, Done and Error. Our Cont state is going to contain our Input[Byte] => InputStreamHandler function, so that the stream can invoke it. Our Done state will contain our result (in the case of Consume, a Seq[Byte]) and our Error state will contain an error message.

In addition to this, both our Done and Error states need to contain the left over input that they didn't consume. This will be important for when we are composing iteratees together, so that once one iteratee has finished consuming input from a stream, the next can pick up where the first left off. This is one reason why we need Input.Empty, because if we did consume all the input, then we need some way to indicate that.

So, here's our Step trait:

sealed trait Step[E, +A]

object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => InputStreamHandler[E, A]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}

The type parameter E is the type of input our iteratee wants to accept, and A is what it's producing. So our handler trait now looks like this:

trait InputStreamHandler[E, A] {
  def onInput[B](step: Step[E, A] => Future[B]): Future[B]
}

And our consumer is implemented like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte, Seq[Byte]] {
  def onInput(step: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Cont({
    case Input.El(byte) => new Consume(data :+ byte)
    case Input.EOF => new InputStreamHandler[Byte, Seq[Byte]] {
      def onInput(cont: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Done(data, Input.Empty))
    }       
    case Input.Empty => this
  }))
}

One big difference here that you now notice is when we receive EOF, we actually pass Done into the step function, to say we are done consuming the input.

And so now we've built our iteratee interface. Our naming isn't quite right though, so we'll rename the trait obviously to Iteratee, and we'll rename onInput to fold, since we are folding our state into one result. And so now we get our interface:

trait Iteratee[E, +A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B]
}

Iteratees in practice

So far we've started with the requirements of a traditional imperative input stream, and described what an iteratee is in constrast to that. But looking at the above code, you might think that using them is really difficult. They seem like they are far more complex than they need to be, at least conceptually, to implement reactive streams. Well, it turns out that although so far we've shown the basics of the iteratee interface, there is a lot more that a full iteratee API has to offer, and once we start understanding this, and using it, you will start to see how powerful, simple and useful iteratees are.

So remember how iteratees are immutable? And remember how iteratees can be in one of three states, cont, done and error, and depending on which state it's in, it will pass its corresponding step class to the folder function? Well, if an iteratee is immutable and it can be in one of three states, then it can only ever be in that state that it's in, and therefore it will only ever pass that corresponding step to the folder function. If an iteratee is done, it's done, it doesn't matter how many times you call its fold function, it will never become cont or error, and its done value will never change, it will only ever pass the Done step to the folder function with the same A value and the same left over input. Because of this, there is only one implementation of a done iteratee that we'll ever need, it looks like this:

case class Done[E, A](a: A, e: Input[E] = Input.Empty) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Done(a, e))
}

This is the only done iteratee you'll ever need to indicate that you're done. In the Consume iteratee above, when we reached EOF, we created a done iteratee using an anonymous inner class, we didn't need to do this, we could have just used the Done iteratee above. The exact same thing holds for error iteratees:

case class Error[E](msg: String, e: Input[E]) extends Iteratee[E, Nothing] {
  def fold[B](folder: Step[E, Nothing] => Future[B]): Future[B] = folder(Step.Error(msg, e))
}

You may be surprised to find out the exact same thing applies to cont iteratees too - a cont iteratee just passes a function the folder, and that function, because the iteratee is immutable, is never going to change. So consequently, the following iteratee will usually be good enough for your requirements:

case class Cont[E, A](k: Input[E] => Iteratee[E, A]) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Cont(k))
}

So let's rewrite our consume iteratee to use these helper classes:

def consume(data: Array[Byte]): Iteratee[Byte, Array[Byte]] = Cont {
  case Input.El(byte) => consume(data :+ byte)
  case Input.EOF => Done(data)
  case Input.Empty => consume(data)
}

A CSV parser

Now we're looking a lot simpler, our code is focussed on just handling the different types of input we could receive, and returning the correct result. So let's start writing some different iteratees. In fact, let's write an iteratee that parses a CSV file from a stream of characters. Our CSV parser will support optionally quoting fields, and escaping quotes with a double quote.

Our first step will be to write the building blocks of our parser. First up, we want to write something that skips some kinds of white space. So let's write a general purpose drop while iteratee:

def dropWhile(p: Char => Boolean): Iteratee[Char, Unit] = Cont {
  case in @ Input.El(char) if !p(char) => Done(Unit, in)
  case in @ Input.EOF => Done(Unit, in)
  case _ => dropWhile(p)
}

Since we're just dropping input, our result is actually Unit. We return Done if the predicate doesn't match the current char, or if we reach EOF, and otherwise, we return ourselves again. Note that when we are done, we include the input that was passed into us as the remaining data, because this is going to be needed to be consumed by the next iteratee. Using this iteratee we can now write an iteratee that drops white space:

def dropSpaces = dropWhile(c => c == ' ' || c == '\t' || c == '\r')

Next up, we're going to write a take while iteratee, it's going to be a mixture between our earlier consume iteratee, carrying state between each invocation, and the drop while iteratee:

def takeWhile(p: Char => Boolean, data: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, Seq[Char]] = Cont {
  case in @ Input.El(char) => if (p(char)) {
    takeWhile(p, data :+ char)
  } else {
    Done(data, in)
  }
  case in @ Input.EOF => Done(data, in)
  case _ => takeWhile(p, data)
}

We also want to write a peek iteratee, that looks at what the next input is, without actually consuming it:

def peek: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char), in)
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => peek
}

Note that our peek iteratee must return an option, since if it encounters EOF, it can't return anything.

And finally, we want a take one iteratee:

def takeOne: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char))
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => takeOne
}

Using the take one iteratee, we'll build an expect iteratee, that mandates that a certain character must appear next otherwise it throws an error:

def expect(char: Char): Iteratee[Char, Unit] = takeOne.flatMap {
  case Some(c) if c == char => Done(Unit)
  case Some(c) => Error("Expected " + char + " but got " + c, Input.El(c))
  case None => Error("Premature end of input, expected: " + char, Input.EOF)
}

Notice the use of flatMap here. If you haven't come across it before, in the asynchronous world, flatMap basically means "and then". It applies a function to the result of the iteratee, and returns a new iteratee. In our case we're using it to convert the result to either a done iteratee, or an error iteratee, depending on whether the result is what we expected. flatMap is one of the fundamental mechanisms that we'll be using to compose our iteratees together.

Now with our building blocks, we are ready to start building our CSV parser. The first part of it that we'll write is an unquoted value parser. This is very simple, we just want to take all characters that aren't a comma or new line, with one catch. We want the result to be a String, not a Seq[Char] like takeWhile produces. Let's see how we do that:

def unquoted = takeWhile(c => c != ',' && c != '\n').map(v => v.mkString.trim)

As you can see, we've used the map function to transform the end result from a sequence of characters into a String. This is another key method on iteratees that you will find useful.

Our next task is to parse a quoted value. Let's start with an implementation that doesn't take into account escaped quotes. To parse a quoted value, we need to expect a quote, and then we need to take any value that is not a quote, and then we need to expect a quote. Notice that during that sentence I said "and then" 2 times? What method can we use to do an "and then"? That's right, the flatMap method that I talked about before. Let's see what our quoted value parser looks like:

def quoted = expect('"')
  .flatMap(_ => takeWhile(_ != '"'))
  .flatMap(value => expect('"')
    .map(_ => value.mkString))

So now you can probably start to see the usefulness of flatMap. In fact it is so useful, not just for iteratees, but many other things, that Scala has a special syntax for it, called for comprehensions. Let's rewrite the above iteratee using that:

def quoted = for {
  _     <- expect('"')
  value <- takeWhile(_ != '"')
  _     <- expect('"')
} yield value.mkString

Now at this point I hope you are getting excited. What does the above code look like? It looks like ordinary imperative synchronous code. Read this value, then read this value, then read this value. Except it's not synchronous, and it's not imperative. It's functional and asynchronous. We've taken our building blocks, and composed them into a piece of very readable code that makes it completely clear exactly what we are doing.

Now in case you're not 100% sure about the above syntax, the values to the left of the <- signs are the results of the iteratees to the right. These are able to be used anywhere in any subsequent lines, including in the end yield statement. Underscores are used to say we're not interested in the value, we're using this for the expect iteratee since that just returns Unit anyway. The statement after the yield is a map function, which gives us the opportunity to take all the intermediate values and turn them into a single result.

So now that we understand that, let's rewrite our quoted iteratee to support escaped quotes. After reading our quote, we want to peek at the next character. If it's a quote, then we want to append the value we just read, plus a quote to our cumulated value, and recursively invoke the quoted iteratee again. Otherwise, we've reached the end of the value.

def quoted(value: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, String] = for {
  _          <- expect('"')
  maybeValue <- takeWhile(_ != '"')
  _          <- expect('"')
  nextChar   <- peek
  value      <- nextChar match {
    case Some('"') => quoted(value ++ maybeValue :+ '"')
    case _ => Done[Char, String]((value ++ maybeValue).mkString)
  }
} yield value

Now we need to write an iteratee that can parse either a quoted or unquoted value. We choose which one by peeking at the first character, and then accordingly returning the right iteratee.

def value = for {
  char  <- peek
  value <- char match {
    case Some('"') => quoted()
    case None => Error[Char]("Premature end of input, expected a value", Input.EOF)
    case _ => unquoted
  }
} yield value

Let's now parse an entire line, reading until the end of line character.

def values(state: Seq[String] = IndexedSeq[String]()): Iteratee[Char, Seq[String]] = for {
  _        <- dropSpaces
  value    <- value
  _        <- dropSpaces
  nextChar <- takeOne
  values   <- nextChar match {
    case Some('\n') | None => Done[Char, Seq[String]](state :+ value)
    case Some(',') => values(state :+ value)
    case Some(other) => Error("Expected comma, newline or EOF, but found " + other, Input.El(other))
  }
} yield values

Enumeratees

Now, in a similar way to how we parse the values, we could also parse each line of a CSV file until we reach EOF. But this time we're going to do something a little different. We've seen how we can sequence iteratees using flatMap, but there are further possibilities for composing iteratees. Another concept in iteratees is enumeratees. Enumeratees adapt a stream to be consumed by an iteratee. The simplest enumeratees simply map the input values of the stream to be something else. So, for example, here's an enumeratee that converts a stream of strings to a stream of ints:

def toInt: Enumeratee[String,Int] = Enumeratee.map[String](_.toInt)

One of the methods on Enumeratee is transform. We can use this method to apply an enumeratee to an iteratee:

val someIteratee: Iteratee[Int, X] = ...
val adaptedIteratee: Iteratee[String, X] = toInt.transform(someIteratee)

This method is also aliased to an operator, &>>, and so this code below is equivalent to the code above:

val adaptedIteratee: Iteratee[String, X] = toInt &>> someIteratee

We can also make an enumeratee out of another iteratee, and this is exactly what we're going to do with our values iteratee. The Enumeratee.grouped method takes an iteratee and applies it to the stream over and over, the result of each application being an input to feed into the the iteratee that will be transformed. Let's have a look:

def csv = Enumeratee.grouped(values())

Now let's get a little bit more creative with enumeratees. Let's say that our CSV file is very big, so we don't want to load it into memory. Each line is a series of 3 integer columns, and we want to sum each column. So, let's define an enumeratee that converts each set of values to integers:

def toInts = Enumeratee.map[Seq[String]](_.map(_.toInt))

And another enumeratee to convert the sequence to a 3-tuple:

def toThreeTuple = Enumeratee.map[Seq[Int]](s => (s(0), s(1), s(2)))

And finally an iteratee to sum the them:

def sumThreeTuple(a: Int = 0, b: Int = 0, c: Int = 0): Iteratee[(Int, Int, Int), (Int, Int, Int)] = Cont {
  case Input.El((x, y, z)) => sumThreeTuple(a + x, b + y, c + z)
  case Input.Empty => sumThreeTuple(a, b, c)
  case in @ Input.EOF => Done((a, b, c), in)
}

Now to put them all together. There is another method on enumeratee called compose, which, you guessed it, let's you compose enumeratees. This has an alias operator, ><>. Let's use it:

val processCsvFile = csv ><> toInts ><> toThreeTuple &>> sumThreeTuple()

Enumerators

Finally, if an iteratee consumes a stream, what produces a stream? The answer is an enumerator. An enumerator can be applied to an iteratee using its apply method, which is also aliased to >>>. This will leave the iteratee in a cont state, ready to receive more input. If however the enumerator contains the entirety of the stream, then the run method can be used instead which will send the iteratee an EOF once it's finished. This is aliased to |>>>.

The Play enumerator API makes it easy to create an enumerator by passing a sequence of inputs to the Enumerator companion objects apply method. So, we can create an enumerator of characters using the following code:

val csvFile = Enumerator(
  """1,2,3
    |4,5,6""".stripMargin.toCharArray:_*)

And we can feed this into our iteratee like so:

val result = csvFile |>>> processCsvFile

And our result in this case will be a future that is eventually redeemed with (5, 7, 9).

Conclusion

Well, it's been a long journey, but hopefully if you're an imperative programmer, you not only understand iteratees, you understand the reasoning behind their design, and how easily they compose. I also hope you have a better understanding of both functional and asynchronous programming in general. The functional mindset is quite different to the imperative mindset, and I'm still getting my head around it, but particularly after seeing how nice and simple iteratees can be to work with (once you understand them), I'm becoming convinced that functional programming is the way to go.

If you are interested in downloading the code from this blog post, or if you want to see a more complex JSON parsing iteratee/enumeratee, checkout this GitHub project, which has a few examples, including parsing byte/character streams in array chunks, rather than one at a time.

Passing common state to templates in Play Framework

This question comes up very frequently on the Play mailing list, so I thought I'd document a quick example of how to pass common state to shared templates in Play Framework when using Scala. The use case is typically that you have a common header, but certain parts of it are dynamic, requiring data to be loaded from the database.

First of all, since the state required for rendering a header can logically be grouped into one object, a header object, let's do that. This will mean we can easily add new types of data to the header without changing any of our existing code. So I'm going to define a header that contains a list of menu items, and the current user:

case class Header(menu: Seq[MenuItem], user: Option[String])
case class MenuItem(url: String, name: String)

Now the template that uses this is my main template, so I'm going to add my Header class to it as an implicit parameter. Implicit parameters are parameters that don't need to be explicitly passed when you invoke a method, instead, if an implicit value exists in the scope of the invocation, that will be used:

@(title: String)(content: Html)(implicit header: Header)

<html>
    <head>
        ...
    </head>
    <body>
        <div id="header">
            @header.user.map { user =>
                <div>User: @user</div>
            }
            <ul>
            @for(item <- header.menu) {
                <li><a href="@item.url">@item.name</a></li>
            }
            </ul>
        </div>
        @content
    </body>
</html>

Now to pass this implicit parameter, all I need to do is declare that each of my templates that use the main template also accept an implicit header parameter. So for example, in my index template:

@(message: String)(implicit header: Header)

@main("Welcome to Play 2.0") {
    @play20.welcome(message)
}

Now comes the magic, supplying this parameter. I will write an implicit method that generates it. When a parameterless method (implicit parameters don't count as parameters in this case) is declared as implicit, this allows it to be used to supply a value for an implicit parameter. Here's the my method:

trait ProvidesHeader {

  implicit def header[A](implicit request: Request[A]) : Header = {
    val menu = Seq(MenuItem("/home", "Home"),
      MenuItem("/about", "About"),
      MenuItem("/contact", "Contact"))
    val user = request.session.get("user")
    Header(menu, user)
  }
}

For now I've just hard coded the menu items, but you get the point. Since this method is implicit, and it returns something of type Header, then it can be used to supply the implicit Header parameter that our index template needs. You can also see that this method itself accepts an implicit parameter, the request. If your method doesn't need the request, then you can remove that, however make sure that you remove the parenthesis from the method, it will only work with parameterless methods, not zero argument methods.

So what now do I need to do to my actions? Almost nothing, I just need to make sure that my implicit header method is in scope, and that they declare the request they accept as an implicit parameter, so for example:

object Application extends Controller with ProvidesHeader {

  def index = Action { implicit request =>
    Ok(views.html.index("Your new application is ready."))
  }
}

As you can see I've simply declared that my controller extends the ProvidesHeader trait. My action code itself is left completely uncluttered, it doesn't need to know whether the templates it renders require a header, that's automatically provided, and in fact more implicit parameters could be added to my templates, and my action code still doesn't have to be aware of it.

A note for Java Play apps

Unfortunately this doesn't work so nicely for Java Play apps, since although you can still use implicit parameter passing in the templates, this needs to be explicitly handled by your actions. As an alternative to implicit parameter passing, Play offers the args map for storing arbitrary per request data on the Http.Context class. This can be populated through action composition, or however you want, and then accessed in your templates through the Http.Context.current thread local.

About

Hi! My name is James Roper, and I am a software developer with a particular interest in open source development and trying new things. I program in Scala, Java, Go, PHP, Python and Javascript, and I work for Lightbend as the architect of Kalix. I also have a full life outside the world of IT, enjoy playing a variety of musical instruments and sports, and currently I live in Canberra.