Skip to content

Commit

Permalink
Merge pull request #177 from threatgrid/entity-ids
Browse files Browse the repository at this point in the history
Files are now cleanly truncated on release, including abandoned connections.
Fixes #176 
Fixes #137
  • Loading branch information
quoll authored Jun 22, 2021
2 parents a24b912 + 77a0292 commit 1de1abb
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 47 deletions.
15 changes: 12 additions & 3 deletions src/asami/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

(defonce connections (atom {}))

(defn shutdown
"Releases all connection resources for a clean shutdown"
[]
(doseq [connection (vals @connections)] (storage/release connection)))

#?(:clj
(.addShutdownHook (Runtime/getRuntime) (Thread. shutdown)))

(s/defn ^:private parse-uri :- {:type s/Str
:name s/Str}
"Splits up a database URI string into structured parts"
Expand Down Expand Up @@ -81,9 +89,10 @@
(storage/delete-database conn))
;; database not in the connections
;; connect to it to free its resources
(when (db-exists? uri)
(if-let [conn (connection-for uri)]
(storage/delete-database conn)))))
(boolean
(when (db-exists? uri)
(if-let [conn (connection-for uri)]
(storage/delete-database conn))))))

(s/defn get-database-names
"Returns a seq of database names that this instance is aware of."
Expand Down
3 changes: 3 additions & 0 deletions src/asami/durable/block/block_api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@
(get-block [this id] "Returns the block associated with an ID.")
(get-block-size [this] "Returns the size of blocks allocated by this manager")
(copy-to-tx [this block] "Returns a block that is in the current transaction, possibly returning the current block"))

(defprotocol CountedBlocks
(get-block-count [this] "Returns the number of blocks that this object has allocated, or nil if not managed by this object."))
22 changes: 15 additions & 7 deletions src/asami/durable/block/file/block_file.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
asami.durable.block.file.block-file
(:require [clojure.java.io :as io]
[asami.durable.common :refer [Transaction Closeable Forceable rewind! commit! close]]
[asami.durable.block.block-api :refer [BlockManager copy-over! copy-block! allocate-block! get-id]]
[asami.durable.block.block-api :refer [CountedBlocks BlockManager copy-over! copy-block! allocate-block! get-id get-block-count]]
[asami.durable.block.bufferblock :refer [create-block]]
[asami.durable.block.file.voodoo :as voodoo]
[asami.durable.cache :refer [lookup hit miss lru-cache-factory]])
Expand Down Expand Up @@ -45,12 +45,14 @@
(defn open-block-file
"Opens a file for storing blocks. Returns a structure with the block file
and the RandomAccessFile that the block file uses. The file will need to be
closed when block files based on this initial block file are no longer needed."
[file block-size]
closed when block files based on this initial block file are no longer needed.
When the init-nr-blocks is not nil, then it holds the recorded number of blocks
in the file."
[file block-size init-nr-blocks]
(let [file (io/file file)
raf (RandomAccessFile. file "rw")
^FileChannel fc (.getChannel raf)
nr-blocks (long (/ (.size fc) block-size))
nr-blocks (or init-nr-blocks (long (/ (.size fc) block-size)))
slack (mod region-size block-size)
stride (if (zero? slack) region-size (+ region-size (- block-size slack)))]
(set-nr-blocks! (->BlockFile 0 block-size 0 [] stride file raf fc) nr-blocks)))
Expand Down Expand Up @@ -243,12 +245,16 @@

(get-block-size [this]
(:block-size (:block-file @state)))

(copy-to-tx [this block]
(if (<= (get-id block) (:commit-point @state))
(copy-block! this block)
block))

CountedBlocks
(get-block-count [this]
(get-nr-blocks (:block-file @state)))

Transaction
(rewind! [this]
(vswap! state #(assoc % :next-id (:commit-point %)))
Expand All @@ -274,9 +280,11 @@
(.delete ^File file))))

