Previously we’ve built multithreaded server with a connection pooling. We’ve tested it using Apache HTTP Client. Since we’ve learned how pooling works, it’s a good time to build our own HTTP client with the connection pooling!

The requirements

We need to meet the following requirements:

  • Connections are pooled

We will reuse a connection for multiple requests. The time a connection is kept open without any activity is configured by the keep alive timeout.

  • Maximum connections per destination

There should be a limit of opened connections per destination, which is a pair of host and port. To explain this, let’s say we set it to 2. We send a request that creates the first connection. If we send another one before first is completed then the new connection is created. If 2 requests are pending and we send another one, then we have to wait for one of them to finish. We wait for maximum time configured by the connection request timeout.

Now, let’s get back to the situation without any opened connection. We send a request that opens a connection. Then we send another one, but before that the previous request is completed so the connection is released back to the pool. This second request will reuse previous connection, therefore only 1 connection would be opened.

  • Connection timeout

We will wait for a connection to be established for the maximum of time configured by the connection timeout.

  • Socket timeout

After establishing the connection we will wait for new data to arrive for maximum time configured by the socket timeout.

The implementation

This is the interface we will be coding to

interface Client: AutoCloseable {
    fun exchange(url: String, request: Request): Response
    override fun close() {
    }
}

SocketClient

First, let’s look at SocketClient

class SocketClient(
        val keepAliveTimeout: Duration = Duration.ofSeconds(30),
        val checkKeepAliveInterval: Duration = Duration.ofSeconds(1), // (1)
        val maxConnectionsToDestination: Int = 10,
        val connectionTimeout: Duration = Duration.ofMillis(500),
        val socketTimeout: Duration = Duration.ofMillis(200),
        val connectionRequestTimeout: Duration = Duration.ofSeconds(1)
) : Client {

    private val connectionPool = ClientConnectionPool(keepAliveTimeout, checkKeepAliveInterval,
            maxConnectionsToDestination, connectionTimeout, socketTimeout, connectionRequestTimeout)

    companion object {
        const val DEFAULT_HTTP_PORT = 80
        const val SUPPORTED_PROTOCOL = "http"

        private val logger = LoggerFactory.getLogger(SocketClient::class.java)
    }

    init {
        connectionPool.start()
    }

    override fun exchange(url: String, request: Request): Response {
        val parsedUrl = URL(url)
        if (parsedUrl.protocol != SUPPORTED_PROTOCOL) {
            throw SocketClientException("${parsedUrl.protocol} is not supported. " +
                    "Only $SUPPORTED_PROTOCOL is supported")
        }

        val connection = connectionPool.retrieveConnection(ConnectionDestination( // (2)
                host = parsedUrl.host,
                port = if (parsedUrl.port == -1) DEFAULT_HTTP_PORT else parsedUrl.port))
        try {
            return exchange(connection, request)
        } catch (e: Exception) {
            connection.close() // (4)
            throw SocketClientException("Error while exchanging data", e)
        } finally {
            connectionPool.releaseConnection(connection) // (5)
        }
    }

    private fun exchange(connection: Connection, request: Request): Response { // (3)
        val input = connection.getInputStream().bufferedReader()
        val output = PrintWriter(connection.getOutputStream())

        val requestRaw = request.toRaw()
        output.print(requestRaw)
        output.flush()

        return Response.fromRaw(input)
    }

    override fun close() {
        connectionPool.close()
    }

    fun stats() = SocketClientStats(connectionPool.stats())
}

open class SocketClientException(msg: String, throwable: Throwable? = null): RuntimeException(msg, throwable)
data class SocketClientStats(val poolStats: ConnectionClientPoolStats)

We pass a bunch of arguments that were explained above. The checkKeepAliveInterval (1) is how often the checker for cleaning up the old connections runs. We pass it to the connection pool that we start on class initialization.

In the exchange() function, we retrieve (lease or create) a connection from the pool (2). Then we do I/O operations converting data between strings and our structures (3). If we catch an exception, we have to close the connection (4). It probably means that there was a violation of the HTTP protocol, therefore we cannot continue sending more data on this connection. Whether we succeed or not, we should release the connection back to the pool (5).

ConnectionPool

Here comes the difficult part. At first I thought we could reuse the pool from the server, but the client’s pool is much more complicated. It supports connection leasing, limits connection per destination and checks connection inactivity in a different way.

Let’s break it down one by one.

Connection

Just like in a server pool we need a wrapper over the socket that will track the activity on this socket.

internal data class ConnectionDestination(val host: String, val port: Int) {
    override fun toString(): String = "$host:$port"
    fun toInetSocketAddress() = InetSocketAddress(host, port)
}

