October 31, 2018

An Overview of an Early Clojure Codebase

Or: Documenting my own stupidity so you can be smarter

Lately, I’ve been thinking about the things that have brought me to my current level of competency in programming. Being self-taught, I never had much rhyme or reason in developing my curriculum. I just followed whatever my interests were at the time. Those interests would point me to books, blog posts, open source projects to contribute to, and even attempts at more substantial, nontrivial projects of my own.

While I could not have done it without the foundation laid through reading and smaller-scale projects, attempts at non-trivial projects have taught me substantially more about software development than anything else. More specifically, taking a retrospective look at my architectural, logical, technological, and design decisions. It’s very humbling to take a magnifying glass to your mistakes, ask why you made them, and figure out how to avoid them in the future.

This article will be going over my current hobby project, Moo Player. Starting from a high level overview of the architecture, it will proceed to a low level look at the flow of some of the functionality. There will be code throughout. Some of the code will be clean. A lot of it will be rather crufty. There’s certainly a level of embarrassment showing off code one isn’t proud of. At the end of the day, though, it’s just code. A bit of temporary embarrassment is a small price to pay to contribute to a community that has helped me grow so much as a developer.

The Project

That thing you’re going to be reading about

Excuse the programmer art...

Moo Player is a collaborative media player. What that means is that users can create media players that they and their friends can control from their individual devices, as if they’re in a physical space together sharing one device. Everything from the order of the tracks in the playlist to the position of the playing media is synchronized across all connected clients. Each player is isolated to a room with a chat alongside it.

If you’re wondering about the name, it’s a pun on “Mumu Player”. Mumu Player is a now-defunct webapp that shared the same core functionality, albeit with fewer features.

Don't ask me to explain the squids.

The Stack

The stuff I used to build it

The Architecture

A bird's eye view

At the highest level of abstraction, the project can be separated into two sections: the server and the client. Unlike how web applications are traditionally written, this project aims to keep the client and server entirely separate. In the current iteration, a disconnected client can do everything a connected client can do except adding new media and sending chat messages. These cases will be handled in the future.

This article will focus on the server. There’s certainly things to be said about the client, but I want to keep a focused scope. For a vague idea of how the client architecture looks, check out the re-frame README. It’s an excellent read.

Going down a few levels of abstraction, the server is roughly separated into three sections: Input, Events, and Actions.

Input refers to the section of the codebase that handles incoming data from clients. At the moment, this is currently just an HTTP/websockets frontend. The data received from clients is used alongside server-provided data to create events to feed the event handler. The architectural goal to strive for is to be able to be able to support various input mechanisms and data formats. For now, though, it follows the YAGNI principle.

Events are where all the logic goes. These are completely pure functions that generate actions from event data. They return a sequential structure of actions to be performed. The significance of this design decision is that it allows for simple, easy, and performant testing.

Actions are the things that change the world. Those things include database modifications, network/file IO, and cache modifications. Actions are, by nature, impure functions. They should be a sequence of steps with no actual logic. No conditionals, no loops. Those all go inside events. This significantly lessens the need for tests. Testing IO directly is difficult, tedious, and fragile. Anything that can be done to lessen it is a big win for maintainability. Having the logic outside of actions also means that changing implementation details can be done without having to work around the logic.

Basically:

  1. Input takes data from clients and creates Events.
  2. Events process said data and create Actions for the server to perform
  3. Actions make the server actually do things.

This provides a simple, one way flow: Input → Events → Actions. All of the logic is isolated in the middle, all of the stuff that is inherently impure stays at the edges.

To be clear: while new code is written with these principles in mind, these were not in place from the project's conception. There’s still some logic in some actions and some impure event handlers. Some of those spots will come up later.

Walkthrough

Wading through a swamp of parenthesis

Yeah, Clojure is a Lisp. Credit to xkcd.

Now it's time to dive into some code. Stuff like HTTP routing will be ignored so the focus can stay on the core architecture. Only a few components will be covered to demonstrate the architecture in action.

Websocket Input Entry Point

The first thing that will be looked at is the entry point for the Websocket input. After the initial HTML/Javascript is served, a websocket connection is established via Sente. Sente proceeds to feed all websocket events into a function named event-msg-handler.

(defn event-msg-handler [{{room-id :room-id
                           cookies :cookies
                           :as ring-req} :ring-req
                          uid :uid
                          :as event-msg}]
  (future
    (log/info event-msg)
    (locking room-id
      (try
        (let [session-id (:value (get cookies "JSESSIONID"))
              user (if-let [user (cache/fetch :users session-id)]
                     user
                     {:id (utils/uuid) :session-id session-id})
              clients-in-room (cache/fetch :room-clients room-id)
              client (get clients-in-room uid)
              actions (handlers/-event-msg-handler (assoc event-msg
                                                          :room-id room-id
                                                          :user user
                                                          :session-id session-id
                                                          :client client
                                                          :clients-in-room clients-in-room
                                                          :downloads-dir download/downloads-dir
                                                          :uploads-dir upload/uploads-dir
                                                          :unique-id-fn utils/uuid
                                                          :database-fns {:get-room-state-fn db/get-room-by-name})))]
          (log/info actions)
          (actions/execute-server-actions! actions))
        (catch Exception e
          (report-error e {:event-msg event-msg}))))))

