Skip to content

Commit

Permalink
Wire in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
janosmeszaros committed Jan 8, 2022
1 parent 0b4a68e commit ae1f797
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 59 deletions.
33 changes: 16 additions & 17 deletions src/porsas/async.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
(ns porsas.async
(:require [porsas.core :as p])
(:require [porsas.cache :as cache]
[porsas.core :as p])
(:import (io.vertx.pgclient PgPool PgConnectOptions)
(io.vertx.sqlclient PoolOptions Tuple RowSet)
io.vertx.sqlclient.impl.ArrayTuple
io.vertx.pgclient.impl.RowImpl
(io.vertx.core Vertx Handler AsyncResult VertxOptions)
(java.util Collection HashMap Map)
java.util.Collection
(clojure.lang PersistentVector)
(java.util.concurrent CompletableFuture Executor CompletionStage)
(java.util.function Function)))
Expand Down Expand Up @@ -100,21 +101,19 @@
| key | description |
| --------------|-------------|
| `:row` | Optional function of `tuple->value` or a [[RowCompiler]] to convert rows into values
| `:cache` | Optional [[java.util.Map]] instance to hold the compiled rowmappers"
| `:cache` | Optional [[porsas.cache/Cache]] instance to hold the compiled rowmappers"
([] (context {}))
([{:keys [row cache] :or {cache (HashMap.)}}]
(let [cache (or cache (reify Map (get [_ _]) (put [_ _ _]) (entrySet [_])))
->row (fn [sql ^RowSet rs]
(let [cols (col-map rs)
row (cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))]
(.put ^Map cache sql row)
row))]
([{:keys [row cache]}]
(let [cache (or cache (cache/create-cache))
->row (fn [_sql ^RowSet rs]
(let [cols (col-map rs)]
(cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))))]
(reify
p/Cached
(cache [_] (into {} cache))
cache/Cached
(cache [_] (cache/elements cache))
Context
(-query-one [_ pool sqlvec]
(let [sql (-get-sql sqlvec)
Expand All @@ -130,7 +129,7 @@
it (.iterator rs)]
(if-not (.hasNext it)
(.complete cf nil)
(let [row (or (.get ^Map cache sql) (->row sql rs))]
(let [row (cache/lookup-or-set cache sql #(->row %1 rs))]
(.complete cf (row (.next it))))))
(.completeExceptionally cf (.cause ^AsyncResult res)))))))
cf))
Expand All @@ -146,7 +145,7 @@
(if (.succeeded ^AsyncResult res)
(let [rs ^RowSet (.result ^AsyncResult res)
it (.iterator rs)
row (or (.get ^Map cache sql) (->row sql rs))]
row (cache/lookup-or-set cache sql #(->row %1 rs))]
(loop [res []]
(if (.hasNext it)
(recur (conj res (row (.next it))))
Expand Down
5 changes: 5 additions & 0 deletions src/porsas/cache.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
(lookup-or-set [this k value-fn] "Lookup a value in the cache based on `k` and if not found set its value based on `value-fn` and returns it.")
(elements [this] "Returns the actual state of the cache."))

(defprotocol Cached
(cache [this]))

(defrecord CoreCache [a]
Cache
(lookup-or-set [this k value-fn]
(c/lookup-or-miss (:a this) k value-fn))
(elements [this] (into {} @(:a this))))

(defn create-cache []
(->CoreCache (c/basic-cache-factory {})))
3 changes: 0 additions & 3 deletions src/porsas/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
(defprotocol GetValue
(get-value [this i]))

(defprotocol Cached
(cache [this]))

;;
;; Implementation
;;
Expand Down
48 changes: 23 additions & 25 deletions src/porsas/jdbc.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
(ns porsas.jdbc
(:require [porsas.core :as p])
(:require [porsas.cache :as cache]
[porsas.core :as p])
(:import (java.sql Connection PreparedStatement ResultSet ResultSetMetaData)
(javax.sql DataSource)
(java.util Iterator Map)
java.util.Iterator
(clojure.lang PersistentVector)))

;;
Expand Down Expand Up @@ -118,42 +119,39 @@
| --------------|-------------|
| `:row` | Optional function of `rs->value` or a [[RowCompiler]] to convert rows into values
| `:key` | Optional function of `rs-meta i->key` to create key for map-results
| `:cache` | Optional [[java.util.Map]] instance to hold the compiled rowmappers"
| `:cache` | Optional [[porsas.cache/Cache]] instance to hold the compiled rowmappers"
([] (context {}))
([{:keys [row key cache] :or {key (unqualified-key)
cache (java.util.HashMap.)}}]
(let [cache (or cache (reify Map (get [_ _]) (put [_ _ _]) (entrySet [_])))
->row (fn [sql rs]
(let [cols (col-map rs key)
row (cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))]
(.put ^Map cache sql row)
row))]
([{:keys [row key cache] :or {key (unqualified-key)}}]
(let [c (or cache (cache/create-cache))
->row (fn [_sql rs]
(let [cols (col-map rs key)]
(cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))))]
(reify
p/Cached
(cache [_] (into {} cache))
cache/Cached
(cache [_] (cache/elements cache))
Context
(-query-one [_ connection sqlvec]
(let [sql (-get-sql sqlvec)
(let [sql (-get-sql sqlvec)
params (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
ps (.prepareStatement ^Connection connection sql)]
(try
(prepare! ps params)
(let [rs (.executeQuery ps)
row (or (.get ^Map cache sql) (->row sql rs))]
(if (.next rs) (row rs)))
(let [rs (.executeQuery ps)
row (cache/lookup-or-set c sql #(->row %1 rs))]
(when (.next rs) (row rs)))
(finally
(.close ps)))))
(-query [_ connection sqlvec]
(let [sql (-get-sql sqlvec)
it (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
it (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
(try
(prepare! ps it)
(let [rs (.executeQuery ps)
row (or (.get ^Map cache sql) (->row sql rs))]
(let [rs (.executeQuery ps)
row (cache/lookup-or-set c sql #(->row %1 rs))]
(loop [res []]
(if (.next rs)
(recur (conj res (row rs)))
Expand Down
17 changes: 7 additions & 10 deletions src/porsas/next.clj
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
(ns porsas.next
(:require [next.jdbc.result-set :as rs]
[porsas.jdbc :as pj]
[porsas.core :as p])
(:import (java.sql ResultSet)
(java.util HashMap)))
[porsas.cache :as cache]
[porsas.core :as p]
[porsas.jdbc :as pj])
(:import (java.sql ResultSet)))

(defn caching-row-builder
"A [[next.jdbc.result-set/RowBuilder]] implementation using porsas. WIP."
([]
(caching-row-builder (pj/qualified-key)))
([key]
(let [cache (HashMap.)] ;; TODO: make bounded
(let [cache (cache/create-cache)]
(fn [^ResultSet rs opts]
(let [sql (:next.jdbc/sql-string opts)
->row (or (.get cache sql)
(let [->row (p/rs-> 1 nil (map last (pj/col-map rs key)))]
(.put cache sql ->row)
->row))]
->row (cache/lookup-or-set cache sql (fn [_] (p/rs-> 1 nil (map last (pj/col-map rs key)))))]
(reify
p/Cached
cache/Cached
(cache [_] (into {} cache))
rs/RowBuilder
(->row [_] (->row rs))
Expand Down
7 changes: 3 additions & 4 deletions test/porsas/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
[manifold.deferred :as d])
(:import io.vertx.pgclient.PgPool))

(def pool-opts {:uri "postgresql://localhost:5432/porsas"
:user "user"
:password "password"})
(def pool-opts {:uri "postgresql://localhost:5432/porsas"
:user "postgres"})

(t/deftest async
(let [pool (pa/pool pool-opts)]
Expand All @@ -17,7 +16,7 @@
(pa/then :name)
(deref))))

(t/is (= "io.reactiverse.pgclient.PgException: relation \"non_existing\" does not exist"
(t/is (= "io.vertx.pgclient.PgException: ERROR: relation \"non_existing\" does not exist (42P01)"
(-> (pa/query-one pool ["SELECT * from non_existing where id=$1" 1])
(pa/then :name)
(pa/catch #(-> % .getMessage))
Expand Down

0 comments on commit ae1f797

Please sign in to comment.