Skip to content

Commit

Permalink
Simplify connection updates, add cost center. (#658)
Browse files Browse the repository at this point in the history
* Simplify connection updates, add cost center.

* Fix format.

* Handle migration properly, warn on outside connection changes.

* Fix format.

* Refactor according to PR feedback.
  • Loading branch information
whilo authored Oct 3, 2024
1 parent 79bb943 commit 99290f1
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 98 deletions.
15 changes: 9 additions & 6 deletions src/datahike/experimental/versioning.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
[datahike.core :refer [with]]
[datahike.store :refer [store-identity]]
[datahike.writing :refer [stored->db db->stored stored-db?
update-connection! commit! add-commit-meta!
create-commit-id flush-pending-writes]]
complete-db-update commit! create-commit-id flush-pending-writes]]
[superv.async :refer [<? S go-loop-try]]
[datahike.db.utils :refer [db?]]
[datahike.tools :as dt]))
Expand Down Expand Up @@ -135,12 +134,16 @@
"Create a merge commit to the current branch of this connection for parent
commit uuids. It is the responsibility of the caller to make sure that tx-data
contains the data to be merged into the branch from the parents. This function
ensures that the parent commits are properly tracked."
ensures that the parent commits are properly tracked.
NOTE: Currently merge! requires that you release all connections to conn and reconnect afterwards to reset the writer state. This will be fixed in the future by handling merge! through the writer."
([conn parents tx-data]
(merge! conn parents tx-data nil))
([conn parents tx-data tx-meta]
(parent-check parents)
(let [db (:db-after (update-connection! conn tx-data tx-meta #(with %1 %2 %3)))
parents (conj parents (get-in @conn [:config :branch]))]
(add-commit-meta! conn (commit! db parents))
(let [old @conn
db (:db-after (complete-db-update old (with old tx-data tx-meta)))
parents (conj parents (get-in old [:config :branch]))
commit-db (commit! db parents)]
(reset! conn commit-db)
true)))
9 changes: 7 additions & 2 deletions src/datahike/index/persistent_set.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,24 @@
(uuid (mapv (comp vec seq) (.keys node))))
(uuid)))

(defrecord CachedStorage [store config cache stats pending-writes]
(defrecord CachedStorage [store config cache stats pending-writes cost-center-fn]
IStorage
(store [_ node]
(@cost-center-fn :store)
(swap! stats update :writes inc)
(let [address (gen-address node (:crypto-hash? config))
_ (trace "writing storage: " address " crypto: " (:crypto-hash? config))]
(swap! pending-writes conj (k/assoc store address node {:sync? false}))
(wrapped/miss cache address node)
address))
(accessed [_ address]
(@cost-center-fn :accessed)
(trace "accessing storage: " address)
(swap! stats update :accessed inc)
(wrapped/hit cache address)
nil)
(restore [_ address]
(@cost-center-fn :restore)
(trace "reading: " address)
(if-let [cached (wrapped/lookup cache address)]
cached
Expand All @@ -234,7 +237,8 @@
(CachedStorage. store config
(atom (cache/lru-cache-factory {} :threshold (:store-cache-size config)))
(atom init-stats)
(atom [])))
(atom [])
(atom (fn [_] nil))))

(def ^:const DEFAULT_BRANCHING_FACTOR 512)

Expand Down Expand Up @@ -268,6 +272,7 @@
(int (or (:branching-factor m) 0))
nil ;; weak ref default
))

