The problem

So far we managed to build a server that is single threaded. It means that processing requests are managed by one thread. When a response is prepared, all other incoming requests are queued. This leads to several problems.

Only one core is utilised

It’s not uncommon to run a server on a machine with 4, 8 or more cores. With a single-threaded model, only one of those core is utilised.

Slow response

What if we’ve got an app that reads something from a database on a request, but the database is having problems? Now we are waiting for the database to return records while other users send more request to our app. They have to wait for a first request to be finished.

Slow client

What if everything is fine with our system, but a client that connects to our server is slow? Our server is waiting for the client to send the request and cannot simultaneously read requests from other clients. What’s more, we did not set any timeouts which means that one client that connects to our server that forgot to sent a request essentially kills our server.

Multithreading

We can partially solve some of the problems by making our server multithreaded. The good news is that it is really easy to do so.

class Server(
        val handler: Handler,
        val port: Int,
        val maxIncomingConnections: Int = 10,
        val maxWorkerThreads: Int = 50
): AutoCloseable {

    private val logger = LoggerFactory.getLogger(Server::class.java)

    private val serverSocket = ServerSocket(port, maxIncomingConnections)
    private val handlerThreadPool = Executors.newFixedThreadPool(maxWorkerThreads) // 1
    private val acceptingConnectionsThreadPool = Executors.newSingleThreadExecutor()

    init {
        logger.info("Starting server on port $port")
        acceptingConnectionsThreadPool.submit(this::acceptConnections)
    }

    private fun acceptConnections() {
        while (!serverSocket.isClosed) {
            try {
                acceptConnection()
            } catch (e: Exception) {
                when {
                    e is InterruptedException -> throw e
                    e is SocketException && serverSocket.isClosed -> logger.trace("Socket was closed", e)
                    else -> logger.error("Could not accept new connection", e)
                }
            }
        }
    }

    private fun acceptConnection() {
        val socket = serverSocket.accept() // 2
        handlerThreadPool.submit { // 3
            try {
                handleConnection(socket)
            } catch (e: Exception) {
                logger.warn("Could not handle connection", e)
            }
        }
    }

    private fun handleConnection(socket: Socket) {
        socket.use {
            val input = it.getInputStream().bufferedReader()
            val output = PrintWriter(it.getOutputStream())

            val response = try {
                val request = Request.fromRaw(input)
                handler(request)
            } catch (e: RequestParseException) {
                logger.warn("Could not parse a request", e)
                Response(status = Status.BAD_REQUEST)
            }
            output.print(response.toRaw())
            output.flush()
        }
    }

    override fun close() {
        logger.info("Stopping server")
        serverSocket.close()
        handlerThreadPool.shutdown()
        handlerThreadPool.awaitTermination(1, TimeUnit.SECONDS)
        acceptingConnectionsThreadPool.shutdown()
        acceptingConnectionsThreadPool.awaitTermination(1, TimeUnit.SECONDS)
    }
}

We added a thread pool (1) of max worker threads of 20. Now our server blocks on the acceptingConnectionsThreadPool to listen for a connection (2). When we’ve got the connection we immediately delegate reading a request, handling it and writing a response to another thread (3) from the handlerThreadPool. Now, when our server blocks on a slow database, client or anything else, it actually blocks a thread from handlerThreadPool, while we still can accept connections in acceptingConnectionsThreadPool.

Testing!

Let’s test if that works.

