Skip to content

Commit

Permalink
feat: wrap gen fns, match old style
Browse files Browse the repository at this point in the history
  • Loading branch information
J0sueTM committed Sep 3, 2024
1 parent 721351d commit 04b11ad
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 124 deletions.
7 changes: 4 additions & 3 deletions src/com/moclojer/internal/reflection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
`(com.moclojer.rq.adapters/pack-pattern
~'pattern ~par)

(some #{'value 'string 'args} [par])
(some #{'value 'string
'args 'pivot} [par])
`(com.moclojer.rq.adapters/encode
~'encoding ~par)

Expand All @@ -84,8 +85,8 @@
{"rpop" ["hello" ["key" "count"] :edn-array :none]})

(require '[clojure.pprint :refer [pprint]])
(let [allowmap {"brpop" ["Left-Pops a message from a queue (blocking)"
["timeout" "key"] :none :edn-array]}
(let [allowmap {"linsert" ["Inserts a message into a queue in reference to given pivot"
["key" "where" "pivot" "value"] :edn :none]}
[method parameters] (first
(get-klazz-methods
redis.clients.jedis.JedisPooled
Expand Down
14 changes: 13 additions & 1 deletion src/com/moclojer/rq/adapters.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
(:require
[clojure.data.json :as json]
[clojure.edn :as edn]
[clojure.tools.logging :as log]))
[clojure.tools.logging :as log])
(:import
[redis.clients.jedis.args ListPosition]))

(defn- pattern->str
"Adapts given pattern keyword to a know internal pattern. Raises
Expand Down Expand Up @@ -91,3 +93,13 @@
:value dec
:expected #{keyword? fn?}})))
message))

(defn ->list-position
[pos]
(or (get {:before ListPosition/BEFORE
:after ListPosition/AFTER}
pos)
(throw (ex-info (str "No list position named " pos)
{:cause :illegal-argument
:value pos
:expected #{:before :after}}))))
110 changes: 88 additions & 22 deletions src/com/moclojer/rq/queue.clj
Original file line number Diff line number Diff line change
@@ -1,36 +1,102 @@
(ns com.moclojer.rq.queue
(:refer-clojure :exclude [pop!])
(:refer-clojure :exclude [pop! range])
(:require
[com.moclojer.internal.reflection :as reflection]))
[com.moclojer.internal.reflection :as reflection]
[com.moclojer.rq.adapters :as adapters]))

;; The allowlisted redis commands followed by their respective
;; arity, documentation and default encoding/decoding formats.
;; documentation, param names and default encoding/decoding formats.
;; `lpush` for example encodes a given `value` through the `:edn-array`,
;; and decodes the result through the `:none` format (`identity`).

(def allowmap
{"lpush" [2 "Pushes a message into a queue" :edn-array :none]
"rpush" [2 "Pushes a message into a queue" :edn-array :none]
"lpop" [2 "Left-Pops a message from a queue" :none :edn-array]
"rpop" [2 "Right-Pops a message from a queue" :none :edn-array]
"brpop" [2 "Right-Pops a message from a queue (blocking)"
:none :edn-array]
"blpop" [2 "Left-Pops a message from a queue (blocking)"
:none :edn-array]
"lrange" [3 "Get the elements from a queue" :none :edn-array]
"lindex" [2 "Get the element from a queue at given index"
:none :none]
"lset" [3 "Sets the element from a queue at given index"
:edn :none]
"lrem" [2 "Removes matching count of given message from a queue"
:edn :none]
"llen" [1 "Gets the length of a queue" :none :none]
"linsert" [4 "Inserts a message into a queue in reference to given pivot"
:edn :none]
"ltrim" [3 "Trim a queue between the given limit values"
{"lpush" ["Pushes a message into a queue"
["key" "string"] :edn-array :none]
"rpush" ["Pushes a message into a queue"
["key" "string"] :edn-array :none]
"lpop" ["Left-Pops a message from a queue"
["key" "count"] :none :edn-array]
"rpop" ["Right-Pops a message from a queue"
["key" "count"] :none :edn-array]
"brpop" ["Right-Pops a message from a queue (blocking)"
["timeout" "key"] :none :edn-array]
"blpop" ["Left-Pops a message from a queue (blocking)"
["timeout" "key"] :none :edn-array]
"lindex" ["Get the element from a queue at given index"
["key", "index"] :none :edn-array]
"lrange" ["Get the elements from a queue"
["key" "start" "stop"] :none :edn-array]
"lset" ["Sets the element from a queue at given index"
["key" "index" "value"] :edn :none]
"lrem" ["Removes matching count of given message from a queue"
["key" "count" "value"] :edn :none]
"llen" ["Gets the length of a queue"
["key"] :none :none]
"linsert" ["Inserts a message into a queue in reference to given pivot"
["key" "where" "pivot" "value"] :edn :none]
"ltrim" ["Trim a queue between the given limit values"
["key" "start" "stop"]
:none :none]})

(doseq [[method parameters] (reflection/get-klazz-methods
redis.clients.jedis.JedisPooled
allowmap)]
(eval `(reflection/->wrap-method ~method ~parameters ~allowmap)))

;; --- directional ---

(defn push!
[client queue-name values & [options]]
(let [{:keys [direction]
:or {direction :l}} options
push-fn (if (= direction :l) lpush rpush)]
(apply push-fn [client queue-name values options])))

(defn pop!
[client queue-name count & [options]]
(let [{:keys [direction timeout]
:or {direction :r}} options
pop-fn (if (= direction :r)
(if timeout brpop rpop)
(if timeout blpop lpop))
num (or timeout count)]
(apply pop-fn (flatten [client
(if timeout
[num queue-name]
[queue-name num])
options]))))

(defn bpop!
[client queue-name timeout & [options]]
(apply pop! [client queue-name count
(assoc options :timeout timeout)]))

(defn index
[client queue-name index & [options]]
(first (apply lindex [client queue-name index options])))

(defn range
[client queue-name start stop & [options]]
(apply lrange [client queue-name start stop options]))

(defn set!
[client queue-name index value & [options]]
(apply lset [client queue-name index value options]))

(defn len
[client queue-name & [options]]
(apply llen [client queue-name options]))

(defn rem!
[client queue-name count value & [options]]
(apply lrem [client queue-name count value options]))

(defn insert!
[client queue-name where pivot value & [options]]
(apply linsert [client queue-name
(adapters/->list-position where)
pivot value options]))

(defn trim!
[client queue-name start stop & [options]]
(apply ltrim [client queue-name start stop options]))
168 changes: 70 additions & 98 deletions test/com/moclojer/rq/queue_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,114 +11,86 @@
message (helpers/gen-message)
message2 (helpers/gen-message)]

[(t/testing "raw"
(rq-queue/lpush client queue-name [message message2])
(t/is (= 2 (rq-queue/llen client queue-name)))
[(t/testing "simple"
(rq-queue/push! client queue-name [message message2])
(t/is (= 2 (rq-queue/len client queue-name)))
(t/is (= [message message2]
(rq-queue/rpop client queue-name 2))))
(rq-queue/pop! client queue-name 2))))

(t/testing "direction"
;; pushing from the right, then reverse popping from the left
(rq-queue/push! client queue-name [message message2]
{:direction :r})
(t/is (= [message message2]
(rq-queue/pop! client queue-name 2
{:direction :l}))))

(t/testing "pattern"
(rq-queue/rpush client queue-name [message] {:pattern :pending})
(t/is
(= [message]
(rq-queue/rpop client queue-name 1 {:pattern :pending}))))]
(rq-queue/push! client queue-name [message]
{:pattern :pending})
(t/is (= [message]
(rq-queue/pop! client queue-name 1
{:pattern :pending}))))

(t/testing "blocking"
(rq-queue/push! client queue-name [message])
(t/is (= message
(second (rq-queue/bpop! client queue-name 1)))))

(t/testing "index"
(rq-queue/push! client queue-name [message])
(t/is (= message (rq-queue/index client queue-name 0)))
(rq-queue/pop! client queue-name 1))

(t/testing "range"
(rq-queue/push! client queue-name [message message2])
(t/is (= [message2 message]
(rq-queue/range client queue-name 0 -1)))
(rq-queue/pop! client queue-name 2))

(t/testing "set!"
(rq-queue/push! client queue-name [message message2])
(rq-queue/set! client queue-name 0 message2)
(rq-queue/set! client queue-name 1 message)
(t/is (= [message message2] (rq-queue/pop! client queue-name 2))))

(t/testing "rem!"
(rq-queue/push! client queue-name [message message message])
(rq-queue/rem! client queue-name 3 message)
(t/is (= 0 (rq-queue/len client queue-name))))

(t/testing "insert!"
(rq-queue/push! client queue-name [message])
(rq-queue/insert! client queue-name :before message message2)
(t/is (= [message message2] (rq-queue/pop! client queue-name 2))))

(t/testing "trim!"
(let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]}
message (assoc base-message :uuid (random-uuid))
another-message (assoc base-message :uuid (random-uuid))]
(rq-queue/push! client queue-name [another-message message])
[(t/is (= "OK" (rq-queue/trim! client queue-name 1 -1)))
(t/is (= [(dissoc another-message :uuid)]
(map #(dissoc % :uuid)
(rq-queue/range client queue-name 0 -1))))])
(rq-queue/pop! client queue-name 2))]

(rq/close-client client)))

(comment
(t/testing "bpop! left"
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l}))))
(rq-queue/push! client queue-name message)
(let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :l})]
(t/is (= message popped-message))
(t/is (= 0 (rq-queue/llen client queue-name)))))

(t/testing "bpop! right"
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :r}))))
(rq-queue/push! client queue-name message)
(let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :r})]
(t/is (= message popped-message))
(t/is (= 0 (rq-queue/llen client queue-name)))))

(t/testing "lindex"
(rq-queue/push! client queue-name message)
(t/is (= message (rq-queue/lindex client queue-name 0))))

(t/testing "lset"
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l}))))
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name another-message)
(rq-queue/lset client queue-name 0 another-message)
(t/is (= another-message (rq-queue/lindex client queue-name 0)))
(rq-queue/lset client queue-name 1 message)
(t/is (= message (rq-queue/lindex client queue-name 1)))
(rq-queue/pop! client queue-name :direction :l)
(rq-queue/pop! client queue-name :direction :l))

(t/testing "lrem"
(rq-queue/push! client queue-name message)
(rq-queue/lrem client queue-name 1 message)
(t/is (= 0 (rq-queue/llen client queue-name))))

(t/testing "linsert"
(rq-queue/push! client queue-name message)
(rq-queue/linsert client queue-name message another-message :pos :before)
(t/is (= another-message (rq-queue/lindex client queue-name 0)))
(rq-queue/pop! client queue-name :direction :l)
(rq-queue/pop! client queue-name :direction :l))

(t/testing "lrange"
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name another-message)
(let [result (rq-queue/lrange client queue-name 0 1)]
(t/is (= [message another-message] (reverse result))))
(rq-queue/pop! client queue-name :direction :l)
(rq-queue/pop! client queue-name :direction :l))

(t/testing "ltrim"
(let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]}
message (assoc base-message :uuid (java.util.UUID/randomUUID))
another-message (assoc base-message :uuid (java.util.UUID/randomUUID))]
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name another-message)
(t/is (= "OK" (rq-queue/ltrim client queue-name 1 -1)))
(let [result (rq-queue/lrange client queue-name 0 -1)]
(t/is (= [(dissoc another-message :uuid)]
(map #(dissoc % :uuid) result)))))
(rq-queue/pop! client queue-name :direction :l)
(rq-queue/pop! client queue-name :direction :l))

(t/testing "rpoplpush"
(rq-queue/push! client queue-name message)
(rq-queue/rpoplpush client queue-name another-queue-name)
(t/is (= 0 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client another-queue-name :direction :l))))

(t/testing "brpoplpush"
(rq-queue/push! client queue-name message)
(rq-queue/brpoplpush client queue-name another-queue-name 1)
(t/is (= 0 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client another-queue-name :direction :l))))

(t/testing "lmove"
(rq-queue/push! client queue-name message)
(rq-queue/lmove client queue-name another-queue-name "LEFT" "RIGHT")
(t/is (= 0 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client another-queue-name :direction :r)))))
(def my-client (rq/create-client "redis://localhost:6379"))

(comment
(dotimes [_ 10]
(queue-test))
(rq-queue/push! my-client "my-queue2" [{:hello true}])

(def my-client (rq/create-client "redis://localhost:6379"))
(rq-queue/insert! my-client "my-queue2" :before {:hello true} {:bye false})

(dotimes [_ 10]
(println
(rq-queue/lpush my-client "my-queue"
[(helpers/gen-message)]
{:pattern :pending})
(first (rq-queue/rpop my-client "my-queue" 1
{:pattern :pending}))))
(rq-queue/range my-client "my-queue2" 0 -1)

(rq-queue/len my-client "my-queue2")

(rq-queue/pop! my-client "my-queue2" 2)

(rq/close-client my-client)

(rq/close-client my-client)
;;
Expand Down

0 comments on commit 04b11ad

Please sign in to comment.