Merge pull request #173 from threatgrid/entity-ids
Merged block managers
quoll authored Jun 17, 2021


2 parents b9578eb + 852b69d commit a24b912
Showing 15 changed files with 505 additions and 114 deletions.
1 change: 1 addition & 0 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -5,4 +5,5 @@
org.clojure/core.cache {:mvn/version "1.0.207"}
org.clojars.quoll/zuko {:mvn/version "0.6.5"}
org.clojure/data.priority-map {:mvn/version "1.0.0"}
cheshire/cheshire {:mvn/version "5.10.0"}
tailrecursion/cljs-priority-map {:mvn/version "1.2.1"}}}
3 changes: 2 additions & 1 deletion src/asami/durable/codec.cljc
Original file line number Diff line number Diff line change
@@ -49,8 +49,9 @@
;; Byte 0
;; 0xxxxxxx String type, length of up to 127.
;; 10xxxxxx URI type, length of up to 64
;; 110xxxxx Keyword type, length of up to 32
;; 1100xxxx Keyword type, length of up to 16
;; For these 3 types, all remaining bytes are the data body.
;; 1101xxxx Long value. xxxx encodes the number of bytes
;; 111ytttt Data is of type described in tttt.
;; Length is run-length encoded as follows:
;; When y=0
51 changes: 34 additions & 17 deletions src/asami/durable/decoder.clj
Original file line number Diff line number Diff line change
@@ -41,9 +41,9 @@
[^bytes data]
(let [b0 (aget data 0)]
(cond ;; test for short format objects
(zero? (bit-and 0x80 b0)) b0
(zero? (bit-and 0x40 b0)) (bit-and 0x3F b0)
(zero? (bit-and 0x20 b0)) (bit-and 0x1F b0)
(zero? (bit-and 0x80 b0)) b0 ;; short string
(zero? (bit-and 0x40 b0)) (bit-and 0x3F b0) ;; short URI
(zero? (bit-and 0x20 b0)) (bit-and 0x0F b0) ;; short keyword OR number
;; First byte contains only the type information. Give a large number = 63
:default 0x3F)))

@@ -61,6 +61,12 @@
[paged-rdr ^long pos ^long len]
(keyword (read-str paged-rdr pos len)))

(defn read-long
"Raw reading of big-endian bytes into a long"
^long [paged-rdr ^long pos ^long len]
(let [^bytes b (read-bytes paged-rdr pos len)]
(areduce b i ret 0 (bit-or (bit-shift-left ret Byte/SIZE) (bit-and 0xFF (aget b i))))))

;; decoders operate on the bytes following the initial type byte information
;; if the data type has variable length, then this is decoded first

@@ -151,26 +157,35 @@
(defn seq-decoder
"This is a decoder for sequences of data. Use a vector as the sequence."
[ext paged-rdr ^long pos]
;; read the length of the header and the length of the seq data
(let [[i len] (decode-length ext paged-rdr pos)
start (+ i pos)
end (+ start len)
;; get the 0 byte. This contain info about the types in the seq
b0 (read-byte paged-rdr start)
decoder (if (zero? b0)
;; heterogeneous
;; heterogeneous types. Full header on every element. Read objects with size.
;; homogeneous
(if-let [tdecoder (typecode->decoder (bit-and 0x0F b0))]
#(tdecoder true %1 %2)
(throw (ex-info "Illegal datatype in array" {:type-code (bit-and 0x0F b0)}))))]
;; homogeneous types. The header is only written once
(if (= 0xD0 (bit-and 0xF0 b0)) ;; homogenous numbers
(let [num-len (bit-and 0x0F b0)] ;; get the byte length of all the numbers
;; return a function that deserializes the number and pairs it with the length
#(vector (read-long %1 %2 num-len) num-len))
(if-let [tdecoder (typecode->decoder (bit-and 0x0F b0))] ;; reader for type
;; the standard decoder already returns a deserialized value/length pair
#(tdecoder true %1 %2)
(throw (ex-info "Illegal datatype in array" {:type-code (bit-and 0x0F b0)})))))]
;; iterate over the buffer deserializing until the end is reached
(loop [s [] offset (inc start)]
(if (>= offset end)
[s (+ i len)]
(let [[o obj-len] (decoder paged-rdr offset)]
[s (+ i len)] ;; end of the buffer, return the seq and the number of bytes read
(let [[o obj-len] (decoder paged-rdr offset)] ;; deserialize, then step forward
(recur (conj s o) (+ offset obj-len)))))))

