Skip to content

Commit

Permalink
Rework Hydra transaction store API to use list.
Browse files Browse the repository at this point in the history
  So that we remove many elements from the store in a single atomic transaction.
  • Loading branch information
KtorZ committed Oct 12, 2023
1 parent 0d1009a commit 35fe6c2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 39 deletions.
61 changes: 31 additions & 30 deletions src/Kupo/App/ChainSync/Hydra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import Control.Exception.Safe
( MonadThrow
, throwM
)
import qualified Data.Map as Map
import Kupo.App.Mailbox
( Mailbox
, putHighFrequencyMessage
Expand All @@ -42,6 +41,8 @@ import Kupo.Data.PartialBlock
( PartialBlock
, PartialTransaction (..)
)

import qualified Data.Map as Map
import qualified Network.WebSockets as WS
import qualified Network.WebSockets.Json as WS

Expand All @@ -57,17 +58,18 @@ runChainSyncClient
-> m IntersectionNotFoundException
runChainSyncClient mailbox beforeMainLoop _pts ws = do
beforeMainLoop
TransactionStore{pushTx, popTxById} <- newTransactionStore
TransactionStore{push, pop} <- newTransactionStore
forever $ do
WS.receiveJson ws decodeHydraMessage >>= \case
HeadIsOpen{genesisTxs} ->
HeadIsOpen{genesisTxs} -> do
atomically (putHighFrequencyMessage mailbox (mkHydraBlock 0 genesisTxs))
TxValid{tx} ->
pushTx tx
TxValid{tx} -> do
push tx
SnapshotConfirmed{ snapshot = Snapshot { number, confirmedTransactionIds }} -> do
txs <- mapM popTxById confirmedTransactionIds
txs <- pop confirmedTransactionIds
atomically (putHighFrequencyMessage mailbox (mkHydraBlock number txs))
SomethingElse -> pure ()
SomethingElse -> do
pure ()

connect
:: ConnectionStatusToggle IO
Expand All @@ -79,35 +81,34 @@ connect ConnectionStatusToggle{toggleConnected} host port action =
WS.runClientWith host port "/"
WS.defaultConnectionOptions [] (\ws -> toggleConnected >> action ws)

newtype TransactionStoreException = TransactionNotInStore { transactionId :: TransactionId }
deriving (Eq, Show)

instance Exception TransactionStoreException

-- | Handle to store and later retrieve transaction.
data TransactionStore m = TransactionStore
{ -- | Store a transaction for later retrieval.
pushTx :: PartialTransaction -> m ()
, -- | Resolves a transaction id and removes it. Throws
-- 'TransactionNotInStore' when not found.
popTxById :: MonadThrow m => TransactionId -> m PartialTransaction
push :: PartialTransaction -> m ()

, -- | Removes transactions from the store.
-- Throws 'TransactionNotInStore' when not found.
pop :: MonadThrow m => [TransactionId] -> m [PartialTransaction]
}

newtype TransactionStoreException = TransactionNotInStore { transactionId :: TransactionId }
deriving (Eq, Show)

instance Exception TransactionStoreException

newTransactionStore :: (Monad m, MonadSTM m, MonadThrow (STM m)) => m (TransactionStore m)
newTransactionStore = do
txStore <- atomically $ newTVar mempty
pure
TransactionStore
{ pushTx = \tx@PartialTransaction{id} -> atomically $ modifyTVar' txStore (Map.insert id tx)
, popTxById = \txId ->
atomically $ do
txMap <- readTVar txStore
case Map.lookup txId txMap of
Nothing -> throwM $ TransactionNotInStore txId
store <- atomically $ newTVar mempty
pure TransactionStore
{ push = \tx@PartialTransaction{id} ->
atomically $ modifyTVar' store (Map.insert id tx)
, pop = \ids ->
atomically $ do
txMap <- readTVar store
forM ids $ \id -> do
case Map.lookup id txMap of
Nothing -> throwM $ TransactionNotInStore id
Just tx -> do
writeTVar txStore (Map.delete txId txMap)
writeTVar store (Map.delete id txMap)
pure tx
}



}
8 changes: 4 additions & 4 deletions src/Kupo/App/Database.hs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ mkDatabase tr mode longestRollback bracketConnection = Database
_otherwise -> do
withTemporaryIndex tr conn "inputsByCreatedAt" "inputs(created_at)" $ do
withTemporaryIndex tr conn "inputsBySpentAt" "inputs(spent_at)" $ do
deleteInputsIncrementally tr conn minSlotNo
-- deleteInputsIncrementally tr conn minSlotNo
traceExecute tr conn rollbackQryUpdateInputs [ minSlotNo ]
traceExecute tr conn rollbackQryDeleteCheckpoints [ minSlotNo ]
query_ conn selectMaxCheckpointQry >>= \case
Expand All @@ -845,11 +845,11 @@ mkDatabase tr mode longestRollback bracketConnection = Database
retryWhenBusy tr (constantStrategy 0.1) 1 $ withTransaction conn mode (runReaderT r conn)
}

deleteInputsIncrementally :: Tracer IO TraceConnection -> Connection -> SQLData -> IO ()
deleteInputsIncrementally tr conn minSlotNo = do
_deleteInputsIncrementally :: Tracer IO TraceConnection -> Connection -> SQLData -> IO ()
_deleteInputsIncrementally tr conn minSlotNo = do
traceExecute tr conn rollbackQryDeleteInputs [ minSlotNo ]
deleted <- changes conn
unless (deleted < pruneInputsMaxIncrement) $ deleteInputsIncrementally tr conn minSlotNo
unless (deleted < pruneInputsMaxIncrement) $ _deleteInputsIncrementally tr conn minSlotNo

insertRow
:: forall tableName.
Expand Down
10 changes: 5 additions & 5 deletions src/Kupo/Data/Hydra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import Kupo.Data.Cardano
, unsafeValueFromList
, withReferences
)
import Kupo.Data.Ogmios
( decodeAddress
, decodeTransactionId
)
import Kupo.Data.PartialBlock
( PartialBlock (..)
, PartialTransaction (PartialTransaction, datums, id, inputs, metadata, outputs, scripts)
Expand All @@ -47,10 +51,6 @@ import qualified Data.Aeson.Types as Json
import qualified Data.ByteString.Builder as BS
import qualified Data.Map.Strict as Map
import qualified Data.Text as Text
import Kupo.Data.Ogmios
( decodeAddress
, decodeTransactionId
)

-- Types

Expand Down Expand Up @@ -83,7 +83,7 @@ mkHydraBlock number txs = do
( Tip slotNo headerHash blockNo
, PartialBlock
{ blockPoint = BlockPoint slotNo headerHash
, blockBody = txs
, blockBody = toList txs
}
)

Expand Down

0 comments on commit 35fe6c2

Please sign in to comment.