Skip to content

Commit

Permalink
Fix pmap exception stopping bug
Browse files Browse the repository at this point in the history
This fixes the issue where pmap would keep running tasks even after one
threw an exception.
  • Loading branch information
leon-barrett committed May 25, 2014
1 parent 19c6c3a commit d162563
Showing 1 changed file with 99 additions and 69 deletions.
168 changes: 99 additions & 69 deletions src/clj/com/climate/claypoole.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
PriorityThreadpoolImpl]
[java.util.concurrent
Callable
CancellationException
ExecutorService
Future
LinkedBlockingQueue
Expand Down Expand Up @@ -263,7 +264,91 @@
serial. This may be helpful during profiling, for example.
"
[pool & body]
`(future-call ~pool (^{:once true} fn [] ~@body)))
`(future-call ~pool (^{:once true} fn future-body [] ~@body)))

(defn- make-canceller
"Creates a function to cancel a bunch of futures."
[future-reader future-seq]
(let [first-already-cancelled (atom Long/MAX_VALUE)]
(fn [i]
(let [cancel-end @first-already-cancelled]
;; Don't re-kill futures we've already zapped to prevent an O(n^2)
;; explosion.
(when (< i cancel-end)
(swap! first-already-cancelled min i)
;; Kill the future reader.
(future-cancel future-reader)
;; Stop the tasks above i before cancel-end.
(doseq [f (->> future-seq (take cancel-end) (drop (inc i)))]
(future-cancel f)))))))

(defn- pmap-core
"Given functions to customize for pmap or upmap, create a function that does
the hard work of pmap."
[send-result read-result]
(fn [pool f arg-seqs]
(let [[shutdown? pool] (impl/->threadpool pool)
;; Use map to handle the argument sequences.
args (apply map vector (map impl/unchunk arg-seqs))
;; Pre-declare the canceller because it needs the tasks but the tasks
;; need it too.
canceller (promise)
start-task (fn [i a]
;; We can't directly make a future add itself to a
;; queue. Instead, we use a promise for indirection.
(let [p (promise)]
(deliver p (future-call
pool
(with-meta
;; Try to run the task, but
;; definitely add the future to
;; the queue.
#(try
(let [result (apply f a)]
(send-result @p)
result)
;; Even if we had an exception
;; running the task, make sure the
;; future shows up in the queue.
(catch Exception e
;; We've still got to send that
;; result, even if it was an
;; exception, and we have to do it
;; before we start the canceller.
(send-result @p)
;; If we've had an exception, kill
;; future and ongoing processes.
(@canceller i)
(throw e)))
;; Add the args to the function's
;; metadata for prioritization.
{:args a})))
@p))
futures (map-indexed start-task args)
;; Start all the tasks in a real future, so we don't block.
read-future (core/future
(try
;; Force all those futures to start.
(dorun futures)
;; If we created a temporary pool, shut it down.
(finally (when shutdown? (shutdown pool)))))]
(deliver canceller (make-canceller read-future futures))
;; Read results as available.
(concat (map read-result futures)
;; Deref the read-future to get its exceptions, if it has any.
(lazy-seq (try @read-future
;; But if it was cancelled, the user doesn't care.
(catch CancellationException e)))))))

(defn- pmap-boilerplate
"Do boilerplate pmap checks, then call the real pmap function."
[pool f arg-seqs pmap-fn]
(when (empty? arg-seqs)
(throw (IllegalArgumentException.
"pmap requires at least one sequence to map over")))
(if (serial? pool)
(doall (apply map f arg-seqs))
(pmap-fn pool f arg-seqs)))

(defn pmap
"Like clojure.core.pmap, except:
Expand All @@ -286,78 +371,23 @@
serial via (doall map). This may be helpful during profiling, for example.
"
[pool f & arg-seqs]
(when (empty? arg-seqs)
(throw (IllegalArgumentException.
"pmap requires at least one sequence to map over")))
(if (serial? pool)
(doall (apply map f arg-seqs))
(let [[shutdown? pool] (impl/->threadpool pool)
;; Use map to handle the argument sequences.
args (apply map vector (map impl/unchunk arg-seqs))
futures (for [a args]
(future-call pool
(with-meta #(apply f a)
{:args a})))
;; Start eagerly parallel processing.
read-future (core/future
(try
(dorun futures)
(finally (when shutdown? (shutdown pool)))))]
;; Read results as available.
(concat (map deref futures)
;; Deref the reading future to get its exceptions, if it had any.
(lazy-seq (deref read-future))))))
(pmap-boilerplate pool f arg-seqs
;; pmap is easy--just deref the futures.
(let [send-result (constantly nil)
read-result deref]
(pmap-core send-result read-result))))

(defn upmap
"Like pmap, except the return value is a sequence of results ordered by
"Like pmap, except that the return value is a sequence of results ordered by
*completion time*, not by input order."
[pool f & arg-seqs]
(when (empty? arg-seqs)
(throw (IllegalArgumentException.
"upmap requires at least one sequence to map over")))
(if (serial? pool)
(doall (apply map f arg-seqs))
(let [[shutdown? pool] (impl/->threadpool pool)
;; Use map to handle the argument sequences.
args (apply map vector (map impl/unchunk arg-seqs))
q (LinkedBlockingQueue.)
;; Start eagerly parallel processing.
read-future (core/future
;; Try to run schedule all the tasks, but definitely
;; shutdown the pool if necessary.
(try
(doseq [a args
:let [p (promise)]]
;; Try to schedule one task, but definitely add
;; something to the queue for the task.
(try
;; We can't directly make a future add itself to
;; a queue. Instead, we use a promise for
;; indirection.
(deliver p (future-call
pool
(with-meta
;; Try to run the task, but
;; definitely add the future to
;; the queue.
#(try
(apply f a)
;; Even if we had an exception
;; running the task, make sure
;; the future shows up in the
;; queue.
(finally (.add q @p)))
{:args a})))
;; If we had an exception scheduling a task,
;; let's plan to re-throw that at queue read
;; time.
(catch Exception e
(.add q (delay (throw e))))))
(finally (when shutdown? (shutdown pool)))))]
;; Read results as available.
(concat (for [_ args] (-> q .take deref))
;; Deref the reading future to get its exceptions, if it had any.
(lazy-seq (deref read-future))))))
(pmap-boilerplate pool f arg-seqs
;; upmap is a little complex; read data out of a queue to
;; get the earliest-available data.
(let [q (LinkedBlockingQueue.)
send-result (fn [result] (.add q result))
read-result (fn [_] (-> q .take deref))]
(pmap-core send-result read-result))))

(defn pcalls
"Like clojure.core.pcalls, except it takes a threadpool. For more detail on
Expand Down

0 comments on commit d162563

Please sign in to comment.