(defn map-decoder
"A decoder for maps"
"A decoder for maps. Returns the map and the bytes read."
[ext paged-rdr ^long pos]
;; read the map as one long seq, then split into pairs
(let [[s len] (seq-decoder ext paged-rdr pos)
m (into {} (map vec (partition 2 s)))]
[m len]))
@@ -313,7 +328,7 @@
(or (first (drop-while zero? (map compare left-body (drop 1 right-bytes)))) 0)))

(defn read-object-size
"Reads an object from a paged-reader, at id=pos"
"Reads an object from a paged-reader, at id=pos. Returns both the object and it's length."
[paged-rdr ^long pos]
(let [b0 (read-byte paged-rdr pos)
ipos (inc pos)]
@@ -322,12 +337,14 @@
(zero? (bit-and 0x80 b0)) [(read-str paged-rdr ipos b0) (inc b0)]
(zero? (bit-and 0x40 b0)) (let [len (bit-and 0x3F b0)]
[(read-uri paged-rdr ipos len) (inc len)])
(zero? (bit-and 0x20 b0)) (let [len (bit-and 0x1F b0)]
[(read-keyword paged-rdr ipos len) (inc len)])
;; First byte contains only the type information. Increment the returned length to include b0
:default (update ((typecode->decoder (bit-and 0x0F b0) default-decoder)
(zero? (bit-and 0x10 b0)) paged-rdr ipos)
1 inc))))
(= 0xE0 (bit-and 0xE0 b0)) (update ((typecode->decoder (bit-and 0x0F b0) default-decoder)
(zero? (bit-and 0x10 b0)) paged-rdr ipos)
1 inc)
;; high nybble is 1100 for keywords or 1101 for long number
:default (let [read-fn (if (zero? (bit-and 0x30 b0)) read-keyword read-long)
len (bit-and 0x0F b0)]
[(read-fn paged-rdr ipos len) (inc len)]))))

(defn read-object
"Reads an object from a paged-reader, at id=pos"
115 changes: 92 additions & 23 deletions src/asami/durable/encoder.clj
Original file line number Diff line number Diff line change
@@ -20,6 +20,11 @@

;; (set! *warn-on-reflection* true)

(def ^:dynamic *entity-offsets* nil)
(def ^:dynamic *current-offset* nil)

(def empty-bytes (byte-array 0))

(def type->code
{Long (byte 0)
Double (byte 1)
@@ -80,12 +85,12 @@
(bit-and 0xFF (bit-shift-right len 8))
(bit-and 0xFF len)])))

;; to-bytes is required by the recursive concattenation operation
(declare to-bytes)
;; to-counted-bytes is required by the recursive concattenation operation
(declare to-counted-bytes)

