clj-salt-api is data oriented Clojure library for interacting with Saltstack through salt-api

The aim of this document is to provide motivation, decisions, code samples and caveats behind clj-salt-api. This is my first Clojure project, so I have created this document for me as a developer to understand (and remember) decisions with their benefits and drawbacks, but it can serve as a documentation for everyone interested in this library.

Clojure and Saltstack

Saltstack is a configuration management and orchestration tool with a solid remote execution framework. Salt is written in Python and leverages ZeroMQ messaging library, which allows users to run tasks simultaneously with satisfying performance.

Saltstack architecture is very flexible, it provides master-client (master-minion), master-less or ssh slaves setup. There are basically three options to interact with Salt master:

  • CLI on master node

  • Python API on master node

  • HTTP API to run commands remotely - used by clj-salt-api

Saltstack provides plethora of modules running on almost all operating systems.


Clojure is a well known programming language. Although the main reason why I have chosen Clojure is Clojure itself ;), it has several interesting properties that suite this project well.

With Clojure dynamic nature and very good data structures, clj-salt-api supports almost all Salt modules. clj-salt-api sends requests to Salt as data (Clojure maps) and returns data back. This approach is heavily inspired by aws-api. It makes the API very simple and there are no differences to the Saltstack API. Therefore user of clj-salt-api can benefit from Saltstack official documentation and this library has minimal compatibility issues with different Saltstack versions.

Salstack provides both synchronous and asynchronous API. clj-salt-api aims to support both approaches and Clojure asynchronous/multi-threading solutions can ensure clean design.

Motivation

There are basically two reasons behind this project:

  • There is no client Saltstack client library in Clojure

  • Proper handling of salt-api asynchronous workflow requires fair amount of work - clj-salt-api hides this workflow and provides simple API for its users

Saltstack asynchronous workflow

Implementation of salt-api asynchronous requests relies on the event system and the event-driven infrastructure. Salt’s internal components communicate with each other by sending and listening to events. Each command has dedicated job with unique job-id. "Job sents" and "job returns" are represented as events on the saltstack event-bus.

Salt-api /events endpoint exposes the Salt event-bus as HTTP stream formatted per the Server Sent Events (SSE) spec. Each event is formatted as JSON.

Simplified asynchronous request flow (as described here):

  1. Connect to the /events endpoint and buffer all events.

  2. Send a POST request with async client (local_async, runner_async or wheel_async) and receive a job-id and target minions (if appropriate) in response. Runner and wheel clients run on the master, so there is no reason to parse the target minions.

  3. Filter the events buffer for "job return" events with the expected job-id and minion-id and deliver the response.

  4. Listen to remaining (expected) "job return" events with the expected job-id and minion_id deliver the response.

Things to consider:

  • One or more target minions have not sent "job return" event within the specified timeout

    • Execute saltutil.find_job function on timed out minions to check the job result.

    • If saltutil.find_job returns an error, deliver the error response for all timed out minions.

    • If saltutil.find_job returns successfully, but the response contains minion failures, deliver the error response for failed minions and continue processing "job return events" until all expected minions return.

  • The /events endpoint has been closed during asynchronous request

    • If it is not possible to reconnect within the specified timeout, deliver the error and close the request.

    • If connection to the /events endpoint has been successfully reopened, fetch "job returns" with jobs.print_job function. Deliver the responses for already finished minions and continue processing of "job return events" until all expected minions return.

Saltstack does not send Last-Event-ID header when reconnecting to /events, so client has to fetch the job results with runner job module.

Other features

Authentication

Saltstack requires users to authenticate. clj-salt-api authenticates user, handles token expiration and automatic re-authentication under the hood.

Synchronous requests

Besides support for asynchronous workflow, clj-salt-api also enables synchronous (but non-blocking) requests.

Tread-safety when executing parallel requests

