From 9adfaa02c856e9bbc294f0a6e47dc0234a6b26f5 Mon Sep 17 00:00:00 2001 From: Paula Gearon Date: Wed, 2 Sep 2020 17:56:42 -0400 Subject: [PATCH] Added graph-transact with transient graphs --- src/asami/common_index.cljc | 19 +++++++++++++- src/asami/graph.cljc | 1 + src/asami/index.cljc | 37 ++++++++++++++++++++++++++-- src/asami/memory.cljc | 14 +---------- src/asami/multi_graph.cljc | 4 +++ test/asami/test_core_query.cljc | 6 +++-- test/asami/test_query_internals.cljc | 6 +++-- 7 files changed, 67 insertions(+), 20 deletions(-) diff --git a/src/asami/common_index.cljc b/src/asami/common_index.cljc index 60ff7d2..a7a5145 100644 --- a/src/asami/common_index.cljc +++ b/src/asami/common_index.cljc @@ -6,7 +6,24 @@ and multigraph implementations." [zuko.schema :as st] [clojure.set :as set] #?(:clj [schema.core :as s] - :cljs [schema.core :as s :include-macros true]))) + :cljs [schema.core :as s :include-macros true])) + #?(:clj (:import [clojure.lang ITransientCollection]))) + +(defn tr + "Converts collections to transients" + [x] + (if (or (map? x) (set? x)) (transient x) x)) + +(defn transient? + "Tests if a value is a transient collection" + [x] + #?(:clj (instance? ITransientCollection x) + :cljs (implements? ITransientCollection x))) + +(defn pt! + "Converts transient collections back to persistent collections" + [x] + (if (transient? x) (persistent! x) x)) (defprotocol NestedIndex (lowest-level-fn [this] "Returns a function for handling the lowest index level retrieval") diff --git a/src/asami/graph.cljc b/src/asami/graph.cljc index 0644cef..e461129 100644 --- a/src/asami/graph.cljc +++ b/src/asami/graph.cljc @@ -10,6 +10,7 @@ (new-graph [this] "Creates an empty graph of the same type") (graph-add [this subj pred obj] "Adds triples to the graph") (graph-delete [this subj pred obj] "Removes triples from the graph") + (graph-transact [this tx-id assertions retractions] "Bulk operation to add and remove multiple statements in a single operation") (graph-diff [this other] "Returns all subjects that have changed in this graph, compared to other") (resolve-triple [this subj pred obj] "Resolves patterns from the graph, and returns unbound columns only") (count-triple [this subj pred obj] "Resolves patterns from the graph, and returns the size of the resolution")) diff --git a/src/asami/index.cljc b/src/asami/index.cljc index c6ab30d..ab71e2e 100644 --- a/src/asami/index.cljc +++ b/src/asami/index.cljc @@ -2,9 +2,10 @@ :author "Paula Gearon"} asami.index (:require [asami.graph :as gr :refer [Graph graph-add graph-delete graph-diff resolve-triple count-triple]] - [asami.common-index :as common :refer [? NestedIndex]] + [asami.common-index :as common :refer [? NestedIndex tr pt!]] [asami.analytics :as analytics] [zuko.node :refer [NodeAPI]] + [clojure.walk :refer [prewalk postwalk]] #?(:clj [schema.core :as s] :cljs [schema.core :as s :include-macros true]))) @@ -16,6 +17,16 @@ c :- s/Any] (update-in idx [a b] (fn [v] (if (seq v) (conj v c) #{c})))) +(defn index-add! + "Add elements to a 3-level transient index. Returns a transient object, or nil when nothing was inserted" + [idx a b c] + (if-let [idx2 (get idx a)] + (if-let [idx3 (get idx2 b)] + (if-not (get idx3 c) + (assoc! idx a (assoc! idx2 b (conj! idx3 c)))) + (assoc! idx a (assoc! idx2 b (transient #{c})))) + (assoc! idx a (transient {b (transient #{c})})))) + (s/defn index-delete :- (s/maybe {s/Any {s/Any #{s/Any}}}) "Remove elements from a 3-level index. Returns the new index, or nil if there is no change." [idx :- {s/Any {s/Any #{s/Any}}} @@ -60,6 +71,13 @@ (* (count (:spo graph)) (count (:osp graph))) (count (common/get-transitive-from-index graph tag s p o)))) +(defn graph-add! [{:keys [spo pos osp] :as graph} subj pred obj] + (when-let [new-spo (index-add! spo subj pred obj)] + (assoc graph + :spo new-spo + :pos (index-add! pos pred obj subj) + :osp (index-add! osp obj subj pred)))) + (declare empty-graph) (defrecord GraphIndexed [spo pos osp] @@ -80,8 +98,23 @@ :osp (index-add osp obj subj pred))))) (graph-delete [this subj pred obj] (if-let [idx (index-delete spo subj pred obj)] - (assoc this :spo idx :pos (index-delete pos pred obj subj) :osp (index-delete osp obj subj pred)) + (assoc this + :spo idx + :pos (index-delete pos pred obj subj) + :osp (index-delete osp obj subj pred)) this)) + (graph-transact [this tx-id assertions retractions] + (as-> this graph + (reduce (fn [acc [s p o]] (graph-delete acc s p o)) graph retractions) + (assoc graph + :spo (postwalk tr (:spo graph)) + :pos (postwalk tr (:pos graph)) + :osp (postwalk tr (:osp graph))) + (reduce (fn [acc [s p o]] (graph-add! acc s p o)) graph assertions) + (assoc graph + :spo (prewalk pt! (:spo graph)) + :pos (prewalk pt! (:pos graph)) + :osp (prewalk pt! (:osp graph))))) (graph-diff [this other] (let [s-po (remove (fn [[s po]] (= po (get (:spo other) s))) spo)] diff --git a/src/asami/memory.cljc b/src/asami/memory.cljc index f35821f..219907f 100644 --- a/src/asami/memory.cljc +++ b/src/asami/memory.cljc @@ -78,16 +78,6 @@ (def empty-multi-graph multi/empty-multi-graph) -(defn- add-to-graph - "Adds triples to the graph" - [graph data] - (reduce (fn [acc d] (apply gr/graph-add acc d)) graph data)) - -(defn- delete-from-graph - "Removes triples from the graph" - [graph data] - (reduce (fn [acc d] (apply gr/graph-delete acc d)) graph data)) - (defn new-connection "Creates a memory Connection object" [name gr] @@ -159,9 +149,7 @@ Updates the connection to the new graph." [conn asserts retracts] (let [{:keys [graph history] :as db-before} (db* conn) - next-graph (-> graph - (delete-from-graph retracts) - (add-to-graph asserts)) + next-graph (gr/graph-transact graph (count history) asserts retracts) db-after (->MemoryDatabase next-graph (conj history db-before) (now))] (reset! (:state conn) {:db db-after :history (conj (:history db-after) db-after)}) [db-before db-after])) diff --git a/src/asami/multi_graph.cljc b/src/asami/multi_graph.cljc index bef0491..185784b 100644 --- a/src/asami/multi_graph.cljc +++ b/src/asami/multi_graph.cljc @@ -117,6 +117,10 @@ allow rules to successfully use this graph type." (if-let [idx (multi-delete spo subj pred obj)] (assoc this :spo idx :pos (multi-delete pos pred obj subj) :osp (multi-delete osp obj subj pred)) this)) + (graph-transact [this tx-id assertions retractions] + (as-> this graph + (reduce (fn [acc [s p o]] (graph-delete acc s p o)) graph retractions) + (reduce (fn [acc [s p o]] (graph-add acc s p o)) graph assertions))) (graph-diff [this other] (let [s-po (remove (fn [[s po]] (= po (get (:spo other) s))) spo)] diff --git a/test/asami/test_core_query.cljc b/test/asami/test_core_query.cljc index dd0be01..7ca7832 100644 --- a/test/asami/test_core_query.cljc +++ b/test/asami/test_core_query.cljc @@ -1,7 +1,7 @@ (ns asami.test-core-query "Tests the public query functionality" (:require [asami.core :refer [q]] - [asami.graph :refer [new-node]] + [asami.graph :refer [new-node graph-transact]] [asami.query :refer [*env*]] [asami.index :refer [empty-graph]] [schema.core :as s] @@ -11,7 +11,9 @@ :cljs [schema.test :as st :refer-macros [deftest]])) #?(:clj (:import [clojure.lang ExceptionInfo]))) -(def assert-data "Access to private function" #'asami.memory/add-to-graph) +(defn assert-data + [graph data] + (graph-transact graph 0 data nil)) (def nn new-node) diff --git a/test/asami/test_query_internals.cljc b/test/asami/test_query_internals.cljc index 8e3d83d..9537646 100644 --- a/test/asami/test_query_internals.cljc +++ b/test/asami/test_query_internals.cljc @@ -5,7 +5,7 @@ [asami.query :as q :refer [pattern-left-join outer-product create-binding create-bindings minus left-join disjunction result-label aggregate-over aggregate-query]] - [asami.graph :refer [Graph resolve-triple]] + [asami.graph :refer [Graph resolve-triple graph-transact]] [asami.index :refer [empty-graph]] [asami.internal :as internal] [zuko.util :as u] @@ -20,7 +20,9 @@ (use-fixtures :once st/validate-schemas) -(def assert-data "Access to private function" #'asami.memory/add-to-graph) +(defn assert-data + [graph data] + (graph-transact graph 0 data nil)) (deftest var-mapping (let [m1 (matching-vars `[?a :rel ?c] `[?a ?b ?c] )