(defn create-managed-block-file
[filename block-size]
(let [block-file (open-block-file filename block-size)
[filename block-size nr-blocks]
(let [block-file (open-block-file filename block-size nr-blocks)
next-id (dec (:nr-blocks block-file))]
(when (and nr-blocks (= next-id nr-blocks))
(throw (ex-info "Inconsistent reopening of block file" {:set-blocks nr-blocks :file-blocks (:nr-blocks block-file)})))
(->ManagedBlockFile (volatile! {:block-file block-file
:next-id next-id
:commit-point next-id
Expand Down
11 changes: 6 additions & 5 deletions src/asami/durable/common_utils.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@

(defn create-block-manager
"Creates a block manager"
[name manager-name block-size]
[name manager-name block-size nr-blocks]
#?(:clj
(block-file/create-managed-block-file (.getPath (io/file name manager-name)) block-size)))
(block-file/create-managed-block-file (.getPath (io/file name manager-name)) block-size nr-blocks)))

(defn named-storage
"A common function for opening storage with a given name. Must be provided with a storage constructor and the name.
The root id indicates an index root, and may be nil for an empty index."
[storage-constructor name root-id]
The root id indicates an index root, and may be nil for an empty index.
The block count refers to the count of blocks in the storage."
[storage-constructor name root-id block-count]
#?(:clj
(let [d (io/file name)]
(if (.exists d)
(when-not (.isDirectory d)
(throw (ex-info (str "'" name "' already exists as a file") {:path (.getAbsolutePath d)})))
(when-not (.mkdir d)
(throw (ex-info (str "Unable to create directory '" name "'") {:path (.getAbsolutePath d)}))))
(storage-constructor name root-id))))
(storage-constructor name root-id block-count))))
16 changes: 10 additions & 6 deletions src/asami/durable/graph.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
[asami.durable.tuples :as tuples]
[asami.durable.resolver :as resolver :refer [get-from-index get-transitive-from-index]]
#?(:clj [asami.durable.flat-file :as flat-file])
[asami.durable.block.block-api :as block-api]
[zuko.node :as node]
[zuko.logging :as log :include-macros true]))

Expand Down Expand Up @@ -194,7 +195,10 @@
{:r-spot (:root-id spot)
:r-post (:root-id post)
:r-ospt (:root-id ospt)
:r-pool (:root-id pool)})
:r-pool (:root-id pool)
:nr-index-node (block-api/get-block-count tree-block-manager)
:nr-index-block (block-api/get-block-count tuple-block-manager)
:nr-pool-node (block-api/get-block-count pool)})

Closeable
(close [this]
Expand Down Expand Up @@ -230,23 +234,23 @@
post-index (tuples/create-tuple-index name post-name r-post)
ospt-index (tuples/create-tuple-index name ospt-name r-ospt)
tspo-index #?(:clj (flat-file/record-store name tspo-name tuples/tuple-size-bytes) :cljs nil)
data-pool (pool/create-pool name r-pool)]
data-pool (pool/create-pool name r-pool nil)]
(->BlockGraph spot-index post-index ospt-index tspo-index data-pool node-allocator id-checker nil nil)))

(defn new-merged-block-graph
"Creates a new BlockGraph object, under a given name. If the resources for that name exist, then they are opened.
If the resources do not exist, then they are created.
name: the label of the location for the graph resources.
tx: The transaction record for this graph."
[name {:keys [r-spot r-post r-ospt r-pool]} node-allocator id-checker]
[name {:keys [r-spot r-post r-ospt r-pool nr-index-node nr-index-block nr-pool-node]} node-allocator id-checker]
;; NOTE: Tree nodes blocks must hold the tuples payload and the tree node header
(let [tree-block-manager (common-utils/create-block-manager name index-name tuples/tree-block-size)
tuple-block-manager (common-utils/create-block-manager name block-name tuples/block-bytes)
(let [tree-block-manager (common-utils/create-block-manager name index-name tuples/tree-block-size nr-index-node)
tuple-block-manager (common-utils/create-block-manager name block-name tuples/block-bytes nr-index-block)
spot-index (tuples/create-tuple-index-for-managers "SPO" tree-block-manager tuple-block-manager r-spot)
post-index (tuples/create-tuple-index-for-managers "POS" tree-block-manager tuple-block-manager r-post)
ospt-index (tuples/create-tuple-index-for-managers "OSP" tree-block-manager tuple-block-manager r-ospt)
tspo-index #?(:clj (flat-file/record-store name tspo-name tuples/tuple-size-bytes) :cljs nil)
data-pool (pool/create-pool name r-pool)]
data-pool (pool/create-pool name r-pool nr-pool-node)]
(->BlockGraph spot-index post-index ospt-index tspo-index data-pool node-allocator id-checker
tree-block-manager tuple-block-manager)))

16 changes: 11 additions & 5 deletions src/asami/durable/pool.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
[asami.durable.tree :as tree]
[asami.durable.encoder :as encoder :refer [to-bytes]]
[asami.durable.decoder :as decoder :refer [type-info long-bytes-compare]]
[asami.durable.block.block-api :refer [get-long get-byte get-bytes put-byte! put-bytes! put-long! get-id]]
[asami.durable.block.block-api :refer [get-long get-byte get-bytes put-byte! put-bytes! put-long! get-id
CountedBlocks get-block-count]]
[asami.durable.cache :refer [lookup hit miss lru-cache-factory]]
#?(:clj [asami.durable.flat-file :as flat-file])))

Expand Down Expand Up @@ -124,6 +125,11 @@
(at [this new-root-id]
(->ReadOnlyPool data (tree/at index new-root-id) new-root-id cache))

CountedBlocks
(get-block-count
[this]
(get-block-count index))

Transaction
(commit! [this]
(force! data)
Expand Down Expand Up @@ -151,17 +157,17 @@

(defn open-pool
"Opens all the resources required for a pool, and returns the pool structure"
[name root-id]
[name root-id block-count]
(let [data-store (data-constructor name data-name)
data-compare (pool-comparator-fn data-store)
index (tree/new-block-tree (fn
([] true)
([lname size] (common-utils/create-block-manager name lname size)))
([lname size] (common-utils/create-block-manager name lname size block-count)))
index-name tree-node-size data-compare root-id)
encap-cache (atom (lru-cache-factory {} :threshold encap-cache-size))]
(->DataPool data-store index root-id encap-cache)))

(defn create-pool
"Creates a datapool object"
([name] (create-pool name nil))
([name root-id] (common-utils/named-storage open-pool name root-id)))
([name] (create-pool name nil nil))
([name root-id block-count] (common-utils/named-storage open-pool name root-id block-count)))
24 changes: 19 additions & 5 deletions src/asami/durable/store.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@