clj-salt-api allows to execute multiple requests (async or sync) in parallel with only one connection to the /events endpoint. This connection can be set up in two modes:

  • Keep HTTP connection opened even there is no asynchronous request running.

  • clj-salt-api ensures there is only one connection opened, but it will closed, if it is not needed any more (no async request running) and opened if its needed (not connected and async request started).

Stream of all events

clj-salt-api provides an API for listening to all events on Saltstack event-bus.

Graceful shutdown

It is common, that Salt commands could take several minutes (e.g. package management commands). So there is need to shutdown client and correctly cancel running requests.

HTTP support

clj-salt-api relies on HTTP when communicating with Saltstack. It needs to support various HTTP features (Authentication, Proxy, Custom Headers, …​)

Retries with backoff policies

clj-salt-api executes number of remote calls and as we all know network can be unreliable.

Basic architecture

The architecture is based on two types of go blocks and several channels (sse-chan, subs-chan,sse-resp-chan, recv-chan and async-resp-chan). Simplified sketch of asynchronous flow design:

clj salt api
  1. Sse (go block) is started during initialization. It listens to subscriptions with the subs-chan and sends sse events or subscription responses to the sse-resp-chan with attached mult.

  2. When async request is executed by a caller, clj-salt-api creates a recv-chan and an async-resp-chan and starts a new go block which delivers responses to the async-resp-chan.

  3. Async request (go block) subscribes to sse events by sending a subscription to the subs-chan with a unique correlation-id.

  4. After sse receives subscription with the correlation-id through the subs-chan, it copies the sse-resp-chan (with tap) to the recv-chan and makes sure its connected to the /events endpoint. The state of /events connection depends on keep-alive setting.

  5. Only after sse is connected to /events, it sends the subscription response with the corresponding correlation-id back to the sse-resp-chan.

  6. When async request receives the subscription response, it submits its job (module function call) to saltstack and receives a job-id and target minions back (if appropriate).

  7. Async request waits for responses of all target minions, delivers them to the async-resp-chan as they come and closes the async-resp-chan.

clj-salt-api implements similar flow for all events stream.

Error handling

There are basically 3 types of errors:

  • minion error

    • async request just delivers minion error to the caller

  • /events error

    • sse tries to reconnect to the /events endpoint

    • if the reconnection is successful,

      • sse sends a reconnected message to the sse-resp-chan

      • all running async requests have to fetch a job result with jobs.print_job, because the job can be already finished and "job return" event has been missed (saltstack does not support Last-Event-ID concept)

      • all running async requests deliver already finished jobs and continue consuming recv-chans until all expected minion responses are delivered

    • if the reconnection is not successful, all running async requests deliver an error and close respective `async-resp-chan`s

  • minion has timed out ("job return" event has not been delivered within the specified time)

    • async request executes saltutil.find_job to check the job status

    • if saltutil.find_job fails, async request delivers the error and closes the async-resp-chan

    • if saltutil.find_job succeeds, async request deliver only failed minions and waits for events from remaining minions with the recv-chan

State handling

clj-salt-api has to manage 2 types of state.

The first one is simple. Its client related information like the salt master settings, salt token, subs-chan etc. clj-salt-api uses atom (referred as client-atom in code) to hold this information.

The second one is trickier. clj-salt-api has a state associated with go blocks (sse, async request, all events stream and sync request). All go blocks implement a solution similar to the one mentioned in clojuredesign.club episode 026. I highly recommend clojuredesign.club podcast and want to thank Chris and Nate for sharing a lot of their knowledge.

Each go block is a loop with state (a map named op) and two steps. The state (op) consists of a command and various data (correlation-id, list of target minions, minion timeout, number of retries, …​) specific to the go block. The steps:

  1. execute-command - call async function (reading/writing to channel) and returns a channel message as a response

  2. handle-response - pure function that accepts current state (op) and a command response as parameters and returns new state (op). Go block loop stops when there is no new state returned from handle-response.

goblock

Sample code:

