From 888c1247bd447779d190c8f8cd9dfabd4d43b137 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Wed, 12 Feb 2014 10:51:13 -0800 Subject: [PATCH 01/20] Made with-shutdown! take variable # of pools Made with-shutdown! take a variable number of pools. Also changed README to make with-shutdown! more prominent. --- README.md | 23 ++++++++++++----------- src/com/climate/claypoole.clj | 27 +++++++++++++++++---------- test/com/climate/claypoole_test.clj | 26 ++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index bca8e3c..7585138 100644 --- a/README.md +++ b/README.md @@ -79,17 +79,18 @@ possible. ```clojure (require '[com.climate.claypoole :as cp]) -(def net-pool (cp/threadpool 100)) -(def cpu-pool (cp/threadpool (cp/ncpus))) -;; Unordered pmap doesn't return output in the same order as the input(!), but -;; that means we can start using service2 as soon as possible. -(def service1-responses (cp/upmap net-pool service1-request myinputs)) -(def service2-responses (cp/upmap net-pool service2-request service1-responses)) -(def results (cp/upmap cpu-pool handle-response service2-responses)) -;; ...eventually... -;; The JVM doesn't automatically clean up threads for us. -(cp/shutdown net-pool) -(cp/shutdown cpu-pool) +;; We'll use the with-shutdown! form to guarantee that pools are cleaned up. +(cp/with-shutdown! [net-pool (cp/threadpool 100) + cpu-pool (cp/threadpool (cp/ncpus))] + ;; Unordered pmap doesn't return output in the same order as the input(!), + ;; but that means we can start using service2 as soon as possible. + (def service1-resps (cp/upmap net-pool service1-request myinputs)) + (def service2-resps (cp/upmap net-pool service2-request service1-resps)) + (def results (cp/upmap cpu-pool handle-response service2-resps)) + ;; ...eventually... + ;; Make sure we make sure the computation is complete before we shutdown the + ;; pools. + (doall results)) ``` Claypoole provides ordered and unordered parallel `for` macros. Note that only diff --git a/src/com/climate/claypoole.clj b/src/com/climate/claypoole.clj index f087160..48ab90b 100644 --- a/src/com/climate/claypoole.clj +++ b/src/com/climate/claypoole.clj @@ -96,8 +96,9 @@ (with-shutdown! [pool (theadpool 6)] (doall (pmap pool identity (range 1000)))) - (with-shutdown! [pool 6] - (doall (pmap pool identity (range 1000)))) + (with-shutdown! [pool1 6 + pool2 :serial] + (doall (pmap pool1 identity (range 1000)))) Bad example: @@ -105,14 +106,20 @@ ;; Some of these tasks may be killed! (pmap pool identity (range 1000))) " - [[pool-sym pool-init] & body] - `(let [pool-init# ~pool-init] - (let [[_# ~pool-sym] (impl/->threadpool pool-init#)] - (try - ~@body - (finally - (when (threadpool? ~pool-sym) - (shutdown! ~pool-sym))))))) + [pool-syms-and-inits & body] + (when-not (even? (count pool-syms-and-inits)) + (throw (IllegalArgumentException. + "with-shutdown! requires an even number of binding forms"))) + (if (empty? pool-syms-and-inits) + `(do ~@body) + (let [[pool-sym pool-init & more] pool-syms-and-inits] + `(let [pool-init# ~pool-init + [_# ~pool-sym] (impl/->threadpool pool-init#)] + (try + (with-shutdown! ~more ~@body) + (finally + (when (threadpool? ~pool-sym) + (shutdown! ~pool-sym)))))))) (defn- serial? "Check if we should run this computation in serial." diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index 9f60717..bf66574 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -118,6 +118,32 @@ (Thread/sleep 5) (is (.isDone @@fp)) (is (thrown? ExecutionException (deref @@fp))))))) + (testing "With-shutdown! works with any number of threadpools" + (let [input (range 100)] + (is (= input + (cp/with-shutdown! [] + (map identity input)))) + (is (= input + (cp/with-shutdown! [p1 4] + (->> input + (cp/pmap p1 identity) + doall)))) + (is (= input + (cp/with-shutdown! [p1 4 + p2 3] + (->> input + (cp/pmap p1 identity) + (cp/pmap p2 identity) + doall)))) + (is (= input + (cp/with-shutdown! [p1 4 + p2 3 + p3 5] + (->> input + (cp/pmap p1 identity) + (cp/pmap p2 identity) + (cp/pmap p3 identity) + doall)))))) (testing "Invalid with-shutdown! arguments" (is (thrown? IllegalArgumentException (cp/with-shutdown! [pool 1.5] nil))) From b5a99cc1d9a347d441d40e8f7fa55bf357a78146 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Wed, 12 Feb 2014 10:55:00 -0800 Subject: [PATCH 02/20] Clarify why John Claypoole links to Betsy Ross. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7585138..7e23f25 100644 --- a/README.md +++ b/README.md @@ -202,8 +202,8 @@ like so: ## Why the name "Claypoole"? -The claypoole library is named after [John -Claypoole](http://en.wikipedia.org/wiki/Betsy_Ross) for reasons that are at +The claypoole library is named after [John Claypoole (Betsy Ross's third +husband)](http://en.wikipedia.org/wiki/Betsy_Ross) for reasons that are at best obscure. ## License From 4cd73def6238a2c253daa53a1be000e5659a02a4 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 13 Feb 2014 17:11:09 -0800 Subject: [PATCH 03/20] Add priority threadpools. --- .gitignore | 1 + README.md | 41 +++- project.clj | 4 +- src/{ => clj}/com/climate/claypoole.clj | 123 ++++++++-- src/{ => clj}/com/climate/claypoole/impl.clj | 85 ++++++- .../climate/claypoole/impl/Prioritized.java | 19 ++ .../claypoole/impl/PriorityFutureTask.java | 51 +++++ .../impl/PriorityThreadpoolImpl.java | 75 +++++++ test/com/climate/claypoole_test.clj | 210 ++++++++++++++++-- 9 files changed, 564 insertions(+), 45 deletions(-) rename src/{ => clj}/com/climate/claypoole.clj (70%) rename src/{ => clj}/com/climate/claypoole/impl.clj (63%) create mode 100644 src/java/com/climate/claypoole/impl/Prioritized.java create mode 100644 src/java/com/climate/claypoole/impl/PriorityFutureTask.java create mode 100644 src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java diff --git a/.gitignore b/.gitignore index 6e2b14e..96cf2b0 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ pom.xml *.jar .*.swp .lein-failures +.lein-plugins .lein-repl-history .nrepl-port /classes/ diff --git a/README.md b/README.md index 7e23f25..8ebe5b5 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,16 @@ binding will be done in the calling thread. (myfn x y))) ``` +Claypoole also lets you prioritize your backlog of tasks. Higher-priority tasks +will be assigned to threads first. See advanced usage in a section below. + +```clojure +(def pool (cp/priority-threadpool 10)) +(def task1 (cp/future (cp/with-priority pool 1000) (myfn 1))) +(def task2 (cp/future (cp/with-priority pool 0) (myfn 2))) +(def moretasks (cp/pmap (cp/with-priority pool 10) myfn (range 3 10))) +``` + ## Do I really need to manage all those threadpools? You don't need to specifically declare a threadpool. Instead, you can just give @@ -170,7 +180,8 @@ exits. You can create a daemon threadpool via: To construct a threadpool, use the `threadpool` function. It takes optional keyword arguments allowing you to change the thread names, their daemon status, -and their priority. +and their priority. (NOTE: Thread priority is a system-level thing that depends +on the OS; it is not the same as the task priority, described below.) ```clojure (def pool (cp/threadpool (cp/ncpus) @@ -200,6 +211,34 @@ like so: (cp/pmap pool myfn inputs))) ``` +## How can I prioritize my tasks? + +You can create a threadpool that respects priorities by creating a +`priority-threadpool`: + +```clojure +(def p1 (cp/priority-threadpool 5)) +(def p2 (cp/priority-threadpool 5 :default-priority -10)) +``` + +Then, use functions `with-priority` and `with-priority-fn` to set the priority +of your tasks: + +```clojure +(cp/future (cp/with-priority p1 100) (myfn)) +;; Nothing bad happens if you nest with-priority. The outermost one "wins"; +;; this task runs at priority 2. +(cp/future (cp/with-priority (cp-with-priority p1 1) 2) (myfn)) +;; For pmaps, you can use a priority function. This will run 3 tasks at +priorities 6, 5, and 4, respectively. +(cp/upmap (cp/with-priority-fn p1 (fn [x y] x) + [6 5 4] [1 2 3]) +;; For for loops, you can use the special :priority binding, which must be the +;; last for binding. +(cp/upfor p1 [i (range 10) + :priority (- i)] + (myfn i)) +``` + ## Why the name "Claypoole"? The claypoole library is named after [John Claypoole (Betsy Ross's third diff --git a/project.clj b/project.clj index 2f270d8..5e7fd9b 100644 --- a/project.clj +++ b/project.clj @@ -12,13 +12,15 @@ ;; and limitations under the License. (defproject com.climate/claypoole - "0.1.0" + "0.2.0-SNAPSHOT" :description "Claypoole: Threadpool tools for Clojure." :url "http://github.com/TheClimateCorporation/claypoole/" :license {:name "Apache License Version 2.0" :url http://www.apache.org/licenses/LICENSE-2.0 :distribution :repo} :min-lein-version "2.0.0" + :source-paths ["src/clj"] + :java-source-paths ["src/java"] :pedantic? :warn :dependencies [[org.clojure/clojure "1.5.1"]] :plugins [[jonase/eastwood "0.1.0"]]) diff --git a/src/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj similarity index 70% rename from src/com/climate/claypoole.clj rename to src/clj/com/climate/claypoole.clj index 48ab90b..13318e3 100644 --- a/src/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -18,11 +18,17 @@ [clojure.core :as core] [com.climate.claypoole.impl :as impl]) (:import + [com.climate.claypoole.impl + PriorityThreadpool + PriorityThreadpoolImpl] [java.util.concurrent Callable ExecutorService Future - LinkedBlockingQueue])) + LinkedBlockingQueue + PriorityBlockingQueue + ThreadPoolExecutor + TimeUnit])) (def ^:dynamic *parallel* @@ -51,18 +57,68 @@ :thread-priority, an integer in [Thread/MIN_PRIORITY, Thread/MAX_PRIORITY] giving the priority of each thread, defaults to the priority of the current thread" - ([] (threadpool (+ 2 (ncpus)))) - ([n & {:keys [daemon thread-priority] pool-name :name}] - (impl/threadpool n - :daemon daemon - :name pool-name - :thread-priority thread-priority))) + ;; NOTE: Although I'm repeating myself, I list all the threadpool-factory + ;; arguments explicitly for API clarity. + [n & {:keys [daemon thread-priority] pool-name :name}] + (impl/threadpool n + :daemon daemon + :name pool-name + :thread-priority thread-priority)) + +(defn priority-threadpool + "Make a threadpool that chooses tasks based on their priorities. + + Assign priorities to tasks by wrapping the pool with with-priority or + with-priority-fn. + + You can also set a default priority with keyword argument :default-priority. + Otherwise, this uses the same keyword arguments as threadpool." + ^PriorityThreadpool + [n & {:keys [default-priority] :as args + :or {default-priority 0}}] + (PriorityThreadpool. + (PriorityThreadpoolImpl. n + ;; Use our thread factory options. + (impl/apply-map impl/thread-factory args) + default-priority) + (constantly default-priority))) + +(defn with-priority-fn + "Make a priority-threadpool wrapper that uses a given priority function. + + The priority function is applied to a pmap'd function's arguments. e.g. + + (upmap (with-priority-fn p first) + [6 5 4] [1 2 3]) + + will use pool p to run tasks [(+ 6 1) (+ 5 2) (+ 4 3)] + with priorities [6 5 4]." + ^PriorityThreadpool [^PriorityThreadpool pool priority-fn] + (let [^PriorityThreadpoolImpl pool* (.pool pool)] + (PriorityThreadpool. pool* priority-fn))) + +(defn with-priority + "Make a priority-threadpool wrapper with a given fixed priority. + + All tasks run with this pool wrapper will have the given priority. e.g. + + (def t1 (future (with-priority p 1) 1)) + (def t2 (future (with-priority p 2) 2)) + (def t3 (future (with-priority p 3) 3)) + + will use pool p to run these tasks with priorities 1, 2, and 3 respectively." + ^ExecutorService [^ExecutorService pool priority] + (with-priority-fn pool (constantly priority))) (defn threadpool? "Returns true iff the argument is a threadpool." [pool] (instance? ExecutorService pool)) +(defn priority-threadpool? + "Returns true iff the argument is a priority-threadpool." + [pool] + (instance? PriorityThreadpool pool)) + (defn shutdown "Syntactic sugar to stop a pool cleanly. This will stop the pool from accepting any new requests." @@ -199,11 +255,17 @@ serial via (doall map). This may be helpful during profiling, for example. " [pool f & arg-seqs] + (when (empty? arg-seqs) + (throw (IllegalArgumentException. + "pmap requires at least one sequence to map over"))) (if (serial? pool) (doall (apply map f arg-seqs)) (let [[shutdown? pool] (impl/->threadpool pool) args (apply map vector (map impl/unchunk arg-seqs)) - futures (map (fn [a] (future pool (apply f a))) args) + futures (map (fn [a] (future-call pool + (with-meta #(apply f a) + {:args a}))) + args) ;; Start eagerly parallel processing. read-future (core/future (try @@ -218,6 +280,9 @@ "Like pmap, except the return value is a sequence of results ordered by *completion time*, not by input order." [pool f & arg-seqs] + (when (empty? arg-seqs) + (throw (IllegalArgumentException. + "upmap requires at least one sequence to map over"))) (if (serial? pool) (doall (apply map f arg-seqs)) (let [[shutdown? pool] (impl/->threadpool pool) @@ -228,9 +293,12 @@ (try (doseq [a args :let [p (promise)]] - (deliver p (future pool (try - (apply f a) - (finally (.add q @p)))))) + (deliver p (future-call pool + (with-meta + #(try + (apply f a) + (finally (.add q @p))) + {:args a})))) (finally (when shutdown? (shutdown pool)))))] ;; Read results as available. (concat (for [_ args] (-> q .take deref)) @@ -263,6 +331,26 @@ [pool & exprs] `(upcalls ~pool ~@(for [e exprs] `(fn [] ~e)))) +(defn- pfor-internal + "Do the messy parsing of the :priority from the for bindings." + [pool bindings body pmap-fn-sym] + (when (vector? pool) + (throw (IllegalArgumentException. + (str "Got a vector instead of a pool--" + "did you forget to use a threadpool?")))) + (if-not (= :priority (first (take-last 2 bindings))) + ;; If there's no priority, everything is simple. + `(~pmap-fn-sym ~pool #(%) (for ~bindings (fn [] ~@body))) + ;; If there's a priority, God help us--let's pull that thing out. + (let [bindings* (vec (drop-last 2 bindings)) + priority-value (last bindings)] + `(let [pool# (with-priority-fn ~pool (fn [_# p#] p#)) + [fns# priorities#] (apply map vector + (for ~bindings* + [(fn [priority#] ~@body) + ~priority-value]))] + (~pmap-fn-sym pool# #(%1 %2) fns# priorities#))))) + (defmacro pfor "A parallel version of for. It is like for, except it takes a threadpool and is parallel. For more detail on its parallelism and on its threadpool @@ -272,10 +360,12 @@ in serial, so while this will call complex-computation in parallel: (pfor pool [i (range 1000)] (complex-computation i)) this will not have useful parallelism: - (pfor pool [i (range 1000) :let [result (complex-computation i)] result) + (pfor pool [i (range 1000) :let [result (complex-computation i)]] result) + + You can use the special binding :priority to set the priorities of the tasks. " [pool bindings & body] - `(apply pcalls ~pool (for ~bindings (fn [] ~@body)))) + (pfor-internal pool bindings body `pmap)) (defmacro upfor "An unordered, parallel version of for. It is like for, except it takes a @@ -291,6 +381,11 @@ (upfor pool [i (range 1000) :let [result (complex-computation i)]] result) + + You can use the special binding :priority to set the priorities of the tasks. + (upfor (priority-threadpool 10) [i (range 1000) + :priority (inc i)] + (complex-computation i)) " [pool bindings & body] - `(apply upcalls ~pool (for ~bindings (fn [] ~@body)))) + (pfor-internal pool bindings body `upmap)) diff --git a/src/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj similarity index 63% rename from src/com/climate/claypoole/impl.clj rename to src/clj/com/climate/claypoole/impl.clj index 88079b8..a50b18e 100644 --- a/src/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -16,7 +16,17 @@ (:require [clojure.core :as core]) (:import + [clojure.lang + IFn] + [com.climate.claypoole.impl + Prioritized + PriorityThreadpoolImpl] + [java.util + Collection + Comparator + List] [java.util.concurrent + ExecutionException Executors ExecutorService Future @@ -26,12 +36,15 @@ (defn binding-conveyor-fn - "Like clojure.core/binding-conveyor-fn." + "Like clojure.core/binding-conveyor-fn for resetting bindings to run a + function in another thread." [f] (let [frame (clojure.lang.Var/cloneThreadBindingFrame)] - (fn [] - (clojure.lang.Var/resetThreadBindingFrame frame) - (f)))) + (with-meta + (fn [] + (clojure.lang.Var/resetThreadBindingFrame frame) + (f)) + (meta f)))) (defn deref-future "Like clojure.core/deref-future." @@ -85,6 +98,13 @@ [] (format "claypoole-%d" (swap! threadpool-id inc))) +(defn apply-map + "Apply a function that takes keyword arguments to a map of arguments." + [f & args] + (let [args (drop-last args) + arg-map (last args)] + (apply f (concat args (mapcat identity arg-map))))) + (defn thread-factory "Create a ThreadFactory with options including thread daemon status, the thread name format (a string for format with one integer), and a thread @@ -92,7 +112,8 @@ ^ThreadFactory [& {:keys [daemon thread-priority] pool-name :name}] (let [daemon* (boolean daemon) pool-name* (or pool-name (default-threadpool-name)) - thread-priority* (or thread-priority (.getPriority (Thread/currentThread)))] + thread-priority* (or thread-priority + (.getPriority (Thread/currentThread)))] (let [default-factory (Executors/defaultThreadFactory) ;; The previously-used thread ID. Start at -1 so we can just use the ;; return value of (swap! inc). @@ -119,12 +140,58 @@ See docs in com.climate.claypoole/threadpool. " - [n & {:keys [daemon thread-priority] pool-name :name}] - (let [factory (thread-factory :daemon daemon - :name pool-name - :thread-priority thread-priority)] + [n & args] + (let [factory (apply thread-factory args)] (Executors/newFixedThreadPool n factory))) +(defn- prioritize + "Apply a priority function to a task. + + Note that this re-throws all priority-fn exceptions as ExecutionExceptions. + That shouldn't mess anything up because the caller re-throws it as an + ExecutionException anyway." + [task, ^IFn priority-fn] + (let [priority (try + (long (apply priority-fn (-> task meta :args))) + (catch Exception e + (throw (ExecutionException. + "Priority function exception" e))))] + (reify + Callable + (call [_] (.call ^Callable task)) + Runnable + (run [_] (.run ^Runnable task)) + Prioritized + (getPriority [_] priority)))) + +;; A Threadpool that has a priority function. +(deftype PriorityThreadpool [^PriorityThreadpoolImpl pool, ^IFn priority-fn] + ExecutorService + (^boolean awaitTermination [_, ^long timeout, ^TimeUnit unit] + (.awaitTermination pool timeout unit)) + (^List invokeAll [_, ^Collection tasks] + (.invokeAll pool tasks)) + (^List invokeAll [_, ^Collection tasks, ^long timeout, ^TimeUnit unit] + (.invokeAll pool tasks timeout unit)) + (^Object invokeAny [_, ^Collection tasks] + (.invokeAny pool tasks)) + (^Object invokeAny [_, ^Collection tasks, ^long timeout, ^TimeUnit unit] + (.invokeAny pool tasks timeout unit)) + (^boolean isShutdown [_] + (.isShutdown pool)) + (^boolean isTerminated [_] + (.isTerminated pool)) + (shutdown [_] + (.shutdown pool)) + (^List shutdownNow [_] + (.shutdownNow pool)) + (^Future submit [_, ^Runnable task] + (.submit pool ^Runnable (prioritize task priority-fn))) + (^Future submit [_, ^Runnable task, ^Object result] + (.submit pool ^Runnable (prioritize task priority-fn) result)) + (^Future submit [_ ^Callable task] + (.submit pool ^Callable (prioritize task priority-fn)))) + (defn ->threadpool "Convert the argument into a threadpool, leaving the special keyword :serial alone. diff --git a/src/java/com/climate/claypoole/impl/Prioritized.java b/src/java/com/climate/claypoole/impl/Prioritized.java new file mode 100644 index 0000000..caf14aa --- /dev/null +++ b/src/java/com/climate/claypoole/impl/Prioritized.java @@ -0,0 +1,19 @@ +// The Climate Corporation licenses this file to you under under the Apache +// License, Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package com.climate.claypoole.impl; + +/** Any object with a priority. */ +public interface Prioritized { + public long getPriority(); +} diff --git a/src/java/com/climate/claypoole/impl/PriorityFutureTask.java b/src/java/com/climate/claypoole/impl/PriorityFutureTask.java new file mode 100644 index 0000000..d4ff8fd --- /dev/null +++ b/src/java/com/climate/claypoole/impl/PriorityFutureTask.java @@ -0,0 +1,51 @@ +// The Climate Corporation licenses this file to you under under the Apache +// License, Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package com.climate.claypoole.impl; + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +/** A prioritized, sortable FutureTask. */ +public class PriorityFutureTask + extends FutureTask + implements Prioritized, Comparable { + private long priority; + + public PriorityFutureTask(Runnable runnable, V value, long priority) { + super(runnable, value); + this.priority = priority; + } + + public PriorityFutureTask(Callable callable, long priority) { + super(callable); + this.priority = priority; + } + + @Override + public int compareTo(Prioritized other) { + // Sort for descending order. + if (this.priority > other.getPriority()) { + return -1; + } else if (this.priority == other.getPriority()) { + return 0; + } else { + return 1; + } + } + + @Override + public long getPriority() { + return this.priority; + } +} diff --git a/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java new file mode 100644 index 0000000..335aadf --- /dev/null +++ b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java @@ -0,0 +1,75 @@ +// The Climate Corporation licenses this file to you under under the Apache +// License, Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package com.climate.claypoole.impl; + +import java.util.concurrent.Callable; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** A fixed-size threadpool that does tasks in priority order. + * + * Submitted tasks can have their own priority by implementing Prioritized; + * otherwise, they are assigned the pool's default priority. + */ +public class PriorityThreadpoolImpl extends ThreadPoolExecutor { + public PriorityThreadpoolImpl(int poolSize) { + this(poolSize, 0); + } + + public PriorityThreadpoolImpl(int poolSize, long defaultPriority) { + super(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, + new PriorityBlockingQueue(poolSize)); + this.defaultPriority = defaultPriority; + } + + public PriorityThreadpoolImpl(int poolSize, ThreadFactory threadFactory, + long defaultPriority) { + this(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, threadFactory, + defaultPriority); + } + + public PriorityThreadpoolImpl(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, long defaultPriority) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, + new PriorityBlockingQueue(corePoolSize), threadFactory); + this.defaultPriority = defaultPriority; + } + + protected long getPriority(Object o) { + if (o instanceof Prioritized) { + return ((Prioritized) o).getPriority(); + } + return defaultPriority; + } + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new PriorityFutureTask(runnable, value, getPriority(runnable)); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return new PriorityFutureTask(callable, getPriority(callable)); + } + + public long getDefaultPriority() { + return defaultPriority; + } + + protected long defaultPriority; +} diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index bf66574..d4c614d 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -22,6 +22,28 @@ ExecutorService])) +(defn check-threadpool-options + [pool-constructor] + (cp/with-shutdown! [pool (pool-constructor 4)] + (dotimes [_ 8] (.submit pool #(inc 1))) + (let [factory (.getThreadFactory pool) + start (promise) + thread (.newThread factory #(deref start))] + (is (false? (.isDaemon thread))) + (is (not (empty? (re-find #"claypoole-[0-9]*-4" (.getName thread))))) + (is (= (.getPriority (Thread/currentThread)) (.getPriority thread))))) + (cp/with-shutdown! [pool (pool-constructor 4 + :daemon true + :name "fiberpond" + :thread-priority 4)] + (dotimes [_ 8] (.submit pool #(inc 1))) + (let [factory (.getThreadFactory pool) + start (promise) + thread (.newThread factory #(deref start))] + (is (true? (.isDaemon thread))) + (is (= "fiberpond-4" (.getName thread))) + (is (= 4 (.getPriority thread)))))) + (deftest test-threadpool (testing "Basic threadpool creation" (cp/with-shutdown! [pool (cp/threadpool 4)] @@ -29,30 +51,178 @@ (dotimes [_ 8] (.submit pool #(inc 1))) (is (= 4 (.getPoolSize pool))))) (testing "Threadpool options" - (cp/with-shutdown! [pool (cp/threadpool 4)] - (dotimes [_ 8] (.submit pool #(inc 1))) - (let [factory (.getThreadFactory pool) - start (promise) - thread (.newThread factory #(deref start))] - (is (false? (.isDaemon thread))) - (is (not (empty? (re-find #"claypoole-[0-9]*-4" (.getName thread))))) - (is (= (.getPriority (Thread/currentThread)) (.getPriority thread))))) - (cp/with-shutdown! [pool (cp/threadpool 4 - :daemon true - :name "fiberpond" - :thread-priority 4)] - (dotimes [_ 8] (.submit pool #(inc 1))) - (let [factory (.getThreadFactory pool) - start (promise) - thread (.newThread factory #(deref start))] - (is (true? (.isDaemon thread))) - (is (= "fiberpond-4" (.getName thread))) - (is (= 4 (.getPriority thread))))))) + (check-threadpool-options cp/threadpool))) + +(defn- sorted*? + "Is a sequence sorted?" + [x] + (= x (sort x))) + +(deftest test-priority-threadpool + (testing "Priority threadpool ordering is mostly in order" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (doall + (for [i (range 10)] + (do + (cp/future (cp/with-priority pool i) + (deref start) + (swap! completed conj i)))))] + ;; start tasks + (Thread/sleep 5) + (deliver start true) + ;; Wait for tasks to complete + (doseq [f tasks] (deref f)) + (is (= [0 9 8 7 6 5 4 3 2 1] + @completed))))) + (testing "Priority threadpool ordering is ordered with unordered inputs." + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (doall + (for [i (shuffle (range 100))] + (cp/future (cp/with-priority pool i) + (deref start) + (swap! completed conj i))))] + ;; start tasks + (deliver start true) + ;; Wait for tasks to complete + (doseq [f tasks] (deref f)) + (is (sorted*? + (-> completed + deref + ;; The first task will be one at random, so drop it + rest + reverse)))))) + (testing "Priority threadpool default priority." + (cp/with-shutdown! [pool (cp/priority-threadpool 1 :default-priority 50)] + (let [start (promise) + completed (atom []) + run (fn [result] (deref start) (swap! completed conj result)) + first-task (cp/future pool (run :first)) + tasks (doall + (for [i [1 100]] + (cp/future (cp/with-priority pool i) (run i)))) + default-task (cp/future pool (run :default))] + ;; start tasks + (deliver start true) + ;; Wait for tasks to complete + (doseq [f tasks] (deref f)) + (deref default-task) + (is (= [:first 100 :default 1] @completed))))) + (testing "Priority threadpool options" + (check-threadpool-options cp/threadpool))) + +(deftest test-with-priority-fn + (testing "with-priority-fn works for simple upmap" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + results (cp/upmap (cp/with-priority-fn pool identity) + (fn [i] + (deref start) + i) + (range 10))] + ;; start tasks + (deliver start true) + (is (= [0 9 8 7 6 5 4 3 2 1] + results))))) + (testing "with-priority-fn throws sensible exceptions" + (cp/with-shutdown! [pool (cp/priority-threadpool 2)] + (is (thrown-with-msg? + Exception #"Priority function exception" + (cp/pmap (cp/with-priority-fn pool identity) + (fn [x y] (+ x y)) + (range 10) (range 10)))) + (is (thrown-with-msg? + Exception #"Priority function exception" + (cp/future (cp/with-priority-fn pool identity) + 1)))))) + +(deftest test-for-priority + (testing "pfor uses priority" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (cp/pfor pool + [i (range 100) + :priority (inc i)] + (deref start) + (swap! completed conj i) + i)] + (Thread/sleep 5) + (deliver start true) + (dorun tasks) + ;; Just worry about the rest of the tasks; the first one may be out of + ;; order. + (is (sorted*? (reverse (rest @completed)))) + (is (= (range 100) tasks))))) + (testing "upfor uses priority" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (cp/upfor pool + [i (range 100) + :priority (inc i)] + (deref start) + (swap! completed conj i) + i)] + (Thread/sleep 5) + (deliver start true) + (dorun tasks) + ;; Just worry about the rest of the tasks; the first one may be out of + ;; order. + (is (sorted*? (reverse (rest @completed)))) + (is (= @completed tasks)))))) + +(deftest test-priority-nonIObj + (testing "A priority pool should work on any sort of Callable." + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + results (atom []) + run (fn [x] (deref start) (swap! results conj x))] + ;; Dummy task, always runs first. + (cp/future (cp/with-priority pool 100) + (run 100)) + ;; Runnables. + (.submit (cp/with-priority pool 1) + (reify Runnable (run [_] (run 1)))) + (.submit (cp/with-priority pool 10) + (reify Runnable (run [_] (run 10)))) + ;; Runnables with return value. + (.submit (cp/with-priority pool 2) + (reify Runnable (run [_] (run 2))) + :return-value) + (.submit (cp/with-priority pool 9) + (reify Runnable (run [_] (run 9))) + :return-value) + (cp/future (cp/with-priority pool 6) + (run 6)) + (cp/future (cp/with-priority pool 11) + (run 11)) + ;; Callables + (.submit (cp/with-priority pool 3) + (reify Callable (call [_] (run 3)))) + (.submit (cp/with-priority pool 8) + (reify Callable (call [_] (run 8)))) + ;; And another couple IFns for good measure + (cp/future (cp/with-priority pool 5) + (run 5)) + (cp/future (cp/with-priority pool 7) + (run 7)) + ;; Make them go. + (Thread/sleep 5) + (deliver start true) + ;; Check the results + (Thread/sleep 5) + (is (sorted*? (reverse @results))))))) (deftest test-threadpool? (testing "Basic threadpool?" - (cp/with-shutdown! [pool 4] + (cp/with-shutdown! [pool 4 + priority-pool (cp/priority-threadpool 4)] (is (true? (cp/threadpool? pool))) + (is (true? (cp/threadpool? priority-pool))) (is (false? (cp/threadpool? :serial))) (is (false? (cp/threadpool? nil))) (is (false? (cp/threadpool? 1)))))) From 1c731095cfc99ba7698620dddc988c46378a7527 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Fri, 14 Feb 2014 09:08:18 -0800 Subject: [PATCH 04/20] README cleanups. --- README.md | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 8ebe5b5..a3c85bb 100644 --- a/README.md +++ b/README.md @@ -88,8 +88,7 @@ possible. (def service2-resps (cp/upmap net-pool service2-request service1-resps)) (def results (cp/upmap cpu-pool handle-response service2-resps)) ;; ...eventually... - ;; Make sure we make sure the computation is complete before we shutdown the - ;; pools. + ;; Make sure sure the computation is complete before we shutdown the pools. (doall results)) ``` @@ -109,7 +108,8 @@ binding will be done in the calling thread. ``` Claypoole also lets you prioritize your backlog of tasks. Higher-priority tasks -will be assigned to threads first. See advanced usage in a section below. +will be assigned to threads first. Here's an example; there is a more detailed +description below. ```clojure (def pool (cp/priority-threadpool 10)) @@ -180,8 +180,10 @@ exits. You can create a daemon threadpool via: To construct a threadpool, use the `threadpool` function. It takes optional keyword arguments allowing you to change the thread names, their daemon status, -and their priority. (NOTE: Thread priority is a system-level thing that depends -on the OS; it is not the same as the task priority, described below.) +and their priority. (NOTE: Thread priority is [a system-level property that +depends on the +OS](http://oreilly.com/catalog/expjava/excerpt/#EXJ-CH-6-SECT-4); it is not the +same as the task priority, described below.) ```clojure (def pool (cp/threadpool (cp/ncpus) @@ -213,7 +215,7 @@ like so: ## How can I prioritize my tasks? -You can create a threadpool that respects priorities by creating a +You can create a threadpool that respects task priorities by creating a `priority-threadpool`: ```clojure From 9ef4257c4e63c7b0a7c0f112ae9bc4a9095c6dde Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Fri, 14 Feb 2014 10:04:41 -0800 Subject: [PATCH 05/20] Fix description of which priority is used --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a3c85bb..17c13e6 100644 --- a/README.md +++ b/README.md @@ -228,7 +228,7 @@ of your tasks: ```clojure (cp/future (cp/with-priority p1 100) (myfn)) -;; Nothing bad happens if you nest with-priority. The outermost one "wins"; +;; Nothing bad happens if you nest with-priority. The innermost one "wins"; ;; this task runs at priority 2. (cp/future (cp/with-priority (cp-with-priority p1 1) 2) (myfn)) ;; For pmaps, you can use a priority function. This will run 3 tasks at From d6d6be50267d46a0561ea0fee5efdff23c8a623a Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Fri, 14 Feb 2014 10:13:14 -0800 Subject: [PATCH 06/20] Made priority-fn act on *seq* of arguments --- README.md | 6 +++--- src/clj/com/climate/claypoole.clj | 2 +- src/clj/com/climate/claypoole/impl.clj | 2 +- test/com/climate/claypoole_test.clj | 8 +++++--- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 17c13e6..d48d7a9 100644 --- a/README.md +++ b/README.md @@ -231,9 +231,9 @@ of your tasks: ;; Nothing bad happens if you nest with-priority. The innermost one "wins"; ;; this task runs at priority 2. (cp/future (cp/with-priority (cp-with-priority p1 1) 2) (myfn)) -;; For pmaps, you can use a priority function. This will run 3 tasks at -priorities 6, 5, and 4, respectively. -(cp/upmap (cp/with-priority-fn p1 (fn [x y] x) + [6 5 4] [1 2 3]) +;; For pmaps, you can use a priority function, which is called with your list of +;; arguments. This will run 3 tasks at priorities 6, 5, and 4, respectively. +(cp/upmap (cp/with-priority-fn p1 first) + [6 5 4] [1 2 3]) ;; For for loops, you can use the special :priority binding, which must be the ;; last for binding. (cp/upfor p1 [i (range 10) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index 13318e3..a06725c 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -344,7 +344,7 @@ ;; If there's a priority, God help us--let's pull that thing out. (let [bindings* (vec (drop-last 2 bindings)) priority-value (last bindings)] - `(let [pool# (with-priority-fn ~pool (fn [_# p#] p#)) + `(let [pool# (with-priority-fn ~pool second) [fns# priorities#] (apply map vector (for ~bindings* [(fn [priority#] ~@body) diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index a50b18e..5d2e8e3 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -152,7 +152,7 @@ ExecutionException anyway." [task, ^IFn priority-fn] (let [priority (try - (long (apply priority-fn (-> task meta :args))) + (long (priority-fn (-> task meta :args))) (catch Exception e (throw (ExecutionException. "Priority function exception" e))))] diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index d4c614d..bf13c5b 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -118,7 +118,7 @@ (testing "with-priority-fn works for simple upmap" (cp/with-shutdown! [pool (cp/priority-threadpool 1)] (let [start (promise) - results (cp/upmap (cp/with-priority-fn pool identity) + results (cp/upmap (cp/with-priority-fn pool first) (fn [i] (deref start) i) @@ -131,12 +131,14 @@ (cp/with-shutdown! [pool (cp/priority-threadpool 2)] (is (thrown-with-msg? Exception #"Priority function exception" - (cp/pmap (cp/with-priority-fn pool identity) + ;; Arity exception. + (cp/pmap (cp/with-priority-fn pool (fn [] 0)) (fn [x y] (+ x y)) (range 10) (range 10)))) (is (thrown-with-msg? + ;; No arguments passed to priority function. Exception #"Priority function exception" - (cp/future (cp/with-priority-fn pool identity) + (cp/future (cp/with-priority-fn pool first) 1)))))) (deftest test-for-priority From 0cf8c01c8091dbf24c99be9fa4f58b7a640b1894 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Fri, 14 Feb 2014 10:34:33 -0800 Subject: [PATCH 07/20] Additional docs on threadpools Also fixed my "fix" of innermost/outermost priority. --- README.md | 12 ++++++++++-- src/clj/com/climate/claypoole.clj | 26 +++++++++++++++++++++----- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index d48d7a9..a96af29 100644 --- a/README.md +++ b/README.md @@ -224,11 +224,11 @@ You can create a threadpool that respects task priorities by creating a ``` Then, use functions `with-priority` and `with-priority-fn` to set the priority -of your tasks: +of your tasks, or just set the `:priority` in your `for` loop: ```clojure (cp/future (cp/with-priority p1 100) (myfn)) -;; Nothing bad happens if you nest with-priority. The innermost one "wins"; +;; Nothing bad happens if you nest with-priority. The outermost one "wins"; ;; this task runs at priority 2. (cp/future (cp/with-priority (cp-with-priority p1 1) 2) (myfn)) ;; For pmaps, you can use a priority function, which is called with your list of @@ -241,6 +241,14 @@ of your tasks: (myfn i)) ``` +## What about Java interoperability? + +Under the hood, threadpools are just instances of +`java.util.concurrent.ExecutorService`. You can use any `ExecutorService` in +place of a threadpool, and you can use a threadpool just as you would an +`ExecutorService`. This means you can create custom threadpools and use them +easily. + ## Why the name "Claypoole"? The claypoole library is named after [John Claypoole (Betsy Ross's third diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index a06725c..974618f 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -12,7 +12,11 @@ ;; and limitations under the License. (ns com.climate.claypoole - "Threadpool tools for Clojure." + "Threadpool tools for Clojure. + + A threadpool is just an ExecutorService with a fixed number of threads. In + general, you can use your own ExecutorService in place of any threadpool, and + you can treat a threadpool as you would any other ExecutorService." (:refer-clojure :exclude [future future-call pcalls pmap pvalues]) (:require [clojure.core :as core] @@ -47,6 +51,10 @@ (defn threadpool "Make a threadpool. It should be shutdown when no longer needed. + A threadpool is just an ExecutorService with a fixed number of threads. In + general, you can use your own ExecutorService in place of any threadpool, and + you can treat a threadpool as you would any other ExecutorService. + This takes optional keyword arguments: :daemon, a boolean indicating whether the threads are daemon threads, which will automatically die when the JVM exits, defaults to @@ -69,10 +77,11 @@ "Make a threadpool that chooses tasks based on their priorities. Assign priorities to tasks by wrapping the pool with with-priority or - with-priority-fn. + with-priority-fn. You can also set a default priority with keyword argument + :default-priority. - You can also set a default priority with keyword argument :default-priority. - Otherwise, this uses the same keyword arguments as threadpool." + Otherwise, this uses the same keyword arguments as threadpool, and functions + just like any other ExecutorService." ^PriorityThreadpool [n & {:keys [default-priority] :as args :or {default-priority 0}}] @@ -105,7 +114,14 @@ (def t2 (future (with-priority p 2) 2)) (def t3 (future (with-priority p 3) 3)) - will use pool p to run these tasks with priorities 1, 2, and 3 respectively." + will use pool p to run these tasks with priorities 1, 2, and 3 respectively. + + If you nest priorities, the outermost one \"wins\", so this task will be run + at priority 3: + + (def wp (with-priority p 1)) + (def t1 (future (with-priority (with-priority wp 2) 3) :result)) + " ^ExecutorService [^ExecutorService pool priority] (with-priority-fn pool (constantly priority))) From 2013938f654dd23a4a1596c74f54a00c275784f6 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Fri, 14 Feb 2014 13:28:50 -0800 Subject: [PATCH 08/20] Fix lazy read bug. Input sequences were not actually read lazily! I added a test and caught it, so I fixed it also. --- src/clj/com/climate/claypoole.clj | 48 ++++++---------- src/clj/com/climate/claypoole/impl.clj | 1 - test/com/climate/claypoole_test.clj | 79 ++++++++++++++++++-------- 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index 974618f..f87043c 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -29,10 +29,7 @@ Callable ExecutorService Future - LinkedBlockingQueue - PriorityBlockingQueue - ThreadPoolExecutor - TimeUnit])) + LinkedBlockingQueue])) (def ^:dynamic *parallel* @@ -221,6 +218,7 @@ ^ExecutorService pool* pool ^Callable f* (impl/binding-conveyor-fn f) fut (.submit pool* f*)] + ;; Make an object just like Clojure futures. (reify clojure.lang.IDeref (deref [_] (impl/deref-future fut)) @@ -278,10 +276,10 @@ (doall (apply map f arg-seqs)) (let [[shutdown? pool] (impl/->threadpool pool) args (apply map vector (map impl/unchunk arg-seqs)) - futures (map (fn [a] (future-call pool - (with-meta #(apply f a) - {:args a}))) - args) + futures (for [a args] + (future-call pool + (with-meta #(apply f a) + {:args a}))) ;; Start eagerly parallel processing. read-future (core/future (try @@ -290,7 +288,7 @@ ;; Read results as available. (concat (map deref futures) ;; Deref the reading future to get its exceptions, if it had any. - @read-future)))) + (lazy-seq (deref read-future)))))) (defn upmap "Like pmap, except the return value is a sequence of results ordered by @@ -309,6 +307,9 @@ (try (doseq [a args :let [p (promise)]] + ;; We can't directly make a future add itself to a + ;; queue. Instead, we use a promise for + ;; indirection. (deliver p (future-call pool (with-meta #(try @@ -319,7 +320,7 @@ ;; Read results as available. (concat (for [_ args] (-> q .take deref)) ;; Deref the reading future to get its exceptions, if it had any. - @read-future)))) + (lazy-seq (deref read-future)))))) (defn pcalls "Like clojure.core.pcalls, except it takes a threadpool. For more detail on @@ -378,30 +379,17 @@ this will not have useful parallelism: (pfor pool [i (range 1000) :let [result (complex-computation i)]] result) - You can use the special binding :priority to set the priorities of the tasks. + You can use the special binding :priority (which must be the last binding) to + set the priorities of the tasks. + (upfor (priority-threadpool 10) [i (range 1000) + :priority (inc i)] + (complex-computation i)) " [pool bindings & body] (pfor-internal pool bindings body `pmap)) (defmacro upfor - "An unordered, parallel version of for. It is like for, except it takes a - threadpool, is parallel, and returns its results ordered by completion time. - For more detail on its parallelism and on its threadpool argument, see - upmap. - - Note that while the body is executed in parallel, the bindings are executed - in serial, so while this will call complex-computation in parallel: - (upfor pool [i (range 1000)] - (complex-computation i)) - this will not have useful parallelism: - (upfor pool [i (range 1000) - :let [result (complex-computation i)]] - result) - - You can use the special binding :priority to set the priorities of the tasks. - (upfor (priority-threadpool 10) [i (range 1000) - :priority (inc i)] - (complex-computation i)) - " + "Like pfor, except the return value is a sequence of results ordered by + *completion time*, not by input order." [pool bindings & body] (pfor-internal pool bindings body `upmap)) diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index 5d2e8e3..8ef814b 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -23,7 +23,6 @@ PriorityThreadpoolImpl] [java.util Collection - Comparator List] [java.util.concurrent ExecutionException diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index bf13c5b..e33e19a 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -70,7 +70,7 @@ (deref start) (swap! completed conj i)))))] ;; start tasks - (Thread/sleep 5) + (Thread/sleep 50) (deliver start true) ;; Wait for tasks to complete (doseq [f tasks] (deref f)) @@ -132,14 +132,14 @@ (is (thrown-with-msg? Exception #"Priority function exception" ;; Arity exception. - (cp/pmap (cp/with-priority-fn pool (fn [] 0)) - (fn [x y] (+ x y)) - (range 10) (range 10)))) + (doall + (cp/pmap (cp/with-priority-fn pool (fn [] 0)) + (fn [x y] (+ x y)) + (range 10) (range 10))))) (is (thrown-with-msg? ;; No arguments passed to priority function. Exception #"Priority function exception" - (cp/future (cp/with-priority-fn pool first) - 1)))))) + (deref (cp/future (cp/with-priority-fn pool first) 1))))))) (deftest test-for-priority (testing "pfor uses priority" @@ -152,7 +152,7 @@ (deref start) (swap! completed conj i) i)] - (Thread/sleep 5) + (Thread/sleep 50) (deliver start true) (dorun tasks) ;; Just worry about the rest of the tasks; the first one may be out of @@ -169,7 +169,7 @@ (deref start) (swap! completed conj i) i)] - (Thread/sleep 5) + (Thread/sleep 50) (deliver start true) (dorun tasks) ;; Just worry about the rest of the tasks; the first one may be out of @@ -213,10 +213,10 @@ (cp/future (cp/with-priority pool 7) (run 7)) ;; Make them go. - (Thread/sleep 5) + (Thread/sleep 50) (deliver start true) ;; Check the results - (Thread/sleep 5) + (Thread/sleep 50) (is (sorted*? (reverse @results))))))) (deftest test-threadpool? @@ -236,15 +236,15 @@ result (promise) f (.submit pool #(deliver result (deref start)))] (is (false? (cp/shutdown? pool))) - (Thread/sleep 5) + (Thread/sleep 50) ;; Make sure the threadpool starts shutting down but doesn't complete ;; until the threads finish. (cp/shutdown pool) (is (true? (cp/shutdown? pool))) (is (false? (.isTerminated pool))) - (Thread/sleep 5) + (Thread/sleep 50) (deliver start true) - (Thread/sleep 5) + (Thread/sleep 50) (is (true? (.isTerminated pool))) (is (true? @result)))) (testing "Shutdown does not affect builtin threadpool" @@ -257,12 +257,12 @@ start (promise) f (.submit pool #(deref start))] (is (false? (cp/shutdown? pool))) - (Thread/sleep 5) + (Thread/sleep 50) ;; Make sure the threadpool completes shutting down immediately. (cp/shutdown! pool) (is (true? (cp/shutdown? pool))) ;; It can take some time for the threadpool to kill the threads. - (Thread/sleep 5) + (Thread/sleep 50) (is (true? (.isTerminated pool))) (is (.isDone f)) (is (thrown? ExecutionException (deref f))))) @@ -280,14 +280,14 @@ (deliver outside-pool pool) ;; Use a future to avoid blocking on the :serial case. (deliver fp (future (.submit pool #(deref start)))) - (Thread/sleep 5)) + (Thread/sleep 50)) ;; Make sure outside of the with-shutdown block the pool is properly ;; killed. (when-not (keyword? arg) (is (true? (cp/shutdown? @outside-pool))) - (Thread/sleep 5) + (Thread/sleep 50) (is (true? (.isTerminated @outside-pool))) (deliver start true) - (Thread/sleep 5) + (Thread/sleep 50) (is (.isDone @@fp)) (is (thrown? ExecutionException (deref @@fp))))))) (testing "With-shutdown! works with any number of threadpools" @@ -371,6 +371,31 @@ ;; order. (sort input)))))))) +(defn check-lazy-read + "Check that a pmap function reads lazily" + [pmap-like] + (let [n 10] + (cp/with-shutdown! [pool n] + (let [pool (cp/threadpool n) + first-inputs (range n) + second-inputs (range n (* n 2)) + ;; The input will have a pause after n items. + pause (promise) + input (concat first-inputs (map deref [pause]) second-inputs) + ;; We'll record what tasks have been started so we can make sure + ;; all of them are started. + started (atom #{}) + results (pmap-like pool + (fn [i] (swap! started conj i) i) + input)] + ;; All of the first set of tasks should have started after 50ms. + (Thread/sleep 50) + (is (= @started (set first-inputs))) + (deliver pause (- n 0.5)) + (Thread/sleep 50) + (is (= @started (set results) (set input))))))) + + (defn check-fn-exception "Check that a pmap function correctly passes exceptions caused by the function." @@ -466,7 +491,7 @@ ;; Check the results (is (= (map inc inputs) (sort @results))) ;; Wait for the thread to be shutdown. - (Thread/sleep 5) + (Thread/sleep 50) (when should-be-shutdown? (is (true? (cp/shutdown? @apool)))) (when should-we-shutdown? @@ -529,13 +554,17 @@ (testing "basic pmap test" (cp/with-shutdown! [pool 3] (is (= (range 1 11) (cp/pmap pool inc (range 10)))))) - (check-all "pmap" cp/pmap true)) + (check-all "pmap" cp/pmap true) + (testing "pmap reads lazily" + (check-lazy-read cp/pmap))) (deftest test-upmap (testing "basic upmap test" (cp/with-shutdown! [pool 3] (is (= (range 1 11) (sort (cp/upmap pool inc (range 10))))))) - (check-all "upmap" cp/upmap false)) + (check-all "upmap" cp/upmap false) + (testing "upmap reads lazily" + (check-lazy-read cp/upmap))) (deftest test-pcalls (testing "basic pcalls test" @@ -605,7 +634,9 @@ pool [i input] (work i)))] - (check-all "pfor" pmap-like true))) + (check-all "pfor" pmap-like true) + (testing "pfor reads lazily" + (check-lazy-read pmap-like)))) (deftest test-upfor (testing "basic upfor test" @@ -617,4 +648,6 @@ pool [i input] (work i)))] - (check-all "upfor" pmap-like false))) + (check-all "upfor" pmap-like false) + (testing "upfor reads lazily" + (check-lazy-read pmap-like)))) From 20c0bf3fe33e8a5697b6574939664ada58962940 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Fri, 14 Feb 2014 13:56:48 -0800 Subject: [PATCH 09/20] Minor cleanups and comments. --- src/clj/com/climate/claypoole.clj | 4 ++ src/clj/com/climate/claypoole/impl.clj | 37 ++++++++++--------- .../climate/claypoole/impl/Prioritized.java | 2 +- .../impl/PriorityThreadpoolImpl.java | 2 +- test/com/climate/claypoole_test.clj | 14 ++++--- 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index f87043c..adfe47e 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -275,6 +275,7 @@ (if (serial? pool) (doall (apply map f arg-seqs)) (let [[shutdown? pool] (impl/->threadpool pool) + ;; Use map to handle the argument sequences. args (apply map vector (map impl/unchunk arg-seqs)) futures (for [a args] (future-call pool @@ -300,6 +301,7 @@ (if (serial? pool) (doall (apply map f arg-seqs)) (let [[shutdown? pool] (impl/->threadpool pool) + ;; Use map to handle the argument sequences. args (apply map vector (map impl/unchunk arg-seqs)) q (LinkedBlockingQueue.) ;; Start eagerly parallel processing. @@ -362,6 +364,8 @@ (let [bindings* (vec (drop-last 2 bindings)) priority-value (last bindings)] `(let [pool# (with-priority-fn ~pool second) + ;; We can't just make functions; we have to have the priority as + ;; an argument to work with the priority-fn. [fns# priorities#] (apply map vector (for ~bindings* [(fn [priority#] ~@body) diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index 8ef814b..45b5444 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -63,9 +63,7 @@ clojure.lang.IDeref (deref [_] result) clojure.lang.IBlockingDeref - (deref - [_ timeout-ms timeout-val] - result) + (deref [_ timeout-ms timeout-val] result) clojure.lang.IPending (isRealized [_] true) Future @@ -119,11 +117,10 @@ thread-id (atom -1)] (reify ThreadFactory (^Thread newThread [_ ^Runnable r] - (let [t (.newThread default-factory r)] - (doto t - (.setDaemon daemon*) - (.setName (str pool-name* "-" (swap! thread-id inc))) - (.setPriority thread-priority*)))))))) + (doto (.newThread default-factory r) + (.setDaemon daemon*) + (.setName (str pool-name* "-" (swap! thread-id inc))) + (.setPriority thread-priority*))))))) (defn unchunk "Takes a seqable and returns a lazy sequence that is maximally lazy. @@ -137,18 +134,21 @@ (defn threadpool "Make a threadpool. It should be shutdown when no longer needed. - See docs in com.climate.claypoole/threadpool. - " + See docs in com.climate.claypoole/threadpool." [n & args] - (let [factory (apply thread-factory args)] - (Executors/newFixedThreadPool n factory))) + (Executors/newFixedThreadPool n (apply thread-factory args))) (defn- prioritize "Apply a priority function to a task. Note that this re-throws all priority-fn exceptions as ExecutionExceptions. That shouldn't mess anything up because the caller re-throws it as an - ExecutionException anyway." + ExecutionException anyway. + + For simplicity, prioritize reifies both Callable and Runnable, rather than + having one prioritize function for each of those types. That means, for + example, that if you prioritize a Runnable that is not also a Callable, you + might want to cast the result to Runnable or otherwise use it carefully." [task, ^IFn priority-fn] (let [priority (try (long (priority-fn (-> task meta :args))) @@ -163,19 +163,20 @@ Prioritized (getPriority [_] priority)))) -;; A Threadpool that has a priority function. +;; A Threadpool that applies a priority function to tasks and uses a +;; PriorityThreadpoolImpl to run them. (deftype PriorityThreadpool [^PriorityThreadpoolImpl pool, ^IFn priority-fn] ExecutorService (^boolean awaitTermination [_, ^long timeout, ^TimeUnit unit] (.awaitTermination pool timeout unit)) (^List invokeAll [_, ^Collection tasks] - (.invokeAll pool tasks)) + (.invokeAll pool (map #(prioritize % priority-fn) tasks))) (^List invokeAll [_, ^Collection tasks, ^long timeout, ^TimeUnit unit] - (.invokeAll pool tasks timeout unit)) + (.invokeAll pool (map #(prioritize % priority-fn) tasks) timeout unit)) (^Object invokeAny [_, ^Collection tasks] - (.invokeAny pool tasks)) + (.invokeAny pool (map #(prioritize % priority-fn) tasks))) (^Object invokeAny [_, ^Collection tasks, ^long timeout, ^TimeUnit unit] - (.invokeAny pool tasks timeout unit)) + (.invokeAny pool (map #(prioritize % priority-fn) tasks) timeout unit)) (^boolean isShutdown [_] (.isShutdown pool)) (^boolean isTerminated [_] diff --git a/src/java/com/climate/claypoole/impl/Prioritized.java b/src/java/com/climate/claypoole/impl/Prioritized.java index caf14aa..cbd2785 100644 --- a/src/java/com/climate/claypoole/impl/Prioritized.java +++ b/src/java/com/climate/claypoole/impl/Prioritized.java @@ -13,7 +13,7 @@ package com.climate.claypoole.impl; -/** Any object with a priority. */ +/** An object with a priority. */ public interface Prioritized { public long getPriority(); } diff --git a/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java index 335aadf..ee169e6 100644 --- a/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java +++ b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java @@ -22,7 +22,7 @@ /** A fixed-size threadpool that does tasks in priority order. * - * Submitted tasks can have their own priority by implementing Prioritized; + * Submitted tasks have their own priority if they implement Prioritized; * otherwise, they are assigned the pool's default priority. */ public class PriorityThreadpoolImpl extends ThreadPoolExecutor { diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index e33e19a..0025ca9 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -17,6 +17,9 @@ [com.climate.claypoole :as cp] [com.climate.claypoole.impl :as impl]) (:import + [com.climate.claypoole.impl + ;; Import to make Eastwood happy. + PriorityThreadpool] [java.util.concurrent ExecutionException ExecutorService])) @@ -132,7 +135,7 @@ (is (thrown-with-msg? Exception #"Priority function exception" ;; Arity exception. - (doall + (dorun (cp/pmap (cp/with-priority-fn pool (fn [] 0)) (fn [x y] (+ x y)) (range 10) (range 10))))) @@ -405,7 +408,7 @@ inputs [0 1 2 3 :4 5 6 7 8 9]] (is (thrown-with-msg? Exception #"keyword found" - (doall (pmap-like pool + (dorun (pmap-like pool (fn [i] (if (keyword? i) (throw (Exception. "keyword found")) @@ -426,7 +429,7 @@ (range 200))] (is (thrown-with-msg? Exception #"deliberate" - (doall (pmap-like pool inc inputs)))) + (dorun (pmap-like pool inc inputs)))) (.shutdown pool))) (defn check-maximum-parallelism-one-case @@ -518,10 +521,11 @@ (deftest test-future (testing "basic future test" (cp/with-shutdown! [pool 3] - (let [f (cp/future + (let [a (atom false) + f (cp/future pool ;; Body can contain multiple elements. - 1 + (reset! a true) (range 10))] (is (= @f (range 10)))))) (testing "future threadpool args" From 3ec92cfad335a4afbca18b02b453f8eda60c7e31 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Fri, 14 Feb 2014 15:49:41 -0800 Subject: [PATCH 10/20] Updated changelog. --- CHANGES.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 8f25186..a0a1266 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,10 @@ CHANGES +0.2.0 + +- Added priority threadpools +- Made with-shutdown! option take multiple threadpools + 0.1.1 - Fixed major bug where pmap etc. blocked on reading the entire input stream From a07fa39dc02fb2606617e1ca54b5975fd2781227 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Tue, 18 Feb 2014 09:28:51 -0800 Subject: [PATCH 11/20] Made README pmap description positive --- README.md | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index c9cdcff..7ad9865 100644 --- a/README.md +++ b/README.md @@ -18,24 +18,24 @@ parallelism). Instead, people tend to fall back on Java. For instance, on the recommendation is to create an `ExecutorService` and call its `invokeAll` method. -Clojure's `pmap` function is another example of a simple parallelism tool that -was insufficiently flexible for our needs. - -* `pmap` uses a hardcoded number of threads (ncpus + 2). `pmap` is designed for - CPU-bound tasks, so it may not give adequate parallelism for latency-bound - tasks like network requests. -* `pmap` is lazy. It will only work a little ahead of where its output is - being read. However, usually we want parallelism so we can get things done as - fast as possible, in which case we want to do work eagerly. -* `pmap` is always ordered. Sometimes, to reduce latency, we just want to get - the results of tasks in the order they complete. - On the other hand, we do not need the flexible building blocks that are [`core.async`](https://github.com/clojure/core.async); we just want to run some simple tasks in parallel. Similarly, [`reducers`](http://clojure.org/reducers) is elegant, but we can't control its level of parallelism. +Essentially, we wanted a `pmap` function that improves on the original in +several ways: + +* We should be able to set the size of the threadpool `pmap` uses, so it would + be tunable for non-CPU-bound tasks like network requests. +* We should be able to share a threadpool between multiple `pmap`s to control + the amount of simultaneous work we're doing. +* We would like it to be streaming rather than lazy, so we can start it going + and expect it to work in the background without using `doall`. +* We would like to be able to do an unordered `pmap`, so that we can start + handling the first response as fast as possible. + ## How do I use claypoole? To use claypoole, make a threadpool via `threadpool` and use it with From 81b51a01097521b50df219efcab77673f5d31324 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Tue, 18 Feb 2014 10:55:54 -0800 Subject: [PATCH 12/20] Test and fix bug for upmap scheduling exceptions --- src/clj/com/climate/claypoole.clj | 36 +++++++++++++++++++++-------- test/com/climate/claypoole_test.clj | 22 ++++++++++++++++-- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index adfe47e..41c0e4c 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -306,18 +306,36 @@ q (LinkedBlockingQueue.) ;; Start eagerly parallel processing. read-future (core/future + ;; Try to run schedule all the tasks, but definitely + ;; shutdown the pool if necessary. (try (doseq [a args :let [p (promise)]] - ;; We can't directly make a future add itself to a - ;; queue. Instead, we use a promise for - ;; indirection. - (deliver p (future-call pool - (with-meta - #(try - (apply f a) - (finally (.add q @p))) - {:args a})))) + ;; Try to schedule one task, but definitely add + ;; something to the queue for the task. + (try + ;; We can't directly make a future add itself to + ;; a queue. Instead, we use a promise for + ;; indirection. + (deliver p (future-call + pool + (with-meta + ;; Try to run the task, but + ;; definitely add the future to + ;; the queue. + #(try + (apply f a) + ;; Even if we had an exception + ;; running the task, make sure + ;; the future shows up in the + ;; queue. + (finally (.add q @p))) + {:args a}))) + ;; If we had an exception scheduling a task, + ;; let's plan to re-throw that at queue read + ;; time. + (catch Exception e + (.add q (delay (throw e)))))) (finally (when shutdown? (shutdown pool)))))] ;; Read results as available. (concat (for [_ args] (-> q .take deref)) diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index 0025ca9..c6bd5f5 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -398,6 +398,19 @@ (Thread/sleep 50) (is (= @started (set results) (set input))))))) +(defn check-shutdown-exceptions + "Check that exceptions are thrown when tasks go to a shutdown pool." + [pmap-like] + (cp/with-shutdown! [pool 2] + (let [input (range 4) + start (promise) + delayed-input (map (fn [i] (deref start) i) input) + results (future (pmap-like pool identity + (concat input delayed-input)))] + (Thread/sleep 50) + (cp/shutdown pool) + (deliver start true) + (is (thrown? Exception (doall @results)))))) (defn check-fn-exception "Check that a pmap function correctly passes exceptions caused by the @@ -515,7 +528,10 @@ (check-->threadpool pmap-like)) (testing (format "%s is made serial by binding cp/*parallel* to false" fn-name) - (check-*parallel*-disables pmap-like))) + (check-*parallel*-disables pmap-like)) + (testing (format "%s throws exceptions when tasks are sent to a shutdown pool" + fn-name) + (check-shutdown-exceptions pmap-like))) (deftest test-future @@ -552,7 +568,9 @@ pmap-like n pool) (when shutdown? (.shutdown pool)))) (testing "Binding cp/*parallel* can disable parallelism in future" - (check-*parallel*-disables pmap-like)))) + (check-*parallel*-disables pmap-like)) + (testing "Future throws exceptions when tasks are sent to a shutdown pool" + (check-shutdown-exceptions pmap-like)))) (deftest test-pmap (testing "basic pmap test" From 58481772ddab4e71152822db94aa494bad32c7fc Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Tue, 18 Feb 2014 11:29:49 -0800 Subject: [PATCH 13/20] Test chaining of pmaps. --- test/com/climate/claypoole_test.clj | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index c6bd5f5..e50ffe1 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -398,6 +398,27 @@ (Thread/sleep 50) (is (= @started (set results) (set input))))))) +(defn check-chaining + "Check that we can chain pmaps." + [pmap-like] + (cp/with-shutdown! [p1 (cp/threadpool 2) + p2 (cp/threadpool 4)] + (is (= (range 1 11) + (->> (range 10) + (pmap-like p1 inc) + sort))) + (is (= (range 2 12) + (->> (range 10) + (pmap-like p1 inc) + (pmap-like p1 inc) + sort))) + (is (= (range 3 13) + (->> (range 10) + (pmap-like p1 inc) + (pmap-like p1 inc) + (pmap-like p2 inc) + sort))))) + (defn check-shutdown-exceptions "Check that exceptions are thrown when tasks go to a shutdown pool." [pmap-like] @@ -531,7 +552,9 @@ (check-*parallel*-disables pmap-like)) (testing (format "%s throws exceptions when tasks are sent to a shutdown pool" fn-name) - (check-shutdown-exceptions pmap-like))) + (check-shutdown-exceptions pmap-like)) + (testing (format "%s can be chained in various threadpools" fn-name) + (check-chaining pmap-like))) (deftest test-future @@ -570,7 +593,9 @@ (testing "Binding cp/*parallel* can disable parallelism in future" (check-*parallel*-disables pmap-like)) (testing "Future throws exceptions when tasks are sent to a shutdown pool" - (check-shutdown-exceptions pmap-like)))) + (check-shutdown-exceptions pmap-like)) + (testing "Futures can be chained in various threadpools." + (check-chaining pmap-like)))) (deftest test-pmap (testing "basic pmap test" From 42e487edccc613fb4486e6b750865d362208e372 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Wed, 19 Feb 2014 16:15:17 -0800 Subject: [PATCH 14/20] Expose thread-factory. --- src/clj/com/climate/claypoole.clj | 10 ++++++++++ src/clj/com/climate/claypoole/impl.clj | 6 +++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index 41c0e4c..15f6c93 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -45,6 +45,16 @@ [] (.. Runtime getRuntime availableProcessors)) +(defn thread-factory + "Create a ThreadFactory with keyword options including thread daemon status + :daemon, the thread name format :name (a string for format with one integer), + and a thread priority :thread-priority. + + This is exposed as a public function because it's handy if you're + instantiating your own ExecutorServices." + [& args] + (apply impl/thread-factory args)) + (defn threadpool "Make a threadpool. It should be shutdown when no longer needed. diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index 45b5444..a6f9780 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -103,9 +103,9 @@ (apply f (concat args (mapcat identity arg-map))))) (defn thread-factory - "Create a ThreadFactory with options including thread daemon status, the - thread name format (a string for format with one integer), and a thread - priority." + "Create a ThreadFactory with keyword options including thread daemon status + :daemon, the thread name format :name (a string for format with one integer), + and a thread priority :thread-priority." ^ThreadFactory [& {:keys [daemon thread-priority] pool-name :name}] (let [daemon* (boolean daemon) pool-name* (or pool-name (default-threadpool-name)) From cde4b6186654116c922e4839654c4dbc1f2780ce Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 20 Feb 2014 10:09:44 -0800 Subject: [PATCH 15/20] Use ScheduledExecutorService --- src/clj/com/climate/claypoole.clj | 11 +++++++++-- src/clj/com/climate/claypoole/impl.clj | 7 +++++-- test/com/climate/claypoole_test.clj | 10 ++++++++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index 15f6c93..e4a6c07 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -29,7 +29,8 @@ Callable ExecutorService Future - LinkedBlockingQueue])) + LinkedBlockingQueue + ScheduledExecutorService])) (def ^:dynamic *parallel* @@ -71,7 +72,13 @@ \"name-1\", etc. Defaults to \"claypoole-[pool-number]\". :thread-priority, an integer in [Thread/MIN_PRIORITY, Thread/MAX_PRIORITY] giving the priority of each thread, defaults to the priority of - the current thread" + the current thread + + Note: Returns a ScheduledExecutorService rather than just an ExecutorService + because it's the same thing with a few bonus features." + ;; NOTE: The Clojure compiler doesn't seem to like the tests if we don't + ;; fully expand this typename. + ^java.util.concurrent.ScheduledExecutorService ;; NOTE: Although I'm repeating myself, I list all the threadpool-factory ;; arguments explicitly for API clarity. [n & {:keys [daemon thread-priority] pool-name :name}] diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index a6f9780..b3af458 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -29,6 +29,7 @@ Executors ExecutorService Future + ScheduledExecutorService ThreadFactory TimeoutException TimeUnit])) @@ -135,8 +136,10 @@ "Make a threadpool. It should be shutdown when no longer needed. See docs in com.climate.claypoole/threadpool." - [n & args] - (Executors/newFixedThreadPool n (apply thread-factory args))) + ^ScheduledExecutorService [n & args] + ;; Return a ScheduledThreadPool rather than a FixedThreadPool because it's + ;; the same thing with some bonus features. + (Executors/newScheduledThreadPool n (apply thread-factory args))) (defn- prioritize "Apply a priority function to a task. diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index e50ffe1..c496f47 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -25,6 +25,11 @@ ExecutorService])) +(defn callable + "Just a cast." + ^Callable [^clojure.lang.IFn f] + f) + (defn check-threadpool-options [pool-constructor] (cp/with-shutdown! [pool (pool-constructor 4)] @@ -237,7 +242,8 @@ (let [pool (cp/threadpool 4) start (promise) result (promise) - f (.submit pool #(deliver result (deref start)))] + myf #(deliver result (deref start)) + f (.submit pool (callable #(deliver result (deref start))))] (is (false? (cp/shutdown? pool))) (Thread/sleep 50) ;; Make sure the threadpool starts shutting down but doesn't complete @@ -258,7 +264,7 @@ (testing "Basic shutdown!" (let [pool (cp/threadpool 4) start (promise) - f (.submit pool #(deref start))] + f (.submit pool (callable #(deref start)))] (is (false? (cp/shutdown? pool))) (Thread/sleep 50) ;; Make sure the threadpool completes shutting down immediately. From 7ca0e6104460abfecda266a1579f9fd583e944f4 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 20 Feb 2014 15:14:53 -0800 Subject: [PATCH 16/20] Addressed Sebastian's first comments. --- README.md | 3 ++- test/com/climate/claypoole_test.clj | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7ad9865..4926438 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,8 @@ several ways: * We should be able to share a threadpool between multiple `pmap`s to control the amount of simultaneous work we're doing. * We would like it to be streaming rather than lazy, so we can start it going - and expect it to work in the background without using `doall`. + and expect it to work in the background without explicitly consuming the + results. * We would like to be able to do an unordered `pmap`, so that we can start handling the first response as fast as possible. diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index c496f47..4920fcb 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -242,7 +242,6 @@ (let [pool (cp/threadpool 4) start (promise) result (promise) - myf #(deliver result (deref start)) f (.submit pool (callable #(deliver result (deref start))))] (is (false? (cp/shutdown? pool))) (Thread/sleep 50) @@ -572,7 +571,8 @@ ;; Body can contain multiple elements. (reset! a true) (range 10))] - (is (= @f (range 10)))))) + (is (= @f (range 10))) + (is (true? @a))))) (testing "future threadpool args" (is (thrown? IllegalArgumentException (cp/future 3 (inc 1)))) (is (thrown? IllegalArgumentException (cp/future nil (inc 1)))) From b72bab992f1d3b6c3e80d5f875f4573c5eb207ab Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 20 Feb 2014 15:30:29 -0800 Subject: [PATCH 17/20] Resolved most of Sebastian's comments --- .../com/climate/claypoole/impl/PriorityFutureTask.java | 8 +------- .../climate/claypoole/impl/PriorityThreadpoolImpl.java | 2 ++ test/com/climate/claypoole_test.clj | 7 +++---- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/java/com/climate/claypoole/impl/PriorityFutureTask.java b/src/java/com/climate/claypoole/impl/PriorityFutureTask.java index d4ff8fd..79d0254 100644 --- a/src/java/com/climate/claypoole/impl/PriorityFutureTask.java +++ b/src/java/com/climate/claypoole/impl/PriorityFutureTask.java @@ -35,13 +35,7 @@ public PriorityFutureTask(Callable callable, long priority) { @Override public int compareTo(Prioritized other) { // Sort for descending order. - if (this.priority > other.getPriority()) { - return -1; - } else if (this.priority == other.getPriority()) { - return 0; - } else { - return 1; - } + return Long.compare(other.getPriority(), this.priority); } @Override diff --git a/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java index ee169e6..ed58e60 100644 --- a/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java +++ b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java @@ -50,6 +50,8 @@ public PriorityThreadpoolImpl(int corePoolSize, int maximumPoolSize, this.defaultPriority = defaultPriority; } + /** Get the priority of an object, using our defaultPriority as a backup. + */ protected long getPriority(Object o) { if (o instanceof Prioritized) { return ((Prioritized) o).getPriority(); diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index 4920fcb..366ccc9 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -73,10 +73,9 @@ completed (atom []) tasks (doall (for [i (range 10)] - (do - (cp/future (cp/with-priority pool i) - (deref start) - (swap! completed conj i)))))] + (cp/future (cp/with-priority pool i) + (deref start) + (swap! completed conj i))))] ;; start tasks (Thread/sleep 50) (deliver start true) From d4ef79c092a4ea5aefe29acd38a5f91ff4eecfda Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 20 Feb 2014 15:41:45 -0800 Subject: [PATCH 18/20] priority-fn is applied to list of args --- README.md | 4 ++-- src/clj/com/climate/claypoole.clj | 4 ++-- src/clj/com/climate/claypoole/impl.clj | 2 +- test/com/climate/claypoole_test.clj | 8 ++++++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 4926438..bb29bd2 100644 --- a/README.md +++ b/README.md @@ -232,9 +232,9 @@ of your tasks, or just set the `:priority` in your `for` loop: ;; Nothing bad happens if you nest with-priority. The outermost one "wins"; ;; this task runs at priority 2. (cp/future (cp/with-priority (cp-with-priority p1 1) 2) (myfn)) -;; For pmaps, you can use a priority function, which is called with your list of +;; For pmaps, you can use a priority function, which is called with your ;; arguments. This will run 3 tasks at priorities 6, 5, and 4, respectively. -(cp/upmap (cp/with-priority-fn p1 first) + [6 5 4] [1 2 3]) +(cp/upmap (cp/with-priority-fn p1 (fn [x _] x)) + [6 5 4] [1 2 3]) ;; For for loops, you can use the special :priority binding, which must be the ;; last for binding. (cp/upfor p1 [i (range 10) diff --git a/src/clj/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj index e4a6c07..bdd0d42 100644 --- a/src/clj/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -111,7 +111,7 @@ The priority function is applied to a pmap'd function's arguments. e.g. - (upmap (with-priority-fn p first) + [6 5 4] [1 2 3]) + (upmap (with-priority-fn p (fn [x _] x)) + [6 5 4] [1 2 3]) will use pool p to run tasks [(+ 6 1) (+ 5 2) (+ 4 3)] with priorities [6 5 4]." @@ -398,7 +398,7 @@ ;; If there's a priority, God help us--let's pull that thing out. (let [bindings* (vec (drop-last 2 bindings)) priority-value (last bindings)] - `(let [pool# (with-priority-fn ~pool second) + `(let [pool# (with-priority-fn ~pool (fn [_# p#] p#)) ;; We can't just make functions; we have to have the priority as ;; an argument to work with the priority-fn. [fns# priorities#] (apply map vector diff --git a/src/clj/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj index b3af458..cf0ba31 100644 --- a/src/clj/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -154,7 +154,7 @@ might want to cast the result to Runnable or otherwise use it carefully." [task, ^IFn priority-fn] (let [priority (try - (long (priority-fn (-> task meta :args))) + (long (apply priority-fn (-> task meta :args))) (catch Exception e (throw (ExecutionException. "Priority function exception" e))))] diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index 366ccc9..751bc2f 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -125,12 +125,14 @@ (testing "with-priority-fn works for simple upmap" (cp/with-shutdown! [pool (cp/priority-threadpool 1)] (let [start (promise) - results (cp/upmap (cp/with-priority-fn pool first) + results (cp/upmap (cp/with-priority-fn + pool (fn [& args] (first args))) (fn [i] (deref start) i) (range 10))] ;; start tasks + (Thread/sleep 50) (deliver start true) (is (= [0 9 8 7 6 5 4 3 2 1] results))))) @@ -146,7 +148,9 @@ (is (thrown-with-msg? ;; No arguments passed to priority function. Exception #"Priority function exception" - (deref (cp/future (cp/with-priority-fn pool first) 1))))))) + (deref (cp/future (cp/with-priority-fn + pool (fn [& args] (first args))) + 1))))))) (deftest test-for-priority (testing "pfor uses priority" From 6e9136dbc1f04f959bfd88a964b3bee4c3f78907 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 20 Feb 2014 15:47:01 -0800 Subject: [PATCH 19/20] Fix lint warning --- test/com/climate/claypoole_test.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index 751bc2f..442cff1 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -439,7 +439,7 @@ (Thread/sleep 50) (cp/shutdown pool) (deliver start true) - (is (thrown? Exception (doall @results)))))) + (is (thrown? Exception (dorun @results)))))) (defn check-fn-exception "Check that a pmap function correctly passes exceptions caused by the From 9068ac56561786c4fa4a0d4a29d96882df002108 Mon Sep 17 00:00:00 2001 From: Leon Barrett Date: Thu, 20 Feb 2014 15:47:25 -0800 Subject: [PATCH 20/20] Version bump --- README.md | 2 +- project.clj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index bb29bd2..496704d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ The claypoole library provides threadpool-based parallel versions of Clojure functions such as `pmap`, `future`, and `for`. Claypoole is available in the Clojars repository. Just use this leiningen -dependency: `[com.climate/claypoole "0.1.1"]`. +dependency: `[com.climate/claypoole "0.2.0"]`. ## Why do you use claypoole? diff --git a/project.clj b/project.clj index 5e7fd9b..f08cf62 100644 --- a/project.clj +++ b/project.clj @@ -12,7 +12,7 @@ ;; and limitations under the License. (defproject com.climate/claypoole - "0.2.0-SNAPSHOT" + "0.2.0" :description "Claypoole: Threadpool tools for Clojure." :url "http://github.com/TheClimateCorporation/claypoole/" :license {:name "Apache License Version 2.0"