Skip to content

Commit

Permalink
Wire in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
janosmeszaros committed Dec 12, 2021
1 parent 0b4a68e commit af34eab
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 71 deletions.
51 changes: 26 additions & 25 deletions src/porsas/async.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
(ns porsas.async
(:require [porsas.core :as p])
(:import (io.vertx.pgclient PgPool PgConnectOptions)
(io.vertx.sqlclient PoolOptions Tuple RowSet)
io.vertx.sqlclient.impl.ArrayTuple
io.vertx.pgclient.impl.RowImpl
(io.vertx.core Vertx Handler AsyncResult VertxOptions)
(java.util Collection HashMap Map)
(clojure.lang PersistentVector)
(java.util.concurrent CompletableFuture Executor CompletionStage)
(java.util.function Function)))
(:require
[porsas.cache :as cache]
[porsas.core :as p])
(:import
(clojure.lang PersistentVector)
(io.vertx.core Vertx Handler AsyncResult VertxOptions)
(io.vertx.pgclient PgPool PgConnectOptions)
io.vertx.pgclient.impl.RowImpl
(io.vertx.sqlclient PoolOptions Tuple RowSet)
io.vertx.sqlclient.impl.ArrayTuple
(java.util Collection)
(java.util.concurrent CompletableFuture Executor CompletionStage)
(java.util.function Function)))