(defmethod di/add-konserve-handlers :datahike.index/persistent-set [config store]
;; deal with circular reference between storage and store
(let [settings (map->settings {:branching-factor DEFAULT_BRANCHING_FACTOR})
Expand Down
135 changes: 72 additions & 63 deletions src/datahike/writer.cljc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns ^:no-doc datahike.writer
(:require [superv.async :refer [S thread-try <?-]]
(:require [superv.async :refer [S thread-try <?- go-try]]
[taoensso.timbre :as log]
[datahike.core]
[datahike.writing :as w]
Expand Down Expand Up @@ -40,77 +40,86 @@
[transaction-queue commit-queue
(thread-try
S
(let [store (:store @(:wrapped-atom connection))]
(do
;; processing loop
(go-loop []
(if-let [{:keys [op args callback] :as invocation} (<?- transaction-queue)]
(do
(when (> (count transaction-queue-buffer) (* 0.9 transaction-queue-size))
(log/warn "Transaction queue buffer more than 90% full, "
(count transaction-queue-buffer) "of" transaction-queue-size " filled."
"Reduce transaction frequency."))
(let [op-fn (write-fn-map op)
res (try
(apply op-fn connection args)
(go-try S
;; delay processing until the writer we are part of in connection is set
(while (not (:writer @(:wrapped-atom connection)))
(<! (timeout 10)))
(loop [old @(:wrapped-atom connection)]
(if-let [{:keys [op args callback] :as invocation} (<?- transaction-queue)]
(do
(when (> (count transaction-queue-buffer) (* 0.9 transaction-queue-size))
(log/warn "Transaction queue buffer more than 90% full, "
(count transaction-queue-buffer) "of" transaction-queue-size " filled."
"Reduce transaction frequency."))
(let [old (if-not (= (:max-tx old) (:max-tx @(:wrapped-atom connection)))
(do
(log/warn "DEPRECATED. Connection was changed outside of writer.")
(assoc old :max-tx (:max-tx @(:wrapped-atom connection))))
old)

op-fn (write-fn-map op)
res (try
(apply op-fn old args)
;; Only catch ExceptionInfo here (intentionally rejected transactions).
;; Any other exceptions should crash the writer and signal the supervisor.
(catch Exception e
(log/error "Error during invocation" invocation e args)
(catch Exception e
(log/error "Error during invocation" invocation e args)
;; take a guess that a NPE was triggered by an invalid connection
;; short circuit on errors
(put! callback
(if (= (type e) NullPointerException)
(ex-info "Null pointer encountered in invocation. Connection may have been invalidated, e.g. through db deletion, and needs to be released everywhere."
{:type :writer-error-during-invocation
:invocation invocation
:connection connection
:error e})
e))
:error))]
(when-not (= res :error)
(when (> (count commit-queue-buffer) (/ commit-queue-size 2))
(log/warn "Commit queue buffer more than 50% full, "
(count commit-queue-buffer) "of" commit-queue-size " filled."
"Throttling transaction processing. Reduce transaction frequency and check your storage throughput.")
(<! (timeout 50)))
(put! commit-queue [res callback])))
(recur))
(do
(close! commit-queue)
(log/debug "Writer thread gracefully closed"))))
(put! callback
(if (= (type e) NullPointerException)
(ex-info "Null pointer encountered in invocation. Connection may have been invalidated, e.g. through db deletion, and needs to be released everywhere."
{:type :writer-error-during-invocation
:invocation invocation
:connection connection
:error e})
e))
:error))]
(if-not (= res :error)
(do
(when (> (count commit-queue-buffer) (/ commit-queue-size 2))
(log/warn "Commit queue buffer more than 50% full, "
(count commit-queue-buffer) "of" commit-queue-size " filled."
"Throttling transaction processing. Reduce transaction frequency and check your storage throughput.")
(<! (timeout 50)))
(put! commit-queue [res callback])
(recur (:db-after res)))
(recur old))))
(do
(close! commit-queue)
(log/debug "Writer thread gracefully closed")))))
;; commit loop
(go-loop [tx (<?- commit-queue)]
(when tx
(let [txs (atom [tx])]
(go-try S
(loop [tx (<?- commit-queue)]
(when tx
(let [txs (into [tx] (take-while some?) (repeatedly #(poll! commit-queue)))]
;; empty channel of pending transactions
(loop [tx (poll! commit-queue)]
(when tx
(swap! txs conj tx)
(recur (poll! commit-queue))))
(log/trace "Batched transaction count: " (count @txs))
(log/trace "Batched transaction count: " (count txs))
;; commit latest tx to disk
(let [db (:db-after (first (peek @txs)))]
(try
(let [start-ts (get-time-ms)
{{:keys [datahike/commit-id datahike/parents]} :meta
:as commit-db} (<?- (w/commit! db nil false))
commit-time (- (get-time-ms) start-ts)]
(log/trace "Commit time (ms): " commit-time)
(w/add-commit-meta! connection commit-db)
(let [db (:db-after (first (peek txs)))]
(try
(let [start-ts (get-time-ms)
{{:keys [datahike/commit-id]} :meta
:as commit-db} (<?- (w/commit! db nil false))
commit-time (- (get-time-ms) start-ts)]
(log/trace "Commit time (ms): " commit-time)
(reset! connection commit-db)
;; notify all processes that transaction is complete
(doseq [[res callback] @txs]
(let [res (-> res
(assoc-in [:tx-meta :db/commitId] commit-id)
(assoc :db-after commit-db))]
(put! callback res))))
(catch Exception e
(doseq [[_ callback] @txs]
(put! callback e))
(log/error "Writer thread shutting down because of commit error " e)
(close! commit-queue)
(close! transaction-queue)))
(<! (timeout commit-wait-time))
(recur (<?- commit-queue))))))))]))
(doseq [[tx-report callback] txs]
(let [tx-report (-> tx-report
(assoc-in [:tx-meta :db/commitId] commit-id)
(assoc :db-after commit-db))]
(put! callback tx-report))))
(catch Exception e
(doseq [[_ callback] txs]
(put! callback e))
(log/error "Writer thread shutting down because of commit error." e)
(close! commit-queue)
(close! transaction-queue)))
(<! (timeout commit-wait-time))
(recur (<?- commit-queue)))))))))]))

;; public API

Expand Down
40 changes: 14 additions & 26 deletions src/datahike/writing.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
[hasch.core :refer [uuid]]
[superv.async :refer [go-try- <?-]]
[clojure.core.async :refer [poll!]]
[konserve.utils :refer [async+sync *default-sync-translation*]]))
[konserve.utils :refer [async+sync *default-sync-translation*]]
[taoensso.timbre :as t]))