(defn concat-bytes
"Takes multiple byte arrays and returns an array with all of the bytes concattenated"
^bytes [bas]
(let [len (apply + (map alength bas))
output (byte-array len)]
(reduce (fn [offset arr]
@@ -133,6 +138,32 @@
(when (and (< l max-short-long) (> l min-short-long))
(bit-and data-mask l)))

(def ^:dynamic *number-bytes* nil)
(def ^:dynamic *number-buffer* nil)

(defn n-byte-number
"Returns an array of n bytes representing the number x.
Must be initialized for the current thread."
[^long n ^long x]
(.putLong *number-buffer* 0 x)
(let [ret (byte-array n)]
(System/arraycopy ^bytes *number-bytes* (int (- Long/BYTES n)) ret 0 (int n))

(defn num-bytes
"Determines the number of bytes that can hold a value.
From 2-4 tests, this preferences small numbers."
[^long n]
(let [f (neg? n)
nn (if f (dec (- n)) n)]
(if (<= nn 0x7FFF)
(if (<= nn 0x7F) 1 2)
(if (<= nn 0x7FFFFFFF)
(if (<= nn 0x7FFFFF) 3 4)
(if (<= nn 0x7FFFFFFFFFFF)
(if (<= nn 0x7FFFFFFFFF) 5 6)
(if (<= nn 0x7FFFFFFFFFFFFF) 7 8))))))

(def constant-length?
"The set of types that can be encoded in a constant number of bytes. Used for homogenous sequences."
#{Long Double Date Instant UUID})
@@ -167,25 +198,21 @@

(header [this len]
(if (< len 0x20)
(if (< len 0x10)
(byte-array [(bit-or 0xC0 len)])
(general-header (type->code Keyword) len)))
(body [this]
(let [nms (namespace this)
n (name this)]
(.getBytes (subs (str this) 1) ^Charset utf8)))
(.getBytes (subs (str this) 1) ^Charset utf8))
(encapsulate-id [this]
(encapsulate-sstr (subs (str this) 1) skey-type-mask))

(header [this len]
(assert (= len Long/BYTES))
(byte-array [(bit-or 0xE0 (type->code Long))]))
(assert (<= len Long/BYTES))
(byte-array [(bit-or 0xD0 len)]))
(body [^long this]
(let [b (byte-array Long/BYTES)
bb (ByteBuffer/wrap b)]
(.putLong bb 0 this)
(let [n (num-bytes this)]
(n-byte-number n this)))
(encapsulate-id [this]
(when-let [v (encapsulate-long this)]
(bit-or long-type-mask v)))
@@ -220,7 +247,7 @@
(assert (= len Long/BYTES))
(byte-array [(bit-or 0xE0 (type->code Date))]))
(body [^Date this]
(body (.getTime this)))
(n-byte-number Long/BYTES (.getTime this)))
(encapsulate-id [this]
(when-let [v (encapsulate-long (.getTime ^Date this))]
(bit-or date-type-mask v)))
@@ -257,18 +284,47 @@
(general-header (type->code ISeq) len))
(body [this]
(if-not (seq this)
(byte-array 0)
(let [fst (first this)
t (type fst)
homogeneous (and (constant-length? t) (every? #(instance? t %) this))
[elt-fn prefix] (if homogeneous
(let [hdr (byte-array [(bit-or 0xE0 (type->code t))])]
[#(vector (body %)) hdr])
[to-bytes zero-array])]
(->> this
(mapcat elt-fn)
(cons prefix)
(if (= t Long)
(let [elt-len (apply max (map num-bytes this))
arr-hdr (byte-array [(bit-or 0xD0 elt-len)])] ;; 0xDllll is the header byte for longs
;; integer homogenous arrays store the number in the header, with nil bodies
[#(vector (n-byte-number elt-len %)) arr-hdr])
(let [arr-hdr (byte-array [(bit-or 0xE0 (type->code t))])] ;; 0xEtttt is the header byte for typed things
;; simple homogenous arrays store everything in the object header, with nil bodies
[#(vector (body %)) arr-hdr]))
[to-counted-bytes zero-array])
;; start counting the bytes that are going into the buffer
starting-offset @*current-offset*
_ (vswap! *current-offset* + 3) ;; 2 bytes for a short header + 1 byte for the prefix array
result (->> this
;; like a mapv but records the lengths of the data as it iterates through the seq
(reduce (fn [arrays x]
(let [offset @*current-offset* ;; save the start, as the embedded objects will update this
[head body] (elt-fn x)]
;; regardless of what embedded objects have update the *current-offset* to, change it to the
;; start of the current object, plus its total size
(vreset! *current-offset* (+ offset (alength head) (if body (alength body) 0)))
;; add the bytes of this object to the overall result of byte arrays
(cond-> (conj! arrays head)
body (conj! body)))) ;; only add the body if there is one
(transient [prefix]))
update-lengths (fn [m u]
(into {} (map (fn [[k v :as kv]]
(if (> v starting-offset) [k (+ v u)] kv))
rlen (alength result)]
;; correct offsets for longer headers
(> rlen 0x7FFF) (vswap! *entity-offsets* update-lengths 3) ;; total 5 after the 2 already added
(> rlen 0xFF) (vswap! *entity-offsets* update-lengths 1)) ;; total 3 after the 2 already added

(header [this len] (header (or (seq this) '()) len))
@@ -278,6 +334,10 @@
(header [this len]
(general-header (type->code IPersistentMap) len))
(body [this]
;; If this is an identified object, then save it's location
(doseq [id-attr [:db/id :db/ident :id]]
(when-let [id (id-attr this)]
(vswap! *entity-offsets* assoc id @*current-offset*)))
(body (apply concat (seq this))))

@@ -300,8 +360,17 @@
(encapsulate-id [^asami.graph.InternalNode this]
(bit-or node-type-mask (bit-and data-mask (.id this)))))

(defn to-bytes
(defn to-counted-bytes
"Returns a tuple of byte arrays, representing the header and the body"
(let [^bytes b (body o)]
[(header o (alength b)) b]))

(defn to-bytes
"Returns a tuple of byte arrays, representing the header and the body"
(binding [*entity-offsets* (volatile! {})
*current-offset* (volatile! 0)
*number-bytes* (byte-array Long/BYTES)]
(binding [*number-buffer* (ByteBuffer/wrap *number-bytes*)]
(conj (to-counted-bytes o) @*entity-offsets*))))