;; transactions contain tree roots for the 3 tree indices,
;; the tree root for the data pool,
;; the 3 block counts for the tuples index tree, the tuples blocks, and the data pool index tree
;; the internal node counter
(def tx-record-size (* 5 common/long-size))
(def tx-record-size (* 8 common/long-size))

(def TxRecord {(s/required-key :r-spot) (s/maybe s/Int)
(s/required-key :r-post) (s/maybe s/Int)
(s/required-key :r-ospt) (s/maybe s/Int)
(s/required-key :r-pool) (s/maybe s/Int)
(s/required-key :nr-index-node) (s/maybe s/Int)
(s/required-key :nr-index-block) (s/maybe s/Int)
(s/required-key :nr-pool-node) (s/maybe s/Int)
(s/required-key :nodes) s/Int
(s/required-key :timestamp) s/Int})

Expand All @@ -40,27 +44,37 @@
(s/one s/Int "post root id")
(s/one s/Int "ospt root id")
(s/one s/Int "pool root id")
(s/one s/Int "number of index nodes allocated")
(s/one s/Int "number of index blocks allocated")
(s/one s/Int "number of pool index nodes allocated")
(s/one s/Int "node id counter")]})

(s/defn pack-tx :- TxRecordPacked
"Packs a transaction into a vector for serialization"
[{:keys [r-spot r-post r-ospt r-pool nodes timestamp]} :- TxRecord]
{:timestamp timestamp :tx-data [(or r-spot 0) (or r-post 0) (or r-ospt 0) (or r-pool 0) nodes]})
[{:keys [r-spot r-post r-ospt r-pool nr-index-node nr-index-block nr-pool-node nodes timestamp]} :- TxRecord]
{:timestamp timestamp :tx-data [(or r-spot 0) (or r-post 0) (or r-ospt 0) (or r-pool 0)
(or nr-index-node 0) (or nr-index-block 0) (or nr-pool-node 0)
nodes]})

(s/defn unpack-tx :- TxRecord
"Unpacks a transaction vector into a structure when deserializing"
[{[r-spot r-post r-ospt r-pool nodes] :tx-data timestamp :timestamp} :- TxRecordPacked]
[{[r-spot r-post r-ospt r-pool
nr-index-node nr-index-block nr-pool-node nodes] :tx-data
timestamp :timestamp} :- TxRecordPacked]
(letfn [(non-zero [v] (and v (when-not (zero? v) v)))]
{:r-spot (non-zero r-spot)
:r-post (non-zero r-post)
:r-ospt (non-zero r-ospt)
:r-pool (non-zero r-pool)
:nr-index-node (non-zero nr-index-node)
:nr-index-block (non-zero nr-index-block)
:nr-pool-node (non-zero nr-pool-node)
:nodes nodes
:timestamp timestamp}))

(s/defn new-db :- TxRecordPacked
[]
{:timestamp (long-time (now)) :tx-data [0 0 0 0 0]})
{:timestamp (long-time (now)) :tx-data [0 0 0 0 0 0 0 0]})

(declare ->DurableDatabase)

