From 397a4d8a46ddbb46b6b499c96df1784462c64908 Mon Sep 17 00:00:00 2001 From: KtorZ Date: Fri, 13 Oct 2023 13:52:27 +0200 Subject: [PATCH] Implement manual 'FindIntersection' for Hydra producer. We do this by skipping blocks because we don't really have much alternative. This is therefore relatively unsound for very long running head. Though is it more sound that actually replaying all events from that head onto an existing database; so it's kind of a best effort solution. --- modules/websockets-json/package.yaml | 1 + .../src/Network/WebSockets/Json.hs | 4 +- modules/websockets-json/websockets-json.cabal | 1 + src/Kupo/App/ChainSync/Hydra.hs | 124 +++++++++++++++--- src/Kupo/App/ChainSync/Ogmios.hs | 6 +- 5 files changed, 116 insertions(+), 20 deletions(-) diff --git a/modules/websockets-json/package.yaml b/modules/websockets-json/package.yaml index b6e0053f..782d1f9e 100644 --- a/modules/websockets-json/package.yaml +++ b/modules/websockets-json/package.yaml @@ -28,5 +28,6 @@ library: - bytestring - connection - exceptions + - io-classes - websockets - wuss diff --git a/modules/websockets-json/src/Network/WebSockets/Json.hs b/modules/websockets-json/src/Network/WebSockets/Json.hs index e6de1729..16e906c6 100644 --- a/modules/websockets-json/src/Network/WebSockets/Json.hs +++ b/modules/websockets-json/src/Network/WebSockets/Json.hs @@ -13,7 +13,7 @@ import Prelude import Control.Exception ( Exception ) -import Control.Monad.Catch +import Control.Monad.Class.MonadThrow ( MonadThrow (..) ) import Control.Monad.IO.Class @@ -54,7 +54,7 @@ receiveJson receiveJson ws decoder = do bytes <- liftIO (WS.receiveData ws) either - (\(path, err) -> throwM $ MalformedOrUnexpectedResponse bytes path err) + (\(path, err) -> throwIO $ MalformedOrUnexpectedResponse bytes path err) pure (Json.eitherDecodeWith Json.jsonEOF (Json.iparse decoder) bytes) diff --git a/modules/websockets-json/websockets-json.cabal b/modules/websockets-json/websockets-json.cabal index b6266164..71b01af3 100644 --- a/modules/websockets-json/websockets-json.cabal +++ b/modules/websockets-json/websockets-json.cabal @@ -77,6 +77,7 @@ library , bytestring , connection , exceptions + , io-classes , websockets , wuss default-language: Haskell2010 diff --git a/src/Kupo/App/ChainSync/Hydra.hs b/src/Kupo/App/ChainSync/Hydra.hs index 8d9cc9ec..96facf4d 100644 --- a/src/Kupo/App/ChainSync/Hydra.hs +++ b/src/Kupo/App/ChainSync/Hydra.hs @@ -12,10 +12,6 @@ module Kupo.App.ChainSync.Hydra import Kupo.Prelude -import Control.Exception.Safe - ( MonadThrow - , throwM - ) import Kupo.App.Mailbox ( Mailbox , putHighFrequencyMessage @@ -23,10 +19,16 @@ import Kupo.App.Mailbox import Kupo.Control.MonadSTM ( MonadSTM (..) ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) import Kupo.Data.Cardano ( Point , Tip , TransactionId + , WithOrigin (..) + , getPointSlotNo + , getTipSlotNo ) import Kupo.Data.ChainSync ( IntersectionNotFoundException (..) @@ -38,7 +40,7 @@ import Kupo.Data.Hydra , mkHydraBlock ) import Kupo.Data.PartialBlock - ( PartialBlock + ( PartialBlock (..) , PartialTransaction (..) ) @@ -50,26 +52,112 @@ runChainSyncClient :: forall m. ( MonadIO m , MonadSTM m - , MonadThrow m, MonadThrow (STM m)) + , MonadThrow m + , MonadThrow (STM m) + ) => Mailbox m (Tip, PartialBlock) (Tip, Point) -> m () -- An action to run before the main loop starts. -> [Point] -> WS.Connection -> m IntersectionNotFoundException -runChainSyncClient mailbox beforeMainLoop _pts ws = do +runChainSyncClient mailbox beforeMainLoop pts ws = do beforeMainLoop TransactionStore{push, pop} <- newTransactionStore - forever $ do - WS.receiveJson ws decodeHydraMessage >>= \case + flip evalStateT (sortOn getPointSlotNo pts) $ forever $ do + lift (WS.receiveJson ws decodeHydraMessage) >>= \case HeadIsOpen{genesisTxs} -> do - atomically (putHighFrequencyMessage mailbox (mkHydraBlock 0 genesisTxs)) + handleBlock $ mkHydraBlock 0 genesisTxs TxValid{tx} -> do - push tx + lift $ push tx SnapshotConfirmed{ snapshot = Snapshot { number, confirmedTransactionIds }} -> do - txs <- pop confirmedTransactionIds - atomically (putHighFrequencyMessage mailbox (mkHydraBlock number txs)) + txs <- lift $ pop confirmedTransactionIds + handleBlock $ mkHydraBlock number txs SomethingElse -> do pure () + where + -- Hydra doesn't provide any mechanism to negotiate an intersection point and receive snapshots + -- only from a certain point. So we 'fake' it by skippping blocks until we have caught up with our + -- state. + -- + -- - We start from a (possibly empty) ascending list of checkpoints that we have already processed. + -- - For each 'block' received from the Hydra head, we have one of three situation. + -- + -- 1. Its slot number is lower than the current cursor (i.e. smallest checkpoint) + -- 2. Its slot number is equal to the current cursor + -- 3. Its slot number is higher than the current cursor. + -- + -- - In the case of (1), it's simple[NOTE^A], we have already seen that block and we can just skip it. The + -- cursor doesn't move as we haven't caught up with that checkpoint yet. + -- + -- - In the case of (2), we have two possibilities: + -- + -- 1. The block header hash and the checkpoint header hash are identical + -- -> This means we've found a block that's on both chains. We can skip it as we already know + -- of it. We can move to the next checkpoint too. + -- + -- 2. The block header hash and the checkpoint header hash are different + -- -> This means we (the indexer) is on a fork. But we don't have enough information to know + -- where to rollback to. So we throw an exception. + -- Note that this truly is *exceptional* because Hydra heads cannot fork in principle. The + -- only way we end up here is if someone tries to connect an already synchronized index to a + -- new head which happen to have different chain histories. And that's a very valid reason to + -- crash. + -- + -- - The case of (3) is in principle _impossible_ because a Hydra snapshots are monotically + -- increasing with no gaps. So we can't possibly hit that scenario without first hitting (2). + -- + -- - When the list of checkpoints is empty, it means we have caught up with the chain and any new + -- block is relevant to the indexer. + -- + -- + -- NOTE^A: It is "simple" because we assume that there is not two different Hydra "chains" that + -- might have two equal blocks. However, in principle, we should verify that the discarded blocks + -- still form a chain up to the last known block we stored. Otherwise, there could be a scenario + -- where we have stored: b1, b2, b3 but receive b1', b2', b3. If are waiting for a snapshot number + -- at 3, then we will discard b1' and b2' even though they are actually different from what we + -- stored. + -- + -- This can really only happen if we have concomittantly: + -- (a) Someone restarting the indexer on a *different* Hydra head. + -- (b) ALL of our selected checkpoint being identical on both heads. + -- + -- Without (b), the condition 2.2 would already throw an exception. Given the scenario we're in, + -- we consider this quite unlikely. + handleBlock + :: (Tip, PartialBlock) + -> StateT [Point] m () + handleBlock (tip, block) = do + get >>= \case + -- No checkpoints, all blocks are relevant. + [] -> do + lift $ atomically (putHighFrequencyMessage mailbox (tip, block)) + + requestedPoints@(cursor:_) -> do + -- Block is before a known checkpoint, ignoring. + if | slot < getPointSlotNo cursor -> do + return () + + -- Block is at checkpoint + | slot == getPointSlotNo cursor -> do + if point == cursor then + -- Checkpoint matches, we skip the block and move the cursor + modify' (drop 1) + else + lift $ throwIO $ IntersectionNotFound + { requestedPoints = At . getPointSlotNo <$> requestedPoints + , tip = At . getTipSlotNo $ tip + } + + -- Block is after a known checkpoint, absurd + | otherwise -> do + error $ "impossible: received a block *after* a known checkpoint? \ + \There isn't supposed to be any gaps in Hydra snapshots \ + \however this suggests that there was one.\ + \We choked on: " <> show point <> ", while expecting a \ + \snapshot at: " <> show cursor <> "." + where + point = blockPoint block + slot = getPointSlotNo point connect :: ConnectionStatusToggle IO @@ -96,7 +184,13 @@ newtype TransactionStoreException = TransactionNotInStore { transactionId :: Tra instance Exception TransactionStoreException -newTransactionStore :: (Monad m, MonadSTM m, MonadThrow (STM m)) => m (TransactionStore m) +newTransactionStore + :: forall m. + ( Monad m + , MonadSTM m + , MonadThrow (STM m) + ) + => m (TransactionStore m) newTransactionStore = do store <- atomically $ newTVar mempty pure TransactionStore @@ -107,7 +201,7 @@ newTransactionStore = do txMap <- readTVar store forM ids $ \id -> do case Map.lookup id txMap of - Nothing -> throwM $ TransactionNotInStore id + Nothing -> throwIO $ TransactionNotInStore id Just tx -> do writeTVar store (Map.delete id txMap) pure tx diff --git a/src/Kupo/App/ChainSync/Ogmios.hs b/src/Kupo/App/ChainSync/Ogmios.hs index bfab209e..02fd8d02 100644 --- a/src/Kupo/App/ChainSync/Ogmios.hs +++ b/src/Kupo/App/ChainSync/Ogmios.hs @@ -12,9 +12,6 @@ module Kupo.App.ChainSync.Ogmios import Kupo.Prelude -import Control.Exception.Safe - ( MonadThrow - ) import Kupo.App.Mailbox ( Mailbox , putHighFrequencyMessage @@ -23,6 +20,9 @@ import Kupo.App.Mailbox import Kupo.Control.MonadSTM ( MonadSTM (..) ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) import Kupo.Data.Cardano ( Point , SlotNo