Persistent connections

Our current flow to receive a response from server looks like this: open a connection, read a request, prepare the response, write the response, close the connection. This is suboptimal! If a client wants to send 50 requests, it has to open and close connection for each request. Establishing a connection is a costly operation, the obvious optimisation is to reuse a connection between multiple requests.

persistent connection source: Wikipedia

The left schema shows the current flow, the right one is what we want to achieve.

Let’s go to reddit.com and see how many requests are sent.

reddit connections

The browser sent 142 requests. What’s more, when you enter a website you probably start going to subpages, clicking here and there. For one website visit you can send hundreds or thousands of requests. Imagine opening and closing connection for each one of them.

Multiple persistent connections

A browser could open one connection and download every file one by one, but this way it wouldn’t utilise all the resources available. Instead of this, the browser opens a few connections to a server, to load multiple files at once. HTTP/2.0 solves this problem, by enabling the client to send multiple concurrent requests over one connection.

Connection: keep-alive

There is no information in RFC1945 (HTTP/1.0) about persistent connections nor Connection header. It was added to the protocol later on in RFC2068 (HTTP/1.1).

You send a request to a server with a header Connection: keep-alive, the server will keep that connection open for some time and send the same header in the response. Now the client will also keep the connection open for some time so they can exchange data on a single connection.

It was such an important optimisation that all connections are persistent by default unless specified otherwise. You don’t have to send a Connection: keep-alive header, but if you want the connection to be closed you should send a Connection: close.

How long should we keep a connection open?

At some point we have to close a connection. How long should we wait since the last request arrived? If we wait too long, we may end up with a bunch of useless connections. If we wait too short, clients may end up not sending multiple requests on one connection.

Let’s see what are the defaults in popular servers.

Spring Boot default

server.connection-timeout=# Time that connectors wait for another HTTP request before closing the connection. When not set, the connector’s container-specific default is used. Use a value of -1 to indicate no (that is, an infinite) timeout.

Spring Boot uses embedded tomcat by default, so we should go look there.

Tomcat default

keepAliveTimeout

The number of milliseconds this Connector will wait for another HTTP request before closing the connection. The default value is to use the value that has been set for the connectionTimeout attribute. Use a value of -1 to indicate no (i.e. infinite) timeout.

connectionTimeout

The number of milliseconds this Connector will wait, after accepting a connection, for the request URI line to be presented. Use a value of -1 to indicate no (i.e. infinite) timeout. The default value is 60000 (i.e. 60 seconds) but note that the standard server.xml that ships with Tomcat sets this to 20000 (i.e. 20 seconds). Unless disableUploadTimeout is set to false, this timeout will also be used when reading the request body (if any).

It seems that it’s 60 seconds.

Undertow default

IDLE_TIMEOUT

The amount of time a connection can be idle for before it is timed out. An idle connection is a connection that has had no data transfer in the idle timeout period. Note that this is a fairly coarse grained approach, and small values will cause problems for requests with a long processing time.

There is no information in documentation about default nor I can find one in the code. This ticket suggests that it’s 10 minutes.

Apache

Description: Amount of time the server will wait for subsequent requests on a persistent connection

Syntax: KeepAliveTimeout num[ms]

Default: KeepAliveTimeout 5

Only 5 seconds in Apache.

The right value

There isn’t one. As always, pick a value you feel is right, measure it, tune it if needed.

Implementation

Firstly, we need our connections to be managed by the server in one place. We need Connection class that will abstract away socket and will trace last time request was sent.

class Connection(private val socket: Socket) {
    var lastActivityTime: Long = System.currentTimeMillis()
        private set

    fun markAsAlive() {
        lastActivityTime = System.currentTimeMillis()
    }

    ...

    fun getInputStream(): InputStream = socket.getInputStream()
            ?: throw ConnectionException("Could not obtain stream. Is socket closed?")

    fun getOutputStream(): OutputStream = socket.getOutputStream()
            ?: throw ConnectionException("Could not obtain stream. Is socket closed?")
}

We will relay on server class to call markAsAlive() on every request. It’s not ideal. We could create decorator over InputStream and mark the connection as alive for every byte, but I feel that this would create performance overhead. I’d also rather measure the time between requests, not between bytes. This can protect us a bit from sending requests one by one, one line by one really slow.