(defn handle-response
  [{:keys [:command] :as op} response]
  (try
    (case command
      :subscribe (handle-with-command op :connect)
      :connect (handle-connect op response)
      :request (handle-request op response)
      :receive (handle-receive op response)
      :send (handle-send op)
      :reconnect (handle-reconnect op response)
      :find-job (handle-find-job op response)
      :error (unsubscribe op)
      :unsubscribe (handle-unsubscribe op response)
      :exit nil)
    (catch Throwable e
      (assoc op :command :error :body e))))

(defn request-async
  [client-atom req async-resp-chan recv-buffer-size]
  (let [correlation-id (java.util.UUID/randomUUID)
        client @client-atom
        subs-chan (:sse-subs-chan client)
        recv-chan (if (< 1 recv-buffer-size) (a/chan) (a/chan recv-buffer-size))]
    (a/go
      (loop [{:keys [:command :body :last-receive-time :minion-timeout] :as op}
             (initial-op req correlation-id recv-chan)]
        (when command
          (->>
           (case command
             :subscribe (a/>! subs-chan body)
             :connect (a/<! recv-chan)
             :request (a/<! (req/request client-atom body (a/chan)))
             :receive (let [timeout-chan (a/timeout
                                          (find-job-timeout minion-timeout
                                                            last-receive-time))]
                        (a/alt!
                          timeout-chan [:timeout]
                          recv-chan ([result] [:receive result])))
             :reconnect (a/<! (req/request client-atom body (a/chan)))
             :find-job (a/<! (req/request client-atom body (a/chan)))
             :send (doseq [b (to-vec body)]
                     (a/>! async-resp-chan b))
             :error (a/>! async-resp-chan body)
             :unsubscribe (a/alt!
                            [[subs-chan body]] [:unsubscribe]
                            recv-chan ([msg] [:receive msg])
                            :priority true)
             :exit (graceful-shutdown recv-chan async-resp-chan))
           (handle-response op)
           (recur)))))
    async-resp-chan))
This solution is just a simplified state machine. I have been thinking in splitting it in multiple state machines, but I still find it as a readable code, so I have left it as it is.

Caveats

Implementing async flows with core.async has been relatively easy task, nevertheless I have ran into two serious problems I’d like to share. I recommend to set buffers length to 0 for all core.async channels to detect similar problems.

It would be very helpful to have an option to monitor channels (e.g. list all core.async channels and their buffer capacity in runtime), but I have not found any.

Deadlock n. 1

This type of deadlock is described in this article by Roman Elizarov.

The deadlock happens when sse tries to deliver an event with sse-resp-chan and async request tries to unsubscribe. Sse is blocked on sending the event to sse-resp-chan and does not read subs-chan and async request is blocked on sending the unsubscribe message to subs-chan and does not read recv-chan (copy of sse-resp-chan).

To overcome this problem clj-salt-api uses alt! when reading/writing to pair of pub/sub channels.

Deadlock n. 2

Deadlock n.2 is not so obvious as the n.1 and it took me some time to find the root cause. It is better explained through an example. For the purpose of the example, lets assume the user of clj-salt-api library executes an async request pkg.installed, but sse is reconnected during this request. As described earlier, after sse reconnection async request (pkg.installed) has to fetch the job status with jobs.print_job. What happens in this scenario?

  • We have two running async requests (pkg.installed and job.print_job) with two different recv-chans tapped from sse-resp-chan mult.

  • pkg.installed has created new async-resp-chan for job.print_job and is parked in take from this chan (it is waiting for the response from print_job).

  • job.print_job has been successfully subscribed to sse and has submitted a job to saltstack.

  • "job sent" event is sent back from saltstack to the /events endpoint and is delivered from the HTTP client to sse go block.

  • sse puts "job sent" event to sse-resp-chan.

  • event is distributed to all taps in parallel an synchronously, which means it is put to recv-chan created for pkg.installed and to recv-chan created for job.print_job.

  • job.print_job takes "job sent" event and continue waiting for "job return" event (park in take from recv-chan).

  • pkg.installed is parked in taking response from jobs.print_job and does not take "job sent" event.

  • sse is deadlocked in putting "job sent" event to sse-resp-chan mult, because pkg.installed has not taken any value and its buffer length is 0. clj-salt-api uses non zero length buffer on recv-chan in production code, but to detect deadlocks, it uses channels without buffers.

