Basic HTTP proxy in Clojure

Goal

The goal is to implement a reverse proxy similar to Nginx. I’m not sure what I’ll cover and I’ll decide as I go.

Origin server

A reverse proxy needs to have one or more origin servers to proxy from/to. For this I’m doing a simple HTML with two static resources:

Directory structure
.
├── cute_cat.jpg
├── index.html
└── styles.css

index.html
<html>
<head>
<title>Origin Server</title>
<link rel="stylesheet" type="text/css" href="styles.css" />
</head>
<body>
<h1>Origin Server</h1>
<p>Home of this cute cat.</p>
<img src="cute_cat.jpg" alt="Cute cat" />
</body>
</html>

styles.css
html {
text-align: center;
}
img {
border-radius: 20px;
}

I’m using npx server origin-server/ to serve these files. By default it servers them to localhost:3000.

With all my UX brilliance this is the result:

Response

When accessing the page the server also logs the requests to it:

HTTP 7/20/2024 10:53:08 AM 127.0.0.1 GET /
HTTP 7/20/2024 10:53:08 AM 127.0.0.1 Returned 304 in 51 ms
HTTP 7/20/2024 10:53:08 AM 127.0.0.1 GET /styles.css
HTTP 7/20/2024 10:53:08 AM 127.0.0.1 Returned 304 in 2 ms
HTTP 7/20/2024 10:53:08 AM 127.0.0.1 GET /cute_cat.jpg
HTTP 7/20/2024 10:53:08 AM 127.0.0.1 Returned 304 in 2 ms

Getting started

First I’m going to define a variable for my upstream server:

(ns core)
(def upstream-server {:hostname "localhost" :port 3000})

We can re-use what was done on Basic HTTP server in Clojure.