Now we can create a connection pool

class ConnectionPool(
        val keepAliveTimeout: Duration,
        val checkKeepAliveInterval: Duration
): Closeable {

    private val logger = LoggerFactory.getLogger(ConnectionPool::class.java)

    private val connections = ConcurrentLinkedQueue<Connection>() // 1
    private val checkerThreadPool = Executors.newSingleThreadScheduledExecutor(
            IncrementingThreadFactory("connection-pool-checker"))

    private val connectionsEstablished = LongAdder()

    init {
        scheduleClosingStaleConnections()
    }

    private fun scheduleClosingStaleConnections() { // 2
        checkerThreadPool.scheduleAtFixedRate({
            try {
                closeStaleConnections()
            } catch (e: Exception) {
                logger.error("Error while closing stale connections", e)
            }
        }, checkKeepAliveInterval.toMillis(), checkKeepAliveInterval.toMillis(), TimeUnit.MILLISECONDS)
    }

    private fun closeStaleConnections() {
        connections.filter(this::isConnectionInactive)
                .forEach {
                    logger.debug("Closing connection to ${it.destination()} due to not being active")
                    it.close()
                }
        connections.removeIf(Connection::isClosed)
    }

    private fun isConnectionInactive(connection: Connection): Boolean =
        Duration.ofMillis(System.currentTimeMillis() - connection.lastActivityTime) > keepAliveTimeout

    fun addConnection(connection: Connection) {
        connectionsEstablished.increment()
        connections += connection
    }

    override fun close() {
        checkerThreadPool.shutdown()
        checkerThreadPool.awaitTermination(1, TimeUnit.SECONDS)
    }

    fun connectionsEstablished() = connectionsEstablished.sum()
}

We keep our connections in a concurrent queue (1). We schedule task in the background that will check whether a connection is inactive and close it if needed (2). keepAliveTimeout is the time after which server will treat the connection as stale, counting after the last request. checkKeepAliveInterval is how often the check task is executed.

We can now use it in our Server by changing acceptConnection() and handleConnection() methods

private fun acceptConnection() {
    val connection = Connection(serverSocket.accept()) // 3
    logger.debug("Accepted new connection ${connection.destination()}")
    handlerThreadPool.submit {
        connectionPool.addConnection(connection) // 4
        try {
            handleConnection(connection) // 5
        } catch (e: Exception) {
            when {
                e is SocketException && connection.isClosed() -> logger.trace("Connection was closed", e)
                else -> logger.warn("Could not handle connection", e)
            }
        }
    }
}

private fun handleConnection(connection: Connection) {
    val input = connection.getInputStream().bufferedReader()
    val output = PrintWriter(connection.getOutputStream())
    while (!connection.isClosed()) {
        var request: Request? = null
        val response = try {
            request = Request.fromRaw(input) // 6
            connection.markAsAlive()
            handler(request)
        } catch (e: EmptyRequestException) { // 7, Find a better way to detect close connections
            logger.trace("Connection was closed by client")
            connection.close()
            break
        } catch (e: RequestParseException) {
            logger.warn("Could not parse a request", e)
            Response(status = Status.BAD_REQUEST)
        }
        output.print(response.toRaw())
        output.flush()
        if (request?.headers?.connection == "close") { // 8
            connection.close()
        }
    }
}

We create a new connection from accepted socket (3), add it to the connection pool (4) and then handle the connection (5). Last two steps are run in handlerThreadPool so our server can immediately accept other connections.

The main changes are in handleConnection method. Previously, we used socket.use { } block in which we read a request and sent a response. At the end of this block, the connection was closed. We can’t do that now because we want to keep the connection opened. Instead, we should keep reading requests and sending responses while the connection is open.

After a first request from a connection, our server will go through second iteration of the loop and will block on Request.fromRaw(input), (6) which will block on input.readLine() of BufferedReader, which internally will block on a socket read(). This loop ends if either server or client closes the connection (7) or will send Connection: close header (8).

Detecting connections closed by client

Detecting closed connections by our server is easy, we can check socket.isClosed. The tricky part for me was to check if a connection was closed by the client. When the client closes a connection, it sends FIN,ACK segment and we send the ACK back. At first I thought that we can just check socket.isClosed too, but I was wrong - this flag only checks if we closed the socket. It seems that there is no information about FIN,ACK event from the client in Socket API. The only indication of it is to try to either use read() or readLine() on BufferedReader.