(defprotocol SQLParams
(-get-sql [this])
Expand Down Expand Up @@ -100,21 +103,19 @@
| key | description |
| --------------|-------------|
| `:row` | Optional function of `tuple->value` or a [[RowCompiler]] to convert rows into values
| `:cache` | Optional [[java.util.Map]] instance to hold the compiled rowmappers"
| `:cache` | Optional [[porsas.cache/Cache]] instance to hold the compiled rowmappers"
([] (context {}))
([{:keys [row cache] :or {cache (HashMap.)}}]
(let [cache (or cache (reify Map (get [_ _]) (put [_ _ _]) (entrySet [_])))
->row (fn [sql ^RowSet rs]
(let [cols (col-map rs)
row (cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))]
(.put ^Map cache sql row)
row))]
([{:keys [row cache]}]
(let [cache (or cache (cache/create-cache))
->row (fn [_sql ^RowSet rs]
(let [cols (col-map rs)]
(cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))))]
(reify
p/Cached
(cache [_] (into {} cache))
cache/Cached
(cache [_] (cache/elements cache))
Context
(-query-one [_ pool sqlvec]
(let [sql (-get-sql sqlvec)
Expand All @@ -130,7 +131,7 @@
it (.iterator rs)]
(if-not (.hasNext it)
(.complete cf nil)
(let [row (or (.get ^Map cache sql) (->row sql rs))]
(let [row (cache/lookup-or-set cache sql #(->row %1 rs))]
(.complete cf (row (.next it))))))
(.completeExceptionally cf (.cause ^AsyncResult res)))))))
cf))
Expand All @@ -146,7 +147,7 @@
(if (.succeeded ^AsyncResult res)
(let [rs ^RowSet (.result ^AsyncResult res)
it (.iterator rs)
row (or (.get ^Map cache sql) (->row sql rs))]
row (cache/lookup-or-set cache sql #(->row %1 rs))]
(loop [res []]
(if (.hasNext it)
(recur (conj res (row (.next it))))
Expand Down
5 changes: 5 additions & 0 deletions src/porsas/cache.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
(lookup-or-set [this k value-fn] "Lookup a value in the cache based on `k` and if not found set its value based on `value-fn` and returns it.")
(elements [this] "Returns the actual state of the cache."))

(defprotocol Cached
(cache [this]))

(defrecord CoreCache [a]
Cache
(lookup-or-set [this k value-fn]
(c/lookup-or-miss (:a this) k value-fn))
(elements [this] (into {} @(:a this))))

(defn create-cache []
(->CoreCache (c/basic-cache-factory {})))
3 changes: 0 additions & 3 deletions src/porsas/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
(defprotocol GetValue
(get-value [this i]))

(defprotocol Cached
(cache [this]))

;;
;; Implementation
;;
Expand Down
56 changes: 28 additions & 28 deletions src/porsas/jdbc.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
(ns porsas.jdbc
(:require [porsas.core :as p])
(:import (java.sql Connection PreparedStatement ResultSet ResultSetMetaData)
(javax.sql DataSource)
(java.util Iterator Map)
(clojure.lang PersistentVector)))
(:require
[porsas.cache :as cache]
[porsas.core :as p])
(:import
(clojure.lang PersistentVector)
(java.sql Connection PreparedStatement ResultSet ResultSetMetaData)
(java.util Iterator)
(javax.sql DataSource)))

;;
;; Protocols
Expand Down Expand Up @@ -118,42 +121,39 @@
| --------------|-------------|
| `:row` | Optional function of `rs->value` or a [[RowCompiler]] to convert rows into values
| `:key` | Optional function of `rs-meta i->key` to create key for map-results
| `:cache` | Optional [[java.util.Map]] instance to hold the compiled rowmappers"
| `:cache` | Optional [[porsas.cache/Cache]] instance to hold the compiled rowmappers"
([] (context {}))
([{:keys [row key cache] :or {key (unqualified-key)
cache (java.util.HashMap.)}}]
(let [cache (or cache (reify Map (get [_ _]) (put [_ _ _]) (entrySet [_])))
->row (fn [sql rs]
(let [cols (col-map rs key)
row (cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))]
(.put ^Map cache sql row)
row))]
([{:keys [row key cache] :or {key (unqualified-key)}}]
(let [c (or cache (cache/create-cache))
->row (fn [_sql rs]
(let [cols (col-map rs key)]
(cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))))]
(reify
p/Cached
(cache [_] (into {} cache))
cache/Cached
(cache [_] (cache/elements cache))
Context
(-query-one [_ connection sqlvec]
(let [sql (-get-sql sqlvec)
(let [sql (-get-sql sqlvec)
params (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
ps (.prepareStatement ^Connection connection sql)]
(try
(prepare! ps params)
(let [rs (.executeQuery ps)
row (or (.get ^Map cache sql) (->row sql rs))]
(if (.next rs) (row rs)))
(let [rs (.executeQuery ps)
row (cache/lookup-or-set c sql #(->row %1 rs))]
(when (.next rs) (row rs)))
(finally
(.close ps)))))
(-query [_ connection sqlvec]
(let [sql (-get-sql sqlvec)
it (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
it (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
(try
(prepare! ps it)
(let [rs (.executeQuery ps)
row (or (.get ^Map cache sql) (->row sql rs))]
(let [rs (.executeQuery ps)
row (cache/lookup-or-set c sql #(->row %1 rs))]
(loop [res []]
(if (.next rs)
(recur (conj res (row rs)))
Expand Down
21 changes: 10 additions & 11 deletions src/porsas/next.clj
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
(ns porsas.next
(:require [next.jdbc.result-set :as rs]
[porsas.jdbc :as pj]
[porsas.core :as p])
(:import (java.sql ResultSet)
(java.util HashMap)))
(:require
[next.jdbc.result-set :as rs]
[porsas.cache :as cache]
[porsas.core :as p]
[porsas.jdbc :as pj])
(:import
(java.sql ResultSet)))

(defn caching-row-builder
"A [[next.jdbc.result-set/RowBuilder]] implementation using porsas. WIP."
([]
(caching-row-builder (pj/qualified-key)))
([key]
(let [cache (HashMap.)] ;; TODO: make bounded
(let [cache (cache/create-cache)]
(fn [^ResultSet rs opts]
(let [sql (:next.jdbc/sql-string opts)
->row (or (.get cache sql)
(let [->row (p/rs-> 1 nil (map last (pj/col-map rs key)))]
(.put cache sql ->row)
->row))]
->row (cache/lookup-or-set cache sql (fn [_] (p/rs-> 1 nil (map last (pj/col-map rs key)))))]
(reify
p/Cached
cache/Cached
(cache [_] (into {} cache))
rs/RowBuilder
(->row [_] (->row rs))
Expand Down
7 changes: 3 additions & 4 deletions test/porsas/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
[manifold.deferred :as d])
(:import io.vertx.pgclient.PgPool))

(def pool-opts {:uri "postgresql://localhost:5432/porsas"
:user "user"
:password "password"})
(def pool-opts {:uri "postgresql://localhost:5432/porsas"
:user "postgres"})

(t/deftest async
(let [pool (pa/pool pool-opts)]
Expand All @@ -17,7 +16,7 @@
(pa/then :name)
(deref))))

(t/is (= "io.reactiverse.pgclient.PgException: relation \"non_existing\" does not exist"
(t/is (= "io.vertx.pgclient.PgException: ERROR: relation \"non_existing\" does not exist (42P01)"
(-> (pa/query-one pool ["SELECT * from non_existing where id=$1" 1])
(pa/then :name)
(pa/catch #(-> % .getMessage))
Expand Down

0 comments on commit af34eab

Please sign in to comment.