all that jazz

james' blog about scala and all that jazz

Call response WebSockets in Play Framework

I got a question from a Play user about implementing call/response WebSockets in Play Framework. This is not something that comes up that often, since it means using WebSockets to do basically what AJAX does for you, so what's the point? But here are some use cases that I've thought of:

  • You have some transformation of a stream that can only be done on the server side. For example, perhaps the transformation requires some heavy database work, or is too computationally expensive for a mobile client, or perhaps you want to encrypt the stream with a key that is private to the server.
  • You are already processing a stream of events from the server using WebSockets, and the responses to the calls are just more events in this stream, so you'd like to share the same transport mechanism for these events.
  • Your application is particularly chatty, and you don't want the overhead of the HTTP protocol on each call/response.

There are possibly more use cases - WebSockets is quite a new technology and as an industry we haven't really settled on what it's best use cases are.

A simple echo implementation

A Play WebSocket is implemented by providing an iteratee that consumes messages from the client, and an enumerator that produces messages for the client. If we simply wanted to echo every message that the client sent us, then we would want to return an iteratee whose input becomes the output of the enumerator that we return. Play doesn't come with anything out of the box to do this, but we will probably add something out of the box that does this in a future release. For now, I'm going to write a method called joined, that returns a joined iteratee/enumerator pair:

/**
 * Create a joined iteratee enumerator pair.
 *
 * When the enumerator is applied to an iteratee, the iteratee subsequently consumes whatever the iteratee in the pair
 * is applied to.  Consequently the enumerator is "one shot", applying it to subsequent iteratees will throw an
 * exception.
 */
def joined[A]: (Iteratee[A, Unit], Enumerator[A]) = {
  val promisedIteratee = Promise[Iteratee[A, Unit]]()
  val enumerator = new Enumerator[A] {
    def apply[B](i: Iteratee[A, B]) = {
      val doneIteratee = Promise[Iteratee[A, B]]()

      // Equivalent to map, but allows us to handle failures
      def wrap(delegate: Iteratee[A, B]): Iteratee[A, B] = new Iteratee[A, B] {
        def fold[C](folder: (Step[A, B]) => Future[C]) = {
          val toReturn = delegate.fold {
            case done @ Step.Done(a, in) => {
              doneIteratee.success(done.it)
              folder(done)
            }
            case Step.Cont(k) => {
              folder(Step.Cont(k.andThen(wrap)))
            }
            case err => folder(err)
          }
          toReturn.onFailure {
            case e => doneIteratee.failure(e)
          }
          toReturn
        }
      }

      if (promisedIteratee.trySuccess(wrap(i).map(_ => ()))) {
        doneIteratee.future
      } else {
        throw new IllegalStateException("Joined enumerator may only be applied once")
      }
    }
  }
  (Iteratee.flatten(promisedIteratee.future), enumerator)
}

This code might be a little scary if you don't understand iteratees, but as I said we will probably add this to Play itself in future. The rest of the code in this blog post will be simple.

Now that we have our joined iteratee/enumerator, let's implement an echo WebSocket. For the rest of this post we'll be assuming that all our WebSockets are sending/receiving JSON messages.

def echo = WebSocket.using[JsValue] { req =>
  joined[JsValue]
}

So now we have an echo call/response WebSocket. But this is not very useful, we want to do something with the incoming messages, and producing new outgoing messages as responses.

Processing messages

So now that we've expressed our call/response in terms of a joined iteratee/enumerator, how can we transform the call messages to be different response messages? The answer is enumeratees. Enumeratees can be used to transform iteratees and enumerators. We return both an enumerator and an iteratee, so which one do we transform? The answer is it doesn't matter, I'm going to use it to transform the iteratee. The enumeratee that we're going to use is the map enumeratee:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.map[JsValue] { json =>
    Json.obj(
      "status" -> "received",
      "msg" -> json
    )
  } &> iter, enum)
}

Enumeratees are one of the most powerful features of iteratees for end users. You could use any enumeratee here, but let's look at some examples of other common use cases.

What if we don't want to return a response to every message? There are numerous ways to do this, but the simplest is to use the collect enumeratee, which takes a partial function:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.collect[JsValue] { 
    case json if (json \ "foo").asOpt[JsValue].isDefined =>
      Json.obj(
        "status" -> "received",
        "msg" -> json
      )
  } &> iter, enum)
}

Perhaps we want to produce many responses for a single input. The mapConcat enumeratee can be used in this case, with our map function returning a sequence of JsValue messages to return:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.mapConcat[JsValue] { json =>
    Seq(
      Json.obj(
        "status" -> "received",
        "msg" -> json
      ),
      Json.obj("foo" -> "bar")
    )
  } &> iter, enum)
}

What if we want to do some blocking operations? In Play 2.2, this will be able to be done simply by providing an execution context suitable for blocking calls to whichever enumeratee you decide to use, but Play 2.1 does not yet support this, so we have to dispatch the callback to another execution context ourselves. This can be done using the mapM enumeratee:

val ec: ExecutionContext = ...

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.mapM[JsValue] { json =>
    Future {
      // Some expensive computation, eg a database call, that returns JsValue
    }(ec)
  } &> iter, enum)
}

Pushing from an external enumerator

You may want to combine your call/response messages with messages from some other enumerator that spontaneously pushes messages to the client, for example a broadcasting enumerator for all clients. This can be done by interleaving your joined enumerator with the external enumerator:

val globalEvents: Enumerator[JsValue] = ...

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]

  (Enumeratee.map[JsValue] { json =>
    ...
  } &> iter, Enumerator.interleave(enum, globalEvents))
}

Conclusion

Using WebSockets in a call response style may be something that your application needs. If so, using enumeratees to map the stream of messages coming in to messages going out is the most natural and idiomatic way of doing this in Play. It allows you to call on the large number of composable enumeratees that Play provides out of the box, and makes your code simple and easy to reason about.

comments powered by Disqus

About

Hi! My name is James Roper, and I am a software developer with a particular interest in open source development and trying new things. I program in Scala, Java, Go, PHP, Python and Javascript, and I work for Lightbend as the architect of Kalix. I also have a full life outside the world of IT, enjoy playing a variety of musical instruments and sports, and currently I live in Canberra.