Skip to content

Commit

Permalink
Merge pull request #221 from threatgrid/ingest-limit
Browse files Browse the repository at this point in the history
Ingest limit
quoll authored Oct 19, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents d3b6752 + 10a21c6 commit aa02827
Showing 8 changed files with 133 additions and 48 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## [2.2.2] - 2021-10-19
### Added
- Added a new `:input-limit` option to transact. When included, the transaction will attempt to keep at or below this number of triples.

## [2.2.1] - 2021-10-16
### Fixed
- Accepting java.time.Instant objects on the `since` and `as-of` database functions.
@@ -285,7 +289,8 @@
### Added
- Introduced Update Annotations

[Unreleased]: https://github.com/threatgrid/asami/compare/2.2.1...HEAD
[Unreleased]: https://github.com/threatgrid/asami/compare/2.2.2...HEAD
[2.2.2]: https://github.com/threatgrid/asami/compare/2.2.1...2.2.2
[2.2.1]: https://github.com/threatgrid/asami/compare/2.2.0...2.2.1
[2.2.0]: https://github.com/threatgrid/asami/compare/2.1.3...2.2.0
[2.1.3]: https://github.com/threatgrid/asami/compare/2.1.2...2.1.3
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ Asami can be made available to clojure by adding the following to a `deps.edn` f
```clojure
{
:deps {
org.clojars.quoll/asami {:mvn/version "2.2.1"}
org.clojars.quoll/asami {:mvn/version "2.2.2"}
}
}
```
@@ -41,7 +41,7 @@ This makes Asami available to a repl that is launched with the `clj` or `clojure

Alternatively, Asami can be added for the Leiningen build tool by adding this to the `:dependencies` section of the `project.clj` file:
```clojure
[org.clojars.quoll/asami "2.2.1"]
[org.clojars.quoll/asami "2.2.2"]
```

### Important Note for databases before 2.1.0
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject org.clojars.quoll/asami "2.2.1"
(defproject org.clojars.quoll/asami "2.2.2"
:description "An in memory graph store for Clojure and ClojureScript"
:url "http://github.com/threatgrid/asami"
:license {:name "Eclipse Public License"
10 changes: 7 additions & 3 deletions src/asami/core.cljc
Original file line number Diff line number Diff line change
@@ -158,7 +158,8 @@
(s/one s/Any "attribute")
(s/one s/Any "value")]]
(s/optional-key :executor) s/Any
(s/optional-key :update-fn) (s/pred fn?)}
(s/optional-key :update-fn) (s/pred fn?)
(s/optional-key :input-limit) s/Num}
[s/Any]))

(s/defn transact-async
@@ -178,6 +179,7 @@
Alternatively, a map may have a :tx-triples key. If so, then this is a seq of 3 element vectors.
Each vector in a :tx-triples seq will contain the raw values for [entity attribute value]
:executor An optional value in the tx-info containing an executor to be used to run the CompletableFuture
:input-limit contains an optional maximum number of statements to insert (approx)
Entities and assertions may have attributes that are keywords with a trailing ' character.
When these appear an existing attribute without that character will be replaced. This only occurs for the top level
entity, and is not applied to attributes appearing in nested structures.
@@ -192,7 +194,7 @@
:tx-data sequence of datoms produced by the transaction
:tempids mapping of the temporary IDs in entities to the allocated nodes"
[{:keys [name state] :as connection} :- ConnectionType
{:keys [tx-data tx-triples executor update-fn] :as tx-info} :- TransactData]
{:keys [tx-data tx-triples executor update-fn input-limit] :as tx-info} :- TransactData]

;; Detached databases need to be reattached when transacted into
(check-attachment connection)
@@ -222,7 +224,9 @@
(fn [graph]
;; building triples returns a tuple of assertions, retractions, tempids
(let [[_ _ tempids :as result]
(entities/build-triples graph (seq-wrapper (or tx-data tx-info)))]
(entities/build-triples graph
(seq-wrapper (or tx-data tx-info))
input-limit)]
(vreset! vtempids tempids)
result))))
;; pull out the info captured during the transaction
85 changes: 56 additions & 29 deletions src/asami/entities.cljc
Original file line number Diff line number Diff line change
@@ -41,6 +41,11 @@
(some append-attribute? obj-keys)
(some #(and (map? %) (contains-updates? %)) (vals obj)))))

(s/defn ^:private minus :- (s/maybe s/Num)
[limit :- (s/maybe s/Num)
n :- s/Num]
(when limit (- limit n)))

(s/defn ^:private entity-triples :- [(s/one [Triple] "New triples")
(s/one [Triple] "Retractions")
(s/one {s/Any s/Any} "New list of ID mappings")
@@ -55,7 +60,8 @@
[graph :- GraphType
{id :db/id ident :db/ident ident2 :id :as obj} :- EntityMap
existing-ids :- {s/Any s/Any}
top-ids :- #{s/Any}]
top-ids :- #{s/Any}
limit :- (s/maybe s/Num)]
(let [[new-obj removals additions]
(if (contains-updates? obj)
(do
@@ -106,10 +112,16 @@
(let [v (obj (append->annotate attr))
new-node (node/new-node graph)]
[[(find-tail head) :tg/rest new-node] [new-node :tg/first v] [head :tg/contains v]])) attr-heads)]
[new-obj removals append-triples]))
(if (and limit (> (count append-triples) limit))
(throw (ex-info "Limit reached" {:overflow true}))
[new-obj removals append-triples])))
[obj nil nil])

[triples ids new-top-ids] (writer/ident-map->triples graph new-obj existing-ids top-ids)
[triples ids new-top-ids] (writer/ident-map->triples graph
new-obj
existing-ids
top-ids
(minus limit (count additions)))

;; if updates occurred new entity statements are redundant
triples (if (or (seq removals) (seq additions) (not (identical? obj new-obj)))
@@ -138,29 +150,44 @@
(s/one {s/Any s/Any} "ID map of created objects")]
"Converts a set of transaction data into triples.
Returns a tuple containing [triples removal-triples tempids]"
[graph :- gr/GraphType
data :- [s/Any]]
(let [[retract-stmts new-data] (util/divide' #(= :db/retract (first %)) data)
ref->id (partial resolve-lookup-refs graph)
retractions (mapv (comp (partial mapv ref->id) rest) retract-stmts)
add-triples (fn [[acc racc ids top-ids] obj]
(if (map? obj)
(let [[triples rtriples new-ids new-top-ids] (entity-triples graph obj ids top-ids)]
[(into acc triples) (into racc rtriples) new-ids new-top-ids])
(if (and (seqable? obj)
(= 4 (count obj))
(= :db/add (first obj)))
(or
(when (= (nth obj 2) :db/id)
(let [id (nth obj 3)]
(when (temp-id? id)
(let [new-id (or (ids id) (node/new-node graph))]
[(conj acc (assoc (vec-rest obj) 2 new-id))
racc
(assoc ids (or id new-id) new-id)
top-ids]))))
[(conj acc (mapv #(or (ids %) (ref->id %)) (rest obj))) racc ids top-ids])
(throw (ex-info (str "Bad data in transaction: " obj) {:data obj})))))
[triples rtriples id-map top-level-ids] (reduce add-triples [[] retractions {} #{}] new-data)
triples (writer/backtrack-unlink-top-entities top-level-ids triples)]
[triples rtriples id-map]))
([graph :- gr/GraphType
data :- [s/Any]]
(build-triples graph data nil))
([graph :- gr/GraphType
data :- [s/Any]
limit :- (s/maybe s/Num)]
(let [[retract-stmts new-data] (util/divide' #(= :db/retract (first %)) data)
ref->id (partial resolve-lookup-refs graph)
retractions (mapv (comp (partial mapv ref->id) rest) retract-stmts)
add-triples (fn [[acc racc ids top-ids :as last-result] obj]
(if (and limit (> (count acc) limit))
(reduced last-result)
(if (map? obj)
(try
(let [[triples rtriples new-ids new-top-ids] (entity-triples graph
obj
ids
top-ids
(minus limit (count acc)))]
[(into acc triples) (into racc rtriples) new-ids new-top-ids])
(catch #?(:clj Exception :cljs :default) e
(if-let [overflow (:overflow (ex-data e))]
(reduced last-result)
(throw e))))
(if (and (seqable? obj)
(= 4 (count obj))
(= :db/add (first obj)))
(or
(when (= (nth obj 2) :db/id)
(let [id (nth obj 3)]
(when (temp-id? id)
(let [new-id (or (ids id) (node/new-node graph))]
[(conj acc (assoc (vec-rest obj) 2 new-id))
racc
(assoc ids (or id new-id) new-id)
top-ids]))))
[(conj acc (mapv #(or (ids %) (ref->id %)) (rest obj))) racc ids top-ids])
(throw (ex-info (str "Bad data in transaction: " obj) {:data obj}))))))
[triples rtriples id-map top-level-ids] (reduce add-triples [[] retractions {} #{}] new-data)
triples (writer/backtrack-unlink-top-entities top-level-ids triples)]
[triples rtriples id-map])))
31 changes: 21 additions & 10 deletions src/asami/entities/writer.cljc
Original file line number Diff line number Diff line change
@@ -33,6 +33,8 @@

(def ^:dynamic *triples* nil)

(def ^:dynamic *limit* nil)

(def ^:dynamic *current-entity* nil)

(def ^:dynamic *top-level-entities* nil)
@@ -43,6 +45,13 @@

(declare value-triples map->triples)

(defn add-triples!
[op data]
(vswap! *triples* op data)
(when (and *limit*
(> (count @*triples*) *limit*))
(throw (ex-info "overflow" {:overflow true}))))

(defn list-triples
"Creates the triples for a list. Returns a node and list of nodes representing contents of the list."
[vlist]
@@ -52,17 +61,17 @@
[list-ref value-nodes]
(let [node-ref (node/new-node *current-graph*)
_ (when last-ref
(vswap! *triples* conj [last-ref :tg/rest node-ref]))
(add-triples! conj [last-ref :tg/rest node-ref]))
value-ref (value-triples v)]
(vswap! *triples* conj [node-ref (node/data-attribute *current-graph* value-ref) value-ref])
(add-triples! conj [node-ref (node/data-attribute *current-graph* value-ref) value-ref])
(recur (or list-ref node-ref) node-ref (conj value-nodes value-ref) vs))))))

(s/defn value-triples-list
[vlist :- [s/Any]]
(if (seq vlist)
(let [[node value-nodes] (list-triples vlist)]
(doseq [vn value-nodes]
(vswap! *triples* conj [node (node/container-attribute *current-graph* vn) vn]))
(add-triples! conj [node (node/container-attribute *current-graph* vn) vn]))
node)
:tg/empty-list))

@@ -86,7 +95,7 @@
(when-not (or (= node *current-entity*)
(@*top-level-entities* node)
(= node :tg/empty-list))
(vswap! *triples* conj [*current-entity* :tg/owns node]))
(add-triples! conj [*current-entity* :tg/owns node]))
node)

(defn value-triples
@@ -110,9 +119,9 @@
(if (set? value)
(doseq [v value]
(let [vr (value-triples v)]
(vswap! *triples* conj [entity-ref property vr])))
(add-triples! conj [entity-ref property vr])))
(let [v (value-triples value)]
(vswap! *triples* conj [entity-ref property v]))))
(add-triples! conj [entity-ref property v]))))

(defn new-node
[id]
@@ -185,22 +194,24 @@
"Converts a single map to triples for an ID'ed map"
([graph :- GraphType
j :- EntityMap]
(ident-map->triples graph j {} #{}))
(ident-map->triples graph j {} #{} nil))
([graph :- GraphType
j :- EntityMap
id-map :- {s/Any s/Any}
top-level-ids :- #{s/Any}]
top-level-ids :- #{s/Any}
limit :- (s/maybe s/Num)]
(binding [*current-graph* graph
*id-map* (volatile! id-map)
*triples* (volatile! [])
*limit* limit
*top-level-entities* (volatile! top-level-ids)]
(let [derefed-id-map (ident-map->triples j)]
[@*triples* derefed-id-map @*top-level-entities*])))
([j :- EntityMap]
(let [node-ref (map->triples j)]
(if (:db/ident j)
(vswap! *triples* conj [node-ref :tg/entity true])
(vswap! *triples* into [[node-ref :db/ident (name-for node-ref)] [node-ref :tg/entity true]]))
(add-triples! conj [node-ref :tg/entity true])
(add-triples! into [[node-ref :db/ident (name-for node-ref)] [node-ref :tg/entity true]]))
@*id-map*)))

(defn backtrack-unlink-top-entities
20 changes: 20 additions & 0 deletions test/asami/api_test.cljc
Original file line number Diff line number Diff line change
@@ -42,6 +42,26 @@
(is (instance? asami.multi_graph.MultiGraph (:graph (:db @(:state cm)))))
(is (= "banana" (:name cm)))))

(deftest test-input-limit
(let [c (connect "asami:mem://limit1")
maksim {:db/id -1
:name "Maksim"
:age 45
:wife {:db/id -2}
:aka ["Maks Otto von Stirlitz", "Jack Ryan"]}
anna {:db/id -2
:name "Anna"
:age 31
:husband {:db/id -1}
:aka ["Anitzka"]}
{:keys [tx-data]} @(transact c {:tx-data [maksim anna]})
c2 (connect "asami:mem://limit2")
{tx-data2 :tx-data} @(transact c2 {:tx-data [maksim anna] :input-limit 15})]
(is (= 21 (count tx-data)))
(is (= 21 (q '[:find (count *) . :where [?s ?p ?o]] c)))
(is (= 13 (count tx-data2)))
(is (= 13 (q '[:find (count *) . :where [?s ?p ?o]] c2)))))

(deftest load-data
(let [c (connect "asami:mem://test1")
r (transact c {:tx-data [{:db/ident "bobid"
22 changes: 20 additions & 2 deletions test/asami/entities/test_entity.cljc
Original file line number Diff line number Diff line change
@@ -14,7 +14,8 @@
:cljs [schema.test :as st :refer-macros [deftest]])
#?(:clj [clojure.test :as t :refer [is]]
:cljs [clojure.test :as t :refer-macros [is]]))
#?(:clj (:import [java.time ZonedDateTime])
#?(:clj (:import [java.time ZonedDateTime]
[clojure.lang ExceptionInfo])
:cljs (:import [goog.date DateTime])))

(defn parseDateTime [s]
@@ -287,6 +288,23 @@
(is (= data obj1))
(is (= (assoc data :sub (dissoc d0 :db/ident)) obj2))))

(deftest test-entity-limits
(let [m1 {:prop "val"}
m2 {:prop "val", :p2 2}
m3 {:prop "val", :p2 22, :p3 [42 54]}
m4 {:prop "val"}
m5 {:prop "val2"}
m6 {:prop "val" :arr [{:a 1} {:a 2} ["nested"]]}
m7 {:prop "val", :p2 22, :p3 []}]
(is (= 3 (count (first (ident-map->triples empty-graph m1 {} #{} 18)))))
(is (= 4 (count (first (ident-map->triples empty-graph m2 {} #{} 18)))))
(is (= 11 (count (first (ident-map->triples empty-graph m3 {} #{} 18)))))
(is (= 3 (count (first (ident-map->triples empty-graph m4 {} #{} 18)))))
(is (= 3 (count (first (ident-map->triples empty-graph m5 {} #{} 18)))))
(is (thrown-with-msg? ExceptionInfo #"overflow"
(ident-map->triples empty-graph m6 {} #{} 18)))
(is (= 5 (count (first (ident-map->triples empty-graph m7 {} #{} 18)))))))

(deftest test-looped-ref->entity
(let [d1 {:db/ident :t1, :task/name "Task 1", :task/requires [#:db{:ident :t3}]}
d2 {:db/ident :t2, :task/name "Task 2", :task/requires [#:db{:ident :t1}]}
@@ -306,7 +324,7 @@
(defn ident-map->graph
([m] (ident-map->graph m {}))
([m mp]
(let [[triples result-map] (ident-map->triples empty-graph m mp #{})]
(let [[triples result-map] (ident-map->triples empty-graph m mp #{} nil)]
[(set triples) result-map])))

(deftest test-ident-map->triples

0 comments on commit aa02827

Please sign in to comment.