bigbrother

0.1.0-SNAPSHOT


Periodically send metrics

dependencies

org.clojure/clojure
1.6.0
org.clojure/data.json
0.2.5
org.clojure/tools.logging
0.3.1
org.clojure/algo.generic
0.1.2
org.clojure/test.check
0.7.0
riemann-clojure-client
0.3.2
overtone/at-at
1.2.0



(this space intentionally left almost blank)
 

This file provide helpers to manage time spend in functions

(ns bigbrother.core
  (:require [clojure.tools.logging :as log]
            [clojure.data.json :as json]
            [overtone.at-at :refer [every mk-pool]]
            [riemann.client :as r]
            [clojure.algo.generic.functor :refer [fmap]]
            [bigbrother.timer :as timer]
            [bigbrother.counter :as counter]
            [bigbrother.metrics :as metrics]
            [bigbrother.max-metrics :as max-metrics]))

Atoms

(def pool (atom nil)) ;; pool for async

pool for async

(def default-map (atom #{}))
(def riemann-conn (atom nil))
(def riemann-service (atom "supercell"))
(def level-by-key (atom nil))
(def n (atom 0))      ;; a number

a number

(def log-time timer/log-time)
(def log-counter counter/log-counter)
(def log-metric metrics/log-metric)
(def log-mmetric max-metrics/log-mmetric)
(defn timer-loop-finished []
  ;; increment the number of loop
  (swap! n inc)
  (timer/finish-timer-loop)
  (max-metrics/loop-finished)
  (metrics/loop-finished)
  (counter/loop-finished))

Starting the timer

---- aliases

Starting the timer

(defn big-brother-is-watching-you  []
  (timer/log-time :start))
(def telescreen-on  big-brother-is-watching-you)

End the timer chrono

End the timer chrono

(def welcome-in-miniluv  timer-loop-finished)
(def telescreen-off  timer-loop-finished)


Riemann Warn Level

warn level between two numbers

(defn warn-level
  [warn crit]
  (fn [v] (cond (neg? v) "critical"
                (>= v crit) "critical"
                (>= v warn) "warning"
                :else "ok")))
(defn rev-warn-level
  [warn crit]
  (fn [v] (cond (<= v crit) "critical"
                (<= v warn) "warning"
                :else "ok")))
(defn ok [] (fn [_] "ok"))
(defn warning [] (fn [_] "warning"))
(defn critical [] (fn [_] "critical"))
(defn- to-riemann-event [[k v]]
  (when (number? v)
    (let [lvl-fn (get @level-by-key k)
          level  (if lvl-fn (lvl-fn v) "ok")]
      {:service (str @riemann-service " " (name k) (subs (str k) 1))
       :state level 
       :metric v})))
(defn send-to-riemann [m]
  (let [result-map (into @default-map m)
        metric-data {:service @riemann-service
                     :state "ok"}
        events (remove nil? (map to-riemann-event result-map))
        ]
    (when @riemann-conn
      (r/send-events @riemann-conn events))))
(defn reset-accumulators! []
  (counter/reset-acc!)
  (metrics/reset-acc!)
  (max-metrics/reset-acc!)
  (timer/reset-acc!)
  (reset! n 0))
(defn reset-all-atoms! []
  (reset! timer/times [])
  (reset! metrics/metrics {})
  (reset! max-metrics/mmetrics {})
  (reset! counter/counters {})
  (reset-accumulators!))
(defn resume-map [nb-ms]
  (let [n-by-sec  (float (/ @n (/ nb-ms 1000)))
        basic     {:nb n-by-sec}]
    (reduce into basic [(metrics/resume @n)
                        (max-metrics/resume)
                        (timer/resume)
                        (counter/resume nb-ms)])))

display the time at most every 10s

(defn display-time
  [nb-ms]
  (let [result (resume-map nb-ms)]
    (log/info (json/write-str result))
    (send-to-riemann result)
    (reset-accumulators!)))

init-metrics

init-map :: Map Keyword (v -> ERROR_LEVEL)
(defn init-metrics
  [init-map nb-ms-metrics riemann-host riemann-service-name]
  (reset! default-map (reduce into {}
                              (map (fn [k] {k -1} )
                                   (conj (keys init-map) :total))))
  (reset! level-by-key init-map)
  (reset! pool (mk-pool))
  (reset! riemann-service riemann-service-name)
  (when riemann-host
    (reset! riemann-conn (r/tcp-client {:host riemann-host})))
  (every nb-ms-metrics (fn [] (display-time nb-ms-metrics)) @pool))
 
(ns bigbrother.counter
  (:require
   [clojure.algo.generic.functor :refer [fmap]]))


COUNTERS

Monoid instance of `sumcounter`

a sum time is a list of couple {name nb}

(def empty-sumcounter {})
(defn add-sumcounter [st st2]
  (merge-with + st st2))

Counters atoms

(def counters (atom {}))
(def sumcounters (atom empty-sumcounter))

Counters (the mean is given by default)

declare a specific counters (not time)

declare a specific counters (not time) and returns the first parameter

(defn- set-counter! [k v] (swap! counters add-sumcounter {k v}))
(defn log-counter
  ([k] (set-counter! k 1))
  ([k v] (set-counter! k v)))
(defn log-counter->
  [x k v]
  (log-counter k v)
  x)
(defn loop-finished []
  ;; aggreate sumcounter
  (swap! sumcounters add-sumcounter @counters)
  (reset! counters {}))
(defn reset-acc! []
  (reset! sumcounters empty-sumcounter))
(defn resume [nb-ms]
  (fmap #(float (/ % (/ nb-ms 1000))) @sumcounters))
 
(ns bigbrother.max-metrics)


Max Metrics

Monoid instance of `maxmetrics`

a sum time is a list of couple {name metric-value}

(def empty-maxmetric {})
(defn add-maxmetrics [st st2]
  (merge-with max st st2))
(def mmetrics (atom {})) ;; timestamps

timestamps

(def maxmetrics (atom empty-maxmetric)) ;; time spent by type

time spent by type

Max Metrics (metrics to do a max between them instead of a mean)

declare a specific max metrics (not time)

declare a specific max metrics (not time) and returns the first parameter

(defn- set-mmetric! [k v] (swap! mmetrics add-maxmetrics {k v}))
(defn log-mmetric
  [k v]
  (set-mmetric! k v))
(defn log-mmetric->
  [x k v]
  (log-mmetric k v)
  x)
(defn loop-finished []
  ;; aggreate maxmetrics
  (swap! maxmetrics add-maxmetrics @mmetrics)
  (reset! mmetrics {}))
(defn reset-acc! []
  (reset! maxmetrics empty-maxmetric))
(defn resume [] @maxmetrics)
 
(ns bigbrother.metrics
  (:require [clojure.algo.generic.functor :refer [fmap]]))


Sum Metrics

Monoid instance of `summetrics`

a sum time is a list of couple {name metric-value}

(def empty-summetric {})
(defn add-summetrics [st st2]
  (merge-with + st st2))
(def metrics (atom {})) ;; timestamps

timestamps

(def summetrics (atom empty-summetric)) ;; time spent by type

time spent by type

Metrics (the mean is given by default)

declare a specific metrics (not time)

declare a specific metrics (not time) and returns the first parameter

(defn- set-metric! [k v] (swap! metrics add-summetrics {k v}))
(defn log-metric
  [k v]
  (set-metric! k v))
(defn log-metric->
  [x k v]
  (log-metric k v)
  x)
(defn loop-finished []
  ;; aggreate summetrics
  (swap! summetrics add-summetrics @metrics)
  (reset! metrics {}))
(defn reset-acc! []
  (reset! summetrics empty-summetric))
(defn resume [n]
  (fmap #(float (/ % n)) @summetrics))
 
(ns bigbrother.timer)


TIMERS

Monoid instance of `sumtimes`

a sum time is a list of couple [name [timespent nb]]

(defn ts-name [x] (first x))
(defn ts-timespent [x] (first (second x)))
(defn ts-nb [x] (second (second x)))
(def empty-sumtime [])
(defn- add-one-sumtime [st st2]
  [(ts-name st) [(+ (ts-timespent st) (ts-timespent st2))
                 (+ (ts-nb st) (ts-nb st2))]])
(defn add-sumtimes [st st2]
  (cond (empty? st) st2
        (empty? st2) st
        :else (map add-one-sumtime st st2)))
(defn fmap-sumtimes [f st]
  (map (fn [v] [(first v) [(f (ts-timespent v))
                           (ts-nb v)]]) st))

Given a sumtimes returns a map {key timespent}'

(defn normalized-map-from-sumtimes
  [st]
  (reduce #(merge-with + %1 %2) {}
          (map (fn [v] {(ts-name v)
                        (/ (ts-timespent v)
                           (ts-nb v))}) st)))

timers atoms

(def times (atom [])) ;; timestamps

timestamps

(def sumtimes (atom empty-sumtime)) ;; time spent by type

time spent by type

Timer

declare the action named k finished

declare the action named k finished and returned object x

(defn- set-value! [k v] (swap! times conj [k v]))
(defn log-time
  [k]
  (set-value! k (System/nanoTime)))
(defn log-time->
  [x k]
  (log-time k)
  x)

get time spent during one step

(defn- show-one-step
  [x]
  (if (< (count x) 2)
    {:nothing 0}
    (let [from (second (first x))
          k    (first  (second x))
          to   (second (second x))]
      [k [(- to from) 1]])))

from a list of timestamp generate a sumtime

(defn timespent
  [times-array]
  (map show-one-step (partition 2 1 times-array)))

from a list of timestamp generate the total time spent

(defn total-time
  [times-array]
  (- (second (last times-array))
     (second (first times-array))))

convert from nanoseconds to milliseconds

(defn to-milliseconds
  [times-array]
  (fmap-sumtimes #(float (/ % 1000000)) times-array))
(defn finish-timer-loop []
  ;; Convert actual timestamps to sumtimes and aggregate them
  (if (> (count @times) 1)
    (let [difftime (timespent @times)
          total    (total-time @times)
          res      (to-milliseconds (conj difftime [:total [total 1]]))]
      (swap! sumtimes add-sumtimes res)))
  (reset! times []))
(defn reset-acc! []
  (reset! sumtimes empty-sumtime))
(defn resume []
  (normalized-map-from-sumtimes @sumtimes))