(ns core
(:require [clojure.string :as str])
(:import (java.net ServerSocket)
(java.io BufferedReader InputStreamReader PrintWriter)))
(def origin-server {:hostname "localhost" :port 3000})
(defn- handle-input
[{:keys [in out]}]
(loop [request {}
state :idle]
(condp = state
:idle
(if-let [line (.readLine in)]
(let [[method path protocol] (str/split line #"\s+")]
(recur (assoc request
:method (-> method str/lower-case keyword)
:path path
:protocol protocol)
:headers))
(println "Client disconnected"))
:headers
(if-let [line (.readLine in)]
(if (empty? line)
(recur request :body)
(let [[key value] (str/split line #": ")]
(recur (assoc-in request [:headers key] value) :headers)))
(recur request :body))
:body
(let [content-length (get-in request [:headers "Content-Length"])]
(if-not content-length
(recur request :done)
(let [size (parse-long content-length)
body (char-array size)]
(doseq [i (range size)]
(aset body i (char (.read in))))
(recur (assoc request :body (String. body)) :done))))
:done
(do
(.println out "HTTP/1.1 200 OK\r\n\r\n")
(.println out (str request))
(println "Done parsing the request")))))
(defn- client-handler
[client-socket]
(future
(try
(with-open [in (BufferedReader. (InputStreamReader. (.getInputStream client-socket)))
out (PrintWriter. (.getOutputStream client-socket) true)]
(handle-input {:in in
:out out}))
(catch Exception e
(println "Client error:" (.getMessage e)))
(finally
(.close client-socket)
(println "Client disconnected")))))
(defn create-tcp-server
[{:keys [port]}]
(let [state (atom {:server-socket nil
:clients []})
start-server (fn []
(when (nil? (:server-socket @state))
(reset! state {:server-socket (ServerSocket. port)
:clients []})
(println "Server started on port" port)
(future
(try
(loop []
(let [client-socket (.accept (:server-socket @state))]
(swap! state update :clients conj client-socket)
(client-handler client-socket)
(swap! state update :clients #(remove #{client-socket} %))
(recur)))
(catch Exception e
(println "Server error:" (.getMessage e)))
(finally
(when-not (.isClosed (:server-socket @state))
(.close (:server-socket @state))
(println "Server stopped by error")))))))
stop-server (fn []
(when-let [server-socket (:server-socket @state)]
(doseq [client-socket (:clients @state)]
(.close client-socket)
(println "Client disconnected by server"))
(.close server-socket)
(println "Server stopped by user")
(reset! state {:server-socket nil
:clients []})))]
{:start start-server
:stop stop-server
:state state}))

But remove this block:

(defn- handle-input
[{:keys [in out]}]
(loop [request {}
state :idle]
(condp = state
:idle
(if-let [line (.readLine in)]
(let [[method path protocol] (str/split line #"\s+")]
(recur (assoc request
:method (-> method str/lower-case keyword)
:path path
:protocol protocol)
:headers))
(println "Client disconnected"))
:headers
(if-let [line (.readLine in)]
(if (empty? line)
(recur request :body)
(let [[key value] (str/split line #": ")]
(recur (assoc-in request [:headers key] value) :headers)))
(recur request :body))
:body
(let [content-length (get-in request [:headers "Content-Length"])]
(if-not content-length
(recur request :done)
(let [size (parse-long content-length)
body (char-array size)]
(doseq [i (range size)]
(aset body i (char (.read in))))
(recur (assoc request :body (String. body)) :done))))
:done
(do
(.println out "HTTP/1.1 200 OK\r\n\r\n")
(.println out (str request))
(println "Done parsing the request")))))
(defn- client-handler
[client-socket]
(future
(try
(with-open [in (BufferedReader. (InputStreamReader. (.getInputStream client-socket)))
out (PrintWriter. (.getOutputStream client-socket) true)]
(handle-input {:in in
:out out}))
(catch Exception e
(println "Client error:" (.getMessage e)))
(finally
(.close client-socket)
(println "Client disconnected")))))

Then replace client-handler with reading:

  • Read socket’s input
  • Pipe that to the upstream server’s output
  • Read the upstream server’s input
  • Pipe that to socket’s output
(defn- client-handler
[client-socket]
(future
(try
(with-open [in (DataInputStream. (.getInputStream client-socket))
out (DataOutputStream. (.getOutputStream client-socket))]
(let [input-bytes (byte-array 4096)
res (.read in input-bytes)]
(case res
-1 (println "End of stream reached")
0 (println "No data received")
(do
(println (format "-> * %dB" res))
(let [return-bytes (byte-array 4096)
upstream-res (atom nil)
upstream-socket (atom nil)]
(try
(reset! upstream-socket (Socket. (:hostname upstream-server) (:port upstream-server)))
(let [upstream-in (DataInputStream. (.getInputStream @upstream-socket))
upstream-out (DataOutputStream. (.getOutputStream @upstream-socket))]
(println (format "Connected to %s:%d" (-> @upstream-socket .getInetAddress .getHostAddress) (.getPort @upstream-socket)))
(.write upstream-out input-bytes 0 res)
(println (format " * -> %dB" res))
(reset! upstream-res (.read upstream-in return-bytes))
(case @upstream-res
-1 (println "End of upstream stream reached")
0 (println "No data from upstream received")
(println (format " * <- %dB" @upstream-res))))
(catch Exception e
(println "Upstream error:" e))
(finally
(.close @upstream-socket)
(println (format "Disconnected from %s:%d" (-> @upstream-socket .getInetAddress .getHostAddress) (.getPort @upstream-socket)))))
(.write out return-bytes 0 @upstream-res)
(println (format "<- * %dB" @upstream-res)))))))
(catch Exception e
(println "Client error:" (.getMessage e)))
(finally
(.close client-socket)
(println "Client disconnected")))))

DataInputStream and DataOutputStream are the Java classes for handling binary data with IO streams.

Can't the input be more than 4096 bytes long?

Well, it can! But we’re not dealing with that for now.

When doing

Terminal window
curl localhost:8080

we get back this response:

<html>
<head>
<title>Origin Server</title>
<link rel="stylesheet" type="text/css" href="styles.css" />
</head>
<body>
<h1>Origin Server</h1>
<p>Home of this cute cat.</p>
<img src="cute_cat.jpg" alt="Cute cat" />
</body>
</html>

and the server shows these logs:

New connection from 0:0:0:0:0:0:0:1:8080
-> * 77B
Connected to 127.0.0.1:3000
* -> 77B
* <- 561B
Disconnected from 127.0.0.1:3000
<- * 561B
Client disconnected

When opening the browser we don’t see the image!

Ha! It turns out you need to handle more than 4096 bytes!

Indeed. But I’m only going to do that for the connection between the proxy and the upstream server.

(defn- client-handler
[client-socket]
(future
(try
(with-open [in (DataInputStream. (.getInputStream client-socket))
out (DataOutputStream. (.getOutputStream client-socket))]
(let [input-bytes (byte-array 4096)
res (.read in input-bytes)]
(case res
-1 (println "End of stream reached")
0 (println "No data received")
(do
(println (format "-> * %dB" res))
(with-open [upstream-socket (Socket. (:hostname upstream-server) (:port upstream-server))
upstream-in (DataInputStream. (.getInputStream upstream-socket))
upstream-out (DataOutputStream. (.getOutputStream upstream-socket))]
(println (format "Connected to %s:%d" (-> upstream-socket .getInetAddress .getHostAddress) (.getPort upstream-socket)))
(.write upstream-out input-bytes 0 res)
(println (format " * -> %dB" res))
(let [return-bytes (byte-array 4096)]
(loop []
(let [upstream-res (.read upstream-in return-bytes)]
(case upstream-res
-1 (println "End of upstream stream reached")
0 (println "No data from upstream received")
(do
(println (format " * <- %dB" upstream-res))
(.write out return-bytes 0 upstream-res)
(println (format "<- * %dB" upstream-res))
(when (= upstream-res 4096)
(recur))))))))))))
(catch Exception e
(println "Client error:" (.getMessage e)))
(finally
(.close client-socket)
(println "Client disconnected")))))

With the logs showing:

New connection from 127.0.0.1:8080
-> * 541B
Connected to 127.0.0.1:3000
* -> 541B
* <- 561B
<- * 561B
Client disconnected
New connection from 127.0.0.1:8080
-> * 443B
Connected to 127.0.0.1:3000
* -> 443B
* <- 376B
<- * 376B
Client disconnected
New connection from 127.0.0.1:8080
-> * 499B
Connected to 127.0.0.1:3000
* -> 499B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 4096B
<- * 4096B
* <- 3630B
<- * 3630B
Client disconnected
New connection from 127.0.0.1:8080
-> * 495B
Connected to 127.0.0.1:3000
* -> 495B
* <- 240B
<- * 240B
Client disconnected

And on the upstream server:

HTTP 7/21/2024 10:07:57 PM 127.0.0.1 GET /
HTTP 7/21/2024 10:07:57 PM 127.0.0.1 Returned 200 in 69 ms
HTTP 7/21/2024 10:07:57 PM 127.0.0.1 GET /styles.css
HTTP 7/21/2024 10:07:57 PM 127.0.0.1 Returned 200 in 3 ms
HTTP 7/21/2024 10:07:57 PM 127.0.0.1 GET /cute_cat.jpg
HTTP 7/21/2024 10:07:57 PM 127.0.0.1 Returned 200 in 4 ms
HTTP 7/21/2024 10:07:57 PM 127.0.0.1 GET /favicon.ico
HTTP 7/21/2024 10:07:57 PM 127.0.0.1 Returned 404 in 7 ms

Handling upstream server down

The correct thing to do when we can’t reach the upstream server is to return a 502.

(try
(with-open [upstream-socket (Socket. (:hostname upstream-server) (:port upstream-server))
upstream-in (DataInputStream. (.getInputStream upstream-socket))
upstream-out (DataOutputStream. (.getOutputStream upstream-socket))]
(println (format "Connected to %s:%d" (-> upstream-socket .getInetAddress .getHostAddress) (.getPort upstream-socket)))
(.write upstream-out input-bytes 0 res)
(println (format " * -> %dB" res))
(let [return-bytes (byte-array 4096)]
(loop []
(let [upstream-res (.read upstream-in return-bytes)]
(case upstream-res
-1 (println "End of upstream stream reached")
0 (println "No data from upstream received")
(do
(println (format " * <- %dB" upstream-res))
(.write out return-bytes 0 upstream-res)
(println (format "<- * %dB" upstream-res))
(when (= upstream-res 4096)
(recur))))))))
(catch ConnectException _e
(let [bytes (.getBytes "HTTP/1.1 502 Bad Gateway\r\n\r\n")]
(.write out bytes 0 (count bytes)))
(println "<- * 502 Bad Gateway"))
(catch Exception e
(println "Upstream error" e)))

Java virtual threads

Besides using Java 21 or higher we just need to change client-handler from a future to Thread/startVirtualThread:

(defn- client-handler
[client-socket]
(Thread/startVirtualThread
(fn []
(try
(with-open [in (DataInputStream. (.getInputStream client-socket))
out (DataOutputStream. (.getOutputStream client-socket))]
(let [input-bytes (byte-array 4096)
res (.read in input-bytes)]
(case res
-1 (println "End of stream reached")
0 (println "No data received")
(do
(println (format "-> * %dB" res))
(try
(with-open [upstream-socket (Socket. (:hostname upstream-server) (:port upstream-server))
upstream-in (DataInputStream. (.getInputStream upstream-socket))
upstream-out (DataOutputStream. (.getOutputStream upstream-socket))]
(println (format "Connected to %s:%d" (-> upstream-socket .getInetAddress .getHostAddress) (.getPort upstream-socket)))
(.write upstream-out input-bytes 0 res)
(println (format " * -> %dB" res))
(let [return-bytes (byte-array 4096)]
(loop []
(let [upstream-res (.read upstream-in return-bytes)]
(case upstream-res
-1 (println "End of upstream stream reached")
0 (println "No data from upstream received")
(do
(println (format " * <- %dB" upstream-res))
(.write out return-bytes 0 upstream-res)
(println (format "<- * %dB" upstream-res))
(when (= upstream-res 4096)
(recur))))))))
(catch ConnectException _e
(let [bytes (.getBytes "HTTP/1.1 502 Bad Gateway\r\n\r\n")]
(.write out bytes 0 (count bytes)))
(println "<- * 502 Bad Gateway"))
(catch Exception e
(println "Upstream error" e)))))))
(catch Exception e
(println "Client error:" (.getMessage e)))
(finally
(.close client-socket)
(println "Client disconnected"))))))

Persistent connections

When opening in the browser it sends the Connection header as keep-alive, but instead we’re closing the connection on each request. The right thing to do is to close the connections on HTTP/1.0 unless the header is keep-alive and keep it open on HTTP/1.1 unless stated otherwise.

(defn parse-http-request
"Parses a sequence of bytes into a map representing an HTTP request"
[b]
(let [lines (->> (String. b)
str/split-lines)
[method path version] (str/split (first lines) #" ")
headers (->> (rest lines)
(map #(str/split % #": "))
(filter #(= 2 (count %)))
(into {}))]
{:method (-> method str/lower-case keyword)
:path path
:version version
:headers headers}))
(defn- client-handler
[client-socket]
(Thread/startVirtualThread
(fn []
(try
(with-open [in (DataInputStream. (.getInputStream client-socket))
out (DataOutputStream. (.getOutputStream client-socket))
upstream-socket (Socket. (:hostname upstream-server) (:port upstream-server))
upstream-in (DataInputStream. (.getInputStream upstream-socket))
upstream-out (DataOutputStream. (.getOutputStream upstream-socket))]
(loop []
(let [input-bytes (byte-array 4096)
res (.read in input-bytes)]
(case res
-1 (println "End of stream reached")
0 (println "No data received")
(let [parsed (parse-http-request input-bytes)]
(println (format "-> * %dB" res))
(try
(println (format "Connected to %s:%d" (-> upstream-socket .getInetAddress .getHostAddress) (.getPort upstream-socket)))
(.write upstream-out input-bytes 0 res)
(println (format " * -> %dB" res))
(let [return-bytes (byte-array 4096)]
(loop []
(let [upstream-res (.read upstream-in return-bytes)]
(case upstream-res
-1 (println "End of upstream stream reached")
0 (println "No data from upstream received")
(do
(println (format " * <- %dB" upstream-res))
(.write out return-bytes 0 upstream-res)
(println (format "<- * %dB" upstream-res))
(when (= upstream-res 4096)
(recur)))))))
(catch ConnectException _e
(let [bytes (.getBytes "HTTP/1.1 502 Bad Gateway\r\n\r\n")]
(.write out bytes 0 (count bytes)))
(println "<- * 502 Bad Gateway"))
(catch Exception e
(println "Upstream error" e)))
(if (or (and (= (:version parsed) "HTTP/1.0")
(not= "keep-alive" (get-in parsed [:headers "Connection"])))
(= "close" (get-in parsed [:headers "Connection"])))
(do
(.close client-socket)
(println "Upstream closed by client"))
(do
(println "Keeping connection alive")
(recur))))))))
(catch Exception e
(println "Client error:" (.getMessage e)))
(finally
(.close client-socket)
(println "Client disconnected"))))))

with all that we can see that the logs are quite different:

New connection from 127.0.0.1:8080
-> * 557B
Connected to 127.0.0.1:3000
* -> 557B
* <- 113B
<- * 113B
Keeping connection alive
-> * 459B
Connected to 127.0.0.1:3000
* -> 459B
* <- 113B
<- * 113B
Keeping connection alive
-> * 515B
Connected to 127.0.0.1:3000
* -> 515B
* <- 113B
<- * 113B
Keeping connection alive
End of stream reached
Client disconnected

I’m appreciating much more how there are time tested software that does all the above and handles the spec (and more!) correctly.

Back to notes