From 8a1d1f1b033b70cd48ac4c036b1365d50c4b2ba0 Mon Sep 17 00:00:00 2001 From: Christian Weilbach Date: Thu, 26 Sep 2024 00:06:17 -0700 Subject: [PATCH] Use CompletableFuture in promise implementation. (#700) * Use CompletableFuture in promise implementation. * Wrap exception correctly, unify transact and transact!. --- src/datahike/api/impl.cljc | 7 ++- src/datahike/api/specification.cljc | 2 +- src/datahike/tools.cljc | 69 +++++++++++++---------------- 3 files changed, 38 insertions(+), 40 deletions(-) diff --git a/src/datahike/api/impl.cljc b/src/datahike/api/impl.cljc index 88abf48b..c56914a8 100644 --- a/src/datahike/api/impl.cljc +++ b/src/datahike/api/impl.cljc @@ -23,7 +23,7 @@ [datahike.db HistoricalDB AsOfDB SinceDB FilteredDB] [datahike.impl.entity Entity]))) -(defn transact [connection arg-map] +(defn transact! [connection arg-map] (let [arg (cond (map? arg-map) (if (contains? arg-map :tx-data) arg-map @@ -35,7 +35,10 @@ :else (dt/raise "Bad argument to transact, expected map, vector or sequence." {:error :transact/syntax :argument-type (type arg-map)}))] - (deref (dw/transact! connection arg)))) + (dw/transact! connection arg))) + +(defn transact [connection arg-map] + @(transact! connection arg-map)) ;; necessary to support initial-tx shorthand, which really should have been avoided (defn create-database [& args] diff --git a/src/datahike/api/specification.cljc b/src/datahike/api/specification.cljc index c2767b89..b16e0eed 100644 --- a/src/datahike/api/specification.cljc +++ b/src/datahike/api/specification.cljc @@ -136,7 +136,7 @@ Exists for Datomic API compatibility. Prefer using `@conn` directly if possible. :doc "Same as transact, but asynchronously returns a future." :supports-remote? false :referentially-transparent? false - :impl datahike.writer/transact!} + :impl datahike.api.impl/transact!} transact {:args (s/cat :conn spec/SConnection :txs spec/STransactions) diff --git a/src/datahike/tools.cljc b/src/datahike/tools.cljc index dd642fdc..ef178377 100644 --- a/src/datahike/tools.cljc +++ b/src/datahike/tools.cljc @@ -1,9 +1,12 @@ (ns ^:no-doc datahike.tools (:require [superv.async :refer [throw-if-exception-]] + [clojure.core.async.impl.protocols :as async-impl] + [clojure.core.async :as async] #?(:clj [clojure.java.io :as io]) [taoensso.timbre :as log]) #?(:clj (:import [java.util Properties UUID Date] + [java.util.concurrent CompletableFuture] [java.net InetAddress]))) (defn combine-hashes [x y] @@ -68,43 +71,35 @@ `(throw #?(:clj (ex-info (str ~@(map (fn [m#] (if (string? m#) m# (list 'pr-str m#))) msgs)) ~data) :cljs (error (str ~@(map (fn [m#] (if (string? m#) m# (list 'pr-str m#))) msgs)) ~data)))))) -; (throwable-promise) derived from (promise) in clojure/core.clj. -; * Clojure -; * Copyright (c) Rich Hickey. All rights reserved. -; * The use and distribution terms for this software are covered by the -; * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) -; * which can be found in the file epl-v10.html at the root of this distribution. -; * By using this software in any fashion, you are agreeing to be bound by -; * the terms of this license. -; * You must not remove this notice, or any other, from this software. -(defn throwable-promise - "Returns a promise object that can be read with deref/@, and set, - once only, with deliver. Calls to deref/@ prior to delivery will - block, unless the variant of deref with timeout is used. All - subsequent derefs will return the same delivered value without - blocking. Exceptions delivered to the promise will throw on deref." - [] - (let [d (java.util.concurrent.CountDownLatch. 1) - v (atom d)] - (reify - clojure.lang.IDeref - (deref [_] (.await d) (throw-if-exception- @v)) - clojure.lang.IBlockingDeref - (deref - [_ timeout-ms timeout-val] - (if (.await d timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS) - (throw-if-exception- @v) - timeout-val)) - clojure.lang.IPending - (isRealized [this] - (zero? (.getCount d))) - clojure.lang.IFn - (invoke - [this x] - (when (and (pos? (.getCount d)) - (compare-and-set! v d x)) - (.countDown d) - this))))) +;; adapted from https://clojure.atlassian.net/browse/CLJ-2766 +#?(:clj + (defn throwable-promise + "Returns a promise object that can be read with deref/@, and set, once only, with deliver. Calls to deref/@ prior to delivery will block, unless the variant of deref with timeout is used. All subsequent derefs will return the same delivered value without blocking. Exceptions delivered to the promise will throw on deref. + + Also supports core.async take! to optionally consume values without blocking the reader thread." + [] + (let [cf (CompletableFuture.) + p (async/promise-chan)] + (reify + clojure.lang.IDeref + (deref [_] (throw-if-exception- (try (.get cf) (catch Throwable t t)))) + clojure.lang.IBlockingDeref + (deref [_ timeout-ms timeout-val] + (if-let [v (try (.get cf timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS) (catch Throwable t t))] + (throw-if-exception- v) + timeout-val)) + clojure.lang.IPending + (isRealized [_] (.isDone cf)) + clojure.lang.IFn + (invoke [this x] + (if (instance? Throwable x) + (.completeExceptionally cf x) + (.complete cf x)) + (if-not (nil? x) (async/put! p x) (async/close! p)) + this) + async-impl/ReadPort + (take! [_this handler] (async-impl/take! p handler))))) + :cljs (def throwable-promise async/promise-chan)) (defn get-version "Retrieves the current version of a dependency. Thanks to https://stackoverflow.com/a/33070806/10978897"