@Test
fun `should process multiple requests at once`() {
    // given
    val server = Server(port = 8090, handler = {
        Thread.sleep(200)
        Response(Status.OK)
    })

    // when
    val completedRequests = CountDownLatch(3)
    val clientPool = Executors.newFixedThreadPool(3)
    repeat(3) {
        clientPool.submit {
            try {
                val response = HttpUrlConnectionClient(connectionTimeout = 100, socketTimeout = 300)
                        .exchange(url = "http://localhost:8090", request = Request(path = "/", method = RequestMethod.GET))
                if (response.status == Status.OK) {
                    completedRequests.countDown()
                }
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }
    }

    // then
    // if server was single threaded then it would take at least 600ms (3 * sleep of 200ms)
    completedRequests.await(500, TimeUnit.MILLISECONDS)
    clientPool.shutdownNow()

    // cleanup
    server.close()
}

We slow down providing a response by 200ms and make 3 requests at once. Then, we check if all those requests are completed by 500ms (which is less than 3 requests of 200ms one by one). We run it and it passes.

We are under heavy load!

Ok, so let’s say our database, microservice or any external resource is lagging and now it takes a lot of time to respond. We are trying to respond to 20 clients at once, we’ve got a pool of 20 threads, so what happens to the next client that makes a request? The maxIncomingConnections is almost irrelevant now because we’re immediately accepting new connections, so the queue there should be empty most of the time. The new handler task is being submitted to the thread pool and it is queued in its internal queue. Where is this queue?

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.
...
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

We can see that Executors#newFixedThreadPool factory method creates a thread pool with unbounded LinkedBlockingQueue. Normally I would strongly suggest limiting the size of the queue because we can easily hit OutOfMemoryError filling queue with tasks, but in this case, we will hit another limit first.

File descriptors

Imagine that database is now taking forever to return query, our requests handling tasks are piling up on a queue. Keep in mind that before we submit a task to handlerThreadPool we accept a new connection. Eventually, what will happen is that we will hit a file descriptors limit! As you may remember from Part 2, every open connection creates a new file descriptor. To see that we setup Vagrant VM changing it open files limit to 500

sudo bash -c "echo '* - nofile 500' >> /etc/security/limits.conf"

Then we can run wrk - a HTTP benchmarking tool from host machine that opens 1000 concurrent connections

wrk -c1000 -d10s -t1000 http://localhost:8091

And there we go, we’ve got an exception

Cvagrant@vagrant-ubuntu-trusty-64:~$ java -jar /project/naphi-0.1-SNAPSHOT.jar
Exception in thread "main" java.net.SocketException: Too many open files (Accept failed)
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.naphi.Server.start(Server.kt:79)
	at org.naphi.ServerKt.main(Server.kt:119)

The default limit on Ubuntu Vagrant was 4096 on Java process. You can easily bump it up, but keep in mind that every resource is finite. Every open socket cost us RAM, although as I mentioned in previous posts, you can bump this limit to even millions of file descriptors with proper tuning and large enough machine.

Can we be more graceful?

When we hit the file descriptor limit, our server either drops the connection or send RST packet. A client will most likely retry connection either by custom logic on RST or built-in TCP retries on dropped connection. Instead, we can return 503 Service Unavailable status code. What is interesting is that you can attach a Retry-After header which will suggest when the client should try again. If the client respects that, then you can avoid unnecessary requests.

Number of worker threads

Let’s say that database is taking 5 seconds to return a query. We’ve got 20 worker threads and in every thread, we are blocking for those 5 seconds. Essentially we can now respond with a maximum of 20 requests per 5 seconds = 4 requests/s. Additionally, let’s assume that we can spawn other connections to the database and we won’t kill it by doing so (not a real-world scenario). Theoretically, if we increase our pool to 200 workers we should get 40 requests/s.

How far can we go?

After changing maxWorkerThreads to 4000 I eventually got OutOfMemoryException. Every new thread costs us RAM, so be careful with that. Besides that, there is a limit on system processes in Linux. It’s 32 768 by default on my Vagrant machine.

vagrant@vagrant-ubuntu-trusty-64:~$ cat /proc/sys/kernel/pid_max
32768

But also more isn’t better. If the system is back to normal we end up with lots of redundant threads eating memory. The pool can scale down killing threads, but more working threads (actually working, not waiting for socket) means more context switching. Moreover, you have to be prepared to spin up thousands of threads, so you have to reserve machine with more RAM, even if for example only 20% of it is used for 90% of the time.

What is the right value?

Measure it. Start with some value that you feel will be ok. Then measure your thread pool utilisation (active threads / all threads) and size of the queue then check if you need to go lower or higher.

Name your thread pools

There is one thing we didn’t do. We didn’t name our thread pool! It is really helpful later on when you want to read logs or thread dump.

Unnamed threads Named threads
unnamed threads named threads

To name our thread pool we have to pass ThreadFactory instance. If you’ve got Guava on your classpath, use ThreadFactoryBuilder, which provides a nice API to create it. We don’t so we have to do it by ourselves.

private class IncrementingThreadFactory(val prefix: String): ThreadFactory {
    val adder = AtomicInteger()
    override fun newThread(r: Runnable) = Thread(r).also { it.name = "$prefix-${adder.getAndIncrement()}" }
}

And pass it to Executors.newFixedThreadPool like this

private val handlerThreadPool = Executors.newFixedThreadPool(maxWorkerThreads,
        IncrementingThreadFactory("server-handler"))

private val acceptingConnectionsThreadPool = Executors.newSingleThreadExecutor(
        IncrementingThreadFactory("server-connections-acceptor"))

Summary

We managed to change our server to the multithread version. We can now utilise all cores! A slow client finally won’t make others wait for a response (although multiple slow clients will). Next up - we will create a connection pool.

Other articles of the series

This article is a part of the HTTP series