Server WebSockets
This feature adds WebSockets support to Ktor. WebSockets are a mechanism to keep a bi-directional real-time ordered connection between the server and the client. Each message from this channel is called Frame: a frame can be a text or binary message, or a close or ping/pong message. Frames can be marked as incomplete or final.
In order to use the WebSockets functionality you first have to install it:
If required, you can adjust parameters during the installation of the feature:
pingPeriod = Duration.ofSeconds(60) // Disabled (null) by default
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE // Disabled (max value). The connection will be closed if surpassed this length.
masking = false
extensions {
// install(...)
}
}
Once installed, you can define the webSocket
routes for the routing feature:
Instead of the short-lived normal route handlers, webSocket handlers are meant to be long-lived. And all the relevant WebSocket methods are suspended so that the function will be suspended in a non-blocking way while receiving or sending messages.
webSocket
methods receive a callback with a instance as the receiver. That interface defines an incoming
(ReceiveChannel) property and an outgoing
(SendChannel) property, as well as a close
method. Check the full WebSocketSession for more information.
Usage as an suspend actor
routing {
webSocket("/") { // websocketSession
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText()
outgoing.send(Frame.Text("YOU SAID: $text"))
if (text.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
}
}
}
}
}
}
Usage as a Channel
Since the incoming
property is a ReceiveChannel, you can use it with its stream-like interface:
You receive a WebSocketSession as the receiver (this), giving you direct access to these members inside your webSocket handler.
interface WebSocketSession {
val incoming: ReceiveChannel<Frame> // Incoming frames channel
val outgoing: SendChannel<Frame> // Outgoing frames channel
fun close(reason: CloseReason)
// Convenience method equivalent to `outgoing.send(frame)`
suspend fun send(frame: Frame) // Enqueue frame, may suspend if the outgoing queue is full. May throw an exception if the outgoing channel is already closed, so it is impossible to transfer any message.
// The call and the context
val application: Application
// List of WebSocket extensions negotiated for the current session
val extensions: List<WebSocketExtension<*>>
// Modifiable properties for this request. Their initial value comes from the feature configuration.
var pingInterval: Duration?
var timeout: Duration
var masking: Boolean // Enable or disable masking output messages by a random xor mask.
var maxFrameSize: Long // Specifies frame size limit. The connection will be closed if violated
// Advanced
val closeReason: Deferred<CloseReason?>
suspend fun flush() // Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called at any time even after close. May return immediately if connection is already terminated.
fun terminate() // Initiate connection termination immediately. Termination may complete asynchronously.
}
A frame is each packet sent and received at the WebSocket protocol level. There are two message types: TEXT and BINARY. And three control packets: CLOSE, PING, and PONG. Each packet has a payload buffer
. And for Text or Close messages, you can call the readText
or readReason
to interpret that buffer.
enum class FrameType { TEXT, BINARY, CLOSE, PING, PONG }
You can test WebSocket conversations by using the handleWebSocketConversation
method inside a withTestApplication
block.
class MyAppTest {
@Test
fun testConversation() {
withTestApplication {
application.install(WebSockets)
val received = arrayListOf<String>()
application.routing {
webSocket("/echo") {
try {
while (true) {
val text = (incoming.receive() as Frame.Text).readText()
received += text
}
} catch (e: ClosedReceiveChannelException) {
// Do nothing!
} catch (e: Throwable) {
}
}
}
handleWebSocketConversation("/echo") { incoming, outgoing ->
val textMessages = listOf("HELLO", "WORLD")
for (msg in textMessages) {
outgoing.send(Frame.Text(msg))
assertEquals(msg, (incoming.receive() as Frame.Text).readText())
}
assertEquals(textMessages, received)
}
}
}
}
Standard Events: onConnect, onMessage, onClose and onError
onConnect
happens at the start of the block.onMessage
happens after successfully reading a message (for example withincoming.receive()
) or using suspended iteration withfor(frame in incoming)
.onClose
happens when theincoming
channel is closed. That would complete the suspended iteration, or throw aClosedReceiveChannelException
when trying to receive a message`.onError
is equivalent to other other exceptions.
In both onClose
and onError
, the closeReason
property is set.
To illustrate this:
webSocket("/echo") {
println("onConnect")
try {
for (frame in incoming){
val text = (frame as Frame.Text).readText()
println("onMessage")
received += text
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedReceiveChannelException) {
println("onClose ${closeReason.await()}")
} catch (e: Throwable) {
println("onError ${closeReason.await()}")
e.printStackTrace()
}
In this sample, the infinite loop is only exited with an exception is risen: either a ClosedReceiveChannelException
or another exception.