Expand Down
16 changes: 13 additions & 3 deletions src/asami/durable/tree.cljc
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
(ns ^{:doc "This namespace provides the basic mechanisms for AVL trees"
:author "Paula Gearon"}
asami.durable.tree
(:require [asami.durable.block.block-api :refer [Block
(:require [asami.durable.block.block-api :refer [Block CountedBlocks
get-id get-byte get-int get-long
get-bytes get-ints get-longs
put-byte! put-int! put-long!
put-bytes! put-ints! put-longs!
put-block! copy-over!
allocate-block! get-block get-block-size
write-block copy-to-tx]]
write-block copy-to-tx get-block-count]]
[asami.durable.common :refer [Transaction Closeable Forceable close delete! rewind! commit! force! long-size]]
[asami.durable.cache :refer [lookup hit miss lru-cache-factory]]))

Expand All @@ -29,6 +29,11 @@

(def ^:const node-cache-size 1024)

(defn to-hex
[n]
#?(:clj (Long/toHexString n)
:cljs (.toString n 16)))

(defprotocol TreeNode
(get-parent [this] "Returns the parent node. Not the Node ID, but the node object.")
(get-child-id [this side] "Gets the Node ID of the child on the given side")
Expand Down Expand Up @@ -96,7 +101,7 @@

Object
(toString [this]
(let [payload (mapv #(Long/toHexString (get-long this %)) (range (/ (- (:size block) header-size) 8)))]
(let [payload (mapv #(to-hex (get-long this %)) (range (/ (- (:size block) header-size) 8)))]
(str "Node[" (get-id this) "] L:" (get-child-id this left) " R:" (get-child-id this right) " payload:"
payload)))

Expand Down Expand Up @@ -389,6 +394,11 @@
;; if modified-node is not yet set, then new-node was the root
[(assoc this :root nd) (or modified-node new-node)]))))))

CountedBlocks
(get-block-count [this]
(when own-manager
(get-block-count block-manager)))

Transaction
(rewind! [this]
(when own-manager
Expand Down
6 changes: 3 additions & 3 deletions src/asami/durable/tuples.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,14 @@
(defn open-tuples
[order-name name root-id]
;; create a factory fn that returns true on 0-arity to indicate that the index manager is owned by the calling tree
(let [index (tree/new-block-tree (fn ([] true) ([lname size] (common-utils/create-block-manager name lname size)))
(let [index (tree/new-block-tree (fn ([] true) ([lname size] (common-utils/create-block-manager name lname size nil)))
(str order-name index-name) tree-node-size tuple-node-compare root-id)
blocks (common-utils/create-block-manager name (str order-name block-name) block-bytes)]
blocks (common-utils/create-block-manager name (str order-name block-name) block-bytes nil)]
(->TupleIndex name index blocks root-id true)))

(defn create-tuple-index
"Creates a tuple index for a name"
([name order-name]
(create-tuple-index name order-name nil))
([name order-name root-id]
(common-utils/named-storage (partial open-tuples order-name) name root-id)))
(common-utils/named-storage (partial open-tuples order-name) name root-id nil)))
10 changes: 5 additions & 5 deletions test/asami/durable/block/blockfile_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
(defn exec-bf
[filename f]
(let [filename (util/temp-file filename)
{:keys [block-file file]} (open-block-file filename test-block-size)]
{:keys [block-file file]} (open-block-file filename test-block-size nil)]
(try
(f file block-file)
(finally
Expand All @@ -34,7 +34,7 @@

(deftest test-allocate
(let [filename (util/temp-file "ualloc")
block-file (open-block-file filename test-block-size)
block-file (open-block-file filename test-block-size nil)
block-file (set-nr-blocks! block-file 1)]
(try
(let [blk (block-for block-file 0)]
Expand All @@ -48,7 +48,7 @@
(deftest test-write
(let [file-str "bftest"
filename (util/temp-file file-str)
block-file (open-block-file filename test-block-size)
block-file (open-block-file filename test-block-size nil)
bf (set-nr-blocks! block-file 4)
b (block-for bf 0)
_ (put-string! b str0)
Expand All @@ -68,7 +68,7 @@
(unmap bf)
(cleanup)

(let [block-file (open-block-file filename test-block-size)]
(let [block-file (open-block-file filename test-block-size nil)]

;; did it persist

Expand All @@ -87,7 +87,7 @@
(deftest test-performance
(let [file-str "perftest"
filename (util/temp-file file-str)
block-file (open-block-file filename test-block-size)
block-file (open-block-file filename test-block-size nil)
block-file (clear! block-file)
nr-blocks 100000
bf (set-nr-blocks! block-file nr-blocks)]
Expand Down
Loading

0 comments on commit 1de1abb

Please sign in to comment.