;; mapping to storage

Expand Down Expand Up @@ -147,27 +148,14 @@
(<?- branch-op)
db)))))

(defn add-commit-meta! [connection commit-db]
(let [{{:keys [datahike/commit-id datahike/parents]} :meta} commit-db]
(swap! connection #(-> %
(assoc-in [:meta :datahike/parents] parents)
(assoc-in [:meta :datahike/commit-id] commit-id)))))

(defn update-connection! [connection tx-data tx-meta update-fn]
(let [ret-atom (atom nil)]
(swap! connection
(fn [old]
(let [{:keys [writer]} old
{:keys [db-after]
{:keys [db/txInstant]}
:tx-meta
:as tx-report} (update-fn old tx-data tx-meta)
new-meta (assoc (:meta db-after) :datahike/updated-at txInstant)
db (assoc db-after :meta new-meta :writer writer)
tx-report (assoc tx-report :db-after db)]
(reset! ret-atom tx-report)
db)))
@ret-atom))
(defn complete-db-update [old tx-report]
(let [{:keys [writer]} old
{:keys [db-after]
{:keys [db/txInstant]} :tx-meta} tx-report
new-meta (assoc (:meta db-after) :datahike/updated-at txInstant)
db (assoc db-after :meta new-meta :writer writer)
tx-report (assoc tx-report :db-after db)]
tx-report))

(defprotocol PDatabaseManager
(-create-database [config opts])
Expand Down Expand Up @@ -274,11 +262,11 @@
;; TODO log deprecation notice with #54
(-database-exists? config)))

(defn transact! [connection {:keys [tx-data tx-meta]}]
(defn transact! [old {:keys [tx-data tx-meta]}]
(log/debug "Transacting" (count tx-data) " objects with meta: " tx-meta)
(log/trace "Transaction data" tx-data)
(update-connection! connection tx-data tx-meta #(core/with %1 %2 %3)))
(complete-db-update old (core/with old tx-data tx-meta)))

(defn load-entities [connection entities]
(defn load-entities [old entities]
(log/debug "Loading" (count entities) " entities.")
(update-connection! connection entities nil #(core/load-entities-with %1 %2 %3)))
(complete-db-update old (core/load-entities-with old entities nil)))
2 changes: 1 addition & 1 deletion test/datahike/test/api_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@
(def date (java.util.Date.))

(Thread/sleep 100)

(d/transact conn {:tx-data [{:db/id [:name "Alice"] :age 35}]})

(is (= #{["Alice" 25] ["Bob" 30]}
Expand Down

0 comments on commit 99290f1

Please sign in to comment.