With that in mind, it is hard to do the following logic - block until new data arrives, then parse the request, if there is no data, close a connection. We cannot use readLine() before Request.fromRaw(input) because it will consume one line of our BufferedReader. We could change signature fromRaw to accept the first line as String and the rest of a stream as BufferedReader but that’s just ugly. That’s why for now we can keep things as they are. The beginning of our parsing logic protects us from empty requests anyway.

val requestLine = input.readLine() ?: throw EmptyRequestException()

If readLine() returns null, we can just catch it (7) and close the connection. It might not be the best and it may smell like using exceptions for control flow but it works. We will try to tackle this problem in the future by using some sort of “one line/character caching BufferedReader”.

Tests

The main happy path to test is to check whether we can make several requests on one connection.

private val client = HttpUrlConnectionClient(connectionTimeout = 50, socketTimeout = 100)

@Test
fun `should reuse connection to make requests`() {
    // given
    server = Server(handler = {
        Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
    })
    server.start(8090)

    // when
    repeat(times = 3) {
        client.exchange(url = "http://localhost:8090", request = Request(path = "/", method = RequestMethod.GET))
    }

    // then
    assertThat(server.connectionsEstablished()).isEqualTo(1)
}

To test that we have to expose connectionsEstablished() method that returns the count of connections established to the server. This most likely will be transformed in some kind of ServerStats object, because we will want to have metrics, but let’s keep it simple for now.

Importance of content-length header

When we switched to the persistent connection model, it is really important to send a content-length header even if we don’t send body! Previously we would send a response and close the connection. Even if a client read body and didn’t know that it ended, it didn’t block waiting for more content for a long time, because we immediately closed connection on the server side. Now we have to explicitly say how long the body is because we don’t close the connection immediately.

Server just checks content-length header and skips reading body if there isn’t any. However, this is not a full HTTP implementation. There is also chunked encoding in which we send body length in the body itself. We will implement it in the future.

Back to the test! We create 3 sequential requests and the test eventually passes, but…

Persistent connections of HttpURLConnection

I had so many problems with persistent connections of the HttpURLConnection! It is said in docs that it supports persistent connections out of the box, but it works in such non-obvious way! For example, connection.disconnect() does not close the connection. I couldn’t find a way to close persistent connections between tests which messed up my tests. Eventually, I got tired of fighting with HttpURLConnection and made a fallback to well know, excellent Apache HTTP Client. I created an implementation for my Client interface. I won’t bother to get into details, you can check it out here.

I also tested whether old connections are closed by connection pool checker

@Test
fun `should create new connection when old is closed by keep alive checker`() {
    // given
    server = Server(
            port = 8090,
            keepAliveTimeout = Duration.ofMillis(500),
            checkKeepAliveInterval = Duration.ofMillis(100),
            handler = {
                Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
            })
    server.start(8090)

    // when
    client.exchange(url = "http://localhost:8090", request = Request(path = "/", method = RequestMethod.GET))

    // then
    assertThat(server.connectionsEstablished()).isEqualTo(1)

    // when
    Thread.sleep(server.keepAliveTimeout.toMillis() + server.checkKeepAliveInterval.toMillis())
    client.exchange(url = "http://localhost:8090", request = Request(path = "/", method = RequestMethod.GET))

    // then
    assertThat(server.connectionsEstablished()).isEqualTo(2)
}

And if a connection is dropped when Connection: close header is sent

@Test
fun `should not reuse connections when Connection close is send`() {
    // given
    server = Server(port = 8090, handler = {
        Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
    })

    // when
    repeat(times = 3) {
        client.exchange(
                url = "http://localhost:8090",
                request = Request(
                        path = "/",
                        method = RequestMethod.GET,
                        headers = HttpHeaders("connection" to "close")))
    }

    // then
    assertThat(server.connectionsEstablished()).isEqualTo(3)
}

Summary

We managed to add persistent connections to our server! We keep a connection open so sending sequentially multiple requests is more optimal. Next up, we will build our HTTP client that will use connection pool as well.

Code is available here.

Other articles of the series

This article is a part of the HTTP series