diff --git a/deps.edn b/deps.edn index 7bcd8c4..aac5054 100644 --- a/deps.edn +++ b/deps.edn @@ -5,4 +5,5 @@ org.clojure/core.cache {:mvn/version "1.0.207"} org.clojars.quoll/zuko {:mvn/version "0.6.5"} org.clojure/data.priority-map {:mvn/version "1.0.0"} + cheshire/cheshire {:mvn/version "5.10.0"} tailrecursion/cljs-priority-map {:mvn/version "1.2.1"}}} diff --git a/src/asami/durable/codec.cljc b/src/asami/durable/codec.cljc index 8421046..92293db 100644 --- a/src/asami/durable/codec.cljc +++ b/src/asami/durable/codec.cljc @@ -49,8 +49,9 @@ ;; Byte 0 ;; 0xxxxxxx String type, length of up to 127. ;; 10xxxxxx URI type, length of up to 64 -;; 110xxxxx Keyword type, length of up to 32 +;; 1100xxxx Keyword type, length of up to 16 ;; For these 3 types, all remaining bytes are the data body. +;; 1101xxxx Long value. xxxx encodes the number of bytes ;; 111ytttt Data is of type described in tttt. ;; Length is run-length encoded as follows: ;; When y=0 diff --git a/src/asami/durable/decoder.clj b/src/asami/durable/decoder.clj index a68d648..c2b098e 100644 --- a/src/asami/durable/decoder.clj +++ b/src/asami/durable/decoder.clj @@ -41,9 +41,9 @@ [^bytes data] (let [b0 (aget data 0)] (cond ;; test for short format objects - (zero? (bit-and 0x80 b0)) b0 - (zero? (bit-and 0x40 b0)) (bit-and 0x3F b0) - (zero? (bit-and 0x20 b0)) (bit-and 0x1F b0) + (zero? (bit-and 0x80 b0)) b0 ;; short string + (zero? (bit-and 0x40 b0)) (bit-and 0x3F b0) ;; short URI + (zero? (bit-and 0x20 b0)) (bit-and 0x0F b0) ;; short keyword OR number ;; First byte contains only the type information. Give a large number = 63 :default 0x3F))) @@ -61,6 +61,12 @@ [paged-rdr ^long pos ^long len] (keyword (read-str paged-rdr pos len))) +(defn read-long + "Raw reading of big-endian bytes into a long" + ^long [paged-rdr ^long pos ^long len] + (let [^bytes b (read-bytes paged-rdr pos len)] + (areduce b i ret 0 (bit-or (bit-shift-left ret Byte/SIZE) (bit-and 0xFF (aget b i)))))) + ;; decoders operate on the bytes following the initial type byte information ;; if the data type has variable length, then this is decoded first @@ -151,26 +157,35 @@ (defn seq-decoder "This is a decoder for sequences of data. Use a vector as the sequence." [ext paged-rdr ^long pos] + ;; read the length of the header and the length of the seq data (let [[i len] (decode-length ext paged-rdr pos) start (+ i pos) end (+ start len) + ;; get the 0 byte. This contain info about the types in the seq b0 (read-byte paged-rdr start) decoder (if (zero? b0) - ;; heterogeneous + ;; heterogeneous types. Full header on every element. Read objects with size. read-object-size - ;; homogeneous - (if-let [tdecoder (typecode->decoder (bit-and 0x0F b0))] - #(tdecoder true %1 %2) - (throw (ex-info "Illegal datatype in array" {:type-code (bit-and 0x0F b0)}))))] + ;; homogeneous types. The header is only written once + (if (= 0xD0 (bit-and 0xF0 b0)) ;; homogenous numbers + (let [num-len (bit-and 0x0F b0)] ;; get the byte length of all the numbers + ;; return a function that deserializes the number and pairs it with the length + #(vector (read-long %1 %2 num-len) num-len)) + (if-let [tdecoder (typecode->decoder (bit-and 0x0F b0))] ;; reader for type + ;; the standard decoder already returns a deserialized value/length pair + #(tdecoder true %1 %2) + (throw (ex-info "Illegal datatype in array" {:type-code (bit-and 0x0F b0)})))))] + ;; iterate over the buffer deserializing until the end is reached (loop [s [] offset (inc start)] (if (>= offset end) - [s (+ i len)] - (let [[o obj-len] (decoder paged-rdr offset)] + [s (+ i len)] ;; end of the buffer, return the seq and the number of bytes read + (let [[o obj-len] (decoder paged-rdr offset)] ;; deserialize, then step forward (recur (conj s o) (+ offset obj-len))))))) (defn map-decoder - "A decoder for maps" + "A decoder for maps. Returns the map and the bytes read." [ext paged-rdr ^long pos] + ;; read the map as one long seq, then split into pairs (let [[s len] (seq-decoder ext paged-rdr pos) m (into {} (map vec (partition 2 s)))] [m len])) @@ -313,7 +328,7 @@ (or (first (drop-while zero? (map compare left-body (drop 1 right-bytes)))) 0))) (defn read-object-size - "Reads an object from a paged-reader, at id=pos" + "Reads an object from a paged-reader, at id=pos. Returns both the object and it's length." [paged-rdr ^long pos] (let [b0 (read-byte paged-rdr pos) ipos (inc pos)] @@ -322,12 +337,14 @@ (zero? (bit-and 0x80 b0)) [(read-str paged-rdr ipos b0) (inc b0)] (zero? (bit-and 0x40 b0)) (let [len (bit-and 0x3F b0)] [(read-uri paged-rdr ipos len) (inc len)]) - (zero? (bit-and 0x20 b0)) (let [len (bit-and 0x1F b0)] - [(read-keyword paged-rdr ipos len) (inc len)]) ;; First byte contains only the type information. Increment the returned length to include b0 - :default (update ((typecode->decoder (bit-and 0x0F b0) default-decoder) - (zero? (bit-and 0x10 b0)) paged-rdr ipos) - 1 inc)))) + (= 0xE0 (bit-and 0xE0 b0)) (update ((typecode->decoder (bit-and 0x0F b0) default-decoder) + (zero? (bit-and 0x10 b0)) paged-rdr ipos) + 1 inc) + ;; high nybble is 1100 for keywords or 1101 for long number + :default (let [read-fn (if (zero? (bit-and 0x30 b0)) read-keyword read-long) + len (bit-and 0x0F b0)] + [(read-fn paged-rdr ipos len) (inc len)])))) (defn read-object "Reads an object from a paged-reader, at id=pos" diff --git a/src/asami/durable/encoder.clj b/src/asami/durable/encoder.clj index 9179a2c..26eb80f 100644 --- a/src/asami/durable/encoder.clj +++ b/src/asami/durable/encoder.clj @@ -20,6 +20,11 @@ ;; (set! *warn-on-reflection* true) +(def ^:dynamic *entity-offsets* nil) +(def ^:dynamic *current-offset* nil) + +(def empty-bytes (byte-array 0)) + (def type->code {Long (byte 0) Double (byte 1) @@ -80,12 +85,12 @@ (bit-and 0xFF (bit-shift-right len 8)) (bit-and 0xFF len)]))) -;; to-bytes is required by the recursive concattenation operation -(declare to-bytes) +;; to-counted-bytes is required by the recursive concattenation operation +(declare to-counted-bytes) (defn concat-bytes "Takes multiple byte arrays and returns an array with all of the bytes concattenated" - [bas] + ^bytes [bas] (let [len (apply + (map alength bas)) output (byte-array len)] (reduce (fn [offset arr] @@ -133,6 +138,32 @@ (when (and (< l max-short-long) (> l min-short-long)) (bit-and data-mask l))) +(def ^:dynamic *number-bytes* nil) +(def ^:dynamic *number-buffer* nil) + +(defn n-byte-number + "Returns an array of n bytes representing the number x. + Must be initialized for the current thread." + [^long n ^long x] + (.putLong *number-buffer* 0 x) + (let [ret (byte-array n)] + (System/arraycopy ^bytes *number-bytes* (int (- Long/BYTES n)) ret 0 (int n)) + ret)) + +(defn num-bytes + "Determines the number of bytes that can hold a value. + From 2-4 tests, this preferences small numbers." + [^long n] + (let [f (neg? n) + nn (if f (dec (- n)) n)] + (if (<= nn 0x7FFF) + (if (<= nn 0x7F) 1 2) + (if (<= nn 0x7FFFFFFF) + (if (<= nn 0x7FFFFF) 3 4) + (if (<= nn 0x7FFFFFFFFFFF) + (if (<= nn 0x7FFFFFFFFF) 5 6) + (if (<= nn 0x7FFFFFFFFFFFFF) 7 8)))))) + (def constant-length? "The set of types that can be encoded in a constant number of bytes. Used for homogenous sequences." #{Long Double Date Instant UUID}) @@ -167,25 +198,21 @@ Keyword (header [this len] - (if (< len 0x20) + (if (< len 0x10) (byte-array [(bit-or 0xC0 len)]) (general-header (type->code Keyword) len))) (body [this] - (let [nms (namespace this) - n (name this)] - (.getBytes (subs (str this) 1) ^Charset utf8))) + (.getBytes (subs (str this) 1) ^Charset utf8)) (encapsulate-id [this] (encapsulate-sstr (subs (str this) 1) skey-type-mask)) Long (header [this len] - (assert (= len Long/BYTES)) - (byte-array [(bit-or 0xE0 (type->code Long))])) + (assert (<= len Long/BYTES)) + (byte-array [(bit-or 0xD0 len)])) (body [^long this] - (let [b (byte-array Long/BYTES) - bb (ByteBuffer/wrap b)] - (.putLong bb 0 this) - b)) + (let [n (num-bytes this)] + (n-byte-number n this))) (encapsulate-id [this] (when-let [v (encapsulate-long this)] (bit-or long-type-mask v))) @@ -220,7 +247,7 @@ (assert (= len Long/BYTES)) (byte-array [(bit-or 0xE0 (type->code Date))])) (body [^Date this] - (body (.getTime this))) + (n-byte-number Long/BYTES (.getTime this))) (encapsulate-id [this] (when-let [v (encapsulate-long (.getTime ^Date this))] (bit-or date-type-mask v))) @@ -257,18 +284,47 @@ (general-header (type->code ISeq) len)) (body [this] (if-not (seq this) - (byte-array 0) + empty-bytes (let [fst (first this) t (type fst) homogeneous (and (constant-length? t) (every? #(instance? t %) this)) [elt-fn prefix] (if homogeneous - (let [hdr (byte-array [(bit-or 0xE0 (type->code t))])] - [#(vector (body %)) hdr]) - [to-bytes zero-array])] - (->> this - (mapcat elt-fn) - (cons prefix) - concat-bytes)))) + (if (= t Long) + (let [elt-len (apply max (map num-bytes this)) + arr-hdr (byte-array [(bit-or 0xD0 elt-len)])] ;; 0xDllll is the header byte for longs + ;; integer homogenous arrays store the number in the header, with nil bodies + [#(vector (n-byte-number elt-len %)) arr-hdr]) + (let [arr-hdr (byte-array [(bit-or 0xE0 (type->code t))])] ;; 0xEtttt is the header byte for typed things + ;; simple homogenous arrays store everything in the object header, with nil bodies + [#(vector (body %)) arr-hdr])) + [to-counted-bytes zero-array]) + ;; start counting the bytes that are going into the buffer + starting-offset @*current-offset* + _ (vswap! *current-offset* + 3) ;; 2 bytes for a short header + 1 byte for the prefix array + result (->> this + ;; like a mapv but records the lengths of the data as it iterates through the seq + (reduce (fn [arrays x] + (let [offset @*current-offset* ;; save the start, as the embedded objects will update this + [head body] (elt-fn x)] + ;; regardless of what embedded objects have update the *current-offset* to, change it to the + ;; start of the current object, plus its total size + (vreset! *current-offset* (+ offset (alength head) (if body (alength body) 0))) + ;; add the bytes of this object to the overall result of byte arrays + (cond-> (conj! arrays head) + body (conj! body)))) ;; only add the body if there is one + (transient [prefix])) + persistent! + concat-bytes) + update-lengths (fn [m u] + (into {} (map (fn [[k v :as kv]] + (if (> v starting-offset) [k (+ v u)] kv)) + m))) + rlen (alength result)] + ;; correct offsets for longer headers + (cond + (> rlen 0x7FFF) (vswap! *entity-offsets* update-lengths 3) ;; total 5 after the 2 already added + (> rlen 0xFF) (vswap! *entity-offsets* update-lengths 1)) ;; total 3 after the 2 already added + result))) IPersistentVector (header [this len] (header (or (seq this) '()) len)) @@ -278,6 +334,10 @@ (header [this len] (general-header (type->code IPersistentMap) len)) (body [this] + ;; If this is an identified object, then save it's location + (doseq [id-attr [:db/id :db/ident :id]] + (when-let [id (id-attr this)] + (vswap! *entity-offsets* assoc id @*current-offset*))) (body (apply concat (seq this)))) Object @@ -300,8 +360,17 @@ (encapsulate-id [^asami.graph.InternalNode this] (bit-or node-type-mask (bit-and data-mask (.id this))))) -(defn to-bytes +(defn to-counted-bytes "Returns a tuple of byte arrays, representing the header and the body" [o] (let [^bytes b (body o)] [(header o (alength b)) b])) + +(defn to-bytes + "Returns a tuple of byte arrays, representing the header and the body" + [o] + (binding [*entity-offsets* (volatile! {}) + *current-offset* (volatile! 0) + *number-bytes* (byte-array Long/BYTES)] + (binding [*number-buffer* (ByteBuffer/wrap *number-bytes*)] + (conj (to-counted-bytes o) @*entity-offsets*)))) diff --git a/src/asami/durable/graph.cljc b/src/asami/durable/graph.cljc index f46873b..eaba9d4 100644 --- a/src/asami/durable/graph.cljc +++ b/src/asami/durable/graph.cljc @@ -1,34 +1,40 @@ (ns ^{:doc "The implements the Graph over durable storage" :author "Paula Gearon"} asami.durable.graph - (:require [asami.graph :as graph] - [asami.internal :refer [now instant? long-time]] - [asami.common-index :as common-index :refer [?]] - [asami.durable.common :as common :refer [TxData Transaction Closeable - find-tuples tuples-at write-new-tx-tuple! - write-tuple! delete-tuple! - find-object find-id write! at latest rewind! commit! - close delete! append! next-id max-long]] - [asami.durable.pool :as pool] - [asami.durable.tuples :as tuples] - [asami.durable.resolver :as resolver :refer [get-from-index get-transitive-from-index]] - #?(:clj [asami.durable.flat-file :as flat-file]) - [zuko.node :as node] - [zuko.logging :as log :include-macros true])) + (:require [asami.graph :as graph] + [asami.internal :refer [now instant? long-time]] + [asami.common-index :as common-index :refer [?]] + [asami.durable.common :as common :refer [TxData Transaction Closeable + find-tuples tuples-at write-new-tx-tuple! + write-tuple! delete-tuple! + find-object find-id write! at latest rewind! commit! + close delete! append! next-id max-long]] + [asami.durable.common-utils :as common-utils] + [asami.durable.pool :as pool] + [asami.durable.tuples :as tuples] + [asami.durable.resolver :as resolver :refer [get-from-index get-transitive-from-index]] + #?(:clj [asami.durable.flat-file :as flat-file]) + [zuko.node :as node] + [zuko.logging :as log :include-macros true])) ;; (set! *warn-on-reflection* true) +;; names to use when index files are all separate (def spot-name "eavt") (def post-name "avet") (def ospt-name "veat") (def tspo-name "teav") ;; a flat file transaction index +;; names to use when index files are shared +(def index-name "stmtidx.bin") +(def block-name "stmt.bin") + (declare ->BlockGraph) (defn square [x] (* x x)) (defn cube [x] (* x x x)) -(defrecord BlockGraph [spot post ospt tspo pool node-allocator id-checker] +(defrecord BlockGraph [spot post ospt tspo pool node-allocator id-checker tree-block-manager tuple-block-manager] graph/Graph (new-graph [this] @@ -155,6 +161,8 @@ Transaction (rewind! [this] + (when tree-block-manager (rewind! tree-block-manager)) + (when tuple-block-manager (rewind! tuple-block-manager)) (let [spot* (rewind! spot) post* (rewind! post) ospt* (rewind! ospt) @@ -167,6 +175,8 @@ :pool pool*))) (commit! [this] + (when tree-block-manager (commit! tree-block-manager)) + (when tuple-block-manager (commit! tuple-block-manager)) (let [spot* (commit! spot) post* (commit! post) ospt* (commit! ospt) @@ -189,11 +199,15 @@ Closeable (close [this] (doseq [resource [spot post ospt tspo pool]] - (close resource))) + (close resource)) + (when tree-block-manager (close tree-block-manager)) + (when tuple-block-manager (close tuple-block-manager))) (delete! [this] (doseq [resource [spot post ospt tspo pool]] - (delete! resource)))) + (delete! resource)) + (when tree-block-manager (delete! tree-block-manager)) + (when tuple-block-manager (delete! tuple-block-manager)))) (defn graph-at "Returns a graph based on another graph, but with different set of index roots. This returns a historical graph. @@ -217,5 +231,22 @@ ospt-index (tuples/create-tuple-index name ospt-name r-ospt) tspo-index #?(:clj (flat-file/record-store name tspo-name tuples/tuple-size-bytes) :cljs nil) data-pool (pool/create-pool name r-pool)] - (->BlockGraph spot-index post-index ospt-index tspo-index data-pool node-allocator id-checker))) + (->BlockGraph spot-index post-index ospt-index tspo-index data-pool node-allocator id-checker nil nil))) + +(defn new-merged-block-graph + "Creates a new BlockGraph object, under a given name. If the resources for that name exist, then they are opened. + If the resources do not exist, then they are created. + name: the label of the location for the graph resources. + tx: The transaction record for this graph." + [name {:keys [r-spot r-post r-ospt r-pool]} node-allocator id-checker] + ;; NOTE: Tree nodes blocks must hold the tuples payload and the tree node header + (let [tree-block-manager (common-utils/create-block-manager name index-name tuples/tree-block-size) + tuple-block-manager (common-utils/create-block-manager name block-name tuples/block-bytes) + spot-index (tuples/create-tuple-index-for-managers "SPO" tree-block-manager tuple-block-manager r-spot) + post-index (tuples/create-tuple-index-for-managers "POS" tree-block-manager tuple-block-manager r-post) + ospt-index (tuples/create-tuple-index-for-managers "OSP" tree-block-manager tuple-block-manager r-ospt) + tspo-index #?(:clj (flat-file/record-store name tspo-name tuples/tuple-size-bytes) :cljs nil) + data-pool (pool/create-pool name r-pool)] + (->BlockGraph spot-index post-index ospt-index tspo-index data-pool node-allocator id-checker + tree-block-manager tuple-block-manager))) diff --git a/src/asami/durable/pool.cljc b/src/asami/durable/pool.cljc index 87f16df..a55876f 100644 --- a/src/asami/durable/pool.cljc +++ b/src/asami/durable/pool.cljc @@ -154,7 +154,9 @@ [name root-id] (let [data-store (data-constructor name data-name) data-compare (pool-comparator-fn data-store) - index (tree/new-block-tree (partial common-utils/create-block-manager name) + index (tree/new-block-tree (fn + ([] true) + ([lname size] (common-utils/create-block-manager name lname size))) index-name tree-node-size data-compare root-id) encap-cache (atom (lru-cache-factory {} :threshold encap-cache-size))] (->DataPool data-store index root-id encap-cache))) diff --git a/src/asami/durable/store.cljc b/src/asami/durable/store.cljc index b898c2f..907d2a5 100644 --- a/src/asami/durable/store.cljc +++ b/src/asami/durable/store.cljc @@ -258,6 +258,6 @@ node-allocator (fn [] (graph/new-node (swap! node-counter inc))) ;; the following function is called under locked conditions id-checker (fn [id] (when (> id @node-counter) (reset! node-counter id))) - block-graph (dgraph/new-block-graph name unpacked-tx node-allocator id-checker)] + block-graph (dgraph/new-merged-block-graph name unpacked-tx node-allocator id-checker)] (->DurableConnection name tx-manager (atom block-graph) node-counter (create-lock)))) diff --git a/src/asami/durable/tree.cljc b/src/asami/durable/tree.cljc index 0c3730a..37d2ef7 100644 --- a/src/asami/durable/tree.cljc +++ b/src/asami/durable/tree.cljc @@ -94,6 +94,12 @@ (assoc (->Node new-block (get-parent this)) :side (:side this)) this))) + Object + (toString [this] + (let [payload (mapv #(Long/toHexString (get-long this %)) (range (/ (- (:size block) header-size) 8)))] + (str "Node[" (get-id this) "] L:" (get-child-id this left) " R:" (get-child-id this right) " payload:" + payload))) + Block (get-id [this] (get-id block)) @@ -163,7 +169,7 @@ (new-node tree nil nil nil)) ([tree data writer] (new-node tree data nil writer)) - ([{:keys [block-manager node-cache]} data parent writer] + ([{:keys [block-manager node-cache] :as tree} data parent writer] (let [block (allocate-block! block-manager)] (let [node (->Node block parent)] (when data @@ -326,7 +332,7 @@ (modify-node! [this node] (throw (ex-info "Read-only trees cannot be modified" {:type :modify})))) -(defrecord TxTree [root rewind-root node-comparator block-manager node-cache] +(defrecord TxTree [label root rewind-root node-comparator block-manager node-cache own-manager] Tree (find-node [this key] (find-node* this key)) @@ -385,35 +391,48 @@ Transaction (rewind! [this] - (rewind! block-manager) + (when own-manager + (rewind! block-manager)) (assoc this :root rewind-root)) (commit! [this] - (commit! block-manager) + (when own-manager + (commit! block-manager)) (assoc this :rewind-root root)) Forceable (force! [this] - (force! block-manager)) + (when own-manager + (force! block-manager))) Closeable (close [this] - (close block-manager)) + (when own-manager + (close block-manager))) (delete! [this] - (delete! block-manager))) + (when own-manager + (delete! block-manager)))) +(defn calc-block-size + "Determines the size of a block used by a tree, based on the payload in the node" + [payload-size] + (+ header-size payload-size)) (defn new-block-tree - "Creates an empty block tree" + "Creates an empty block tree. + The factory may provide a block manager that this objects owns, or which is provided. + This can be determined by calling the block-manager-factory with no arguments." ([block-manager-factory store-name data-size node-comparator] (new-block-tree block-manager-factory store-name data-size node-comparator nil)) ([block-manager-factory store-name data-size node-comparator root-id] - (let [block-manager (block-manager-factory store-name (+ header-size data-size))] + (let [block-manager (block-manager-factory store-name (calc-block-size data-size)) + own-manager? (block-manager-factory)] (if-not (get-block block-manager null) (throw (ex-info "Unable to initialize tree with null block" {:block-manager block-manager}))) (let [data-size (- (get-block-size block-manager) header-size) root (if (and root-id (not= null root-id)) (->Node (get-block block-manager root-id) nil))] - (->TxTree root root node-comparator block-manager - (atom (lru-cache-factory {} :threshold node-cache-size))))))) + (->TxTree store-name root root node-comparator block-manager + (atom (lru-cache-factory {} :threshold node-cache-size)) + own-manager?))))) diff --git a/src/asami/durable/tuples.cljc b/src/asami/durable/tuples.cljc index 72a91fb..b685dd6 100644 --- a/src/asami/durable/tuples.cljc +++ b/src/asami/durable/tuples.cljc @@ -32,6 +32,9 @@ (def ^:const tree-node-size "Number of bytes used in the index nodes" (* (inc block-reference-offset) long-size)) +(def tree-block-size "All the node data, plus overhead normally managed by the tree" + (tree/calc-block-size tree-node-size)) + ;; range is 0-511 (def ^:const block-max "Maximum number of tuples in a block" 512) @@ -419,7 +422,10 @@ (set-block-ref! node block-id)) (defn insert-tuple! - "Inserts a tuple into an index" + "Inserts a tuple into an index. + tuple: the data to be inserted. + short-tuple: a possibly shortened tuple to search on. This allows searching for tuples + that don't share every element (such as a statement ID)" [{:keys [index blocks root-id] :as this} tuple short-tuple] (let [insert-coord (find-coord index blocks short-tuple)] (cond @@ -514,17 +520,21 @@ (count-index-tuples index blocks tuple))) -(defrecord TupleIndex [index blocks root-id] +(defrecord TupleIndex [label index blocks root-id own-manager] TupleStorage (tuples-at [this root] (->ReadOnlyTupleIndex (tree/at index root) blocks root)) + ;; this operation drops the statement ID from a tuple, so as not to overwrite an existing statement (write-new-tx-tuple! [this tuple] (let [part-tuple (if (vector? tuple) (subvec tuple 0 dec-tuple-size) (vec (butlast tuple)))] (insert-tuple! this tuple part-tuple))) + ;; this operation assumes that the provided statements is not already present. + ;; write-new-tx-tuple should have been used to check this first + ;; can be used on a non-primary index if the primary index has already checked. (write-tuple! [this tuple] (insert-tuple! this tuple tuple)) @@ -545,30 +555,46 @@ Transaction (rewind! [this] - (rewind! blocks) + (when own-manager + (rewind! blocks)) (let [rindex (rewind! index)] (assoc this :index rindex :root-id (:root rindex)))) (commit! [this] - (commit! blocks) + (when own-manager + (commit! blocks)) (assoc this :index (commit! index))) Closeable (close [this] (close index) - (close blocks)) + (when own-manager + (close blocks))) (delete! [this] (delete! index) - (delete! blocks))) + (when own-manager + (delete! blocks)))) + + +(defn create-tuple-index-for-managers + "Creates a tuple index for a provided pair of block managers." + ([label index-manager tuple-block-manager] + (create-tuple-index-for-managers index-manager tuple-block-manager nil)) + ([label index-manager tuple-block-manager root-id] + ;; create a factory fn that returns false on 0-arity to indicate that the index manager is not owned by the tree + (let [block-manager-factory (fn ([] false) ([_ _] index-manager)) + index (tree/new-block-tree block-manager-factory label tree-node-size tuple-node-compare root-id)] + (->TupleIndex label index tuple-block-manager root-id false)))) (defn open-tuples [order-name name root-id] - (let [index (tree/new-block-tree (partial common-utils/create-block-manager name) + ;; create a factory fn that returns true on 0-arity to indicate that the index manager is owned by the calling tree + (let [index (tree/new-block-tree (fn ([] true) ([lname size] (common-utils/create-block-manager name lname size))) (str order-name index-name) tree-node-size tuple-node-compare root-id) - blocks (common-utils/create-block-manager name (str order-name block-name) (* block-max tuple-size long-size))] - (->TupleIndex index blocks root-id))) + blocks (common-utils/create-block-manager name (str order-name block-name) block-bytes)] + (->TupleIndex name index blocks root-id true))) (defn create-tuple-index "Creates a tuple index for a name" diff --git a/test/asami/durable/block_tree_test.cljc b/test/asami/durable/block_tree_test.cljc index a839500..c001fa1 100644 --- a/test/asami/durable/block_tree_test.cljc +++ b/test/asami/durable/block_tree_test.cljc @@ -15,23 +15,25 @@ (defn create-block-manager "Creates a block manager. If the reuse? flag is on, then an existing non-empty block manager is returned. Otherwise the block manager has only fresh blocks." - [name block-size & [reuse?]] - #?(:clj - ;; get-filename will ensure that a file is EMPTY. - ;; to reuse a temporary file, will need to find it directly with temp-file - (let [f (if reuse? - (util/temp-file name) - (get-filename name))] - (create-managed-block-file f block-size)) - - :cljs - ;; TODO: create ClojureScript block manager - nil - )) + ([] true) + ([name block-size & [reuse?]] + #?(:clj + ;; get-filename will ensure that a file is EMPTY. + ;; to reuse a temporary file, will need to find it directly with temp-file + (let [f (if reuse? + (util/temp-file name) + (get-filename name))] + (create-managed-block-file f block-size)) + + :cljs + ;; TODO: create ClojureScript block manager + nil + ))) (defn reopen-block-manager - [name block-size] - (create-block-manager name block-size true)) + ([] true) + ([name block-size] + (create-block-manager name block-size true))) (defn long-compare [a node] diff --git a/test/asami/durable/codec_test.cljc b/test/asami/durable/codec_test.cljc index b1a5385..570d06c 100644 --- a/test/asami/durable/codec_test.cljc +++ b/test/asami/durable/codec_test.cljc @@ -168,6 +168,25 @@ (defn now [] #?(:clj (Date.) :cljs (js/Date.))) + +(deftest test-long-codec + (let [bb (byte-buffer (byte-array 2048)) + rdr (->TestReader bb) + write! (fn [o] + (let [[header body] (to-bytes o) + header-size (byte-length header) + body-size (byte-length body)] + (.position bb 0) + (.put bb header 0 header-size) + (.put bb body 0 body-size))) + rt (fn [o] + (write! o) + (let [r (read-object rdr 0)] + (if (bytes? o) + (is (array-equals o r)) + (is (= o r)))))] + (rt 2020))) + (deftest test-codecs (let [bb (byte-buffer (byte-array 2048)) rdr (->TestReader bb) diff --git a/test/asami/durable/graph_test.cljc b/test/asami/durable/graph_test.cljc index 9c29845..9d2db5d 100644 --- a/test/asami/durable/graph_test.cljc +++ b/test/asami/durable/graph_test.cljc @@ -2,7 +2,7 @@ :author "Paula Gearon"} asami.durable.graph-test (:require [asami.graph :refer [resolve-triple graph-transact new-node]] - [asami.durable.graph :refer [graph-at new-block-graph]] + [asami.durable.graph :refer [graph-at new-merged-block-graph]] [asami.durable.store :refer [unpack-tx tx-record-size]] [asami.durable.common :refer [latest close get-tx-data commit!]] #?(:clj [asami.durable.flat-file :as flat-file]) @@ -36,7 +36,7 @@ counter (atom 0) node-allocator (fn [] (new-node (swap! counter inc))) id-checker (fn [id] (when (> id @counter) (reset! counter id)))] - (assoc (new-block-graph nm (and tx (unpack-tx tx)) node-allocator id-checker) + (assoc (new-merged-block-graph nm (and tx (unpack-tx tx)) node-allocator id-checker) :to-close tx-manager))) (deftest test-new-graph diff --git a/test/asami/durable/object_codec_test.clj b/test/asami/durable/object_codec_test.clj new file mode 100644 index 0000000..27b4239 --- /dev/null +++ b/test/asami/durable/object_codec_test.clj @@ -0,0 +1,207 @@ +(ns ^{:doc "Tests the encoding/decoding operations on objects" + :author "Paula Gearon"} + asami.durable.object-codec-test + (:require [asami.durable.codec :as codec] + [asami.durable.encoder :as encoder :refer [to-bytes encapsulate-id]] + [asami.durable.decoder :as decoder :refer [read-object unencapsulate-id decode-length-node]] + [asami.durable.common :refer [Paged refresh! read-byte read-short read-bytes read-bytes-into + FlatStore write-object! get-object force!]] + [asami.durable.flat-file :refer [paged-file]] + [clojure.string :as s] + [clojure.test :refer [deftest is]] + [clojure.pprint :refer [pprint]]) + (:import [java.io RandomAccessFile File] + [java.nio ByteBuffer] + [java.time Instant ZoneOffset] + [java.util Date Arrays GregorianCalendar TimeZone] + [java.net URI URL])) + +(defn byte-length [bytes] + (alength bytes)) + +(defn byte-buffer [byte-array] + (ByteBuffer/wrap byte-array)) + +(defrecord TestReader [b] + Paged + (refresh! [this]) + + (read-byte [this offset] + (.get b offset)) + + (read-short [this offset] + (.getShort b offset)) + + (read-bytes [this offset len] + (read-bytes-into this offset (byte-array len))) + + (read-bytes-into [this offset bytes] + (.get (.position (.asReadOnlyBuffer b) offset) bytes 0 (byte-length bytes)) + bytes)) + +(defn array-equals + [a b] + (Arrays/equals a b)) + +(defn now [] (Date.)) + +(def data {"layers" + {:db/ident "layers" + "frame" + {:id "frame-data" + "frame.protocols" "eth:ethertype:ip:tcp", + "frame.cap_len" "78", + "frame.marked" "0", + "frame.offset_shift" "0.000000000", + "frame.time_delta_displayed" "0.003008000", + "frame.time_relative" "7.959051000", + "frame.time_delta" "0.003008000", + "frame.time_epoch" "1612222673.161785000", + "frame.time" "Feb 1, 2021 18:37:53.161785000 EST", + "frame.encap_type" "1", + "frame.len" "78", + "frame.number" "771", + "frame.ignored" "0"}, + "eth" + {:id "eth-data" + "eth.dst" "22:4e:7f:74:55:8d", + "eth.dst_tree" + {"eth.dst.oui" "2248319", + "eth.addr" "22:4e:7f:74:55:8d", + "eth.dst_resolved" "22:4e:7f:74:55:8d", + "eth.dst.ig" "0", + "eth.ig" "0", + "eth.lg" "1", + "eth.addr_resolved" "22:4e:7f:74:55:8d", + "eth.dst.lg" "1", + "eth.addr.oui" "2248319"}, + "eth.src" "46:eb:d7:d5:2b:c8", + "eth.src_tree" + {"eth.addr" "46:eb:d7:d5:2b:c8", + "eth.ig" "0", + "eth.lg" "1", + "eth.src.oui" "4647895", + "eth.addr_resolved" "46:eb:d7:d5:2b:c8", + "eth.src.lg" "1", + "eth.src.ig" "0", + "eth.addr.oui" "4647895", + "eth.src_resolved" "46:eb:d7:d5:2b:c8"}, + "eth.type" "0x00000800"}, + "ip" + {:id "ip-data" + "ip.checksum.status" "2", + "ip.dst_host" "199.232.37.87", + "ip.host" "199.232.37.87", + "ip.dsfield" "0x00000000", + "ip.version" "4", + "ip.len" "64", + "ip.src" "192.168.1.122", + "ip.addr" "199.232.37.87", + "ip.frag_offset" "0", + "ip.dsfield_tree" {"ip.dsfield.dscp" "0", "ip.dsfield.ecn" "0"}, + "ip.ttl" "64", + "ip.checksum" "0x00008b56", + "ip.id" "0x00000000", + "ip.proto" "6", + "ip.flags_tree" + {"ip.flags.rb" "0", "ip.flags.df" "1", "ip.flags.mf" "0"}, + "ip.hdr_len" "20", + "ip.dst" "199.232.37.87", + "ip.src_host" "192.168.1.122", + "ip.flags" "0x00000040"}, + "tcp" + {:id "tcp-data" + "tcp.srcport" "57934", + "tcp.seq" "518", + "tcp.options_tree" + {"tcp.options.nop" "01", + "tcp.options.nop_tree" {"tcp.option_kind" "1"}, + "tcp.options.timestamp" "08:0a:2a:c2:dd:68:4b:8d:11:f5", + "tcp.options.timestamp_tree" + {"tcp.option_kind" "8", + "tcp.option_len" "10", + "tcp.options.timestamp.tsval" "717413736", + "tcp.options.timestamp.tsecr" "1267536373"}, + "tcp.options.sack" "05:0a:1d:4e:a3:7c:1d:4e:ae:4c", + "tcp.options.sack_tree" + {"tcp.option_kind" "5", + "tcp.option_len" "10", + "tcp.options.sack_le" "1385", + "tcp.options.sack_re" "4153", + "tcp.options.sack.count" "1", + "tcp.options.sack.dsack_le" "1385", + "tcp.options.sack.dsack_re" "4153", + "D-SACK Sequence" + {"_ws.expert" + {:db/ident :dsack + "tcp.options.sack.dsack" "", + "_ws.expert.message" "D-SACK Sequence", + "_ws.expert.severity" "6291456", + "_ws.expert.group" "33554432"}}}}, + "tcp.window_size" "131456", + "tcp.dstport" "443", + "tcp.urgent_pointer" "0", + "tcp.nxtseq" "518", + "tcp.ack_raw" "491691540", + "tcp.options" + "01:01:08:0a:2a:c2:dd:68:4b:8d:11:f5:01:01:05:0a:1d:4e:a3:7c:1d:4e:ae:4c", + "tcp.stream" "35", + "tcp.hdr_len" "44", + "tcp.seq_raw" "3253069772", + "tcp.checksum" "0x00004606", + "tcp.port" "443", + "tcp.ack" "1", + "Timestamps" + {"tcp.time_relative" "0.075895000", "tcp.time_delta" "0.003835000"}, + "tcp.window_size_scalefactor" "32", + "tcp.checksum.status" "2", + "tcp.flags" "0x00000010", + "tcp.window_size_value" "4108", + "tcp.len" "0", + "tcp.flags_tree" + {"tcp.flags.ecn" "0", + "tcp.flags.res" "0", + "tcp.flags.cwr" "0", + "tcp.flags.syn" "0", + "tcp.flags.urg" "0", + "tcp.flags.fin" "0", + "tcp.flags.push" "0", + "tcp.flags.str" "·······A····", + "tcp.flags.reset" "0", + "tcp.flags.ns" "0", + "tcp.flags.ack" "1"}, + "tcp.analysis" + {"tcp.analysis.initial_rtt" "0.042200000", + "tcp.analysis.flags" {"tcp.analysis.duplicate_ack" ""}, + "tcp.analysis.duplicate_ack_num" "2", + "tcp.analysis.duplicate_ack_frame" "754", + "tcp.analysis.duplicate_ack_frame_tree" + {"_ws.expert" + {"tcp.analysis.duplicate_ack" "", + "_ws.expert.message" "Duplicate ACK (#2)", + "_ws.expert.severity" "4194304", + "_ws.expert.group" "33554432"}}}}}}) + +(deftest test-map-codecs + (let [buffer-size 10240 + ba (byte-array buffer-size) + bb (byte-buffer ba) + rdr (->TestReader bb) + [header body offsets] (to-bytes data) + header-size (byte-length header) + body-size (byte-length body)] + (if (>= (+ header-size body-size) buffer-size) + (throw (ex-info "Buffer too small" {:header (alength header) :body (alength body)}))) + (.position bb 0) + (.put bb header 0 header-size) + (.put bb body 0 body-size) + (is (= (get data "layers") (read-object rdr (offsets "layers")))) + (is (= (get-in data ["layers" "frame"]) (read-object rdr (offsets "frame-data")))) + (is (= (get-in data ["layers" "eth"]) (read-object rdr (offsets "eth-data")))) + (is (= (get-in data ["layers" "ip"]) (read-object rdr (offsets "ip-data")))) + (is (= (get-in data ["layers" "tcp"]) (read-object rdr (offsets "tcp-data")))) + (is (= (get-in data ["layers" "tcp" "tcp.options_tree" "tcp.options.sack_tree" + "D-SACK Sequence" "_ws.expert"]) + (read-object rdr (offsets :dsack)))))) + + diff --git a/test/asami/durable/pages_test.clj b/test/asami/durable/pages_test.clj index 2c0e6f2..31fd0e9 100644 --- a/test/asami/durable/pages_test.clj +++ b/test/asami/durable/pages_test.clj @@ -21,17 +21,17 @@ (doto of (write-object "1234567890") ;; 1 + 10 (write-object :keyword) ;; 1 + 7 - (write-object 1023) ;; 1 + 8 + (write-object 1023) ;; 1 + 2 (.seek 0)) - (is (= 28 (.length of))) + (is (= 22 (.length of))) (is (= 0xa (.read of))) (is (= 10 (.read of buffer))) (is (= "1234567890" (String. buffer "UTF-8"))) (is (= 0xc7 (.read of))) (is (= 7 (.read of buffer 0 7))) (is (= "keyword" (String. buffer 0 7 "UTF-8"))) - (is (= 0xe0 (.read of))) - (is (= 1023 (.readLong of))))) + (is (= 0xd2 (.read of))) + (is (= 1023 (.readShort of))))) (.delete f))) (defn b-as-long [b] (bit-and 0xff b)) @@ -43,16 +43,13 @@ (doto of (write-object "1234567890") ;; 1 + 10 (write-object :keyword) ;; 1 + 7 - (write-object 1023)) ;; 1 + 8 + (write-object 1023)) ;; 1 + 2 (let [r (paged-file of) bytes (byte-array 10)] - (is (= 0xe0 (b-as-long (read-byte r 19)))) + (is (= 0xd2 (b-as-long (read-byte r 19)))) (is (= 0xc7 (b-as-long (read-byte r 11)))) (is (= 0x0a (b-as-long (read-byte r 0)))) - (is (= 0 (read-short r 20))) - (is (= 0 (read-short r 22))) - (is (= 0 (read-short r 24))) - (is (= 0x03ff (read-short r 26))) + (is (= 0x03ff (read-short r 20))) (is (= "keyword" (String. (read-bytes r 12 7) "UTF-8"))) (is (= "1234567890" (String. (read-bytes r 1 10) "UTF-8"))))) (.delete f))) @@ -74,7 +71,7 @@ (let [offset (* n 20)] (is (= 0xa (read-byte r offset))) (is (= (format "123456789%d" n) (String. (read-bytes r (inc offset) buffer) "UTF-8"))) - (is (= 0xe0 (b-as-long (read-byte r (+ offset 11))))) + (is (= 0xd8 (b-as-long (read-byte r (+ offset 11))))) (is (= 0x1234 (read-short r (+ offset 12)))) (is (= 0x5678 (read-short r (+ offset 14)))) (is (= 0x9abc (s-as-long (read-short r (+ offset 16))))) @@ -96,7 +93,7 @@ (let [offset (* n 20)] (is (= 0xa (read-byte r offset))) (is (= (format "123456789%x" n) (String. (read-bytes r (inc offset) buffer) "UTF-8"))) - (is (= 0xe0 (b-as-long (read-byte r (+ offset 11))))) + (is (= 0xd8 (b-as-long (read-byte r (+ offset 11))))) (is (= 0x1234 (read-short r (+ offset 12)))) (is (= 0x5678 (read-short r (+ offset 14)))) (is (= 0x9abc (s-as-long (read-short r (+ offset 16))))) @@ -112,7 +109,7 @@ (let [offset (* n 20)] (is (= 0xa (read-byte r offset)) (str "Failed for n=" n)) (is (= (format "123456789%x" n) (String. (read-bytes r (inc offset) buffer) "UTF-8")) (str "Failed for n=" n)) - (is (= 0xe0 (b-as-long (read-byte r (+ offset 11))))) + (is (= 0xd8 (b-as-long (read-byte r (+ offset 11))))) (is (= 0x1234 (read-short r (+ offset 12)))) (is (= 0x5678 (read-short r (+ offset 14)))) (is (= 0x9abc (s-as-long (read-short r (+ offset 16))))) diff --git a/test/asami/durable/tuples_test.cljc b/test/asami/durable/tuples_test.cljc index b71caa5..f39fbb8 100644 --- a/test/asami/durable/tuples_test.cljc +++ b/test/asami/durable/tuples_test.cljc @@ -126,7 +126,7 @@ tree-node-size tuple-node-compare (get-id root)) - tuples (->TupleIndex index blocks (get-id root))] + tuples (->TupleIndex "test" index blocks (get-id root) true)] (set-low-tuple! lchild [2 4 6 1]) (set-high-tuple! lchild [2 8 8 1]) (set-count! lchild 2) @@ -257,7 +257,7 @@ (let [{:keys [root lchild rchild rlchild tuples] [block1 block2 block3 block4] :tuple-blocks {:keys [index blocks]} :tuples} (create-test-tree) - tuples (->TupleIndex index blocks (get-id root)) + tuples (->TupleIndex "test" index blocks (get-id root) true) [tuples t] (delete-tuple! tuples [2 8 14 1]) _ (is (= [[2 4 6 1] [2 8 8 1] [2 8 10 1] [2 8 18 1] [2 8 20 1] [2 10 4 1] [2 20 8 1] [3 1 1 1] [3 1 3 1] [3 2 1 1]] @@ -329,7 +329,7 @@ (constantly node-blocks) "" tree-node-size tuple-node-compare) tuple-blocks (faux-manager tuples-block-size)] - (->TupleIndex index tuple-blocks nil))) + (->TupleIndex "test" index tuple-blocks nil true))) (deftest test-small-insert (let [tuples (create-test-tuples)