I have not found a solution to remove the deadlock when the code executes request for job.print_job asynchronously. The only solution that works is calling job.print_job synchronously (but non-blocking) using local client.

Setting buffer size

One interesting topic I’d like to mention is how to set correct buffer sizes in core.async channels. General recommendation mentioned in core.async documentation is to set the buffer size for slow consumers. Let’s examine our channels:

Channel

Data

Buffer size

sse-chan

consumer (sse go block) could be parked in sending data to async request, which could be parked taking from in find_job or print_job

buffer needed

subs-chan

consumer (sse go block) could be parked, but async request has to wait for subscription response, so it could be blocked in putting message to subs-chan

no buffer needed

sse-resp-chan

mult exists on top of this channel

no buffer needed

recv-chan

one consumer (async request) could be parked in taking from find_job or print_job

buffer needed

async-resp-chan

consumer is outside of clj-salt-client, so it is better to pass the channel as a parameter

not applicable

Code conventions

I have struggled with code readability at first. All functions looked similar and I had problems to understand function body after some time passed.

To overcome this problem, I have found three helpful conventions:

Another technique I have found useful is Passing data down (maybe there is a better name in Clojure community, which I am not aware of).

Destructuring function arguments

Whenever possible clj-salt-api uses destructuring. This technique helps me to understand the data passed to function, just looking at a function name and parameters.

Example:

(defn- handle-receive-event
  [{:keys [:subscription-count] :as op}
   {:keys [:type :data :retry] :as event}]
  (if (instance? Throwable event)       ; SSE sent an error
    (assoc op :command :send :body event)
    (if (nil? type)                     ; SSE has been closed
      (assoc op :command :close)
      (if (= 0 subscription-count)
        (assoc op :command :receive)    ; ignore event if there is no subscriber
        (case type
          :data (assoc op :command :send :body data)
          :retry (assoc op :command :receive :retry-timeout retry)
          :close (assoc op :command :receive))))))

Small functions

Clojure is very consistent and concise language. Often a function is one block (threading macro/go-block/if/case…​) with several calls of map/filter/assoc/update/…​. I understand the pros, but having functions all build up from core libraries, made my code less readable. So every time I have written a function body that I have not understand at the first look, I have tried to extract functions with names easy to read (even if extracted functions are one-liners and used only once).

Example:

(defn- not-comment-line?
  [field]
  (not= ':' (first field)))

(defn- retry-field?
  [field]
  (= :retry (first field)))

(defn sse-buffer->events
  "Converts sse buffer to collection of events and next buffer value"
  [buf prev-buf]
  (let [[chunks next-buf] (split-buffer (str prev-buf buf))]
    [(->> chunks
          (mapcat (fn [chunk]
                    (->> chunk
                         (str/split-lines)
                         (filter not-comment-line?)
                         (map line->field)
                         (filter sse-supported-field?)
                         (group-by retry-field?)
                         (map (fn [group]
                                (let [[retry? fields] group]
                                  (if retry?
                                    (reduce-retry-fields fields)
                                    (reduce-sse-fields fields)))))))))
     next-buf]))

Comments

Third technique that helps with code readability is surprisingly "comments". Yeah I know that "Good code is its own documentation". Maybe this is not that good code ;). Anyway I have found useful:

  • Docstrings for public API

    • Obvious

  • Docstrings for private functions

    • explaining reasons, why this function exists - this helps to understand the context

    • explaining not obvious implementation (why this function calls sync instead of async interface)

    • containing examples of passed parameters - saltstack response can be deep data structure and example of data in the documentation helps to understand the body when destructuring is not possible

Example:

