Skip to content

Commit

Permalink
Break out schema and cache. Move gc to public API. (#716)
Browse files Browse the repository at this point in the history
* Break out schema and cache. Move gc to public API.

* Clear cache also on db deletion.

* Cleanup.

* Expose gc before_tx argument to C++.

* Write cache initial schema as well.

* Feedback for PR.

* Use set spec.

* Fix format.
  • Loading branch information
whilo authored Nov 2, 2024
1 parent 99290f1 commit 12c68ee
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 99 deletions.
20 changes: 20 additions & 0 deletions java/src/datahike/java/Datahike.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class Datahike {
private static final IFn schemaFn = Clojure.var("datahike.api", "schema");
private static final IFn reverseSchemaFn = Clojure.var("datahike.api", "reverse-schema");
private static final IFn metricsFn = Clojure.var("datahike.api", "metrics");
private static final IFn gcStorageFn = Clojure.var("datahike.api", "gc-storage");

/**
* Forbids instances creation.
Expand Down Expand Up @@ -365,4 +366,23 @@ public static APersistentMap reverseSchema(Object db) {
public static APersistentMap metrics(Object db) {
return (APersistentMap)metricsFn.invoke(db);
}

/**
* Perform garbage collection on the database
*
* @param db a database
*/
public static Object gcStorage(Object db) {
return gcStorageFn.invoke(db);
}

/**
* Perform garbage collection on the database
*
* @param db a database
* @param removeBefore a date
*/
public static Object gcStorage(Object db, Date removeBefore) {
return gcStorageFn.invoke(db, removeBefore);
}
}
9 changes: 9 additions & 0 deletions java/src/datahike/java/DatahikeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ public void dbWith() {
assertTrue(res.size() == 3);
}

/* TODO: This requires a durable backend */
// @Test
// public void gcStorage() {
// APersistentMap config = config();
// Datahike.createDatabase(config);
// Object conn = Datahike.connect(config);
// Set<UUID> res = (Set<UUID>)deref(Datahike.gcStorage(conn));
// assertTrue(res.size() == 0);
// }

/**
* Called by Datahike's Clojure tests and runs the above Junit tests.
Expand Down
14 changes: 14 additions & 0 deletions libdatahike/src/datahike/impl/LibDatahike.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,20 @@ public static void metrics(@CEntryPoint.IsolateThreadContext long isolateId,
}
}

@CEntryPoint(name = "gc_storage")
public static void gc_storage(@CEntryPoint.IsolateThreadContext long isolateId,
@CConst CCharPointer db_config,
long before_tx_unix_time_ms,
@CConst CCharPointer output_format,
@CConst OutputReader output_reader) {
try {
Object db = Datahike.connect(readConfig(db_config));
output_reader.call(toOutput(output_format, Datahike.gcStorage(db, new Date(before_tx_unix_time_ms))));
} catch (Exception e) {
output_reader.call(toException(e));
}
}

// seekdatoms not supported yet because we would always realize the iterator until the end

// release we do not expose connection objects yet, but create them internally on the fly
Expand Down
19 changes: 15 additions & 4 deletions src/datahike/api/specification.cljc
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns datahike.api.specification
"Shared specification for different bindings. This namespace holds all
information such that individual bindings can be automatically derived from
it.")
it."
(:require [clojure.spec.alpha :as s]
[datahike.spec :as spec]))

(defn ->url
"Turns an API endpoint name into a URL."
Expand Down Expand Up @@ -850,7 +852,16 @@ Returns the key under which this listener is registered. See also [[unlisten]]."
:doc "Returns database metrics."
:impl datahike.db/metrics
:supports-remote? true
:referentially-transparent? true}})
:referentially-transparent? true}

(comment
(map (fn [[k v]] [k (:impl v)]) api-specification))
gc-storage
{:args (s/alt :with-date (s/cat :conn spec/SConnection :remove-before spec/time-point?)
:no-date (s/cat :conn spec/SConnection))
:ret set?
:doc "Invokes garbage collection on the store of connection by whitelisting currently known branches.
All db snapshots on these branches before remove-before date will also be
erased (defaults to beginning of time [no erasure]). The branch heads will
always be retained. Return the set of removed blobs from the store."
:impl datahike.writer/gc-storage!
:supports-remote? true
:referentially-transparent? false}})
6 changes: 5 additions & 1 deletion src/datahike/cli.clj
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,8 @@

:metrics
(let [out (d/metrics (load-input (first arguments)))]
(report (:format options) out))))))
(report (:format options) out))

:gc-storage
(let [out (d/gc-storage (load-input (first arguments)) (load-input (second arguments)))]
(report (:format options) out))))))
5 changes: 5 additions & 0 deletions src/datahike/config.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
[datahike.index :as di])
(:import [java.net URI]))

;; global
(def ^:dynamic *schema-meta-cache-size* (env :schema-meta-cache-size 1024))
(def ^:dynamic *schema-write-cache-max-db-count* (env :schema-write-cache-size 1024))

;; per database
(def ^:dynamic *default-index* :datahike.index/persistent-set)
(def ^:dynamic *default-schema-flexibility* :write)
(def ^:dynamic *default-keep-history?* true)
Expand Down
11 changes: 6 additions & 5 deletions src/datahike/experimental/versioning.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@
(parent-check parents)
(let [store (:store db)
cid (create-commit-id db)
db (db->stored (-> db
(assoc-in [:config :branch] branch)
(assoc-in [:meta :datahike/parents] parents)
(assoc-in [:meta :datahike/commit-id] cid))
true)]
[_schema-meta-op db] (db->stored (-> db
(assoc-in [:config :branch] branch)
(assoc-in [:meta :datahike/parents] parents)
(assoc-in [:meta :datahike/commit-id] cid))
true
true)]
(flush-pending-writes store true)
(k/update store :branches #(conj % branch) {:sync? true})
(k/assoc store cid db {:sync? true})
Expand Down
18 changes: 11 additions & 7 deletions src/datahike/experimental/gc.cljc → src/datahike/gc.cljc
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
(ns datahike.experimental.gc
(ns datahike.gc
(:require [clojure.set :as set]
[datahike.index.interface :refer [-mark]]
[konserve.core :as k]
[konserve.gc :refer [sweep!]]
[taoensso.timbre :refer [debug trace]]
[superv.async :refer [<? S go-try <<?]]
[clojure.core.async :as async])
[clojure.core.async :as async]
[datahike.schema-cache :as sc])
(:import [java.util Date]))

;; meta-data does not get passed in macros
Expand All @@ -22,7 +23,8 @@
(if (visited to-check) ;; skip
(recur r visited reachable)
(let [{:keys [eavt-key avet-key aevt-key
temporal-eavt-key temporal-avet-key temporal-aevt-key]
temporal-eavt-key temporal-avet-key temporal-aevt-key
schema-meta-key]
{:keys [datahike/parents
datahike/created-at
datahike/updated-at]} :meta}
Expand All @@ -32,6 +34,7 @@
(recur (concat r (when in-range? parents))
(conj visited to-check)
(set/union reachable #{to-check}
(when schema-meta-key #{schema-meta-key})
(-mark eavt-key)
(-mark aevt-key)
(-mark avet-key)
Expand All @@ -42,24 +45,25 @@
(-mark temporal-avet-key)))))))
reachable)))))

(defn gc!
(defn gc-storage!
"Invokes garbage collection on the database by whitelisting currently known branches.
All db snapshots on these branches before remove-before date will also be
erased (defaults to beginning of time [no erasure]). The branch heads will
always be retained."
([db] (gc! db (Date.)))
([db] (gc-storage! db (Date. 0)))
([db remove-before]
(go-try S
(let [now (Date.)
_ (debug "starting gc" now)
{:keys [config store]} db
_ (sc/clear-write-cache (:store config)) ; Clear the schema write cache for this store
branches (<? S (k/get store :branches))
_ (trace "retaining branches" branches)
_ (trace "retaining branches" branches)
reachable (->> branches
(map #(reachable-in-branch store % remove-before config))
async/merge
(<<? S)
(apply set/union))
reachable (conj reachable :branches)]
(trace "gc reached: " reachable)
(trace "gc reached: " reachable)
(<? S (sweep! store reachable now))))))
41 changes: 41 additions & 0 deletions src/datahike/schema_cache.cljc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
(ns datahike.schema-cache
(:require [clojure.core.cache.wrapped :as cw]
[datahike.config :as dc]
[datahike.store :as ds]))

;; Shared schema read cache across all stores
(def schema-meta-cache (cw/lru-cache-factory {} :threshold dc/*schema-meta-cache-size*))

;; LRU cache of LRU caches for write operations, one per store
(def schema-write-caches
(cw/lru-cache-factory {} :threshold dc/*schema-write-cache-max-db-count*))

(defn- get-or-create-write-cache [store-config]
(let [store-id (ds/store-identity store-config)]
(if (cw/has? schema-write-caches store-id)
(cw/lookup schema-write-caches store-id)
(let [new-cache (cw/lru-cache-factory {} :threshold dc/*schema-meta-cache-size*)]
(cw/miss schema-write-caches store-id new-cache)
new-cache))))

(defn cache-has? [schema-meta-key]
(cw/has? schema-meta-cache schema-meta-key))

(defn cache-lookup [schema-meta-key]
(cw/lookup schema-meta-cache schema-meta-key))

(defn cache-miss [schema-meta-key schema-meta]
(cw/miss schema-meta-cache schema-meta-key schema-meta))

(defn write-cache-has? [store-config schema-meta-key]
(let [write-cache (get-or-create-write-cache store-config)]
(cw/has? write-cache schema-meta-key)))

(defn add-to-write-cache [store-config schema-meta-key]
(let [write-cache (get-or-create-write-cache store-config)]
(cw/miss write-cache schema-meta-key true)))

(defn clear-write-cache [store-config]
(let [store-id (ds/store-identity store-config)]
(cw/evict schema-write-caches store-id)))

58 changes: 40 additions & 18 deletions src/datahike/writer.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
[taoensso.timbre :as log]
[datahike.core]
[datahike.writing :as w]
[datahike.gc :as gc]
[datahike.tools :as dt :refer [throwable-promise get-time-ms]]
[clojure.core.async :refer [chan close! promise-chan put! go go-loop <! >! poll! buffer timeout]])
(:import [clojure.lang ExceptionInfo]))
(:import [clojure.core.async.impl.channels ManyToManyChannel]))

(defn chan? [x]
(instance? ManyToManyChannel x))

(defprotocol PWriter
(-dispatch! [_ arg-map] "Returns a channel that resolves when the transaction finalizes.")
Expand All @@ -28,6 +32,7 @@

;; minimum wait time between commits in ms
;; this reduces write pressure on the storage
;; at the cost of higher latency
(def ^:const DEFAULT_COMMIT_WAIT_TIME 0) ;; in ms

(defn create-thread
Expand All @@ -53,10 +58,9 @@
(log/warn "Transaction queue buffer more than 90% full, "
(count transaction-queue-buffer) "of" transaction-queue-size " filled."
"Reduce transaction frequency."))
(let [old (if-not (= (:max-tx old) (:max-tx @(:wrapped-atom connection)))
(do
(log/warn "DEPRECATED. Connection was changed outside of writer.")
(assoc old :max-tx (:max-tx @(:wrapped-atom connection))))
(let [;; TODO remove this after import is ported to writer API
old (if-not (= (:max-tx old) (:max-tx @(:wrapped-atom connection)))
(assoc old :max-tx (:max-tx @(:wrapped-atom connection)))
old)

op-fn (write-fn-map op)
Expand All @@ -77,16 +81,23 @@
:error e})
e))
:error))]
(if-not (= res :error)
(do
(when (> (count commit-queue-buffer) (/ commit-queue-size 2))
(log/warn "Commit queue buffer more than 50% full, "
(count commit-queue-buffer) "of" commit-queue-size " filled."
"Throttling transaction processing. Reduce transaction frequency and check your storage throughput.")
(<! (timeout 50)))
(put! commit-queue [res callback])
(recur (:db-after res)))
(recur old))))
(cond (chan? res)
;; async op, run in parallel in background, no sequential commit handling needed
(do
(go (>! callback (<! res)))
(recur old))

(not= res :error)
(do
(when (> (count commit-queue-buffer) (/ commit-queue-size 2))
(log/warn "Commit queue buffer more than 50% full, "
(count commit-queue-buffer) "of" commit-queue-size " filled."
"Throttling transaction processing. Reduce transaction frequency and check your storage throughput.")
(<! (timeout 50)))
(put! commit-queue [res callback])
(recur (:db-after res)))
:else
(recur old))))
(do
(close! commit-queue)
(log/debug "Writer thread gracefully closed")))))
Expand Down Expand Up @@ -121,10 +132,11 @@
(<! (timeout commit-wait-time))
(recur (<?- commit-queue)))))))))]))

;; public API

;; public API to internal mapping
(def default-write-fn-map {'transact! w/transact!
'load-entities w/load-entities})
'load-entities w/load-entities
;; async operations that run in background
'gc-storage! gc/gc-storage!})

(defmulti create-writer
(fn [writer-config _]
Expand Down Expand Up @@ -199,3 +211,13 @@
:args [entities]}))]
(deliver p tx-report)))
p))

(defn gc-storage! [conn & args]
(let [p (throwable-promise)
writer (:writer @(:wrapped-atom conn))]
(go
(let [result (<! (dispatch! writer
{:op 'gc-storage!
:args (vec args)}))]
(deliver p result)))
p))
Loading

0 comments on commit 12c68ee

Please sign in to comment.