Skip to content

Commit

Permalink
Merge pull request #17 from TheClimateCorporation/fix-readahead
Browse files Browse the repository at this point in the history
Fix readahead memory leak
  • Loading branch information
leon-barrett committed Oct 30, 2014
2 parents 9181aef + 41d3c73 commit aecf892
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 99 deletions.
172 changes: 97 additions & 75 deletions src/clj/com/climate/claypoole.clj
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
handy for testing."
true)

(def ^:dynamic *default-pmap-buffer*
"This is an advanced configuration option. You probably don't need to set
this!
When doing a pmap, Claypoole pushes input tasks into the threadpool. It
normally tries to keep the threadpool full, plus it adds a buffer of size
nthreads. If it can't find out the number of threads in the threadpool, it
just tries to keep *default-pmap-buffer* tasks in the pool."
200)

(defn ncpus
"Get the number of available CPUs."
[]
Expand Down Expand Up @@ -268,89 +278,111 @@
`(future-call ~pool (^{:once true} fn future-body [] ~@body)))

(defn- make-canceller
"Creates a function to cancel a bunch of futures."
[future-reader]
"Creates a function to cancel a pmap."
[driver]
(let [first-already-cancelled (atom Long/MAX_VALUE)]
(fn [i future-seq]
(fn [i later-tasks]
(let [cancel-end @first-already-cancelled]
;; Don't re-kill futures we've already zapped to prevent an O(n^2)
;; explosion.
(when (< i cancel-end)
(swap! first-already-cancelled min i)
;; Kill the future reader.
(future-cancel future-reader)
;; Kill the pmap driver thread.
(future-cancel driver)
;; Stop the tasks above i before cancel-end.
(doseq [f (->> future-seq (take (- cancel-end i)))]
(doseq [f (->> later-tasks rest (take (- cancel-end i)))]
(future-cancel f)))))))

(defn- buffer-blocking-seq
"Make a lazy sequence that blocks when the map's (imaginary) buffer is full."
[pool unordered-results]
(let [buffer-size (if-let [pool-size (impl/get-pool-size pool)]
(* 2 pool-size)
*default-pmap-buffer*)]
(concat (repeat buffer-size nil)
unordered-results)))

(defn- pmap-core
"Given functions to customize for pmap or upmap, create a function that does
the hard work of pmap."
[send-result read-result]
(fn [pool f arg-seqs]
(let [[shutdown? pool] (impl/->threadpool pool)
;; Use map to handle the argument sequences.
args (apply map vector (map impl/unchunk arg-seqs))
;; Pre-declare the canceller because it needs the tasks but the tasks
;; need it too.
canceller (promise)
start-task (fn [i a later-tasks]
;; We can't directly make a future add itself to a
;; queue. Instead, we use a promise for indirection.
(let [p (promise)]
(deliver p (future-call
pool
(with-meta
;; Try to run the task, but
;; definitely add the future to
;; the queue.
#(try
(let [result (apply f a)]
(send-result @p)
result)
;; Even if we had an error
;; running the task, make sure the
;; future shows up in the queue.
(catch Throwable t
;; We've still got to send that
;; result, even if it was an
;; exception, and we have to do it
;; before we start the canceller.
(send-result @p)
;; If we've had an exception, kill
;; future and ongoing processes.
(@canceller i later-tasks)
;; Re-throw that throwable!
(throw t)))
;; Add the args to the function's
;; metadata for prioritization.
{:args a})))
@p))
futures (impl/map-indexed-with-rest start-task args)
;; Start all the tasks in a real future, so we don't block.
read-future (core/future
(try
;; Force all those futures to start.
(dorun futures)
;; If we created a temporary pool, shut it down.
(finally (when shutdown? (shutdown pool)))))]
(deliver canceller (make-canceller read-future))
;; Read results as available.
(concat (map read-result futures)
;; Deref the read-future to get its exceptions, if it has any.
(lazy-seq (try @read-future
;; But if it was cancelled, the user doesn't care.
(catch CancellationException e)))))))
[pool ordered? f arg-seqs]
(let [[shutdown? pool] (impl/->threadpool pool)
;; Use map to handle the argument sequences.
args (apply map vector (map impl/unchunk arg-seqs))
;; Pre-declare the canceller because it needs the tasks but the tasks
;; need it too.
canceller (promise)
;; Set up queues of tasks and results
[task-q tasks] (impl/queue-seq)
[unordered-results-q unordered-results] (impl/queue-seq)
;; This is how we'll actually make things go.
start-task (fn [i a later-tasks]
;; We can't directly make a future add itself to a
;; queue. Instead, we use a promise for indirection.
(let [p (promise)]
(deliver p (future-call
pool
(with-meta
;; Try to run the task, but definitely
;; add the future to the queue.
#(try
(let [result (apply f a)]
(impl/queue-seq-add!
unordered-results-q @p)
result)
;; Even if we had an error running
;; the task, make sure the future
;; shows up in the queue.
(catch Throwable t
;; We've still got to send that
;; result, even if it was an
;; exception, and we have to do it
;; before we start the canceller.
(impl/queue-seq-add!
unordered-results-q @p)
;; If we've had an exception, kill
;; future and ongoing processes.
(@canceller i later-tasks)
;; Re-throw that throwable!
(throw t)))
;; Add the args to the function's
;; metadata for prioritization.
{:args a})))
@p))
;; Start all the tasks in a real future, so we don't block.
driver (core/future
(try
(doseq [[i a later-tasks _]
(map vector (range) args (impl/subseqs tasks)
;; The driver thread reads from this sequence
;; and ignores the result, just to get the side
;; effect of blocking when the map's
;; (imaginary) buffer is full.
(buffer-blocking-seq pool unordered-results))]
(impl/queue-seq-add! task-q (start-task i a later-tasks)))
(finally
(impl/queue-seq-end! task-q)
(when shutdown? (shutdown pool)))))]
(deliver canceller (make-canceller driver))
;; Read results as available.
(concat (map deref
(if ordered?
tasks
(map second (impl/lazy-co-read tasks unordered-results))))
;; Deref the read-future to get its exceptions, if it has any.
(lazy-seq (try @driver
;; But if it was cancelled, the user doesn't care.
(catch CancellationException e))))))

(defn- pmap-boilerplate
"Do boilerplate pmap checks, then call the real pmap function."
[pool f arg-seqs pmap-fn]
[pool ordered? f arg-seqs]
(when (empty? arg-seqs)
(throw (IllegalArgumentException.
"pmap requires at least one sequence to map over")))
(if (serial? pool)
(doall (apply map f arg-seqs))
(pmap-fn pool f arg-seqs)))
(pmap-core pool ordered? f arg-seqs)))

(defn pmap
"Like clojure.core.pmap, except:
Expand All @@ -373,23 +405,13 @@
serial via (doall map). This may be helpful during profiling, for example.
"
[pool f & arg-seqs]
(pmap-boilerplate pool f arg-seqs
;; pmap is easy--just deref the futures.
(let [send-result (constantly nil)
read-result deref]
(pmap-core send-result read-result))))
(pmap-boilerplate pool true f arg-seqs))

(defn upmap
"Like pmap, except that the return value is a sequence of results ordered by
*completion time*, not by input order."
[pool f & arg-seqs]
(pmap-boilerplate pool f arg-seqs
;; upmap is a little complex; read data out of a queue to
;; get the earliest-available data.
(let [q (LinkedBlockingQueue.)
send-result (fn [result] (.add q result))
read-result (fn [_] (-> q .take deref))]
(pmap-core send-result read-result))))
(pmap-boilerplate pool false f arg-seqs))

(defn pcalls
"Like clojure.core.pcalls, except it takes a threadpool. For more detail on
Expand Down
83 changes: 65 additions & 18 deletions src/clj/com/climate/claypoole/impl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Executors
ExecutorService
Future
LinkedBlockingQueue
ScheduledExecutorService
ThreadFactory
TimeoutException
Expand Down Expand Up @@ -214,23 +215,69 @@
(str "Claypoole functions require a threadpool, a "
"number, :builtin, or :serial, not %s.") arg)))))

(defn map-indexed-with-rest
"Given f and xs, return a sequence of (f i x s) for every x in xs where
s is the results of the rest of the map.
(defn get-pool-size
"If the pool has a max size, get that; else, return nil."
[pool]
(cond
(instance? java.util.concurrent.ScheduledThreadPoolExecutor pool)
(.getCorePoolSize pool)

i.e. (map-indexed-with-rest #(do (prn [%1 %2 %3]) %2) [:x :y :z])
would print
[0 :x (:y :z)]
[1 :y (:z)]
[2 :z ()]
(instance? java.util.concurrent.ThreadPoolExecutor pool)
(.getMaximumPoolSize pool)

This is handy for lazy sequences that might need to see later elements
in the sequence, e.g. for cancelling futures."
[f xs & [i]]
(lazy-seq
(when (not (empty? xs))
(let [i (or i 0)
x (first xs)
more (rest xs)
results (map-indexed-with-rest f more (inc i))]
(cons (f i x results) results)))))
:else
nil))

;; Queue-seq needs a unique item that, when seen in a queue, indicates that the
;; sequence has ended. It uses the private object end-marker, and uses
;; identical? to check against this object's (unique) memory address.
(let [end-marker (Object.)]

(defn- queue-reader
"Make a lazy sequence from a queue, stopping upon reading the unique
end-marker object."
[q]
(lazy-seq
(let [x (.take q)]
(when-not (identical? x end-marker)
(cons x (queue-reader q))))))

(defn queue-seq
"Create a queue and a lazy sequence that reads from that queue."
[]
(let [q (LinkedBlockingQueue.)]
[q (queue-reader q)]))

(defn queue-seq-add!
"Add an item to a queue (and its lazy sequence)."
[q x]
(.add q x))

(defn queue-seq-end!
"End a lazy sequence reading from a queue."
[q]
(queue-seq-add! q end-marker)))

(defn lazy-co-read
"Zip s1 and s2, stopping when s1 stops. This helps avoid potential blocking
when trying to read queue sequences.
In particular, this will block:
(map vector
(range 10)
(concat (range 10) (lazy-seq (deref (promise)))))
even though we only can read 10 things. Lazy-co-read fixes that case by
checking the first sequence first, so this will not block:
(lazy-co-read
(range 10)
(concat (range 10) (lazy-seq (deref (promise)))))"
[s1 s2]
(lazy-seq (when-not (empty? s1)
(cons [(first s1) (first s2)]
(lazy-co-read (rest s1) (rest s2))))))

(defn subseqs
"Given a sequence s, return a lazy, non-head-holding sequence of
(s (drop 1 s) (drop 2 s) ... '())"
[s]
(reductions (fn [l _] (rest l)) s s))
23 changes: 17 additions & 6 deletions test/com/climate/claypoole/impl_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@
[com.climate.claypoole.impl :as impl]))


(deftest test-map-indexed-with-rest
(is (= (impl/map-indexed-with-rest vector [:x :y :z])
'([0 :x ([1 :y ([2 :z ()])]
[2 :z ()])]
[1 :y ([2 :z ()])]
[2 :z ()]))))
(deftest test-queue-seq
(let [[q qs] (impl/queue-seq)]
(doseq [i (range 10)]
(impl/queue-seq-add! q i))
(impl/queue-seq-end! q)
(is (= (range 10) qs))))

(deftest test-lazy-co-read
(let [s1 (range 10)
s2 (concat (range 10) (lazy-seq (deref (promise))))]
(is (= (map #(list % %) (range 10))
(impl/lazy-co-read s1 s2)))))

(deftest test-subseqs
(let [n 10]
(is (= (impl/subseqs (range n))
(map #(drop % (range n)) (range (inc n)))))))
31 changes: 31 additions & 0 deletions test/com/climate/claypoole_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,35 @@
(deliver finish :done)
@task-runner))

(defn check-read-ahead
"Verify that this pmap function does not read too far ahead in the input
sequence, as that can cause unnecessary use of RAM."
[pmap-fn]
(let [a (atom nil)
indicator #(do (reset! a %) a)
finish (promise)
started (promise)
results (pmap-fn 4 deref
(concat ;; indicate we've started
(repeatedly 1 #(do (deliver started true)
started))
;; block the map
(repeat 10 finish)
;; a long runway
(map atom (range 100))
;; an indicator for whether we've realized
;; past the runway
(map indicator [:started])))]
;; Let the tasks run
@started
;; Let the threadpool run unchecked for a minute
(Thread/sleep 100)
;; Verify that the indicator wasn't triggered
(is (= nil @a))
;; Complete the map
(deliver finish :done)
(dorun results)))

(defn check-shuts-off
[pmap-like]
(cp/with-shutdown! [pool 2]
Expand Down Expand Up @@ -649,6 +678,8 @@
(when lazy?
(testing (format "%s doesn't hold the head of lazy sequences" fn-name)
(check-holding-thread pmap-like))
(testing (format "%s doesn't read ahead in the input sequence" fn-name)
(check-read-ahead pmap-like))
(testing (format "%s can be chained in various threadpools" fn-name)
(check-chaining pmap-like))
(testing (format "%s stops processing when an exception occurs" fn-name)
Expand Down

0 comments on commit aecf892

Please sign in to comment.