(defn- parse-print-job-minions-returns
  "Example of minion print_job return:
  {:20200511183953191117
   {:Function test.ping,
    :Result {:minion1 {:return true, :retcode 0, :success true},
             :minion2 {:return true, :retcode 0, :success true}},
    :Target *,
    :Target-type glob,
    :Arguments [],
    :StartTime 2020, May 11 18:39:53.191117,
    :Minions [minion1 minion2],
    :User saltapi}}"
  [return jid minions]
  (->> (:Result (get return (keyword (str jid))))
       (map #(vector (name (first %)) (second %)))
       (filter #(contains? minions (first %)))
       (map #(assoc {}
                    :minion (first %)
                    :return (:return (second %))
                    :success (:success (second %))))))

Passing data down

Well known technique from Clojure world is to pass one data structure to functions down the pipeline and every function picks up only values it actually needs. It does not have to understand the whole data structure. clj-salt-api uses HTTP to communicate with salt-api and it uses Aleph library for that purpose. Aleph follows Ring spec and clj-salt-api request functions also accepts the same ring request map as a parameter. Passing data down has 2 advantages in this case:

  • All HTTP related configuration is supported out-of-box (timeouts, proxy configuration, client certificates). Users can refer to official Aleph documentation.

  • All Salstack modules are supported out-of-box. Users can refer to official Salstack documentation and there is no need to map from official Salstack function to clj-salt-api function.

Example:

(defn local-async-locate
  "Locate files and limit results to 10 with local async client and locate module."
  []
  (let [cl (example-client)]
    (print-and-close cl (salt/request-async
                         cl
                         {:form-params {:client "local_async"
                                        :tgt "*"
                                        :fun "locate.locate"
                                        :arg ["ld.*", "", 10]
                                        :kwarg {:count false
                                                :regex true}}}))))

Testing

Besides usual unit tests for pure functions (data in/data out), I had to figure out how to test async calls. Separating all async calls to one function and implement almost all code with simple functions (mentioned here) let me simplify testing.

It is important to mention that I have not applied TDD practices in clj-salt-api, because I have developed all code with REPL and added tests afterwards. I think the best granularity for testing asynchronous saltstack request is to test the flow. I have created macro test-flow→ for that purpose (to simplify the syntax).

Let’s take an example:

(testing "async minion job timeout"
  (u/test-flow->
   (initial-op (java.util.UUID/randomUUID)) r
   (is (= :connect (:command r))) (send-resp)
   (is (= :request (:command r))) (connected-resp :all)
   (is (= :receive (:command r))) (minion-request-resp "job-1" ["minion1" "minion2"])
   (is (= :find-job (:command r))) (timeout-resp)
   (is (send-minion-failure? r "minion1")) (find-job-resp ["minion1"] ["minion2"])
   (is (= :receive (:command r))) (send-resp)
   (is (= :send (:command r))) (minion-event-resp "job-1" "minion2")
   (is (= :unsubscribe (:command r))) (send-resp)
   (is (= :exit (:command r))) (unsubscribe-resp)))
  1. Setup the initial state (op)

  2. Create send response as a response to the initial command (async request sent a subscribe message to sse)

  3. Assert that the next command is :connect (async request is waiting for the connected message from sse)

  4. Create connected response with correlation-id :all as a response to :connect command

  5. Assert that the next command is :request (submitting job to saltstack)

  6. Create response with job id job-1 and expected minions minion1 and minion2 as a response to :request command

  7. Assert that the next command is :receive (async request is waiting for minion responses)

  8. Create timeout response as a response to :receive (minions responses have not been delivered within specified timeout)

  9. Assert that the next command :find-job

  10. …​

All handler functions could be tested separately, but to create current state (op) as a parameter to handle-response the tests need to understand internal structure. By testing flow, the test does not need the internal representation of state, because the state is created with every test step. It verifies only the sequence of steps.

Future work

  • Support for salt-ssh client

  • Support for scripting with babashka. It will be possible to create scripts similar to Spire, but with support for all saltstack modules

  • Integration tests with saltstack in docker container