diff --git a/java/src/datahike/java/Datahike.java b/java/src/datahike/java/Datahike.java index 7648a342f..a732600ea 100644 --- a/java/src/datahike/java/Datahike.java +++ b/java/src/datahike/java/Datahike.java @@ -50,6 +50,7 @@ public class Datahike { private static final IFn schemaFn = Clojure.var("datahike.api", "schema"); private static final IFn reverseSchemaFn = Clojure.var("datahike.api", "reverse-schema"); private static final IFn metricsFn = Clojure.var("datahike.api", "metrics"); + private static final IFn gcStorageFn = Clojure.var("datahike.api", "gc-storage"); /** * Forbids instances creation. @@ -365,4 +366,23 @@ public static APersistentMap reverseSchema(Object db) { public static APersistentMap metrics(Object db) { return (APersistentMap)metricsFn.invoke(db); } + + /** + * Perform garbage collection on the database + * + * @param db a database + */ + public static Object gcStorage(Object db) { + return gcStorageFn.invoke(db); + } + + /** + * Perform garbage collection on the database + * + * @param db a database + * @param removeBefore a date + */ + public static Object gcStorage(Object db, Date removeBefore) { + return gcStorageFn.invoke(db, removeBefore); + } } diff --git a/java/src/datahike/java/DatahikeTest.java b/java/src/datahike/java/DatahikeTest.java index f86105759..7d89987a0 100644 --- a/java/src/datahike/java/DatahikeTest.java +++ b/java/src/datahike/java/DatahikeTest.java @@ -251,6 +251,15 @@ public void dbWith() { assertTrue(res.size() == 3); } + /* TODO: This requires a durable backend */ + // @Test + // public void gcStorage() { + // APersistentMap config = config(); + // Datahike.createDatabase(config); + // Object conn = Datahike.connect(config); + // Set res = (Set)deref(Datahike.gcStorage(conn)); + // assertTrue(res.size() == 0); + // } /** * Called by Datahike's Clojure tests and runs the above Junit tests. diff --git a/libdatahike/src/datahike/impl/LibDatahike.java b/libdatahike/src/datahike/impl/LibDatahike.java index bc273c213..e1c4c2461 100644 --- a/libdatahike/src/datahike/impl/LibDatahike.java +++ b/libdatahike/src/datahike/impl/LibDatahike.java @@ -279,6 +279,20 @@ public static void metrics(@CEntryPoint.IsolateThreadContext long isolateId, } } + @CEntryPoint(name = "gc_storage") + public static void gc_storage(@CEntryPoint.IsolateThreadContext long isolateId, + @CConst CCharPointer db_config, + long before_tx_unix_time_ms, + @CConst CCharPointer output_format, + @CConst OutputReader output_reader) { + try { + Object db = Datahike.connect(readConfig(db_config)); + output_reader.call(toOutput(output_format, Datahike.gcStorage(db, new Date(before_tx_unix_time_ms)))); + } catch (Exception e) { + output_reader.call(toException(e)); + } + } + // seekdatoms not supported yet because we would always realize the iterator until the end // release we do not expose connection objects yet, but create them internally on the fly diff --git a/src/datahike/api/specification.cljc b/src/datahike/api/specification.cljc index 932577562..a6186f804 100644 --- a/src/datahike/api/specification.cljc +++ b/src/datahike/api/specification.cljc @@ -1,7 +1,9 @@ (ns datahike.api.specification "Shared specification for different bindings. This namespace holds all information such that individual bindings can be automatically derived from - it.") + it." + (:require [clojure.spec.alpha :as s] + [datahike.spec :as spec])) (defn ->url "Turns an API endpoint name into a URL." @@ -850,7 +852,16 @@ Returns the key under which this listener is registered. See also [[unlisten]]." :doc "Returns database metrics." :impl datahike.db/metrics :supports-remote? true - :referentially-transparent? true}}) + :referentially-transparent? true} -(comment - (map (fn [[k v]] [k (:impl v)]) api-specification)) + gc-storage + {:args (s/alt :with-date (s/cat :conn spec/SConnection :remove-before spec/time-point?) + :no-date (s/cat :conn spec/SConnection)) + :ret set? + :doc "Invokes garbage collection on the store of connection by whitelisting currently known branches. +All db snapshots on these branches before remove-before date will also be +erased (defaults to beginning of time [no erasure]). The branch heads will +always be retained. Return the set of removed blobs from the store." + :impl datahike.writer/gc-storage! + :supports-remote? true + :referentially-transparent? false}}) diff --git a/src/datahike/cli.clj b/src/datahike/cli.clj index f92708421..8e8b82b25 100644 --- a/src/datahike/cli.clj +++ b/src/datahike/cli.clj @@ -257,4 +257,8 @@ :metrics (let [out (d/metrics (load-input (first arguments)))] - (report (:format options) out)))))) + (report (:format options) out)) + + :gc-storage + (let [out (d/gc-storage (load-input (first arguments)) (load-input (second arguments)))] + (report (:format options) out)))))) \ No newline at end of file diff --git a/src/datahike/config.cljc b/src/datahike/config.cljc index b782fab67..ad1c413d0 100644 --- a/src/datahike/config.cljc +++ b/src/datahike/config.cljc @@ -9,6 +9,11 @@ [datahike.index :as di]) (:import [java.net URI])) +;; global +(def ^:dynamic *schema-meta-cache-size* (env :schema-meta-cache-size 1024)) +(def ^:dynamic *schema-write-cache-max-db-count* (env :schema-write-cache-size 1024)) + +;; per database (def ^:dynamic *default-index* :datahike.index/persistent-set) (def ^:dynamic *default-schema-flexibility* :write) (def ^:dynamic *default-keep-history?* true) diff --git a/src/datahike/experimental/versioning.cljc b/src/datahike/experimental/versioning.cljc index 0c1bd156b..3b539123f 100644 --- a/src/datahike/experimental/versioning.cljc +++ b/src/datahike/experimental/versioning.cljc @@ -94,11 +94,12 @@ (parent-check parents) (let [store (:store db) cid (create-commit-id db) - db (db->stored (-> db - (assoc-in [:config :branch] branch) - (assoc-in [:meta :datahike/parents] parents) - (assoc-in [:meta :datahike/commit-id] cid)) - true)] + [_schema-meta-op db] (db->stored (-> db + (assoc-in [:config :branch] branch) + (assoc-in [:meta :datahike/parents] parents) + (assoc-in [:meta :datahike/commit-id] cid)) + true + true)] (flush-pending-writes store true) (k/update store :branches #(conj % branch) {:sync? true}) (k/assoc store cid db {:sync? true}) diff --git a/src/datahike/experimental/gc.cljc b/src/datahike/gc.cljc similarity index 83% rename from src/datahike/experimental/gc.cljc rename to src/datahike/gc.cljc index e6331b9f3..2791df98e 100644 --- a/src/datahike/experimental/gc.cljc +++ b/src/datahike/gc.cljc @@ -1,11 +1,12 @@ -(ns datahike.experimental.gc +(ns datahike.gc (:require [clojure.set :as set] [datahike.index.interface :refer [-mark]] [konserve.core :as k] [konserve.gc :refer [sweep!]] [taoensso.timbre :refer [debug trace]] [superv.async :refer [> branches (map #(reachable-in-branch store % remove-before config)) async/merge (<! poll! buffer timeout]]) - (:import [clojure.lang ExceptionInfo])) + (:import [clojure.core.async.impl.channels ManyToManyChannel])) + +(defn chan? [x] + (instance? ManyToManyChannel x)) (defprotocol PWriter (-dispatch! [_ arg-map] "Returns a channel that resolves when the transaction finalizes.") @@ -28,6 +32,7 @@ ;; minimum wait time between commits in ms ;; this reduces write pressure on the storage +;; at the cost of higher latency (def ^:const DEFAULT_COMMIT_WAIT_TIME 0) ;; in ms (defn create-thread @@ -53,10 +58,9 @@ (log/warn "Transaction queue buffer more than 90% full, " (count transaction-queue-buffer) "of" transaction-queue-size " filled." "Reduce transaction frequency.")) - (let [old (if-not (= (:max-tx old) (:max-tx @(:wrapped-atom connection))) - (do - (log/warn "DEPRECATED. Connection was changed outside of writer.") - (assoc old :max-tx (:max-tx @(:wrapped-atom connection)))) + (let [;; TODO remove this after import is ported to writer API + old (if-not (= (:max-tx old) (:max-tx @(:wrapped-atom connection))) + (assoc old :max-tx (:max-tx @(:wrapped-atom connection))) old) op-fn (write-fn-map op) @@ -77,16 +81,23 @@ :error e}) e)) :error))] - (if-not (= res :error) - (do - (when (> (count commit-queue-buffer) (/ commit-queue-size 2)) - (log/warn "Commit queue buffer more than 50% full, " - (count commit-queue-buffer) "of" commit-queue-size " filled." - "Throttling transaction processing. Reduce transaction frequency and check your storage throughput.") - (! callback ( (count commit-queue-buffer) (/ commit-queue-size 2)) + (log/warn "Commit queue buffer more than 50% full, " + (count commit-queue-buffer) "of" commit-queue-size " filled." + "Throttling transaction processing. Reduce transaction frequency and check your storage throughput.") + (stored "Maps memory db to storage layout and flushes dirty indices." - [db flush?] + [db flush? sync?] (when-not (dbu/db? db) (dt/raise "Argument is not a database." {:type :argument-is-not-a-db @@ -56,28 +55,36 @@ (let [{:keys [eavt aevt avet temporal-eavt temporal-aevt temporal-avet schema rschema system-entities ident-ref-map ref-ident-map config max-tx max-eid op-count hash meta store]} db + schema-meta {:schema schema + :rschema rschema + :system-entities system-entities + :ident-ref-map ident-ref-map + :ref-ident-map ref-ident-map} + schema-meta-key (uuid schema-meta) backend (di/konserve-backend (:index config) store) not-in-memory? (not= :mem (-> config :store :backend)) - flush! (and flush? not-in-memory?)] - (merge - {:schema schema - :rschema rschema - :system-entities system-entities - :ident-ref-map ident-ref-map - :ref-ident-map ref-ident-map - :config config - :meta meta - :hash hash - :max-tx max-tx - :max-eid max-eid - :op-count op-count - :eavt-key (cond-> eavt flush! (di/-flush backend)) - :aevt-key (cond-> aevt flush! (di/-flush backend)) - :avet-key (cond-> avet flush! (di/-flush backend))} - (when (:keep-history? config) - {:temporal-eavt-key (cond-> temporal-eavt flush! (di/-flush backend)) - :temporal-aevt-key (cond-> temporal-aevt flush! (di/-flush backend)) - :temporal-avet-key (cond-> temporal-avet flush! (di/-flush backend))})))) + flush! (and flush? not-in-memory?) + schema-meta-op (when-not (sc/write-cache-has? (:store config) schema-meta-key) + (sc/add-to-write-cache (:store config) schema-meta-key) + (k/assoc store schema-meta-key schema-meta {:sync? sync?}))] + (when-not (sc/cache-has? schema-meta-key) + (sc/cache-miss schema-meta-key schema-meta)) + [schema-meta-op + (merge + {:schema-meta-key schema-meta-key + :config config + :meta meta + :hash hash + :max-tx max-tx + :max-eid max-eid + :op-count op-count + :eavt-key (cond-> eavt flush! (di/-flush backend)) + :aevt-key (cond-> aevt flush! (di/-flush backend)) + :avet-key (cond-> avet flush! (di/-flush backend))} + (when (:keep-history? config) + {:temporal-eavt-key (cond-> temporal-eavt flush! (di/-flush backend)) + :temporal-aevt-key (cond-> temporal-aevt flush! (di/-flush backend)) + :temporal-avet-key (cond-> temporal-avet flush! (di/-flush backend))}))])) (defn stored->db "Constructs in-memory db instance from stored map value." @@ -85,28 +92,35 @@ (let [{:keys [eavt-key aevt-key avet-key temporal-eavt-key temporal-aevt-key temporal-avet-key schema rschema system-entities ref-ident-map ident-ref-map - config max-tx max-eid op-count hash meta] + config max-tx max-eid op-count hash meta schema-meta-key] :or {op-count 0}} stored-db - empty (db/empty-db nil config store)] - (assoc empty - :max-tx max-tx - :max-eid max-eid - :config config - :meta meta - :schema schema - :hash hash - :op-count op-count - :eavt eavt-key - :aevt aevt-key - :avet avet-key - :temporal-eavt temporal-eavt-key - :temporal-aevt temporal-aevt-key - :temporal-avet temporal-avet-key - :rschema rschema - :system-entities system-entities - :ident-ref-map ident-ref-map - :ref-ident-map ref-ident-map - :store store))) + schema-meta (or (sc/cache-lookup schema-meta-key) + ;; not in store in case we load an old db where the schema meta data was inline + (when-let [schema-meta (k/get store schema-meta-key nil {:sync? true})] + (sc/cache-miss schema-meta-key schema-meta) + schema-meta)) + empty (db/empty-db nil config store)] + (merge + (assoc empty + :max-tx max-tx + :max-eid max-eid + :config config + :meta meta + :schema schema + :hash hash + :op-count op-count + :eavt eavt-key + :aevt aevt-key + :avet avet-key + :temporal-eavt temporal-eavt-key + :temporal-aevt temporal-aevt-key + :temporal-avet temporal-avet-key + :rschema rschema + :system-entities system-entities + :ident-ref-map ident-ref-map + :ref-ident-map ref-ident-map + :store store) + schema-meta))) (defn branch-heads-as-commits [store parents] (set (doall (for [p parents] @@ -140,10 +154,12 @@ db (-> db (assoc-in [:meta :datahike/commit-id] cid) (assoc-in [:meta :datahike/parents] parents)) - db-to-store (db->stored db true) + [schema-meta-op db-to-store] (db->stored db true sync?) _ (