The HTTP part 5 - Persistent connections
Persistent connections
Our current flow to receive a response from server looks like this: open a connection, read a request, prepare the response, write the response, close the connection. This is suboptimal! If a client wants to send 50 requests, it has to open and close connection for each request. Establishing a connection is a costly operation, the obvious optimisation is to reuse a connection between multiple requests.
source: Wikipedia
The left schema shows the current flow, the right one is what we want to achieve.
Let’s go to reddit.com and see how many requests are sent.
The browser sent 142 requests. What’s more, when you enter a website you probably start going to subpages, clicking here and there. For one website visit you can send hundreds or thousands of requests. Imagine opening and closing connection for each one of them.
Multiple persistent connections
A browser could open one connection and download every file one by one, but this way it wouldn’t utilise all the resources available. Instead of this, the browser opens a few connections to a server, to load multiple files at once. HTTP/2.0 solves this problem, by enabling the client to send multiple concurrent requests over one connection.
Connection: keep-alive
There is no information in RFC1945 (HTTP/1.0) about persistent connections nor Connection
header. It was added to the protocol later on in RFC2068 (HTTP/1.1).
You send a request to a server with a header Connection: keep-alive
, the server will keep that connection open for some time and send the same header in the response. Now the client will also keep the connection open for some time so they can exchange data on a single connection.
It was such an important optimisation that all connections are persistent by default unless specified otherwise. You don’t have to send a Connection: keep-alive
header, but if you want the connection to be closed you should send a Connection: close
.
How long should we keep a connection open?
At some point we have to close a connection. How long should we wait since the last request arrived? If we wait too long, we may end up with a bunch of useless connections. If we wait too short, clients may end up not sending multiple requests on one connection.
Let’s see what are the defaults in popular servers.
Spring Boot default
server.connection-timeout=# Time that connectors wait for another HTTP request before closing the connection. When not set, the connector’s container-specific default is used. Use a value of -1 to indicate no (that is, an infinite) timeout.
Spring Boot uses embedded tomcat by default, so we should go look there.
Tomcat default
keepAliveTimeout
The number of milliseconds this Connector will wait for another HTTP request before closing the connection. The default value is to use the value that has been set for the connectionTimeout attribute. Use a value of -1 to indicate no (i.e. infinite) timeout.
connectionTimeout
The number of milliseconds this Connector will wait, after accepting a connection, for the request URI line to be presented. Use a value of -1 to indicate no (i.e. infinite) timeout. The default value is 60000 (i.e. 60 seconds) but note that the standard server.xml that ships with Tomcat sets this to 20000 (i.e. 20 seconds). Unless disableUploadTimeout is set to false, this timeout will also be used when reading the request body (if any).
It seems that it’s 60 seconds.
Undertow default
IDLE_TIMEOUT
The amount of time a connection can be idle for before it is timed out. An idle connection is a connection that has had no data transfer in the idle timeout period. Note that this is a fairly coarse grained approach, and small values will cause problems for requests with a long processing time.
There is no information in documentation about default nor I can find one in the code. This ticket suggests that it’s 10 minutes.
Apache
Description: Amount of time the server will wait for subsequent requests on a persistent connection
Syntax: KeepAliveTimeout num[ms]
Default: KeepAliveTimeout 5
The right value
There isn’t one. As always, pick a value you feel is right, measure it, tune it if needed.
Implementation
Firstly, we need our connections to be managed by the server in one place. We need Connection
class that will abstract away socket and will trace last time request was sent.
class Connection(private val socket: Socket) {
var lastActivityTime: Long = System.currentTimeMillis()
private set
fun markAsAlive() {
lastActivityTime = System.currentTimeMillis()
}
...
fun getInputStream(): InputStream = socket.getInputStream()
?: throw ConnectionException("Could not obtain stream. Is socket closed?")
fun getOutputStream(): OutputStream = socket.getOutputStream()
?: throw ConnectionException("Could not obtain stream. Is socket closed?")
}
We will relay on server class to call markAsAlive()
on every request. It’s not ideal. We could create decorator over InputStream
and mark the connection as alive for every byte, but I feel that this would create performance overhead. I’d also rather measure the time between requests, not between bytes. This can protect us a bit from sending requests one by one, one line by one really slow.
Now we can create a connection pool
class ConnectionPool(
val keepAliveTimeout: Duration,
val checkKeepAliveInterval: Duration
): Closeable {
private val logger = LoggerFactory.getLogger(ConnectionPool::class.java)
private val connections = ConcurrentLinkedQueue<Connection>() // 1
private val checkerThreadPool = Executors.newSingleThreadScheduledExecutor(
IncrementingThreadFactory("connection-pool-checker"))
private val connectionsEstablished = LongAdder()
init {
scheduleClosingStaleConnections()
}
private fun scheduleClosingStaleConnections() { // 2
checkerThreadPool.scheduleAtFixedRate({
try {
closeStaleConnections()
} catch (e: Exception) {
logger.error("Error while closing stale connections", e)
}
}, checkKeepAliveInterval.toMillis(), checkKeepAliveInterval.toMillis(), TimeUnit.MILLISECONDS)
}
private fun closeStaleConnections() {
connections.filter(this::isConnectionInactive)
.forEach {
logger.debug("Closing connection to ${it.destination()} due to not being active")
it.close()
}
connections.removeIf(Connection::isClosed)
}
private fun isConnectionInactive(connection: Connection): Boolean =
Duration.ofMillis(System.currentTimeMillis() - connection.lastActivityTime) > keepAliveTimeout
fun addConnection(connection: Connection) {
connectionsEstablished.increment()
connections += connection
}
override fun close() {
checkerThreadPool.shutdown()
checkerThreadPool.awaitTermination(1, TimeUnit.SECONDS)
}
fun connectionsEstablished() = connectionsEstablished.sum()
}
We keep our connections in a concurrent queue (1). We schedule task in the background that will check whether a connection is inactive and close it if needed (2). keepAliveTimeout
is the time after which server will treat the connection as stale, counting after the last request. checkKeepAliveInterval
is how often the check task is executed.
We can now use it in our Server
by changing acceptConnection()
and handleConnection()
methods
private fun acceptConnection() {
val connection = Connection(serverSocket.accept()) // 3
logger.debug("Accepted new connection ${connection.destination()}")
handlerThreadPool.submit {
connectionPool.addConnection(connection) // 4
try {
handleConnection(connection) // 5
} catch (e: Exception) {
when {
e is SocketException && connection.isClosed() -> logger.trace("Connection was closed", e)
else -> logger.warn("Could not handle connection", e)
}
}
}
}
private fun handleConnection(connection: Connection) {
val input = connection.getInputStream().bufferedReader()
val output = PrintWriter(connection.getOutputStream())
while (!connection.isClosed()) {
var request: Request? = null
val response = try {
request = Request.fromRaw(input) // 6
connection.markAsAlive()
handler(request)
} catch (e: EmptyRequestException) { // 7, Find a better way to detect close connections
logger.trace("Connection was closed by client")
connection.close()
break
} catch (e: RequestParseException) {
logger.warn("Could not parse a request", e)
Response(status = Status.BAD_REQUEST)
}
output.print(response.toRaw())
output.flush()
if (request?.headers?.connection == "close") { // 8
connection.close()
}
}
}
We create a new connection from accepted socket (3), add it to the connection pool (4) and then handle the connection (5). Last two steps are run in handlerThreadPool
so our server can immediately accept other connections.
The main changes are in handleConnection
method. Previously, we used socket.use { }
block in which we read a request and sent a response. At the end of this block, the connection was closed. We can’t do that now because we want to keep the connection opened. Instead, we should keep reading requests and sending responses while the connection is open.
After a first request from a connection, our server will go through second iteration of the loop and will block on Request.fromRaw(input)
, (6) which will block on input.readLine()
of BufferedReader
, which internally will block on a socket read()
. This loop ends if either server or client closes the connection (7) or will send Connection: close
header (8).
Detecting connections closed by client
Detecting closed connections by our server is easy, we can check socket.isClosed
. The tricky part for me was to check if a connection was closed by the client. When the client closes a connection, it sends FIN,ACK
segment and we send the ACK
back. At first I thought that we can just check socket.isClosed
too, but I was wrong - this flag only checks if we closed the socket.
It seems that there is no information about FIN,ACK
event from the client in Socket API.
The only indication of it is to try to either use read()
or readLine()
on BufferedReader
.
With that in mind, it is hard to do the following logic - block until new data arrives, then parse the request, if there is no data, close a connection. We cannot use readLine()
before Request.fromRaw(input)
because it will consume one line of our BufferedReader
. We could change signature fromRaw
to accept the first line as String
and the rest of a stream as BufferedReader
but that’s just ugly.
That’s why for now we can keep things as they are. The beginning of our parsing logic protects us from empty requests anyway.
val requestLine = input.readLine() ?: throw EmptyRequestException()
If readLine()
returns null, we can just catch it (7) and close the connection. It might not be the best and it may smell like using exceptions for control flow but it works.
We will try to tackle this problem in the future by using some sort of “one line/character caching BufferedReader”.
Tests
The main happy path to test is to check whether we can make several requests on one connection.
private val client = HttpUrlConnectionClient(connectionTimeout = 50, socketTimeout = 100)
@Test
fun `should reuse connection to make requests`() {
// given
server = Server(handler = {
Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
})
server.start(8090)
// when
repeat(times = 3) {
client.exchange(url = "http://localhost:8090", request = Request(path = "/", method = RequestMethod.GET))
}
// then
assertThat(server.connectionsEstablished()).isEqualTo(1)
}
To test that we have to expose connectionsEstablished()
method that returns the count of connections established to the server. This most likely will be transformed in some kind of ServerStats
object, because we will want to have metrics, but let’s keep it simple for now.
Importance of content-length header
When we switched to the persistent connection model, it is really important to send a content-length
header even if we don’t send body! Previously we would send a response and close the connection. Even if a client read body and didn’t know that it ended, it didn’t block waiting for more content for a long time, because we immediately closed connection on the server side. Now we have to explicitly say how long the body is because we don’t close the connection immediately.
Server just checks content-length
header and skips reading body if there isn’t any. However, this is not a full HTTP implementation. There is also chunked encoding in which we send body length in the body itself. We will implement it in the future.
Back to the test! We create 3 sequential requests and the test eventually passes, but…
Persistent connections of HttpURLConnection
I had so many problems with persistent connections of the HttpURLConnection
! It is said in docs that it supports persistent connections out of the box, but it works in such non-obvious way! For example, connection.disconnect()
does not close the connection. I couldn’t find a way to close persistent connections between tests which messed up my tests. Eventually, I got tired of fighting with HttpURLConnection
and made a fallback to well know, excellent Apache HTTP Client.
I created an implementation for my Client
interface. I won’t bother to get into details, you can check it out here.
I also tested whether old connections are closed by connection pool checker
@Test
fun `should create new connection when old is closed by keep alive checker`() {
// given
server = Server(
port = 8090,
keepAliveTimeout = Duration.ofMillis(500),
checkKeepAliveInterval = Duration.ofMillis(100),
handler = {
Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
})
server.start(8090)
// when
client.exchange(url = "http://localhost:8090", request = Request(path = "/", method = RequestMethod.GET))
// then
assertThat(server.connectionsEstablished()).isEqualTo(1)
// when
Thread.sleep(server.keepAliveTimeout.toMillis() + server.checkKeepAliveInterval.toMillis())
client.exchange(url = "http://localhost:8090", request = Request(path = "/", method = RequestMethod.GET))
// then
assertThat(server.connectionsEstablished()).isEqualTo(2)
}
And if a connection is dropped when Connection: close
header is sent
@Test
fun `should not reuse connections when Connection close is send`() {
// given
server = Server(port = 8090, handler = {
Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
})
// when
repeat(times = 3) {
client.exchange(
url = "http://localhost:8090",
request = Request(
path = "/",
method = RequestMethod.GET,
headers = HttpHeaders("connection" to "close")))
}
// then
assertThat(server.connectionsEstablished()).isEqualTo(3)
}
Summary
We managed to add persistent connections to our server! We keep a connection open so sending sequentially multiple requests is more optimal. Next up, we will build our HTTP client that will use connection pool as well.
Code is available here.
Other articles of the series
This article is a part of the HTTP series
- The HTTP part 6 - HTTP Client
- The HTTP part 5 - Persistent connections
- The HTTP part 4 - Multithreading
- The HTTP part 3 - Parsing request & response
- The HTTP part 2 - Connections Queue
- The HTTP part 1 - Let's start small