diff --git a/.gitignore b/.gitignore index 6e2b14e..96cf2b0 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ pom.xml *.jar .*.swp .lein-failures +.lein-plugins .lein-repl-history .nrepl-port /classes/ diff --git a/CHANGES.txt b/CHANGES.txt index 8f25186..a0a1266 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,10 @@ CHANGES +0.2.0 + +- Added priority threadpools +- Made with-shutdown! option take multiple threadpools + 0.1.1 - Fixed major bug where pmap etc. blocked on reading the entire input stream diff --git a/README.md b/README.md index a42e32e..496704d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ The claypoole library provides threadpool-based parallel versions of Clojure functions such as `pmap`, `future`, and `for`. Claypoole is available in the Clojars repository. Just use this leiningen -dependency: `[com.climate/claypoole "0.1.1"]`. +dependency: `[com.climate/claypoole "0.2.0"]`. ## Why do you use claypoole? @@ -18,24 +18,25 @@ parallelism). Instead, people tend to fall back on Java. For instance, on the recommendation is to create an `ExecutorService` and call its `invokeAll` method. -Clojure's `pmap` function is another example of a simple parallelism tool that -was insufficiently flexible for our needs. - -* `pmap` uses a hardcoded number of threads (ncpus + 2). `pmap` is designed for - CPU-bound tasks, so it may not give adequate parallelism for latency-bound - tasks like network requests. -* `pmap` is lazy. It will only work a little ahead of where its output is - being read. However, usually we want parallelism so we can get things done as - fast as possible, in which case we want to do work eagerly. -* `pmap` is always ordered. Sometimes, to reduce latency, we just want to get - the results of tasks in the order they complete. - On the other hand, we do not need the flexible building blocks that are [`core.async`](https://github.com/clojure/core.async); we just want to run some simple tasks in parallel. Similarly, [`reducers`](http://clojure.org/reducers) is elegant, but we can't control its level of parallelism. +Essentially, we wanted a `pmap` function that improves on the original in +several ways: + +* We should be able to set the size of the threadpool `pmap` uses, so it would + be tunable for non-CPU-bound tasks like network requests. +* We should be able to share a threadpool between multiple `pmap`s to control + the amount of simultaneous work we're doing. +* We would like it to be streaming rather than lazy, so we can start it going + and expect it to work in the background without explicitly consuming the + results. +* We would like to be able to do an unordered `pmap`, so that we can start + handling the first response as fast as possible. + ## How do I use claypoole? To use claypoole, make a threadpool via `threadpool` and use it with @@ -79,17 +80,17 @@ possible. ```clojure (require '[com.climate.claypoole :as cp]) -(def net-pool (cp/threadpool 100)) -(def cpu-pool (cp/threadpool (cp/ncpus))) -;; Unordered pmap doesn't return output in the same order as the input(!), but -;; that means we can start using service2 as soon as possible. -(def service1-responses (cp/upmap net-pool service1-request myinputs)) -(def service2-responses (cp/upmap net-pool service2-request service1-responses)) -(def results (cp/upmap cpu-pool handle-response service2-responses)) -;; ...eventually... -;; The JVM doesn't automatically clean up threads for us. -(cp/shutdown net-pool) -(cp/shutdown cpu-pool) +;; We'll use the with-shutdown! form to guarantee that pools are cleaned up. +(cp/with-shutdown! [net-pool (cp/threadpool 100) + cpu-pool (cp/threadpool (cp/ncpus))] + ;; Unordered pmap doesn't return output in the same order as the input(!), + ;; but that means we can start using service2 as soon as possible. + (def service1-resps (cp/upmap net-pool service1-request myinputs)) + (def service2-resps (cp/upmap net-pool service2-request service1-resps)) + (def results (cp/upmap cpu-pool handle-response service2-resps)) + ;; ...eventually... + ;; Make sure sure the computation is complete before we shutdown the pools. + (doall results)) ``` Claypoole provides ordered and unordered parallel `for` macros. Note that only @@ -107,6 +108,17 @@ binding will be done in the calling thread. (myfn x y))) ``` +Claypoole also lets you prioritize your backlog of tasks. Higher-priority tasks +will be assigned to threads first. Here's an example; there is a more detailed +description below. + +```clojure +(def pool (cp/priority-threadpool 10)) +(def task1 (cp/future (cp/with-priority pool 1000) (myfn 1))) +(def task2 (cp/future (cp/with-priority pool 0) (myfn 2))) +(def moretasks (cp/pmap (cp/with-priority pool 10) myfn (range 3 10))) +``` + ## Do I really need to manage all those threadpools? You don't need to specifically declare a threadpool. Instead, you can just give @@ -169,7 +181,10 @@ exits. You can create a daemon threadpool via: To construct a threadpool, use the `threadpool` function. It takes optional keyword arguments allowing you to change the thread names, their daemon status, -and their priority. +and their priority. (NOTE: Thread priority is [a system-level property that +depends on the +OS](http://oreilly.com/catalog/expjava/excerpt/#EXJ-CH-6-SECT-4); it is not the +same as the task priority, described below.) ```clojure (def pool (cp/threadpool (cp/ncpus) @@ -199,10 +214,46 @@ like so: (cp/pmap pool myfn inputs))) ``` +## How can I prioritize my tasks? + +You can create a threadpool that respects task priorities by creating a +`priority-threadpool`: + +```clojure +(def p1 (cp/priority-threadpool 5)) +(def p2 (cp/priority-threadpool 5 :default-priority -10)) +``` + +Then, use functions `with-priority` and `with-priority-fn` to set the priority +of your tasks, or just set the `:priority` in your `for` loop: + +```clojure +(cp/future (cp/with-priority p1 100) (myfn)) +;; Nothing bad happens if you nest with-priority. The outermost one "wins"; +;; this task runs at priority 2. +(cp/future (cp/with-priority (cp-with-priority p1 1) 2) (myfn)) +;; For pmaps, you can use a priority function, which is called with your +;; arguments. This will run 3 tasks at priorities 6, 5, and 4, respectively. +(cp/upmap (cp/with-priority-fn p1 (fn [x _] x)) + [6 5 4] [1 2 3]) +;; For for loops, you can use the special :priority binding, which must be the +;; last for binding. +(cp/upfor p1 [i (range 10) + :priority (- i)] + (myfn i)) +``` + +## What about Java interoperability? + +Under the hood, threadpools are just instances of +`java.util.concurrent.ExecutorService`. You can use any `ExecutorService` in +place of a threadpool, and you can use a threadpool just as you would an +`ExecutorService`. This means you can create custom threadpools and use them +easily. + ## Why the name "Claypoole"? -The claypoole library is named after [John -Claypoole](http://en.wikipedia.org/wiki/Betsy_Ross) for reasons that are at +The claypoole library is named after [John Claypoole (Betsy Ross's third +husband)](http://en.wikipedia.org/wiki/Betsy_Ross) for reasons that are at best obscure. ## License diff --git a/project.clj b/project.clj index ddc8798..f08cf62 100644 --- a/project.clj +++ b/project.clj @@ -12,13 +12,15 @@ ;; and limitations under the License. (defproject com.climate/claypoole - "0.1.1" + "0.2.0" :description "Claypoole: Threadpool tools for Clojure." :url "http://github.com/TheClimateCorporation/claypoole/" :license {:name "Apache License Version 2.0" :url http://www.apache.org/licenses/LICENSE-2.0 :distribution :repo} :min-lein-version "2.0.0" + :source-paths ["src/clj"] + :java-source-paths ["src/java"] :pedantic? :warn :dependencies [[org.clojure/clojure "1.5.1"]] :plugins [[jonase/eastwood "0.1.0"]]) diff --git a/src/com/climate/claypoole.clj b/src/clj/com/climate/claypoole.clj similarity index 53% rename from src/com/climate/claypoole.clj rename to src/clj/com/climate/claypoole.clj index a9b95ea..bdd0d42 100644 --- a/src/com/climate/claypoole.clj +++ b/src/clj/com/climate/claypoole.clj @@ -12,17 +12,25 @@ ;; and limitations under the License. (ns com.climate.claypoole - "Threadpool tools for Clojure." + "Threadpool tools for Clojure. + + A threadpool is just an ExecutorService with a fixed number of threads. In + general, you can use your own ExecutorService in place of any threadpool, and + you can treat a threadpool as you would any other ExecutorService." (:refer-clojure :exclude [future future-call pcalls pmap pvalues]) (:require [clojure.core :as core] [com.climate.claypoole.impl :as impl]) (:import + [com.climate.claypoole.impl + PriorityThreadpool + PriorityThreadpoolImpl] [java.util.concurrent Callable ExecutorService Future - LinkedBlockingQueue])) + LinkedBlockingQueue + ScheduledExecutorService])) (def ^:dynamic *parallel* @@ -38,9 +46,23 @@ [] (.. Runtime getRuntime availableProcessors)) +(defn thread-factory + "Create a ThreadFactory with keyword options including thread daemon status + :daemon, the thread name format :name (a string for format with one integer), + and a thread priority :thread-priority. + + This is exposed as a public function because it's handy if you're + instantiating your own ExecutorServices." + [& args] + (apply impl/thread-factory args)) + (defn threadpool "Make a threadpool. It should be shutdown when no longer needed. + A threadpool is just an ExecutorService with a fixed number of threads. In + general, you can use your own ExecutorService in place of any threadpool, and + you can treat a threadpool as you would any other ExecutorService. + This takes optional keyword arguments: :daemon, a boolean indicating whether the threads are daemon threads, which will automatically die when the JVM exits, defaults to @@ -50,19 +72,83 @@ \"name-1\", etc. Defaults to \"claypoole-[pool-number]\". :thread-priority, an integer in [Thread/MIN_PRIORITY, Thread/MAX_PRIORITY] giving the priority of each thread, defaults to the priority of - the current thread" - ([] (threadpool (+ 2 (ncpus)))) - ([n & {:keys [daemon thread-priority] pool-name :name}] - (impl/threadpool n - :daemon daemon - :name pool-name - :thread-priority thread-priority))) + the current thread + + Note: Returns a ScheduledExecutorService rather than just an ExecutorService + because it's the same thing with a few bonus features." + ;; NOTE: The Clojure compiler doesn't seem to like the tests if we don't + ;; fully expand this typename. + ^java.util.concurrent.ScheduledExecutorService + ;; NOTE: Although I'm repeating myself, I list all the threadpool-factory + ;; arguments explicitly for API clarity. + [n & {:keys [daemon thread-priority] pool-name :name}] + (impl/threadpool n + :daemon daemon + :name pool-name + :thread-priority thread-priority)) + +(defn priority-threadpool + "Make a threadpool that chooses tasks based on their priorities. + + Assign priorities to tasks by wrapping the pool with with-priority or + with-priority-fn. You can also set a default priority with keyword argument + :default-priority. + + Otherwise, this uses the same keyword arguments as threadpool, and functions + just like any other ExecutorService." + ^PriorityThreadpool + [n & {:keys [default-priority] :as args + :or {default-priority 0}}] + (PriorityThreadpool. + (PriorityThreadpoolImpl. n + ;; Use our thread factory options. + (impl/apply-map impl/thread-factory args) + default-priority) + (constantly default-priority))) + +(defn with-priority-fn + "Make a priority-threadpool wrapper that uses a given priority function. + + The priority function is applied to a pmap'd function's arguments. e.g. + + (upmap (with-priority-fn p (fn [x _] x)) + [6 5 4] [1 2 3]) + + will use pool p to run tasks [(+ 6 1) (+ 5 2) (+ 4 3)] + with priorities [6 5 4]." + ^PriorityThreadpool [^PriorityThreadpool pool priority-fn] + (let [^PriorityThreadpoolImpl pool* (.pool pool)] + (PriorityThreadpool. pool* priority-fn))) + +(defn with-priority + "Make a priority-threadpool wrapper with a given fixed priority. + + All tasks run with this pool wrapper will have the given priority. e.g. + + (def t1 (future (with-priority p 1) 1)) + (def t2 (future (with-priority p 2) 2)) + (def t3 (future (with-priority p 3) 3)) + + will use pool p to run these tasks with priorities 1, 2, and 3 respectively. + + If you nest priorities, the outermost one \"wins\", so this task will be run + at priority 3: + + (def wp (with-priority p 1)) + (def t1 (future (with-priority (with-priority wp 2) 3) :result)) + " + ^ExecutorService [^ExecutorService pool priority] + (with-priority-fn pool (constantly priority))) (defn threadpool? "Returns true iff the argument is a threadpool." [pool] (instance? ExecutorService pool)) +(defn priority-threadpool? + "Returns true iff the argument is a priority-threadpool." + [pool] + (instance? PriorityThreadpool pool)) + (defn shutdown "Syntactic sugar to stop a pool cleanly. This will stop the pool from accepting any new requests." @@ -96,8 +182,9 @@ (with-shutdown! [pool (theadpool 6)] (doall (pmap pool identity (range 1000)))) - (with-shutdown! [pool 6] - (doall (pmap pool identity (range 1000)))) + (with-shutdown! [pool1 6 + pool2 :serial] + (doall (pmap pool1 identity (range 1000)))) Bad example: @@ -105,14 +192,20 @@ ;; Some of these tasks may be killed! (pmap pool identity (range 1000))) " - [[pool-sym pool-init] & body] - `(let [pool-init# ~pool-init] - (let [[_# ~pool-sym] (impl/->threadpool pool-init#)] - (try - ~@body - (finally - (when (threadpool? ~pool-sym) - (shutdown! ~pool-sym))))))) + [pool-syms-and-inits & body] + (when-not (even? (count pool-syms-and-inits)) + (throw (IllegalArgumentException. + "with-shutdown! requires an even number of binding forms"))) + (if (empty? pool-syms-and-inits) + `(do ~@body) + (let [[pool-sym pool-init & more] pool-syms-and-inits] + `(let [pool-init# ~pool-init + [_# ~pool-sym] (impl/->threadpool pool-init#)] + (try + (with-shutdown! ~more ~@body) + (finally + (when (threadpool? ~pool-sym) + (shutdown! ~pool-sym)))))))) (defn- serial? "Check if we should run this computation in serial." @@ -142,6 +235,7 @@ ^ExecutorService pool* pool ^Callable f* (impl/binding-conveyor-fn f) fut (.submit pool* f*)] + ;; Make an object just like Clojure futures. (reify clojure.lang.IDeref (deref [_] (impl/deref-future fut)) @@ -192,11 +286,18 @@ serial via (doall map). This may be helpful during profiling, for example. " [pool f & arg-seqs] + (when (empty? arg-seqs) + (throw (IllegalArgumentException. + "pmap requires at least one sequence to map over"))) (if (serial? pool) (doall (apply map f arg-seqs)) (let [[shutdown? pool] (impl/->threadpool pool) + ;; Use map to handle the argument sequences. args (apply map vector (map impl/unchunk arg-seqs)) - futures (map (fn [a] (future pool (apply f a))) args) + futures (for [a args] + (future-call pool + (with-meta #(apply f a) + {:args a}))) ;; Start eagerly parallel processing. read-future (core/future (try @@ -211,19 +312,47 @@ "Like pmap, except the return value is a sequence of results ordered by *completion time*, not by input order." [pool f & arg-seqs] + (when (empty? arg-seqs) + (throw (IllegalArgumentException. + "upmap requires at least one sequence to map over"))) (if (serial? pool) (doall (apply map f arg-seqs)) (let [[shutdown? pool] (impl/->threadpool pool) + ;; Use map to handle the argument sequences. args (apply map vector (map impl/unchunk arg-seqs)) q (LinkedBlockingQueue.) ;; Start eagerly parallel processing. read-future (core/future + ;; Try to run schedule all the tasks, but definitely + ;; shutdown the pool if necessary. (try (doseq [a args :let [p (promise)]] - (deliver p (future pool (try - (apply f a) - (finally (.add q @p)))))) + ;; Try to schedule one task, but definitely add + ;; something to the queue for the task. + (try + ;; We can't directly make a future add itself to + ;; a queue. Instead, we use a promise for + ;; indirection. + (deliver p (future-call + pool + (with-meta + ;; Try to run the task, but + ;; definitely add the future to + ;; the queue. + #(try + (apply f a) + ;; Even if we had an exception + ;; running the task, make sure + ;; the future shows up in the + ;; queue. + (finally (.add q @p))) + {:args a}))) + ;; If we had an exception scheduling a task, + ;; let's plan to re-throw that at queue read + ;; time. + (catch Exception e + (.add q (delay (throw e)))))) (finally (when shutdown? (shutdown pool)))))] ;; Read results as available. (concat (for [_ args] (-> q .take deref)) @@ -256,6 +385,28 @@ [pool & exprs] `(upcalls ~pool ~@(for [e exprs] `(fn [] ~e)))) +(defn- pfor-internal + "Do the messy parsing of the :priority from the for bindings." + [pool bindings body pmap-fn-sym] + (when (vector? pool) + (throw (IllegalArgumentException. + (str "Got a vector instead of a pool--" + "did you forget to use a threadpool?")))) + (if-not (= :priority (first (take-last 2 bindings))) + ;; If there's no priority, everything is simple. + `(~pmap-fn-sym ~pool #(%) (for ~bindings (fn [] ~@body))) + ;; If there's a priority, God help us--let's pull that thing out. + (let [bindings* (vec (drop-last 2 bindings)) + priority-value (last bindings)] + `(let [pool# (with-priority-fn ~pool (fn [_# p#] p#)) + ;; We can't just make functions; we have to have the priority as + ;; an argument to work with the priority-fn. + [fns# priorities#] (apply map vector + (for ~bindings* + [(fn [priority#] ~@body) + ~priority-value]))] + (~pmap-fn-sym pool# #(%1 %2) fns# priorities#))))) + (defmacro pfor "A parallel version of for. It is like for, except it takes a threadpool and is parallel. For more detail on its parallelism and on its threadpool @@ -265,25 +416,19 @@ in serial, so while this will call complex-computation in parallel: (pfor pool [i (range 1000)] (complex-computation i)) this will not have useful parallelism: - (pfor pool [i (range 1000) :let [result (complex-computation i)] result) + (pfor pool [i (range 1000) :let [result (complex-computation i)]] result) + + You can use the special binding :priority (which must be the last binding) to + set the priorities of the tasks. + (upfor (priority-threadpool 10) [i (range 1000) + :priority (inc i)] + (complex-computation i)) " [pool bindings & body] - `(apply pcalls ~pool (for ~bindings (fn [] ~@body)))) + (pfor-internal pool bindings body `pmap)) (defmacro upfor - "An unordered, parallel version of for. It is like for, except it takes a - threadpool, is parallel, and returns its results ordered by completion time. - For more detail on its parallelism and on its threadpool argument, see - upmap. - - Note that while the body is executed in parallel, the bindings are executed - in serial, so while this will call complex-computation in parallel: - (upfor pool [i (range 1000)] - (complex-computation i)) - this will not have useful parallelism: - (upfor pool [i (range 1000) - :let [result (complex-computation i)]] - result) - " + "Like pfor, except the return value is a sequence of results ordered by + *completion time*, not by input order." [pool bindings & body] - `(apply upcalls ~pool (for ~bindings (fn [] ~@body)))) + (pfor-internal pool bindings body `upmap)) diff --git a/src/com/climate/claypoole/impl.clj b/src/clj/com/climate/claypoole/impl.clj similarity index 51% rename from src/com/climate/claypoole/impl.clj rename to src/clj/com/climate/claypoole/impl.clj index 88079b8..cf0ba31 100644 --- a/src/com/climate/claypoole/impl.clj +++ b/src/clj/com/climate/claypoole/impl.clj @@ -16,22 +16,35 @@ (:require [clojure.core :as core]) (:import + [clojure.lang + IFn] + [com.climate.claypoole.impl + Prioritized + PriorityThreadpoolImpl] + [java.util + Collection + List] [java.util.concurrent + ExecutionException Executors ExecutorService Future + ScheduledExecutorService ThreadFactory TimeoutException TimeUnit])) (defn binding-conveyor-fn - "Like clojure.core/binding-conveyor-fn." + "Like clojure.core/binding-conveyor-fn for resetting bindings to run a + function in another thread." [f] (let [frame (clojure.lang.Var/cloneThreadBindingFrame)] - (fn [] - (clojure.lang.Var/resetThreadBindingFrame frame) - (f)))) + (with-meta + (fn [] + (clojure.lang.Var/resetThreadBindingFrame frame) + (f)) + (meta f)))) (defn deref-future "Like clojure.core/deref-future." @@ -51,9 +64,7 @@ clojure.lang.IDeref (deref [_] result) clojure.lang.IBlockingDeref - (deref - [_ timeout-ms timeout-val] - result) + (deref [_ timeout-ms timeout-val] result) clojure.lang.IPending (isRealized [_] true) Future @@ -85,25 +96,32 @@ [] (format "claypoole-%d" (swap! threadpool-id inc))) +(defn apply-map + "Apply a function that takes keyword arguments to a map of arguments." + [f & args] + (let [args (drop-last args) + arg-map (last args)] + (apply f (concat args (mapcat identity arg-map))))) + (defn thread-factory - "Create a ThreadFactory with options including thread daemon status, the - thread name format (a string for format with one integer), and a thread - priority." + "Create a ThreadFactory with keyword options including thread daemon status + :daemon, the thread name format :name (a string for format with one integer), + and a thread priority :thread-priority." ^ThreadFactory [& {:keys [daemon thread-priority] pool-name :name}] (let [daemon* (boolean daemon) pool-name* (or pool-name (default-threadpool-name)) - thread-priority* (or thread-priority (.getPriority (Thread/currentThread)))] + thread-priority* (or thread-priority + (.getPriority (Thread/currentThread)))] (let [default-factory (Executors/defaultThreadFactory) ;; The previously-used thread ID. Start at -1 so we can just use the ;; return value of (swap! inc). thread-id (atom -1)] (reify ThreadFactory (^Thread newThread [_ ^Runnable r] - (let [t (.newThread default-factory r)] - (doto t - (.setDaemon daemon*) - (.setName (str pool-name* "-" (swap! thread-id inc))) - (.setPriority thread-priority*)))))))) + (doto (.newThread default-factory r) + (.setDaemon daemon*) + (.setName (str pool-name* "-" (swap! thread-id inc))) + (.setPriority thread-priority*))))))) (defn unchunk "Takes a seqable and returns a lazy sequence that is maximally lazy. @@ -117,13 +135,65 @@ (defn threadpool "Make a threadpool. It should be shutdown when no longer needed. - See docs in com.climate.claypoole/threadpool. - " - [n & {:keys [daemon thread-priority] pool-name :name}] - (let [factory (thread-factory :daemon daemon - :name pool-name - :thread-priority thread-priority)] - (Executors/newFixedThreadPool n factory))) + See docs in com.climate.claypoole/threadpool." + ^ScheduledExecutorService [n & args] + ;; Return a ScheduledThreadPool rather than a FixedThreadPool because it's + ;; the same thing with some bonus features. + (Executors/newScheduledThreadPool n (apply thread-factory args))) + +(defn- prioritize + "Apply a priority function to a task. + + Note that this re-throws all priority-fn exceptions as ExecutionExceptions. + That shouldn't mess anything up because the caller re-throws it as an + ExecutionException anyway. + + For simplicity, prioritize reifies both Callable and Runnable, rather than + having one prioritize function for each of those types. That means, for + example, that if you prioritize a Runnable that is not also a Callable, you + might want to cast the result to Runnable or otherwise use it carefully." + [task, ^IFn priority-fn] + (let [priority (try + (long (apply priority-fn (-> task meta :args))) + (catch Exception e + (throw (ExecutionException. + "Priority function exception" e))))] + (reify + Callable + (call [_] (.call ^Callable task)) + Runnable + (run [_] (.run ^Runnable task)) + Prioritized + (getPriority [_] priority)))) + +;; A Threadpool that applies a priority function to tasks and uses a +;; PriorityThreadpoolImpl to run them. +(deftype PriorityThreadpool [^PriorityThreadpoolImpl pool, ^IFn priority-fn] + ExecutorService + (^boolean awaitTermination [_, ^long timeout, ^TimeUnit unit] + (.awaitTermination pool timeout unit)) + (^List invokeAll [_, ^Collection tasks] + (.invokeAll pool (map #(prioritize % priority-fn) tasks))) + (^List invokeAll [_, ^Collection tasks, ^long timeout, ^TimeUnit unit] + (.invokeAll pool (map #(prioritize % priority-fn) tasks) timeout unit)) + (^Object invokeAny [_, ^Collection tasks] + (.invokeAny pool (map #(prioritize % priority-fn) tasks))) + (^Object invokeAny [_, ^Collection tasks, ^long timeout, ^TimeUnit unit] + (.invokeAny pool (map #(prioritize % priority-fn) tasks) timeout unit)) + (^boolean isShutdown [_] + (.isShutdown pool)) + (^boolean isTerminated [_] + (.isTerminated pool)) + (shutdown [_] + (.shutdown pool)) + (^List shutdownNow [_] + (.shutdownNow pool)) + (^Future submit [_, ^Runnable task] + (.submit pool ^Runnable (prioritize task priority-fn))) + (^Future submit [_, ^Runnable task, ^Object result] + (.submit pool ^Runnable (prioritize task priority-fn) result)) + (^Future submit [_ ^Callable task] + (.submit pool ^Callable (prioritize task priority-fn)))) (defn ->threadpool "Convert the argument into a threadpool, leaving the special keyword :serial diff --git a/src/java/com/climate/claypoole/impl/Prioritized.java b/src/java/com/climate/claypoole/impl/Prioritized.java new file mode 100644 index 0000000..cbd2785 --- /dev/null +++ b/src/java/com/climate/claypoole/impl/Prioritized.java @@ -0,0 +1,19 @@ +// The Climate Corporation licenses this file to you under under the Apache +// License, Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package com.climate.claypoole.impl; + +/** An object with a priority. */ +public interface Prioritized { + public long getPriority(); +} diff --git a/src/java/com/climate/claypoole/impl/PriorityFutureTask.java b/src/java/com/climate/claypoole/impl/PriorityFutureTask.java new file mode 100644 index 0000000..79d0254 --- /dev/null +++ b/src/java/com/climate/claypoole/impl/PriorityFutureTask.java @@ -0,0 +1,45 @@ +// The Climate Corporation licenses this file to you under under the Apache +// License, Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package com.climate.claypoole.impl; + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +/** A prioritized, sortable FutureTask. */ +public class PriorityFutureTask + extends FutureTask + implements Prioritized, Comparable { + private long priority; + + public PriorityFutureTask(Runnable runnable, V value, long priority) { + super(runnable, value); + this.priority = priority; + } + + public PriorityFutureTask(Callable callable, long priority) { + super(callable); + this.priority = priority; + } + + @Override + public int compareTo(Prioritized other) { + // Sort for descending order. + return Long.compare(other.getPriority(), this.priority); + } + + @Override + public long getPriority() { + return this.priority; + } +} diff --git a/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java new file mode 100644 index 0000000..ed58e60 --- /dev/null +++ b/src/java/com/climate/claypoole/impl/PriorityThreadpoolImpl.java @@ -0,0 +1,77 @@ +// The Climate Corporation licenses this file to you under under the Apache +// License, Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package com.climate.claypoole.impl; + +import java.util.concurrent.Callable; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** A fixed-size threadpool that does tasks in priority order. + * + * Submitted tasks have their own priority if they implement Prioritized; + * otherwise, they are assigned the pool's default priority. + */ +public class PriorityThreadpoolImpl extends ThreadPoolExecutor { + public PriorityThreadpoolImpl(int poolSize) { + this(poolSize, 0); + } + + public PriorityThreadpoolImpl(int poolSize, long defaultPriority) { + super(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, + new PriorityBlockingQueue(poolSize)); + this.defaultPriority = defaultPriority; + } + + public PriorityThreadpoolImpl(int poolSize, ThreadFactory threadFactory, + long defaultPriority) { + this(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, threadFactory, + defaultPriority); + } + + public PriorityThreadpoolImpl(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, long defaultPriority) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, + new PriorityBlockingQueue(corePoolSize), threadFactory); + this.defaultPriority = defaultPriority; + } + + /** Get the priority of an object, using our defaultPriority as a backup. + */ + protected long getPriority(Object o) { + if (o instanceof Prioritized) { + return ((Prioritized) o).getPriority(); + } + return defaultPriority; + } + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new PriorityFutureTask(runnable, value, getPriority(runnable)); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return new PriorityFutureTask(callable, getPriority(callable)); + } + + public long getDefaultPriority() { + return defaultPriority; + } + + protected long defaultPriority; +} diff --git a/test/com/climate/claypoole_test.clj b/test/com/climate/claypoole_test.clj index e98d467..442cff1 100644 --- a/test/com/climate/claypoole_test.clj +++ b/test/com/climate/claypoole_test.clj @@ -17,11 +17,41 @@ [com.climate.claypoole :as cp] [com.climate.claypoole.impl :as impl]) (:import + [com.climate.claypoole.impl + ;; Import to make Eastwood happy. + PriorityThreadpool] [java.util.concurrent ExecutionException ExecutorService])) +(defn callable + "Just a cast." + ^Callable [^clojure.lang.IFn f] + f) + +(defn check-threadpool-options + [pool-constructor] + (cp/with-shutdown! [pool (pool-constructor 4)] + (dotimes [_ 8] (.submit pool #(inc 1))) + (let [factory (.getThreadFactory pool) + start (promise) + thread (.newThread factory #(deref start))] + (is (false? (.isDaemon thread))) + (is (not (empty? (re-find #"claypoole-[0-9]*-4" (.getName thread))))) + (is (= (.getPriority (Thread/currentThread)) (.getPriority thread))))) + (cp/with-shutdown! [pool (pool-constructor 4 + :daemon true + :name "fiberpond" + :thread-priority 4)] + (dotimes [_ 8] (.submit pool #(inc 1))) + (let [factory (.getThreadFactory pool) + start (promise) + thread (.newThread factory #(deref start))] + (is (true? (.isDaemon thread))) + (is (= "fiberpond-4" (.getName thread))) + (is (= 4 (.getPriority thread)))))) + (deftest test-threadpool (testing "Basic threadpool creation" (cp/with-shutdown! [pool (cp/threadpool 4)] @@ -29,30 +59,183 @@ (dotimes [_ 8] (.submit pool #(inc 1))) (is (= 4 (.getPoolSize pool))))) (testing "Threadpool options" - (cp/with-shutdown! [pool (cp/threadpool 4)] - (dotimes [_ 8] (.submit pool #(inc 1))) - (let [factory (.getThreadFactory pool) - start (promise) - thread (.newThread factory #(deref start))] - (is (false? (.isDaemon thread))) - (is (not (empty? (re-find #"claypoole-[0-9]*-4" (.getName thread))))) - (is (= (.getPriority (Thread/currentThread)) (.getPriority thread))))) - (cp/with-shutdown! [pool (cp/threadpool 4 - :daemon true - :name "fiberpond" - :thread-priority 4)] - (dotimes [_ 8] (.submit pool #(inc 1))) - (let [factory (.getThreadFactory pool) - start (promise) - thread (.newThread factory #(deref start))] - (is (true? (.isDaemon thread))) - (is (= "fiberpond-4" (.getName thread))) - (is (= 4 (.getPriority thread))))))) + (check-threadpool-options cp/threadpool))) + +(defn- sorted*? + "Is a sequence sorted?" + [x] + (= x (sort x))) + +(deftest test-priority-threadpool + (testing "Priority threadpool ordering is mostly in order" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (doall + (for [i (range 10)] + (cp/future (cp/with-priority pool i) + (deref start) + (swap! completed conj i))))] + ;; start tasks + (Thread/sleep 50) + (deliver start true) + ;; Wait for tasks to complete + (doseq [f tasks] (deref f)) + (is (= [0 9 8 7 6 5 4 3 2 1] + @completed))))) + (testing "Priority threadpool ordering is ordered with unordered inputs." + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (doall + (for [i (shuffle (range 100))] + (cp/future (cp/with-priority pool i) + (deref start) + (swap! completed conj i))))] + ;; start tasks + (deliver start true) + ;; Wait for tasks to complete + (doseq [f tasks] (deref f)) + (is (sorted*? + (-> completed + deref + ;; The first task will be one at random, so drop it + rest + reverse)))))) + (testing "Priority threadpool default priority." + (cp/with-shutdown! [pool (cp/priority-threadpool 1 :default-priority 50)] + (let [start (promise) + completed (atom []) + run (fn [result] (deref start) (swap! completed conj result)) + first-task (cp/future pool (run :first)) + tasks (doall + (for [i [1 100]] + (cp/future (cp/with-priority pool i) (run i)))) + default-task (cp/future pool (run :default))] + ;; start tasks + (deliver start true) + ;; Wait for tasks to complete + (doseq [f tasks] (deref f)) + (deref default-task) + (is (= [:first 100 :default 1] @completed))))) + (testing "Priority threadpool options" + (check-threadpool-options cp/threadpool))) + +(deftest test-with-priority-fn + (testing "with-priority-fn works for simple upmap" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + results (cp/upmap (cp/with-priority-fn + pool (fn [& args] (first args))) + (fn [i] + (deref start) + i) + (range 10))] + ;; start tasks + (Thread/sleep 50) + (deliver start true) + (is (= [0 9 8 7 6 5 4 3 2 1] + results))))) + (testing "with-priority-fn throws sensible exceptions" + (cp/with-shutdown! [pool (cp/priority-threadpool 2)] + (is (thrown-with-msg? + Exception #"Priority function exception" + ;; Arity exception. + (dorun + (cp/pmap (cp/with-priority-fn pool (fn [] 0)) + (fn [x y] (+ x y)) + (range 10) (range 10))))) + (is (thrown-with-msg? + ;; No arguments passed to priority function. + Exception #"Priority function exception" + (deref (cp/future (cp/with-priority-fn + pool (fn [& args] (first args))) + 1))))))) + +(deftest test-for-priority + (testing "pfor uses priority" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (cp/pfor pool + [i (range 100) + :priority (inc i)] + (deref start) + (swap! completed conj i) + i)] + (Thread/sleep 50) + (deliver start true) + (dorun tasks) + ;; Just worry about the rest of the tasks; the first one may be out of + ;; order. + (is (sorted*? (reverse (rest @completed)))) + (is (= (range 100) tasks))))) + (testing "upfor uses priority" + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + completed (atom []) + tasks (cp/upfor pool + [i (range 100) + :priority (inc i)] + (deref start) + (swap! completed conj i) + i)] + (Thread/sleep 50) + (deliver start true) + (dorun tasks) + ;; Just worry about the rest of the tasks; the first one may be out of + ;; order. + (is (sorted*? (reverse (rest @completed)))) + (is (= @completed tasks)))))) + +(deftest test-priority-nonIObj + (testing "A priority pool should work on any sort of Callable." + (cp/with-shutdown! [pool (cp/priority-threadpool 1)] + (let [start (promise) + results (atom []) + run (fn [x] (deref start) (swap! results conj x))] + ;; Dummy task, always runs first. + (cp/future (cp/with-priority pool 100) + (run 100)) + ;; Runnables. + (.submit (cp/with-priority pool 1) + (reify Runnable (run [_] (run 1)))) + (.submit (cp/with-priority pool 10) + (reify Runnable (run [_] (run 10)))) + ;; Runnables with return value. + (.submit (cp/with-priority pool 2) + (reify Runnable (run [_] (run 2))) + :return-value) + (.submit (cp/with-priority pool 9) + (reify Runnable (run [_] (run 9))) + :return-value) + (cp/future (cp/with-priority pool 6) + (run 6)) + (cp/future (cp/with-priority pool 11) + (run 11)) + ;; Callables + (.submit (cp/with-priority pool 3) + (reify Callable (call [_] (run 3)))) + (.submit (cp/with-priority pool 8) + (reify Callable (call [_] (run 8)))) + ;; And another couple IFns for good measure + (cp/future (cp/with-priority pool 5) + (run 5)) + (cp/future (cp/with-priority pool 7) + (run 7)) + ;; Make them go. + (Thread/sleep 50) + (deliver start true) + ;; Check the results + (Thread/sleep 50) + (is (sorted*? (reverse @results))))))) (deftest test-threadpool? (testing "Basic threadpool?" - (cp/with-shutdown! [pool 4] + (cp/with-shutdown! [pool 4 + priority-pool (cp/priority-threadpool 4)] (is (true? (cp/threadpool? pool))) + (is (true? (cp/threadpool? priority-pool))) (is (false? (cp/threadpool? :serial))) (is (false? (cp/threadpool? nil))) (is (false? (cp/threadpool? 1)))))) @@ -62,7 +245,7 @@ (let [pool (cp/threadpool 4) start (promise) result (promise) - f (.submit pool #(deliver result (deref start)))] + f (.submit pool (callable #(deliver result (deref start))))] (is (false? (cp/shutdown? pool))) (Thread/sleep 50) ;; Make sure the threadpool starts shutting down but doesn't complete @@ -83,7 +266,7 @@ (testing "Basic shutdown!" (let [pool (cp/threadpool 4) start (promise) - f (.submit pool #(deref start))] + f (.submit pool (callable #(deref start)))] (is (false? (cp/shutdown? pool))) (Thread/sleep 50) ;; Make sure the threadpool completes shutting down immediately. @@ -118,6 +301,32 @@ (Thread/sleep 50) (is (.isDone @@fp)) (is (thrown? ExecutionException (deref @@fp))))))) + (testing "With-shutdown! works with any number of threadpools" + (let [input (range 100)] + (is (= input + (cp/with-shutdown! [] + (map identity input)))) + (is (= input + (cp/with-shutdown! [p1 4] + (->> input + (cp/pmap p1 identity) + doall)))) + (is (= input + (cp/with-shutdown! [p1 4 + p2 3] + (->> input + (cp/pmap p1 identity) + (cp/pmap p2 identity) + doall)))) + (is (= input + (cp/with-shutdown! [p1 4 + p2 3 + p3 5] + (->> input + (cp/pmap p1 identity) + (cp/pmap p2 identity) + (cp/pmap p3 identity) + doall)))))) (testing "Invalid with-shutdown! arguments" (is (thrown? IllegalArgumentException (cp/with-shutdown! [pool 1.5] nil))) @@ -197,6 +406,41 @@ (Thread/sleep 50) (is (= @started (set results) (set input))))))) +(defn check-chaining + "Check that we can chain pmaps." + [pmap-like] + (cp/with-shutdown! [p1 (cp/threadpool 2) + p2 (cp/threadpool 4)] + (is (= (range 1 11) + (->> (range 10) + (pmap-like p1 inc) + sort))) + (is (= (range 2 12) + (->> (range 10) + (pmap-like p1 inc) + (pmap-like p1 inc) + sort))) + (is (= (range 3 13) + (->> (range 10) + (pmap-like p1 inc) + (pmap-like p1 inc) + (pmap-like p2 inc) + sort))))) + +(defn check-shutdown-exceptions + "Check that exceptions are thrown when tasks go to a shutdown pool." + [pmap-like] + (cp/with-shutdown! [pool 2] + (let [input (range 4) + start (promise) + delayed-input (map (fn [i] (deref start) i) input) + results (future (pmap-like pool identity + (concat input delayed-input)))] + (Thread/sleep 50) + (cp/shutdown pool) + (deliver start true) + (is (thrown? Exception (dorun @results)))))) + (defn check-fn-exception "Check that a pmap function correctly passes exceptions caused by the function." @@ -206,7 +450,7 @@ inputs [0 1 2 3 :4 5 6 7 8 9]] (is (thrown-with-msg? Exception #"keyword found" - (doall (pmap-like pool + (dorun (pmap-like pool (fn [i] (if (keyword? i) (throw (Exception. "keyword found")) @@ -227,7 +471,7 @@ (range 200))] (is (thrown-with-msg? Exception #"deliberate" - (doall (pmap-like pool inc inputs)))) + (dorun (pmap-like pool inc inputs)))) (.shutdown pool))) (defn check-maximum-parallelism-one-case @@ -313,18 +557,25 @@ (check-->threadpool pmap-like)) (testing (format "%s is made serial by binding cp/*parallel* to false" fn-name) - (check-*parallel*-disables pmap-like))) + (check-*parallel*-disables pmap-like)) + (testing (format "%s throws exceptions when tasks are sent to a shutdown pool" + fn-name) + (check-shutdown-exceptions pmap-like)) + (testing (format "%s can be chained in various threadpools" fn-name) + (check-chaining pmap-like))) (deftest test-future (testing "basic future test" (cp/with-shutdown! [pool 3] - (let [f (cp/future + (let [a (atom false) + f (cp/future pool ;; Body can contain multiple elements. - 1 + (reset! a true) (range 10))] - (is (= @f (range 10)))))) + (is (= @f (range 10))) + (is (true? @a))))) (testing "future threadpool args" (is (thrown? IllegalArgumentException (cp/future 3 (inc 1)))) (is (thrown? IllegalArgumentException (cp/future nil (inc 1)))) @@ -349,7 +600,11 @@ pmap-like n pool) (when shutdown? (.shutdown pool)))) (testing "Binding cp/*parallel* can disable parallelism in future" - (check-*parallel*-disables pmap-like)))) + (check-*parallel*-disables pmap-like)) + (testing "Future throws exceptions when tasks are sent to a shutdown pool" + (check-shutdown-exceptions pmap-like)) + (testing "Futures can be chained in various threadpools." + (check-chaining pmap-like)))) (deftest test-pmap (testing "basic pmap test"