This function does the following:

  1. Spin up a new thread
  2. Acquires a lock
  3. Fetches the user from a cache, or creates one if none exist
  4. Gets all the clients in the room from a cache
  5. Associates a ton of data with the event-msg map passed into the function
  6. Passes all of that into handlers/-event-msg-handler which dispatches based on the :id field
  7. Feeds the returned data from handlers/-event-msg-handler into actions/execute-server-actions!
  8. Releases the lock

There's a few things that stand out as code smells.

The first thing is the name of the function and the event handler function. Why is the function called event-msg-handler? Why is handlers/-event-msg-handler prefixed by a hyphen? The answer is simply that that's what they were named in the Sente example. In that implementation, event-msg-handler is the event handling function. Here, it is just the websocket entry point. The naming should change to reflect that.

There's also the if-let statement that decides on whether or not a user needs to be created. As mentioned earlier, all logic should be in events. This is a remnant of the time before the architecture was fleshed out.

Another smell is the (locking room-id ... expression. Does it really need to lock down the entire room for every websocket event? No, but for the time being, it is significantly simpler to do that than it is to lock down specific resources. There's a less apparent problem with the locking strategy as well. The locking is local to the JVM process. The caches, however, are powered by Infinispan. Infinispan was chosen for its ability to act as a distributed cache, enabling the application to scale horizontally. By making the locking local to the JVM process, that functionality is thrown out the window. It will be replaced with a distributed lock in the future. Probably Apache Zookeeper.

The final smell is the fact that two functions, :upload-id-fn and :get-room-state-fn, are being passed into the event handler. While this does keep the event handlers pure, a more declarative approach would be optimal. Hopefully this can be achieved once relevant patterns emerge in the codebase. For now, it's certainly preferable to impure event handlers.

So far, so good. Now onto some events.

Initial Client Connection

The first websocket event that is sent is after the initial handshake.

(defmethod handler/-event-msg-handler :client/connect [{room-id :room-id
                                                        clients-in-room :clients-in-room
                                                        uid :uid
                                                        session-id :session-id
                                                        user :user
                                                        {get-room-state :get-room-state-fn} :database-fns}]
  [[:cache/put [:room-clients room-id (assoc clients-in-room uid {:id uid :synced? true})]]
   [:cache/put [:users
                (:session-id user)
                (if (:clients user)
                  (update-in user [:clients] conj uid)
                  (assoc user :clients #{uid}))]]
   [:send-to-client [uid [:room/state-update (get-room-state room-id)]]]])

This function returns a vector containing three vectors. Those three vectors each represent an action to perform. The order of the vector determines the order of execution.

An action structure is structured like so: [:name-of-action vector-of-arguments]

In this case, the actions are as follows:

  1. Add the client to the clients-in-room structure and put that onto the :room-clients cache using the room-id as a key
  2. Add the uid from the event-msg to the user structure and put that onto the :users cache using the session-id as a key
  3. Get the room state from the database and send it to the client

That's simple enough. It could be argued that uid is a bit poorly named. However, given the context, it is easy enough to deduce that it's a unique identifier for the client. In this case, it is generated by Sente and has semantic meaning to Sente. It's a bit of a leaky abstraction, but for now it isn't worth modifying.

The actions themselves are very simple:

(defmethod -execute-server-action! :send-to-client [[_ [uid data]]]
  (ws/send! uid data))

(defmethod actions/-execute-server-action! :cache/put [[_ [id k v opts]]]
  (cache/put-or-create! id k v opts))

ws/send! and cache/put-or-create! are just very thin wrapper functions around Sente and Infinispan. Nothing particularly interesting.

Now for something more substantial

New External Track

The following event is sent when a new external track is added. An "external track" is a media file from an external server. Basically something like https://foo.com/bar.mp3. Moo Player actually downloads external media and serves them itself in an attempt to provide a more consistent experience.

Here's the event:

(defmethod -event-msg-handler :track/new-external [{:keys [room-id user unique-id-fn ?data]}]
  (let [track-id (unique-id-fn)
        track {:user-id (:id user) :id track-id :track-type :download :url ?data}]
    [[:add-track [room-id track]]
     [:room/broadcast [room-id [:track/new track]]]
     [:download-track {:url ?data
                       :track-id track-id
                       :room-id room-id}]]))

This seems pretty simple and concise. It does the following

  1. Generates a new track ID and create a track structure
  2. Adds the track to the room
  3. Broadcasts the new track to every client in the room
  4. Downloads the track

Now, for the actions:

(defmethod -execute-server-action! :add-track [[_ [room-id track]]]
  (db/add-track-to-room room-id track))

(defmethod actions/-execute-server-action! :room/broadcast [[_ [room-id data]]]
  (ws/broadcast! (keys (cache/fetch :room-clients room-id)) data))

A quick look at the implementation of db/add-track-to-room:

(defn add-track-to-room [room-name track]
  (mc/update db "rooms" {:name room-name}
             {$push {:tracks track}}))

The only thing that stands out here is :name and :room-name being inconsistent with the rest of the codebase, which uses ID. This is another remnant of an even earlier copy of the codebase.

Simple enough. Finally, the :download-track action:

(defn last-modified-rfc7232 [file]
  ; We have to do this hacky string/replace shit because of a limitation with JodaTime
  ; http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
  ; We should switch to Java 8 Date-Time api
  (clojure.string/replace
   (time-format/unparse (time-format/formatters :rfc822)
                        (time/from-time-zone (time-coerce/from-long (.lastModified file))
                                             (time/time-zone-for-id "GMT")))
   #"\+0000"
   "GMT"))

(defn start-or-resume-download-request! [url destination-file]
  (http/get url {:as :stream
                 :headers {"Range" (str "bytes=" (.length destination-file) "-")}}))

(defn start-or-resume-download-async! [url destination-file update-fn on-complete-fn]
  "Starts or resumes a download."
  (future
    (let [response (start-or-resume-download-request! url destination-file)
          buffer-size (* 1024 10) ; 10 kb
          content-length (Integer. (get-in response [:headers "content-length"] 0))]
      (when (= 200 (:status response))
        (io/delete-file destination-file))
      (with-open [input (:body response)
                  output (java.io.FileOutputStream. destination-file true)]
        (let [buffer (make-array Byte/TYPE buffer-size)]
          (loop [size (.read input buffer)
                 bytes-read 0
                 last-time (System/currentTimeMillis)]
            (when (and (not (Thread/interrupted))
                       (pos? size))
              (.write output buffer 0 size)
              (recur (.read input buffer)
                     (+ bytes-read size)
                     (update-fn bytes-read content-length last-time)))))))
    (when-not (Thread/interrupted)
      (on-complete-fn destination-file))))

(defmethod -execute-server-action! :download-track [[_ {:keys [url room-id track-id]}]]
  (let [absolute-file-path (str download/downloads-dir track-id "." (last (string/split url #"\.")))
        file (io/file absolute-file-path)
        update! (fn [bytes-read content-length last-time]
                  (let [current-time (System/currentTimeMillis)]
                    (if (< 333 (- current-time last-time))
                      (do
                        (-execute-server-action! [:room/broadcast [room-id [:track/update {:id track-id
                                                                                           :track-type :download
                                                                                           :bytes-read bytes-read
                                                                                           :content-length content-length}]]])
                        current-time)
                      last-time)))
        complete! (fn [file]
                    (future
                      (locking room-id
                        (let [track {:id track-id
                                     :track-type :file
                                     :metadata (extract/parse file)
                                     :src (str "/static/" (last (string/split absolute-file-path #"/")))}]
                          (execute-server-actions! [[:update-track [room-id track]]
                                                    [:room/broadcast [room-id [:track/update track]]]])))))]

    (swap! downloads assoc track-id {:thread (start-or-resume-download-async! url
                                                                              file
                                                                              update!
                                                                              complete!)
                                     :file file})))

Yikes! This is a lot of code. Not only that, the code is quite complex. Threads are being created, a lock is being obtained, Thread/interrupted is called for some reason, swap! is being called, there's a giant comment in a function, Java interop is happening..... It is far too difficult to reason about this code at a glance. So much so that it isn't worth getting too deep into specifics.

There's a few things that can be done to simplify this:

  1. Instead of making the HTTP request inside start-or-resume-download-async!, make it so that a stream and content length are passed in. This will get rid of one level of nesting.
  2. Instead of spinning up a thread inside start-or-resume-download-async!, just make it syncronous and let the caller spin up the thread.
  3. Move the update! and complete! functions outside of the :download-track action and into their own namespace.
  4. Like the comment says, drop clj-time in favor of the Java 8 Date-Time API.
  5. Make the actual download code an "input" in the codebase. Similar to the websocket entry point. Perhaps these can be isolated somewhere in the codebase.

1 and 4 would be particularly helpful. They'd make it much easier to share the code with the upload logic.

Closing Remarks

Nothing Left to Say

So far, I'm very happy with this architecture. It's been making development more fun, debugging less difficult, and the program more stable. At ~1500 LOC, it's still a rather small codebase. Granted, that speaks more to Clojure's succinctness than anything else. I'm looking forward to continuing development and seeing what I learn.

Tags: architecture Clojure