diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a8f6f2f..56469b4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -29,6 +29,9 @@ jobs: distribution: 'adopt' java-version: '11' + - name: Clone Submodules + run: make jedis + - name: Install clojure cli uses: DeLaGuardo/setup-clojure@master with: diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..50b0211 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "vendor/jedis"] + path = vendor/jedis + url = https://github.com/moclojer/jedis.git diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bc5f924 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +jedis: + git submodule update --init --recursive --remote + cd vendor/jedis && make mvn-package-no-tests + +all: jedis diff --git a/README.md b/README.md index 647a4ed..7f7178e 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,151 @@ com.moclojer/rq {:mvn/version "0.x.x"} > see the versions distributed on clojars -## example +## building from source + +We build Jedis ourselves to enable building queue functions directly using reflection. This approach ensures full compatibility with our library's features. + +### prerequisites + +- Make sure you have Java JDK (version X.X or higher) installed +- Ensure you have Make installed on your system + +### build steps + +1. Clone the repository: `git clone [repository URL]` +2. Navigate to the project directory: `cd clj-rq` +3. Run the build command: `make jedis` + +After running `make jedis`, the library will be built and ready to be linked with your project. Linking in this context means that the built Jedis library will be properly referenced and used by clj-rq when you include it in your project. + +## how clj-rq works under the hood + +The `clj-rq` library leverages the `->wrap-method` macro to dynamically generate queue functions by wrapping methods from the Jedis library. This approach ensures that the library is always up-to-date with the latest changes in Jedis, providing enhanced security and compatibility. + +The `->wrap-method` macro is defined in `src/com/moclojer/internal/reflection.clj` and is used in `src/com/moclojer/rq/queue.clj` to generate the queue functions. By using reflection, the library can dynamically adapt to changes in the Jedis API, ensuring that the generated functions are always in sync with the underlying Jedis methods. + +This dynamic generation process is a key differentiator of the `clj-rq` library, making it more secure and future-proof compared to other libraries that rely on static function definitions. + +## functions + +This section outlines the key functions available in the clj-rq library, covering both queue and pub/sub operations. For detailed descriptions and examples of each function, please refer to the specific subsections below. + +### queue + +The `clj-rq` library provides a set of queue functions that are dynamically generated by wrapping methods from the Jedis library. These functions are defined in `src/com/moclojer/rq/queue.clj` and include: + +- `push!`: Adds elements to the queue. +- `pop!`: Removes and returns elements from the queue. +- `bpop!`: Blocks until an element is available to pop from the queue. +- `index`: Retrieves an element at a specific index in the queue. +- `range`: Retrieves a range of elements from the queue. +- `set!`: Sets the value of an element at a specific index in the queue. +- `len`: Returns the length of the queue. +- `rem!`: Removes elements from the queue. +- `insert!`: Inserts an element into the queue at a specific position. +- `trim!`: Trims the queue to a specified range. + +#### common options + +All these functions share common options, such as specifying the queue name and handling encoding/decoding of messages. The options are passed as arguments to the functions and allow for flexible configuration. + +#### examples + +- **push!**: This function adds an element to the queue. It supports options for specifying the direction (left or right) and encoding the message before pushing it to the queue. + +> [!WARNING] +> The element or elements to be pushed into a queue has to be passed inside a sequentiable (a vector for example). + +```clojure +(rq-queue/push! client "my-queue" ["message"] {:direction :left}) +``` + +- **pop!**: This function removes and returns an element from the queue. It supports options for specifying the direction (left or right) and decoding the message after popping it from the queue. + +```clojure +(rq-queue/pop! client "my-queue" {:direction :right}) +``` + +- **bpop!**: This function blocks until an element is available to pop from the queue. It is useful in scenarios where you need to wait for new messages to arrive. + +```clojure +(rq-queue/bpop! client "my-queue" {:timeout 5}) +``` + +- **index**: This function retrieves an element at a specific index in the queue. It supports options for decoding the retrieved message. + +```clojure +(rq-queue/index client "my-queue" 0) +``` + +- **range**: This function retrieves a range of elements from the queue. It supports options for decoding the retrieved messages. + +```clojure +(rq-queue/range client "my-queue" 0 -1) +``` + +- **set!**: This function sets the value of an element at a specific index in the queue. It supports options for encoding the message before setting it. + +```clojure +(rq-queue/set! client "my-queue" 0 "new-message") +``` + +- **len**: This function returns the length of the queue. It is useful for monitoring the size of the queue. + + ```clojure + (rq-queue/len client "my-queue") + ``` + +- **rem!**: This function removes elements from the queue based on a specified pattern. It supports options for specifying the number of elements to remove. + +```clojure +(rq-queue/rem! client "my-queue" "message" {:count 2}) +``` + +- **insert!**: This function inserts an element into the queue at a specific position. It supports options for encoding the message before inserting it. + +```clojure +(rq-queue/insert! client "my-queue" "pivot-message" "new-message" {:position :before}) +``` + +- **trim!**: This function trims the queue to a specified range. It is useful for maintaining the size of the queue within certain limits. + +```clojure +(rq-queue/trim! client "my-queue" 0 10) +``` + +### pubsub + +The `clj-rq` library provides a set of pub/sub functions that facilitate message publishing and subscription in a Redis-backed system. These functions are defined in `src/com/moclojer/rq/pubsub.clj` and include: + +- `publish!`: Publishes a message to a specified channel. +- `group-handlers-by-channel`: Groups message handlers by their associated channels. +- `create-listener`: Creates a listener that processes messages from subscribed channels. +- `unarquive-channel!`: Unarchives a channel, making it active again. +- `pack-workers-channels`: Packs worker channels into a format suitable for processing. +- `subscribe!`: Subscribes to one or more channels and processes incoming messages. + +#### examples + +- **publish!**: This function publishes a message to a specified channel. It is used to send messages to subscribers listening on that channel. + +```clojure +(rq-pubsub/publish! client "my-channel" "Hello, World!") +``` + +- **subscribe!**: This function subscribes to one or more channels and processes incoming messages using the provided handlers. + +```clojure +(rq-pubsub/subscribe! client ["my-channel"] handlers) +``` + +- **unarquive-channel!**: This function unarchives a channel, making it active again. It is useful for reactivating channels that were previously archived. + +```clojure +(rq-pubsub/unarquive-channel! client "my-channel") +``` + +## complete example ```clojure (ns rq.example @@ -34,9 +178,12 @@ com.moclojer/rq {:mvn/version "0.x.x"} (def *redis-pool* (rq/create-client "redis://localhost:6379/0")) ;; queue -(queue/push! *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now) - :foo "bar"}) -(println :size (queue/llen *redis-pool* "my-queue")) +(queue/push! *redis-pool* "my-queue" + ;; has to be an array of the elements to push + [{:now (java.time.LocalDateTime/now) + :foo "bar"}]) + +(println :size (queue/len *redis-pool* "my-queue")) (prn :popped (queue/pop! *redis-pool* "my-queue")) ;; pub/sub @@ -50,9 +197,10 @@ com.moclojer/rq {:mvn/version "0.x.x"} (pubsub/subscribe! *redis-pool* my-workers) (pubsub/publish! *redis-pool* "my-channel" "hello world") -(pubsub/publish! *redis-pool* "my-other-channel" {:my "moclojer team" - :data "app.moclojer.com" - :hello "maybe you'll like this website"}) +(pubsub/publish! *redis-pool* "my-other-channel" + {:my "moclojer team" + :data "app.moclojer.com" + :hello "maybe you'll like this website"}) (rq/close-client *redis-pool*) ``` @@ -86,6 +234,8 @@ sequenceDiagram User->>Client: close-client client Client-->>Logger: log closing client Client-->>User: confirm client closure -``` +``` + +--- -Read more about the project [here](docs/README.md). +Made with 💜 by [moclojer](https://moclojer.com). diff --git a/deps.edn b/deps.edn index f1426fa..c3e7366 100644 --- a/deps.edn +++ b/deps.edn @@ -1,8 +1,11 @@ -{:paths ["src"] +{:paths ["src" "resources"] :deps - {redis.clients/jedis {:mvn/version "5.1.2"} + {redis.clients/jedis {#_#_:mvn/version "5.1.2" + :local/root "vendor/jedis/target/jedis-5.2.0-SNAPSHOT.jar"} org.clojure/tools.logging {:mvn/version "1.3.0"} - ch.qos.logback/logback-classic {:mvn/version "1.5.6"}} + ch.qos.logback/logback-classic {:mvn/version "1.5.6"} + camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"} + org.clojure/data.json {:mvn/version "2.5.0"}} :aliases {;; clj -A:dev -m com.moclojer.rq diff --git a/resources/command-allowlist.edn b/resources/command-allowlist.edn new file mode 100644 index 0000000..0469b54 --- /dev/null +++ b/resources/command-allowlist.edn @@ -0,0 +1,4 @@ +#{"lpush" "rpush" "lpop" "rpop" "brpop" + "blpop" "lrange" "lindex" "lset" "lrem" + "llen" "linsert" "ltrim" "rpoplpush" + "brpoplpush" "lmove"} diff --git a/src/com/moclojer/internal/reflection.clj b/src/com/moclojer/internal/reflection.clj new file mode 100644 index 0000000..5b884b8 --- /dev/null +++ b/src/com/moclojer/internal/reflection.clj @@ -0,0 +1,103 @@ +(ns com.moclojer.internal.reflection + (:require + [camel-snake-kebab.core :as csk] + [clojure.string :as str] + [com.moclojer.rq.adapters :as adapters])) + +(defn unpack-parameter + [parameter] + {:type (.. parameter getType getName) + :name (csk/->kebab-case (.getName parameter))}) + +(defn unpack-method + [method] + {:name (csk/->kebab-case (.getName method)) + :parameters (map unpack-parameter (.getParameters method))}) + +(defn underload-methods + "Given a list of overloaded `methods`, returns each one's parameter + list that matches given its `paramlist`." + [paramlist methods] + (reduce + (fn [underloaded-methods {:keys [name parameters]}] + (let [allowed-params (get paramlist name) + param-names (map :name parameters)] + (if (and (= (count parameters) (count allowed-params)) + (every? #(some #{%} param-names) allowed-params)) + (assoc underloaded-methods name parameters) + underloaded-methods))) + {} methods)) + +(defn get-klazz-methods + [klazz allowmap] + (let [allowlist (set (keys allowmap)) + paramlist (reduce-kv + (fn [acc name method] + (assoc acc name (second method))) + {} allowmap)] + (->> (.getMethods klazz) + (map unpack-method) + (filter #(contains? allowlist (:name %))) + (underload-methods paramlist)))) + +(defmacro ->wrap-method + "Wraps given jedis `method` and its respective `parameters` into a + common function for this library, which includes, besides the wrapped + function itself, options like key pattern and encoding/decoding." + [method parameters allowmap] + (let [wrapped-method (clojure.string/replace method #"[`0-9]" "") + base-doc (str "Wraps redis.clients.jedis.JedisPooled." wrapped-method) + param-syms (map #(-> % :name symbol) parameters) + [doc _ enc dec] (get allowmap method ["" nil :none :none])] + `(defn ~(symbol method) + ~(str base-doc \newline doc) + + ~(-> (into ['client] param-syms) + (conj '& 'options)) + + (let [~{:keys ['pattern 'encoding 'decoding] + :or {'pattern :rq + 'encoding enc + 'decoding dec}} ~'options + + ~'result ~(->> (reduce + (fn [acc par] + (->> (cond + (= par 'key) + `(com.moclojer.rq.adapters/pack-pattern + ~'pattern ~par) + + (some #{'value 'string + 'args 'pivot} [par]) + `(com.moclojer.rq.adapters/encode + ~'encoding ~par) + + :else par) + (conj acc))) + [] + param-syms) + (into [(symbol (str "." wrapped-method)) '@client]) + (seq))] + (try + (com.moclojer.rq.adapters/decode ~'decoding ~'result) + (catch ~'Exception ~'e + (.printStackTrace ~'e) + ~'result)))))) + +(comment + (get-klazz-methods + redis.clients.jedis.JedisPooled + {"rpop" ["hello" ["key" "count"] :edn-array :none]}) + + (require '[clojure.pprint :refer [pprint]]) + (let [allowmap {"linsert" ["Inserts a message into a queue in reference to given pivot" + ["key" "where" "pivot" "value"] :edn :none]} + [method parameters] (first + (get-klazz-methods + redis.clients.jedis.JedisPooled + allowmap))] + (pprint + (macroexpand-1 `(->wrap-method ~method ~parameters ~allowmap)))) + + ;; + ) diff --git a/src/com/moclojer/rq/adapters.clj b/src/com/moclojer/rq/adapters.clj new file mode 100644 index 0000000..7f62431 --- /dev/null +++ b/src/com/moclojer/rq/adapters.clj @@ -0,0 +1,119 @@ +(ns com.moclojer.rq.adapters + (:require + [clojure.data.json :as json] + [clojure.edn :as edn] + [clojure.string :as str] + [clojure.tools.logging :as log]) + (:import + [redis.clients.jedis.args ListPosition])) + +(def patterns + {:none "" + :rq "rq:" + :pubsub "rq:pubsub:" + :pending "rq:pubsub:pending:"}) + +(defn- pattern->str + "Adapts given pattern keyword to a known internal pattern. Raises + an exception if invalid." + [pattern] + (or (get patterns pattern) + (throw (ex-info (str "No pattern named " pattern) + {:cause :illegal-argument + :value pattern + :expected (keys patterns)})))) + +(defn pack-pattern + [pattern queue-name] + (log/debug :packing pattern queue-name) + (str (pattern->str pattern) queue-name)) + +(defn unpack-pattern + [pattern queue-name] + (log/debug :unpacking pattern queue-name) + (let [prefix (pattern->str pattern)] + (if (str/starts-with? queue-name prefix) + (subs queue-name (count prefix)) + (do + (log/warn :invalid-prefix + :queue-name queue-name + :expected-prefix prefix) + queue-name)))) + +(def encoding-fns + {:none identity + :edn pr-str + :json json/write-str + :array #(into-array (map pr-str %)) + :edn-array #(into-array (map pr-str %)) + :json-array #(into-array (map json/write-str %))}) + +(defn- keyword-enc->fn + [enc] + (or (get encoding-fns enc) + (throw (ex-info (str "No encoding " (name enc)) + {:cause :illegal-argument + :value enc + :expected (set (keys encoding-fns))})))) + +(defn encode + [enc message] + (log/debug :encoding enc message) + ((cond + (keyword? enc) (keyword-enc->fn enc) + (fn? enc) enc + :else (throw (ex-info + (str "`encoding` must be either keyword or function") + {:cause :illegal-argument + :value enc + :expected #{keyword? fn?}}))) + message)) + +(def decoding-fns + (let [json-dec-fn #(json/read-str % :key-fn keyword) + array? #(or (seq? %) + (some-> % class .isArray) + (instance? java.util.ArrayList %))] + {:none identity + :edn edn/read-string + :json json-dec-fn + :array #(if (array? %) + (vec %) + [%]) + :edn-array #(if (array? %) + (vec (map edn/read-string %)) + [(edn/read-string %)]) + :json-array #(if (array? %) + (vec (map json-dec-fn %)) + [(json-dec-fn %)])})) + +(defn- keyword-dec->fn + [dec] + (or (get decoding-fns dec) + (throw (ex-info (str "No decoding " (name dec)) + {:cause :illegal-argument + :value dec + :expected (set (keys decoding-fns))})))) + +(defn decode + [dec message] + (log/debug :decoding dec message) + ((cond + (keyword? dec) (keyword-dec->fn dec) + (fn? dec) dec + :else (throw (ex-info + (str "`decoding` must be either keyword or function") + {:cause :illegal-argument + :value dec + :expected #{keyword? fn?}}))) + message)) + +(defn ->list-position + [pos] + (or (get {:before ListPosition/BEFORE + :after ListPosition/AFTER} + pos) + (throw (ex-info (str "No list position named " pos) + {:cause :illegal-argument + :value pos + :expected #{:before :after}})))) diff --git a/src/com/moclojer/rq/pubsub.clj b/src/com/moclojer/rq/pubsub.clj index c879c61..a848e77 100644 --- a/src/com/moclojer/rq/pubsub.clj +++ b/src/com/moclojer/rq/pubsub.clj @@ -2,21 +2,23 @@ (:require [clojure.edn :as edn] [clojure.tools.logging :as log] - [com.moclojer.rq.queue :as queue] - [com.moclojer.rq.utils :as utils]) + [com.moclojer.rq.adapters :as adapters] + [com.moclojer.rq.queue :as queue]) (:import [redis.clients.jedis JedisPubSub] [redis.clients.jedis.exceptions JedisConnectionException])) (defn publish! - "Publish a message to a channel. When `consumer-min` isn't isn't met, + "Publish a message to a channel. When `consumer-min` isn't met, archives the message. Returns whether or not `consumer-min` was met." [client channel message & options] (let [{:keys [consumer-min] :or {consumer-min 1} :as opts} options - consumer-count (.publish @client (utils/pack-pattern :pubsub channel) - (pr-str message)) + consumer-count (.publish + @client + (adapters/pack-pattern :pubsub channel) + (pr-str message)) consumer-met? (>= consumer-count consumer-min) debug-args {:channel channel :message message @@ -28,7 +30,7 @@ (do (log/warn "published message, but didn't meet min consumers. archiving..." debug-args) - (queue/push! client channel message :pattern :pending))) + (queue/push! client channel [message] {:pattern :pending}))) consumer-met?)) @@ -40,9 +42,9 @@ {} workers)) (defn create-listener - "Create a listener for the pubsub. It will be entry point for any published - data, being responsible for routing the right consumer. However, that's on - the enduser." + "Create a listener for the pubsub. It will be entry point for any + published data, being responsible for routing the right consumer. + However, that's on the enduser." [workers] (let [handlers-by-channel (group-handlers-by-channel workers)] (proxy [JedisPubSub] [] @@ -60,13 +62,13 @@ :message message})))))) (defn unarquive-channel! - "Unarquives every pending message from given `channel`, calling `on-msg-fn` - on each of them." + "Unarquives every pending message from given `channel`, calling + `on-msg-fn` on each of them." [client channel on-msg-fn] (loop [message-count 0] - (if-let [?message (queue/pop! client channel - :direction :r - :pattern :pending)] + (if-let [?message (first + (queue/pop! client channel 1 + {:pattern :pending}))] (do (try (on-msg-fn ?message) @@ -85,7 +87,9 @@ (defn pack-workers-channels [workers] - (map #(update % :channel (partial utils/pack-pattern :pubsub)) workers)) + (map + #(update % :channel (partial adapters/pack-pattern :pubsub)) + workers)) (defn subscribe! "Subscribe given `workers` to their respective channels. @@ -112,19 +116,28 @@ :as opts} options] (doseq [channel (map :channel workers)] - (unarquive-channel! client channel - #(.onMessage listener (utils/pack-pattern :pubsub channel) %))) + (unarquive-channel! + client channel + #(.onMessage + listener + (adapters/pack-pattern :pubsub channel) + %))) (let [sub-fn #(try - (.subscribe @client listener (into-array packed-channels)) + (.subscribe @client listener + (into-array packed-channels)) + (log/debug "subscribed workers to channels" {:channels packed-channels :options opts}) + (catch JedisConnectionException e + (log/warn "subscriber connection got killed. trying to reconnect..." {:channels packed-channels :exception e :ex-message (.getMessage e)}) + (Thread/sleep reconnect-sleep) (apply subscribe! [client workers options])))] (if blocking? diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index aa15028..5fc3b78 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -1,323 +1,102 @@ (ns com.moclojer.rq.queue - (:refer-clojure :exclude [pop!]) + (:refer-clojure :exclude [pop! range]) (:require - [clojure.edn :as edn] - [clojure.tools.logging :as log] - [com.moclojer.rq.utils :as utils] - [clojure.string :as s])) + [com.moclojer.internal.reflection :as reflection] + [com.moclojer.rq.adapters :as adapters])) + +;; The allowlisted redis commands followed by their respective +;; documentation, param names and default encoding/decoding formats. +;; `lpush` for example encodes a given `value` through the `:edn-array`, +;; and decodes the result through the `:none` format (`identity`). + +(def allowmap + {"lpush" ["Pushes a message into a queue" + ["key" "string"] :edn-array :none] + "rpush" ["Pushes a message into a queue" + ["key" "string"] :edn-array :none] + "lpop" ["Left-Pops a message from a queue" + ["key" "count"] :none :edn-array] + "rpop" ["Right-Pops a message from a queue" + ["key" "count"] :none :edn-array] + "brpop" ["Right-Pops a message from a queue (blocking)" + ["timeout" "key"] :none :edn-array] + "blpop" ["Left-Pops a message from a queue (blocking)" + ["timeout" "key"] :none :edn-array] + "lindex" ["Get the element from a queue at given index" + ["key", "index"] :none :edn-array] + "lrange" ["Get the elements from a queue" + ["key" "start" "stop"] :none :edn-array] + "lset" ["Sets the element from a queue at given index" + ["key" "index" "value"] :edn :none] + "lrem" ["Removes matching count of given message from a queue" + ["key" "count" "value"] :edn :none] + "llen" ["Gets the length of a queue" + ["key"] :none :none] + "linsert" ["Inserts a message into a queue in reference to given pivot" + ["key" "where" "pivot" "value"] :edn :none] + "ltrim" ["Trim a queue between the given limit values" + ["key" "start" "stop"] + :none :none]}) + +(doseq [[method parameters] (reflection/get-klazz-methods + redis.clients.jedis.JedisPooled + allowmap)] + (eval `(reflection/->wrap-method ~method ~parameters ~allowmap))) + +;; --- directional --- (defn push! - "Push a message into a queue. - - Options: - - - direction: Direction to push the message (:l or :r) - - pattern: Pattern for the queue name" - [client queue-name message & options] - (let [{:keys [direction pattern _at _in _retry _retry-delay] - :or {direction :l - pattern :rq} - :as opts} options - packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (into-array [(pr-str message)]) - pushed-count (if (= direction :l) - (.lpush @client packed-queue-name encoded-message) - (.rpush @client packed-queue-name encoded-message))] - - (log/debug "pushed to queue" - {:client client - :queue-name packed-queue-name - :message message - :options opts - :pushed-count pushed-count}) - - pushed-count)) + [client queue-name values & [options]] + (let [{:keys [direction] + :or {direction :l}} options + push-fn (if (= direction :l) lpush rpush)] + (apply push-fn [client queue-name values options]))) (defn pop! - "Pop a message from a queue. - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - options: - - direction: Direction to pop the message (:l or :r) - - pattern: Pattern for the queue name" - [client queue-name & options] - (let [{:keys [direction pattern] - :or {direction :l - pattern :rq} - :as opts} options - packed-queue-name (utils/pack-pattern pattern queue-name) - message (if (= direction :l) - (.lpop @client packed-queue-name) - (.rpop @client packed-queue-name))] - - (when message - (log/debug "popped from queue" - {:client client - :queue-name packed-queue-name - :options opts - :message message}) - - (edn/read-string message)))) - -(defn llen - "Get the size of a queue. - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - options: Optional parameters, including: - - pattern: Pattern for the queue name" - [client queue-name & options] - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-queue-name (utils/pack-pattern pattern queue-name)] - (.llen @client packed-queue-name))) + [client queue-name count & [options]] + (let [{:keys [direction timeout] + :or {direction :r}} options + pop-fn (if (= direction :r) + (if timeout brpop rpop) + (if timeout blpop lpop)) + num (or timeout count)] + (apply pop-fn (flatten [client + (if timeout + [num queue-name] + [queue-name num]) + options])))) (defn bpop! - "Pop a message from a queue. Blocking if necessary. - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - tmot: Blocking timeout - - options: - - direction: Direction to pop the message (:l or :r) - - pattern: Pattern for the queue name" - [client queue-name tmot & {:keys [direction pattern] - :or {direction :l pattern :rq}}] - (let [packed-queue-name (utils/pack-pattern pattern queue-name) - return (if (= direction :l) - (.blpop @client tmot packed-queue-name) - (.brpop @client tmot packed-queue-name))] - (when return - (let [message (second return)] - (log/debug "popped from queue" - {:client client - :queue-name packed-queue-name - :options {:direction direction :pattern pattern} - :message message}) - (edn/read-string message))))) - -(defn lindex - "Return a element in a specified index - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - index: specific index to access" - [client queue-name index & options] - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-queue-name (utils/pack-pattern pattern queue-name) - return (.lindex @client packed-queue-name index)] - - (let [message (clojure.edn/read-string return)] - (log/debug "message found" - {:client client - :queue-name packed-queue-name - :index index - :message message}) - message))) - -(defn lset - "Set a new message in a specified index - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - index: specific index to access - - message: new msg to be added" - [client queue-name index message & options] - (let [{:keys [pattern] - :or {pattern :rq} :as opts} options - packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (pr-str message) - return (.lset @client packed-queue-name index encoded-message)] - - (log/debug "set in queue" - {:client client - :queue-name packed-queue-name - :message (str encoded-message) - :index index - :options opts - :return return}) - return)) - -(defn lrem - "Removes a specified occurance of the message given (if any) - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - msg: new msg to be added - - cnt: count - count > 0: Remove elements equal to element moving from head to tail. - count < 0: Remove elements equal to element moving from tail to head. - count = 0: Remove all elements equal to element." - [client queue-name cnt msg & options] - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (pr-str msg) - return (.lrem @client packed-queue-name cnt encoded-message)] - - (log/debug "removed from queue" - {:client client - :queue-name queue-name - :msg msg - :count cnt - :return return}) - return)) - -(defn linsert - "insert a message before the first occurance of a pivot given. - - parameters: - - client: redis client - - queue-name: name of the queue - - msg: new msg to be added - - pivot: pivot message to be added before or after - - options: - - pos (keywords): - - before: insert the message before the pivot - - after: insert the message after the pivot" - [client queue-name pivot msg & options] - (let [{:keys [pos pattern] - :or {pos :before - pattern :rq} :as opts} options - packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (pr-str msg) - encoded-pivot (pr-str pivot) - encoded-pos (if (= pos :before) - redis.clients.jedis.args.ListPosition/BEFORE - redis.clients.jedis.args.ListPosition/AFTER) - return (.linsert @client packed-queue-name encoded-pos encoded-pivot encoded-message)] - (log/debug "inserted in queue" - {:client client - :queue-name queue-name - :msg encoded-message - :opts opts - :return return}) - return)) - -(defn lrange - "Return an entire range given min and max indexes - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - floor: floor index - - ceil: ceiling index" - [client queue-name floor ceil & options] - (let [{:keys [pattern] - :or {pattern :rq} :as opts} options - packed-queue-name (utils/pack-pattern pattern queue-name) - return (.lrange @client packed-queue-name floor ceil)] - (log/debug "queue specified range" - {:client client - :queue-name packed-queue-name - :opts opts - :result return}) - (mapv clojure.edn/read-string return))) - -(defn ltrim - "Trim a list to the specified range. - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - start: start index - - stop: stop index - - options: - - pattern: pattern to pack the queue name" - [client queue-name start stop & options] - (let [{:keys [pattern] - :or {pattern :rq} :as opts} options - packed-queue-name (utils/pack-pattern pattern queue-name)] - (let [return (.ltrim @client packed-queue-name start stop)] - (log/debug "queue trimmed" - {:client client - :queue-name queue-name - :opts opts - :result return}) - return))) - -(defn rpoplpush - "Remove the last element in a list and append it to another list. - - Parameters: - - client: Redis client - - source-queue: Name of the source queue - - destination-queue: Name of the destination queue - - options: - - pattern: pattern to pack the queue names" - [client source-queue destination-queue & options] - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-source-queue (utils/pack-pattern pattern source-queue) - packed-destination-queue (utils/pack-pattern pattern destination-queue) - return (.rpoplpush @client packed-source-queue packed-destination-queue)] - (log/debug "rpoplpush operation" - {:client client - :source-queue packed-source-queue - :destination-queue packed-destination-queue - :result return}) - return)) - -(defn brpoplpush - "Remove the last element in a list and append it to another list, blocking if necessary. - - Parameters: - - client: Redis client - - source-queue: Name of the source queue - - destination-queue: Name of the destination queue - - timeout: timeout in seconds - - options: - - pattern: pattern to pack the queue names" - [client source-queue destination-queue timeout & options] - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-source-queue (utils/pack-pattern pattern source-queue) - packed-destination-queue (utils/pack-pattern pattern destination-queue) - result (.brpoplpush @client packed-source-queue packed-destination-queue timeout)] - (log/debug "brpoplpush operation" - {:client client - :source-queue packed-source-queue - :destination-queue packed-destination-queue - :timeout timeout - :result result}) - result)) - -(defn lmove - "Atomically return and remove the first/last element of the source list, and push the element as the first/last element of the destination list. - - Parameters: - - client: Redis client - - source-queue: Name of the source queue - - destination-queue: Name of the destination queue - - wherefrom: 'LEFT' or 'RIGHT' - - whereto: 'LEFT' or 'RIGHT' - - options: - - pattern: pattern to pack the queue names" - [client source-queue destination-queue wherefrom whereto & options] - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-source-queue (utils/pack-pattern pattern source-queue) - packed-destination-queue (utils/pack-pattern pattern destination-queue) - from-direction (if (= wherefrom "LEFT") - redis.clients.jedis.args.ListDirection/LEFT - redis.clients.jedis.args.ListDirection/RIGHT) - to-direction (if (= whereto "LEFT") - redis.clients.jedis.args.ListDirection/LEFT - redis.clients.jedis.args.ListDirection/RIGHT) - result (.lmove @client packed-source-queue packed-destination-queue from-direction to-direction)] - (log/debug "lmove operation" - {:client client - :source-queue packed-source-queue - :destination-queue packed-destination-queue - :from-direction from-direction - :to-direction to-direction - :result result}) - result)) + [client queue-name timeout & [options]] + (apply pop! [client queue-name count + (assoc options :timeout timeout)])) + +(defn index + [client queue-name index & [options]] + (first (apply lindex [client queue-name index options]))) + +(defn range + [client queue-name start stop & [options]] + (apply lrange [client queue-name start stop options])) + +(defn set! + [client queue-name index value & [options]] + (apply lset [client queue-name index value options])) + +(defn len + [client queue-name & [options]] + (apply llen [client queue-name options])) + +(defn rem! + [client queue-name count value & [options]] + (apply lrem [client queue-name count value options])) + +(defn insert! + [client queue-name where pivot value & [options]] + (apply linsert [client queue-name + (adapters/->list-position where) + pivot value options])) + +(defn trim! + [client queue-name start stop & [options]] + (apply ltrim [client queue-name start stop options])) diff --git a/src/com/moclojer/rq/utils.clj b/src/com/moclojer/rq/utils.clj deleted file mode 100644 index 1bf2eca..0000000 --- a/src/com/moclojer/rq/utils.clj +++ /dev/null @@ -1,28 +0,0 @@ -(ns com.moclojer.rq.utils - (:require - [clojure.string :as s])) - -(defn- pattern->str - "Adapts given pattern keyword to a know internal pattern. Raises - an exception if invalid." - [pattern] - (let [patterns {:none "" - :rq "rq:" - :pubsub "rq:pubsub:" - :pending "rq:pubsub:pending:"}] - (or (get-in patterns [pattern]) - (throw (ex-info (str "No pattern named " pattern) - {:cause :illegal-argument - :value pattern - :expected (keys patterns)}))))) - -(defn pack-pattern - [pattern queue-name] - (str (pattern->str pattern) queue-name)) - -(defn unpack-pattern - [pattern queue-name] - (subs queue-name (count (pattern->str pattern)))) - - - diff --git a/test/com/moclojer/rq/adapters_test.clj b/test/com/moclojer/rq/adapters_test.clj new file mode 100644 index 0000000..96e1a5a --- /dev/null +++ b/test/com/moclojer/rq/adapters_test.clj @@ -0,0 +1,44 @@ +(ns com.moclojer.rq.adapters-test + (:require + [clojure.string :as str] + [clojure.test :as t] + [com.moclojer.rq.adapters :as adapters])) + +(t/deftest pattern->str-test + (t/are [expected pattern queue-name] (= expected + (adapters/pack-pattern + pattern queue-name)) + "my-queue" :none "my-queue" + "rq:my-queue" :rq "my-queue" + "rq:pubsub:my-queue" :pubsub "my-queue" + "rq:pubsub:pending:my-queue" :pending "my-queue") + + (t/are [expected pattern queue-name] (= expected + (adapters/unpack-pattern + pattern queue-name)) + "my-queue" :none "my-queue" + "my-queue" :rq "rq:my-queue" + "my-queue" :pubsub "rq:pubsub:my-queue" + "my-queue" :pending "rq:pubsub:pending:my-queue")) + +(t/deftest encode-test + (t/testing "keyword encoders" + [(t/is (= "hello world" (adapters/encode :none "hello world"))) + (t/is (= "{:hello? true}" (adapters/encode :edn {:hello? true}))) + (t/is (= "{\"hello?\":true}" (adapters/encode :json {:hello? true}))) + (t/is (= ["3" "true"] (vec (adapters/encode :array [3 true])))) + (t/is (= ["{\"hello?\":true}"] (vec (adapters/encode + :json-array + [{:hello? true}]))))]) + (t/testing "function encoder" + (t/is (= "HELLO WORLD" (adapters/encode str/upper-case "hello world"))))) + +(t/deftest decode-test + (t/are [expected decoding value] (= expected + (adapters/decode decoding value)) + "hello world" :none "hello world" + {:hello? true} :edn "{:hello? true}" + {:hello? true} :json "{\"hello?\":true}" + ["3" "true"] :array (into-array ["3" "true"]) + [3 true] :edn-array (into-array ["3" "true"]) + [{:hello? true}] :json-array (into-array ["{\"hello?\":true}"]))) diff --git a/test/com/moclojer/rq/pubsub_test.clj b/test/com/moclojer/rq/pubsub_test.clj index c3a6ab0..5d4decd 100644 --- a/test/com/moclojer/rq/pubsub_test.clj +++ b/test/com/moclojer/rq/pubsub_test.clj @@ -3,12 +3,12 @@ [clojure.test :as t] [com.moclojer.rq :as rq] [com.moclojer.rq.pubsub :as rq-pubsub] - [com.moclojer.test-utils :as utils])) + [com.moclojer.test-helpers :as helpers])) (defn build-workers [qtt state] (let [channels (repeatedly qtt #(str (random-uuid))) - messages (repeatedly qtt utils/gen-message) + messages (repeatedly qtt helpers/gen-message) chans-msgs (zipmap channels messages)] {:chans-msgs chans-msgs :msgs messages @@ -18,23 +18,22 @@ (swap! state conj msg))}) chans-msgs)})) -;; (t/deftest pubsub-test (let [client (rq/create-client "redis://localhost:6379")] (t/testing "archiving/unarchiving" (let [channel (str (random-uuid)) - message (utils/gen-message) + message (helpers/gen-message) state (atom nil)] (rq-pubsub/publish! client channel message) (Thread/sleep 500) - (rq-pubsub/unarquive-channel! client channel (fn [msg] - (reset! state msg))) + (rq-pubsub/unarquive-channel! client channel + (fn [msg] (reset! state msg))) (t/is (= message @state)))) (t/testing "unarchiving after subscribing" (let [channel (str (random-uuid)) - message (utils/gen-message) + message (helpers/gen-message) state (atom nil)] (rq-pubsub/publish! client channel message) (rq-pubsub/publish! client channel message) diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index 2c39a86..3b1d168 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -3,107 +3,95 @@ [clojure.test :as t] [com.moclojer.rq :as rq] [com.moclojer.rq.queue :as rq-queue] - [com.moclojer.test-utils :as utils])) + [com.moclojer.test-helpers :as helpers])) (t/deftest queue-test (let [client (rq/create-client "redis://localhost:6379") queue-name (str (random-uuid)) - another-queue-name (str (random-uuid)) - message (utils/gen-message) - another-message (utils/gen-message)] - - (t/testing "raw" - (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name another-message) - (t/is (= 2 (rq-queue/llen client queue-name))) - (t/is (= message (rq-queue/pop! client queue-name {:direction :r})))) - - (t/testing "direction" - (rq-queue/push! client queue-name message :direction :r) - (t/is (= message (rq-queue/pop! client queue-name :direction :r)))) - - (t/testing "pattern" - (rq-queue/push! client queue-name message :pattern :pending) - (t/is (= message (rq-queue/pop! client queue-name :pattern :pending)))) - - (t/testing "bpop! left" - (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) - (rq-queue/push! client queue-name message) - (let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :l})] - (t/is (= message popped-message)) - (t/is (= 0 (rq-queue/llen client queue-name))))) - - (t/testing "bpop! right" - (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :r})))) - (rq-queue/push! client queue-name message) - (let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :r})] - (t/is (= message popped-message)) - (t/is (= 0 (rq-queue/llen client queue-name))))) - - (t/testing "lindex" - (rq-queue/push! client queue-name message) - (t/is (= message (rq-queue/lindex client queue-name 0)))) - - (t/testing "lset" - (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) - (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name another-message) - (rq-queue/lset client queue-name 0 another-message) - (t/is (= another-message (rq-queue/lindex client queue-name 0))) - (rq-queue/lset client queue-name 1 message) - (t/is (= message (rq-queue/lindex client queue-name 1))) - (rq-queue/pop! client queue-name :direction :l) - (rq-queue/pop! client queue-name :direction :l)) - - (t/testing "lrem" - (rq-queue/push! client queue-name message) - (rq-queue/lrem client queue-name 1 message) - (t/is (= 0 (rq-queue/llen client queue-name)))) - - (t/testing "linsert" - (rq-queue/push! client queue-name message) - (rq-queue/linsert client queue-name message another-message :pos :before) - (t/is (= another-message (rq-queue/lindex client queue-name 0))) - (rq-queue/pop! client queue-name :direction :l) - (rq-queue/pop! client queue-name :direction :l)) - - (t/testing "lrange" - (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name another-message) - (let [result (rq-queue/lrange client queue-name 0 1)] - (t/is (= [message another-message] (reverse result)))) - (rq-queue/pop! client queue-name :direction :l) - (rq-queue/pop! client queue-name :direction :l)) - - (t/testing "ltrim" - (let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]} - message (assoc base-message :uuid (java.util.UUID/randomUUID)) - another-message (assoc base-message :uuid (java.util.UUID/randomUUID))] - (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name another-message) - (t/is (= "OK" (rq-queue/ltrim client queue-name 1 -1))) - (let [result (rq-queue/lrange client queue-name 0 -1)] + message (helpers/gen-message) + message2 (helpers/gen-message)] + + [(t/testing "simple" + (rq-queue/push! client queue-name [message message2]) + (t/is (= 2 (rq-queue/len client queue-name))) + (t/is (= [message message2] + (rq-queue/pop! client queue-name 2)))) + + (t/testing "direction" + ;; pushing from the right, then reverse popping from the left + (rq-queue/push! client queue-name [message message2] + {:direction :r}) + (t/is (= [message message2] + (rq-queue/pop! client queue-name 2 + {:direction :l})))) + + (t/testing "pattern" + (rq-queue/push! client queue-name [message] + {:pattern :pending}) + (t/is (= [message] + (rq-queue/pop! client queue-name 1 + {:pattern :pending})))) + + (t/testing "blocking" + (rq-queue/push! client queue-name [message]) + (t/is (= message + (second (rq-queue/bpop! client queue-name 1))))) + + (t/testing "index" + (rq-queue/push! client queue-name [message]) + (t/is (= message (rq-queue/index client queue-name 0))) + (rq-queue/pop! client queue-name 1)) + + (t/testing "range" + (rq-queue/push! client queue-name [message message2]) + (t/is (= [message2 message] + (rq-queue/range client queue-name 0 -1))) + (rq-queue/pop! client queue-name 2)) + + (t/testing "set!" + (rq-queue/push! client queue-name [message message2]) + (rq-queue/set! client queue-name 0 message2) + (rq-queue/set! client queue-name 1 message) + (t/is (= [message message2] (rq-queue/pop! client queue-name 2)))) + + (t/testing "rem!" + (rq-queue/push! client queue-name [message message message]) + (rq-queue/rem! client queue-name 3 message) + (t/is (= 0 (rq-queue/len client queue-name)))) + + (t/testing "insert!" + (rq-queue/push! client queue-name [message]) + (rq-queue/insert! client queue-name :before message message2) + (t/is (= [message message2] (rq-queue/pop! client queue-name 2)))) + + (t/testing "trim!" + (let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]} + message (assoc base-message :uuid (random-uuid)) + another-message (assoc base-message :uuid (random-uuid))] + (rq-queue/push! client queue-name [another-message message]) + [(t/is (= "OK" (rq-queue/trim! client queue-name 1 -1))) (t/is (= [(dissoc another-message :uuid)] - (map #(dissoc % :uuid) result))))) - (rq-queue/pop! client queue-name :direction :l) - (rq-queue/pop! client queue-name :direction :l)) - - (t/testing "rpoplpush" - (rq-queue/push! client queue-name message) - (rq-queue/rpoplpush client queue-name another-queue-name) - (t/is (= 0 (rq-queue/llen client queue-name))) - (t/is (= message (rq-queue/pop! client another-queue-name :direction :l)))) - - (t/testing "brpoplpush" - (rq-queue/push! client queue-name message) - (rq-queue/brpoplpush client queue-name another-queue-name 1) - (t/is (= 0 (rq-queue/llen client queue-name))) - (t/is (= message (rq-queue/pop! client another-queue-name :direction :l)))) - - (t/testing "lmove" - (rq-queue/push! client queue-name message) - (rq-queue/lmove client queue-name another-queue-name "LEFT" "RIGHT") - (t/is (= 0 (rq-queue/llen client queue-name))) - (t/is (= message (rq-queue/pop! client another-queue-name :direction :r)))) + (map #(dissoc % :uuid) + (rq-queue/range client queue-name 0 -1))))]) + (rq-queue/pop! client queue-name 2))] (rq/close-client client))) + +(comment + (def my-client (rq/create-client "redis://localhost:6379")) + + (rq-queue/push! my-client "my-queue2" [{:hello true}]) + + (rq-queue/insert! my-client "my-queue2" :before {:hello true} {:bye false}) + + (rq-queue/range my-client "my-queue2" 0 -1) + + (rq-queue/len my-client "my-queue2") + + (rq-queue/pop! my-client "my-queue2" 2) + + (rq/close-client my-client) + + (rq/close-client my-client) + ;; + ) diff --git a/test/com/moclojer/rq/utils_test.clj b/test/com/moclojer/rq/utils_test.clj deleted file mode 100644 index 9854f32..0000000 --- a/test/com/moclojer/rq/utils_test.clj +++ /dev/null @@ -1,17 +0,0 @@ -(ns com.moclojer.rq.utils-test - (:require - [clojure.test :as t] - [com.moclojer.rq.utils :as utils])) - -(t/deftest pattern->str-test - (t/testing "packing" - [(t/is "my-queue" (utils/pack-pattern :none "my-queue")) - (t/is "rq:my-queue" (utils/pack-pattern :rq "my-queue")) - (t/is "rq:pubsub:my-queue" (utils/pack-pattern :pubsub "my-queue")) - (t/is "rq:pubsub:pending:my-queue" (utils/pack-pattern :pending "my-queue"))]) - - (t/testing "unpacking" - [(t/is "my-queue" (utils/unpack-pattern :none "my-queue")) - (t/is "my-queue" (utils/unpack-pattern :rq "rq:my-queue")) - (t/is "my-queue" (utils/unpack-pattern :pubsub "rq:pubsub:my-queue")) - (t/is "my-queue" (utils/unpack-pattern :pending "rq:pubsub:pending:my-queue"))])) diff --git a/test/com/moclojer/test_utils.clj b/test/com/moclojer/test_helpers.clj similarity index 84% rename from test/com/moclojer/test_utils.clj rename to test/com/moclojer/test_helpers.clj index b49e226..a86d83e 100644 --- a/test/com/moclojer/test_utils.clj +++ b/test/com/moclojer/test_helpers.clj @@ -1,4 +1,4 @@ -(ns com.moclojer.test-utils) +(ns com.moclojer.test-helpers) (defn gen-message "Generates a fuzzy message" diff --git a/vendor/jedis b/vendor/jedis new file mode 160000 index 0000000..288617c --- /dev/null +++ b/vendor/jedis @@ -0,0 +1 @@ +Subproject commit 288617c70c63c16f4c4cb121290117ce5d042139