internal class Connection(
        private val socket: Socket,
        val destination: ConnectionDestination,
        keepAliveTimeout: Duration,
        checkKeepAliveInterval: Duration
) {

    companion object {
        private val logger = LoggerFactory.getLogger(Connection::class.java)
    }

    /**
     * Counter that is incremented on periodic inactivity check. It is reset on new connection activity
     */
    private var checks: Int = 0 // (1)

    /**
     * If the connection is checked for inactivity for inactiveChecksThreshold times and there was no activity
     * it is assumed that connection is no longer active. It is an optimization so we don't have to call time()
     * for every byte on socket.
     */
    // (3)
    private val inactiveChecksThreshold = (keepAliveTimeout.toMillis() / checkKeepAliveInterval.toMillis()).toInt()

    internal fun isInactive(): Boolean = checks++ > inactiveChecksThreshold // (2)

    fun close() {
        try {
            socket.close()
        } catch (e: Exception) {
            logger.warn("Could not close the connection", e)
        }
    }

    fun isClosed(): Boolean = socket.isClosed

    fun getInputStream(): InputStream = socket.getInputStream()
            ?.let(::ActivityTrackingInputStream)
            ?: throw ConnectionException("Could not obtain stream. Is socket closed?")

    fun getOutputStream(): OutputStream = socket.getOutputStream()
            ?.let(::ActivityTrackingOutputStream)
            ?: throw ConnectionException("Could not obtain stream. Is socket closed?")

    private inner class ActivityTrackingInputStream(val inputStream: InputStream): InputStream() {

        override fun read(): Int = inputStream.read().also { checks = 0 }
        override fun read(b: ByteArray?): Int = inputStream.read(b).also { checks = 0 }
        override fun read(b: ByteArray?, off: Int, len: Int): Int = inputStream.read(b, off, len).also { checks = 0 }
        override fun skip(n: Long): Long = inputStream.skip(n)
        override fun available(): Int = inputStream.available()
        override fun reset() = inputStream.reset()
        override fun close() = inputStream.close()
        override fun mark(readlimit: Int) = inputStream.mark(readlimit)
        override fun markSupported(): Boolean = inputStream.markSupported()
    }

    private inner class ActivityTrackingOutputStream(val outputStream: OutputStream): OutputStream() {

        override fun write(b: Int) = outputStream.write(b).also { checks = 0 }
        override fun write(b: ByteArray?) = outputStream.write(b).also { checks = 0 }
        override fun write(b: ByteArray?, off: Int, len: Int) = outputStream.write(b, off, len).also { checks = 0 }
        override fun flush() = outputStream.flush()
        override fun close() = outputStream.close()
    }
}

open class ConnectionException(msg: String, throwable: Throwable? = null): RuntimeException(msg, throwable)

How can we tell whether the connection is active or not? It would be so easy to save the timestamp of the last activity and compare it to keepAliveTimeout, but calling System.nanoTime() on every byte send/received would create an overhead. Instead, we create a counter (1) that is incremented on every inactivity check (2). We know how often this method will be called because of the checkKeepAliveInternal argument, so we can compute (3) after how many checks the connection is no longer active. If there is any activity, then ActivityTrackingInputStream and ActivityTrackingOutputStream decorators will reset the counter.

This solution is not ideal, because we have to trust the client of this class to run the isInactive() method with the checkKeepAliveInterval interval. On the other hand, the Connection class won’t be visible to the end user of our SocketClient so I think we can live with that.

Connections

Now we need a container from which we can lease connections.

private class Connections {

    companion object {
        private val logger = LoggerFactory.getLogger(Connections::class.java)
    }

    private val leased = ConcurrentHashMap<ConnectionDestination, BlockingDeque<Connection>>()
    private val available = ConcurrentHashMap<ConnectionDestination, BlockingDeque<Connection>>()

    fun addLeased(connection: Connection) { // (2)
        leased(connection.destination) += connection
    }

    fun lease(destination: ConnectionDestination): Connection? { // (1)
        // There can be already closed connection, but we want to return an active one
        do {
            val connection = available(destination).poll()
            if (connection == null) {
                return null
            } else if (!connection.isClosed()) {
                addLeased(connection)
                return connection
            }
        } while(true)
    }

    fun release(connection: Connection) { // (3)
        leased(connection.destination).remove(connection)
        available(connection.destination) += connection
    }

    fun closeStale() { // (4)
        closeStale(leased)
        closeStale(available)
    }

    private fun leased(destination: ConnectionDestination) =
            leased.computeIfAbsent(destination) { LinkedBlockingDeque() }

    private fun available(destination: ConnectionDestination) =
            available.computeIfAbsent(destination) { LinkedBlockingDeque() }

    private fun closeStale(
            connectionsForDestination: ConcurrentHashMap<ConnectionDestination, BlockingDeque<Connection>>) {
        connectionsForDestination.forEach { _, connections ->
            val staleConnections = connections.filter(Connection::isInactive)
            connections.removeAll(staleConnections)
            staleConnections.forEach {
                logger.debug("Closing connection to ${it.destination} due to not being active")
                it.close()
            }
        }
    }

}

