diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index 19d6e75..697be19 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -42,6 +42,16 @@ handy for testing." true) +(def ^:dynamic *default-pmap-buffer* + "This is an advanced configuration option. You probably don't need to set + this! + + When doing a pmap, Claypoole pushes input tasks into the threadpool. It + normally tries to keep the threadpool full, plus it adds a buffer of size + nthreads. If it can't find out the number of threads in the threadpool, it + just tries to keep *default-pmap-buffer* tasks in the pool." + 200) + (defn ncpus "Get the number of available CPUs." [] @@ -268,89 +278,111 @@ `(future-call ~pool (^{:once true} fn future-body [] ~@body))) (defn- make-canceller - "Creates a function to cancel a bunch of futures." - [future-reader] + "Creates a function to cancel a pmap." + [driver] (let [first-already-cancelled (atom Long/MAX_VALUE)] - (fn [i future-seq] + (fn [i later-tasks] (let [cancel-end @first-already-cancelled] ;; Don't re-kill futures we've already zapped to prevent an O(n^2) ;; explosion. (when (< i cancel-end) (swap! first-already-cancelled min i) - ;; Kill the future reader. - (future-cancel future-reader) + ;; Kill the pmap driver thread. + (future-cancel driver) ;; Stop the tasks above i before cancel-end. - (doseq [f (->> future-seq (take (- cancel-end i)))] + (doseq [f (->> later-tasks rest (take (- cancel-end i)))] (future-cancel f))))))) +(defn- buffer-blocking-seq + "Make a lazy sequence that blocks when the map's (imaginary) buffer is full." + [pool unordered-results] + (let [buffer-size (if-let [pool-size (impl/get-pool-size pool)] + (* 2 pool-size) + *default-pmap-buffer*)] + (concat (repeat buffer-size nil) + unordered-results))) + (defn- pmap-core "Given functions to customize for pmap or upmap, create a function that does the hard work of pmap." - [send-result read-result] - (fn [pool f arg-seqs] - (let [[shutdown? pool] (impl/->threadpool pool) - ;; Use map to handle the argument sequences. - args (apply map vector (map impl/unchunk arg-seqs)) - ;; Pre-declare the canceller because it needs the tasks but the tasks - ;; need it too. - canceller (promise) - start-task (fn [i a later-tasks] - ;; We can't directly make a future add itself to a - ;; queue. Instead, we use a promise for indirection. - (let [p (promise)] - (deliver p (future-call - pool - (with-meta - ;; Try to run the task, but - ;; definitely add the future to - ;; the queue. - #(try - (let [result (apply f a)] - (send-result @p) - result) - ;; Even if we had an error - ;; running the task, make sure the - ;; future shows up in the queue. - (catch Throwable t - ;; We've still got to send that - ;; result, even if it was an - ;; exception, and we have to do it - ;; before we start the canceller. - (send-result @p) - ;; If we've had an exception, kill - ;; future and ongoing processes. - (@canceller i later-tasks) - ;; Re-throw that throwable! - (throw t))) - ;; Add the args to the function's - ;; metadata for prioritization. - {:args a}))) - @p)) - futures (impl/map-indexed-with-rest start-task args) - ;; Start all the tasks in a real future, so we don't block. - read-future (core/future - (try - ;; Force all those futures to start. - (dorun futures) - ;; If we created a temporary pool, shut it down. - (finally (when shutdown? (shutdown pool)))))] - (deliver canceller (make-canceller read-future)) - ;; Read results as available. - (concat (map read-result futures) - ;; Deref the read-future to get its exceptions, if it has any. - (lazy-seq (try @read-future - ;; But if it was cancelled, the user doesn't care. - (catch CancellationException e))))))) + [pool ordered? f arg-seqs] + (let [[shutdown? pool] (impl/->threadpool pool) + ;; Use map to handle the argument sequences. + args (apply map vector (map impl/unchunk arg-seqs)) + ;; Pre-declare the canceller because it needs the tasks but the tasks + ;; need it too. + canceller (promise) + ;; Set up queues of tasks and results + [task-q tasks] (impl/queue-seq) + [unordered-results-q unordered-results] (impl/queue-seq) + ;; This is how we'll actually make things go. + start-task (fn [i a later-tasks] + ;; We can't directly make a future add itself to a + ;; queue. Instead, we use a promise for indirection. + (let [p (promise)] + (deliver p (future-call + pool + (with-meta + ;; Try to run the task, but definitely + ;; add the future to the queue. + #(try + (let [result (apply f a)] + (impl/queue-seq-add! + unordered-results-q @p) + result) + ;; Even if we had an error running + ;; the task, make sure the future + ;; shows up in the queue. + (catch Throwable t + ;; We've still got to send that + ;; result, even if it was an + ;; exception, and we have to do it + ;; before we start the canceller. + (impl/queue-seq-add! + unordered-results-q @p) + ;; If we've had an exception, kill + ;; future and ongoing processes. + (@canceller i later-tasks) + ;; Re-throw that throwable! + (throw t))) + ;; Add the args to the function's + ;; metadata for prioritization. + {:args a}))) + @p)) + ;; Start all the tasks in a real future, so we don't block. + driver (core/future + (try + (doseq [[i a later-tasks _] + (map vector (range) args (impl/subseqs tasks) + ;; The driver thread reads from this sequence + ;; and ignores the result, just to get the side + ;; effect of blocking when the map's + ;; (imaginary) buffer is full. + (buffer-blocking-seq pool unordered-results))] + (impl/queue-seq-add! task-q (start-task i a later-tasks))) + (finally + (impl/queue-seq-end! task-q) + (when shutdown? (shutdown pool)))))] + (deliver canceller (make-canceller driver)) + ;; Read results as available. + (concat (map deref + (if ordered? + tasks + (map second (impl/lazy-co-read tasks unordered-results)))) + ;; Deref the read-future to get its exceptions, if it has any. + (lazy-seq (try @driver + ;; But if it was cancelled, the user doesn't care. + (catch CancellationException e)))))) (defn- pmap-boilerplate "Do boilerplate pmap checks, then call the real pmap function." - [pool f arg-seqs pmap-fn] + [pool ordered? f arg-seqs] (when (empty? arg-seqs) (throw (IllegalArgumentException. "pmap requires at least one sequence to map over"))) (if (serial? pool) (doall (apply map f arg-seqs)) - (pmap-fn pool f arg-seqs))) + (pmap-core pool ordered? f arg-seqs))) (defn pmap "Like clojure.core.pmap, except: @@ -373,23 +405,13 @@ serial via (doall map). This may be helpful during profiling, for example. " [pool f & arg-seqs] - (pmap-boilerplate pool f arg-seqs - ;; pmap is easy--just deref the futures. - (let [send-result (constantly nil) - read-result deref] - (pmap-core send-result read-result)))) + (pmap-boilerplate pool true f arg-seqs)) (defn upmap "Like pmap, except that the return value is a sequence of results ordered by *completion time*, not by input order." [pool f & arg-seqs] - (pmap-boilerplate pool f arg-seqs - ;; upmap is a little complex; read data out of a queue to - ;; get the earliest-available data. - (let [q (LinkedBlockingQueue.) - send-result (fn [result] (.add q result)) - read-result (fn [_] (-> q .take deref))] - (pmap-core send-result read-result)))) + (pmap-boilerplate pool false f arg-seqs)) (defn pcalls "Like clojure.core.pcalls, except it takes a threadpool. For more detail on diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index 6f9c5f2..06a30e6 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -29,6 +29,7 @@ Executors ExecutorService Future + LinkedBlockingQueue ScheduledExecutorService ThreadFactory TimeoutException @@ -214,23 +215,69 @@ (str "Claypoole functions require a threadpool, a " "number, :builtin, or :serial, not %s.") arg))))) -(defn map-indexed-with-rest - "Given f and xs, return a sequence of (f i x s) for every x in xs where - s is the results of the rest of the map. +(defn get-pool-size + "If the pool has a max size, get that; else, return nil." + [pool] + (cond + (instance? java.util.concurrent.ScheduledThreadPoolExecutor pool) + (.getCorePoolSize pool) - i.e. (map-indexed-with-rest #(do (prn [%1 %2 %3]) %2) [:x :y :z]) - would print - [0 :x (:y :z)] - [1 :y (:z)] - [2 :z ()] + (instance? java.util.concurrent.ThreadPoolExecutor pool) + (.getMaximumPoolSize pool) - This is handy for lazy sequences that might need to see later elements - in the sequence, e.g. for cancelling futures." - [f xs & [i]] - (lazy-seq - (when (not (empty? xs)) - (let [i (or i 0) - x (first xs) - more (rest xs) - results (map-indexed-with-rest f more (inc i))] - (cons (f i x results) results))))) + :else + nil)) + +;; Queue-seq needs a unique item that, when seen in a queue, indicates that the +;; sequence has ended. It uses the private object end-marker, and uses +;; identical? to check against this object's (unique) memory address. +(let [end-marker (Object.)] + + (defn- queue-reader + "Make a lazy sequence from a queue, stopping upon reading the unique + end-marker object." + [q] + (lazy-seq + (let [x (.take q)] + (when-not (identical? x end-marker) + (cons x (queue-reader q)))))) + + (defn queue-seq + "Create a queue and a lazy sequence that reads from that queue." + [] + (let [q (LinkedBlockingQueue.)] + [q (queue-reader q)])) + + (defn queue-seq-add! + "Add an item to a queue (and its lazy sequence)." + [q x] + (.add q x)) + + (defn queue-seq-end! + "End a lazy sequence reading from a queue." + [q] + (queue-seq-add! q end-marker))) + +(defn lazy-co-read + "Zip s1 and s2, stopping when s1 stops. This helps avoid potential blocking + when trying to read queue sequences. + + In particular, this will block: + (map vector + (range 10) + (concat (range 10) (lazy-seq (deref (promise))))) + even though we only can read 10 things. Lazy-co-read fixes that case by + checking the first sequence first, so this will not block: + (lazy-co-read + (range 10) + (concat (range 10) (lazy-seq (deref (promise)))))" + [s1 s2] + (lazy-seq (when-not (empty? s1) + (cons [(first s1) (first s2)] + (lazy-co-read (rest s1) (rest s2)))))) + +(defn subseqs + "Given a sequence s, return a lazy, non-head-holding sequence of + (s (drop 1 s) (drop 2 s) ... '())" + [s] + (reductions (fn [l _] (rest l)) s s)) diff --git a/test/com/climate/claypoole/impl_test.clj b/test/com/climate/claypoole/impl_test.clj index f3cc20d..918f098 100644 --- a/test/com/climate/claypoole/impl_test.clj +++ b/test/com/climate/claypoole/impl_test.clj @@ -17,9 +17,20 @@ [com.climate.claypoole.impl :as impl])) -(deftest test-map-indexed-with-rest - (is (= (impl/map-indexed-with-rest vector [:x :y :z]) - '([0 :x ([1 :y ([2 :z ()])] - [2 :z ()])] - [1 :y ([2 :z ()])] - [2 :z ()])))) +(deftest test-queue-seq + (let [[q qs] (impl/queue-seq)] + (doseq [i (range 10)] + (impl/queue-seq-add! q i)) + (impl/queue-seq-end! q) + (is (= (range 10) qs)))) + +(deftest test-lazy-co-read + (let [s1 (range 10) + s2 (concat (range 10) (lazy-seq (deref (promise))))] + (is (= (map #(list % %) (range 10)) + (impl/lazy-co-read s1 s2))))) + +(deftest test-subseqs + (let [n 10] + (is (= (impl/subseqs (range n)) + (map #(drop % (range n)) (range (inc n))))))) diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index c6b0250..7154d41 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -603,6 +603,35 @@ (deliver finish :done) @task-runner)) +(defn check-read-ahead + "Verify that this pmap function does not read too far ahead in the input + sequence, as that can cause unnecessary use of RAM." + [pmap-fn] + (let [a (atom nil) + indicator #(do (reset! a %) a) + finish (promise) + started (promise) + results (pmap-fn 4 deref + (concat ;; indicate we've started + (repeatedly 1 #(do (deliver started true) + started)) + ;; block the map + (repeat 10 finish) + ;; a long runway + (map atom (range 100)) + ;; an indicator for whether we've realized + ;; past the runway + (map indicator [:started])))] + ;; Let the tasks run + @started + ;; Let the threadpool run unchecked for a minute + (Thread/sleep 100) + ;; Verify that the indicator wasn't triggered + (is (= nil @a)) + ;; Complete the map + (deliver finish :done) + (dorun results))) + (defn check-shuts-off [pmap-like] (cp/with-shutdown! [pool 2] @@ -649,6 +678,8 @@ (when lazy? (testing (format "%s doesn't hold the head of lazy sequences" fn-name) (check-holding-thread pmap-like)) + (testing (format "%s doesn't read ahead in the input sequence" fn-name) + (check-read-ahead pmap-like)) (testing (format "%s can be chained in various threadpools" fn-name) (check-chaining pmap-like)) (testing (format "%s stops processing when an exception occurs" fn-name)