We keep separate queues of connections for leased and available connections. We can lease a connection (1), but we have to be careful not to return a closed connection. We can add already leased connection (2). This will be useful when we create a connection and return it immediately to the client. We can release connection (3) so it is placed back into the leased queue. There is also a method for closing stale connections (4), it seeks for inactive connections, closes them and removes from the queues.

ConnectionPool

Now we can put it all together.

internal class ClientConnectionPool(
        private val keepAliveTimeout: Duration,
        private val checkKeepAliveInterval: Duration,
        private val maxConnectionsPerDestination: Int,
        private val connectionTimeout: Duration,
        private val socketTimeout: Duration,
        private val connectionRequestTimeout: Duration
): Closeable {

    companion object {
        private val logger = LoggerFactory.getLogger(ClientConnectionPool::class.java)
    }

    private val connections = Connections()
    private val checkerThreadPool = Executors.newSingleThreadScheduledExecutor(
            IncrementingThreadFactory("connection-pool-checker"))
    private val retrievingSemaphores = ConcurrentHashMap<ConnectionDestination, Semaphore>()
    private val connectionsEstablished = LongAdder()

    fun start() {
        scheduleClosingStaleConnections()
    }

    /**
     * You have to manually call `releaseConnection()` when you are done with using the connection
     */
    fun retrieveConnection(destination: ConnectionDestination): Connection {
        acquirePermit(destination) // (1)
        return leaseConnection(destination) ?: createConnection(destination) // (3)
    }

    private fun acquirePermit(destination: ConnectionDestination) {
        logger.trace("Waiting for acquire permit to retrieve connection to $destination")
        // (2)
        if (!semaphore(destination).tryAcquire(connectionRequestTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
            throw ClientConnectionPoolException(
                    "Timeout on waiting to retrieve connection. Limit of open connections exceeded.")
        }
    }

    private fun semaphore(destination: ConnectionDestination) =
            retrievingSemaphores.computeIfAbsent(destination) { Semaphore(maxConnectionsPerDestination) }

    fun stats(): ConnectionClientPoolStats = ConnectionClientPoolStats(connectionsEstablished.sum())

    fun releaseConnection(connection: Connection) {
        logger.debug("Releasing connection to ${connection.destination}")
        connections.release(connection)
        semaphore(connection.destination).release()
    }

    private fun createConnection(destination: ConnectionDestination): Connection {
        val socket = try {
            Socket().also { // (4)
                it.soTimeout = socketTimeout.toMillis().toInt()
                it.connect(destination.toInetSocketAddress(), connectionTimeout.toMillis().toInt())
            }
        } catch (e: SocketTimeoutException) {
            throw ConnectionTimeoutException(destination)
        } catch (e: Exception) {
            throw ConnectionException("Could not connect to $destination", e)
        }
        val connection = Connection(socket, destination, keepAliveTimeout, checkKeepAliveInterval)
        logger.debug("Created connection to $destination")
        connections.addLeased(connection)
        connectionsEstablished.increment() // (5)
        return connection
    }

    private fun leaseConnection(destination: ConnectionDestination): Connection? {
        logger.trace("Leasing a connection to $destination")
        val connection = connections.lease(destination)
        when (connection) {
            null -> logger.trace("There was no connection to lease")
            else -> logger.debug("Connection to $destination leased")
        }
        return connection
    }


    private fun scheduleClosingStaleConnections() { // 6
        checkerThreadPool.scheduleAtFixedRate({
            try {
                connections.closeStale()
            } catch (e: Exception) {
                logger.error("Error while closing stale connections", e)
            }
        }, checkKeepAliveInterval.toMillis(), checkKeepAliveInterval.toMillis(), TimeUnit.MILLISECONDS)
    }

    override fun close() {
        checkerThreadPool.shutdown()
        checkerThreadPool.awaitTermination(1, TimeUnit.SECONDS)
    }

}

To retrieve a connection we have to acquire a permit (1). A permit is a semaphore that guards the pool against creating more than the limit of connections per destination. If we acquired all the permits, then the next request for a connection will wait for the max of connectionRequestTimeout duration (2). If we successfully acquired the permit it means that we can either lease a connection or create one (3). Firstly, we try to lease a connection because we’d rather reuse a connection than create a new one. If we succeeded, we return it to the user. Otherwise, we need to create a new one. While connecting, we have to set socket and connection timeouts (4). A new connection is added to the connections as leased and immediately returned to the client. We track the number of established connections for testing purposes (5).

When we start the pool, we also schedule the closing of stale connections at checkKeepAliveInterval rate (6).

Tests

Tests are similar to the suite we wrote for the multithreaded server. The main happy path to check whether client and server reuse connections.

@Test
fun `client should reuse opened connections`() {
    // given
    server = Server(port = 8090, handler = {
        Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
    })

    // when
    repeat(times = 3) {
        client.exchange(
                url = "http://localhost:8090",
                request = Request(
                        path = "/",
                        method = RequestMethod.POST,
                        headers = HttpHeaders("content-length" to "0")))
    }

    // then
    assertThat(server.connectionsEstablished()).isEqualTo(1)
    assertThat(client.stats().poolStats.connectionsEstablished).isEqualTo(1)
}

You can see the whole test suit here.

Testing the timeouts

Testing timeouts was an interesting case. Socket timeout is easy to test, we can just slow down response from a server and check if the client fails, but what about the connection timeout? How can we slow down the process of establishing a connection? We could use Toxiproxy, but we need that in the test.

What we can use is an IP address that is non-routable. There are a couple of address blocks like 10.0.0.0/8, or 192.0.0.0/24 that are meant to be used for a private network. A packet with such destination IP will be dropped by a router, so we will never hear back the answer for our SYN TCP segment. The problem is that an IP that is non-routable on my private network can be used by someone who runs the test in another network. Digging deeper, there is an address block that should be used only for documentation and should not be used in practice - it’s 192.0.2.0/24.

Let’s try that

@Test
fun `should throw exception when connection timed out`() {
    // given
    val nonRoutableIp = "192.0.2.0"

    // when & then
    assertThatThrownBy { client.exchange(
            url = "http://$nonRoutableIp:80",
            request = Request(
                    path = "/",
                    method = RequestMethod.POST,
                    headers = HttpHeaders("content-length" to "0"))) }
            .isInstanceOf(ConnectionTimeoutException::class.java)
}

It passes! Let’s check that in the Wiremock non_routable_connection Indeed, we send the SYN segment and never hear back so retransmission kicks in and eventually we time out the operation.

Testing the concurrency

I’m 100% sure there are concurrency bugs in our client. What we can test at least is the limit of maximum connections per destination. To do that, we can spin up set of servers, create a client and flood servers with requests from multiple threads.

class SocketClientMultithreadingTest {

    val requestParallelism = 100
    val requests = 1_000_000
    val sampleRate = 10
    val nServers = 10
    val servers = mutableListOf<Server>()
    val client = SocketClient(maxConnectionsToDestination = 5,
            socketTimeout = Duration.ofSeconds(10),
            connectionTimeout = Duration.ofSeconds(1),
            connectionRequestTimeout = Duration.ofSeconds(10))
    val random = Random()

    @Before
    fun setupServers() {
        repeat(nServers) {
            servers += Server(port = 8090 + it, handler = {
                Response(status = Status.OK, headers = HttpHeaders("content-length" to "0"))
            })
        }
    }

    @After
    fun cleanup() {
        val cleaning = Executors.newFixedThreadPool(nServers)
        servers.map { cleaning.submit { it.close() } }
                .map { it.get() }
        cleaning.awaitTermination(0, TimeUnit.MILLISECONDS)
        client.close()
    }

    @Test
    @Ignore("This is HEAVY test")
    fun `should allow to open max of 5 connections to a destination`() {
        val threadPool = Executors.newFixedThreadPool(requestParallelism)
        val requestsDone = CountDownLatch(requests)
        val errors = mutableListOf<String>()

        repeat(requests) {
            threadPool.submit {
                val chosenServer = random.nextInt(nServers)
                val response = try {
                    client.exchange(
                            url = "http://localhost:${8090 + chosenServer}",
                            request = Request(
                                    path = "/",
                                    method = RequestMethod.POST,
                                    headers = HttpHeaders("content-length" to "0")))
                } catch (e: Exception) {
                    e.printStackTrace()
                    throw e
                }
                if (response.status != Status.OK) {
                    errors += "Response was $response"
                }
                requestsDone.countDown()
            }
        }
        requestsDone.await(1, TimeUnit.MINUTES)
        assertThat(errors).isEmpty()
        assertThat(requestsDone.count).isEqualTo(0)
        assertThat(client.stats().poolStats.connectionsEstablished.toInt())
                .isLessThanOrEqualTo(nServers * client.maxConnectionsToDestination)
    }
}

Sending and receiving a milion requests takes about 20s on my laptop, but most importantly we did not exceeded maxConnectionsToDestination limit. This is also a smoke test wheter we even can use SocketClient from the multiple threads. We will benchmark and profile our server and client in the future.

Summary

We built an HTTP client meeting all the requirements!

The next step is to make our server and client more user-friendly. We will build a simple application that will use the server and the brand new client. We will look at things like routing, filters and exception handling.

Code is available here.

Other articles of the series

This article is a part of the HTTP series