From f28b6736cac1349e3061be6ce0064a64e06ef75f Mon Sep 17 00:00:00 2001 From: KtorZ Date: Fri, 13 Oct 2023 13:52:27 +0200 Subject: [PATCH] Implement manual 'FindIntersection' for Hydra producer. We do this by skipping blocks because we don't really have much alternative. This is therefore relatively unsound for very long running head. Though is it more sound that actually replaying all events from that head onto an existing database; so it's kind of a best effort solution. --- CHANGELOG.md | 27 +++- modules/websockets-json/package.yaml | 1 + .../src/Network/WebSockets/Json.hs | 4 +- modules/websockets-json/websockets-json.cabal | 1 + src/Kupo/App.hs | 12 +- src/Kupo/App/ChainSync/Hydra.hs | 124 +++++++++++++++--- src/Kupo/App/ChainSync/Ogmios.hs | 6 +- src/Kupo/Data/Hydra.hs | 8 +- 8 files changed, 156 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fc1757a..89625c1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,30 @@ -### [2.7.0] - UNRELEASED +### [2.7.0] - 2023-10-13 #### Added -- Support for indexing a Hydra head. Use `--hydra-host` and `--hydra-port` to - connect `kupo` to a `hydra-node`. +- Support for indexing a Hydra head. + + Use `--hydra-host` and `--hydra-port` to connect `kupo` to a `hydra-node`. + + > **Note**: + > + > Hydra heads do not actually form a 'chain'; Hydra doesn't have blocks, but + > snapshots which are akin to blocks so we treat them as such. The block + > header hash we opted for is arbitrary and is a hash of all the transaction + > id present in the snapshot. + > + > It also comes with diminished capabilities since Hydra doesn't have any + > protocol to query metadata for example and Kupo does not store them. So + > unlike Ogmios or Cardano-node, metadata cannot be retrieved when Hydra is + > used as a chain producer. + +#### Changed + +- N/A + +#### Removed + +- N/A ### [2.6.1] - 2023-08-30 diff --git a/modules/websockets-json/package.yaml b/modules/websockets-json/package.yaml index b6e0053f..782d1f9e 100644 --- a/modules/websockets-json/package.yaml +++ b/modules/websockets-json/package.yaml @@ -28,5 +28,6 @@ library: - bytestring - connection - exceptions + - io-classes - websockets - wuss diff --git a/modules/websockets-json/src/Network/WebSockets/Json.hs b/modules/websockets-json/src/Network/WebSockets/Json.hs index e6de1729..16e906c6 100644 --- a/modules/websockets-json/src/Network/WebSockets/Json.hs +++ b/modules/websockets-json/src/Network/WebSockets/Json.hs @@ -13,7 +13,7 @@ import Prelude import Control.Exception ( Exception ) -import Control.Monad.Catch +import Control.Monad.Class.MonadThrow ( MonadThrow (..) ) import Control.Monad.IO.Class @@ -54,7 +54,7 @@ receiveJson receiveJson ws decoder = do bytes <- liftIO (WS.receiveData ws) either - (\(path, err) -> throwM $ MalformedOrUnexpectedResponse bytes path err) + (\(path, err) -> throwIO $ MalformedOrUnexpectedResponse bytes path err) pure (Json.eitherDecodeWith Json.jsonEOF (Json.iparse decoder) bytes) diff --git a/modules/websockets-json/websockets-json.cabal b/modules/websockets-json/websockets-json.cabal index b6266164..71b01af3 100644 --- a/modules/websockets-json/websockets-json.cabal +++ b/modules/websockets-json/websockets-json.cabal @@ -77,6 +77,7 @@ library , bytestring , connection , exceptions + , io-classes , websockets , wuss default-language: Haskell2010 diff --git a/src/Kupo/App.hs b/src/Kupo/App.hs index 7edd32c8..02737390 100644 --- a/src/Kupo/App.hs +++ b/src/Kupo/App.hs @@ -2,6 +2,7 @@ -- License, v. 2.0. If a copy of the MPL was not distributed with this -- file, You can obtain one at http://mozilla.org/MPL/2.0/. +{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} module Kupo.App @@ -77,6 +78,7 @@ import Kupo.Data.Cardano , distanceToTip , getPoint , getPointSlotNo + , pattern GenesisPoint ) import Kupo.Data.ChainSync ( ForcedRollbackHandler (..) @@ -173,11 +175,19 @@ newProducer tr chainProducer callback = do Hydra{hydraHost, hydraPort} -> do logWith tr ConfigurationHydra{hydraHost, hydraPort} mailbox <- atomically (newMailbox mailboxCapacity) + callback forcedRollbackCallback mailbox $ \_tracerChainSync checkpoints statusToggle -> do let runHydra pts beforeMainLoop onIntersectionNotFound continuation = do res <- race (Hydra.connect statusToggle hydraHost hydraPort $ - Hydra.runChainSyncClient mailbox beforeMainLoop pts + -- NOTE: Kupo does not generally index the genesis 'block' / configuration. + -- That's true for the normal case with ogmios or cardano-node; but for + -- Hydra, we might want to. + -- + -- This is debatable and should be backed by use-cases, and may also be + -- confusing since normally Kupo starts indexing only the blocks that + -- follows the given checkpoints. But now we make an exception for genesis. + Hydra.runChainSyncClient mailbox beforeMainLoop (filter (/= GenesisPoint) pts) ) (atomically (takeTMVar forcedRollbackVar)) case res of diff --git a/src/Kupo/App/ChainSync/Hydra.hs b/src/Kupo/App/ChainSync/Hydra.hs index 8d9cc9ec..46641d54 100644 --- a/src/Kupo/App/ChainSync/Hydra.hs +++ b/src/Kupo/App/ChainSync/Hydra.hs @@ -12,10 +12,6 @@ module Kupo.App.ChainSync.Hydra import Kupo.Prelude -import Control.Exception.Safe - ( MonadThrow - , throwM - ) import Kupo.App.Mailbox ( Mailbox , putHighFrequencyMessage @@ -23,10 +19,16 @@ import Kupo.App.Mailbox import Kupo.Control.MonadSTM ( MonadSTM (..) ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) import Kupo.Data.Cardano ( Point , Tip , TransactionId + , WithOrigin (..) + , getPointSlotNo + , getTipSlotNo ) import Kupo.Data.ChainSync ( IntersectionNotFoundException (..) @@ -38,7 +40,7 @@ import Kupo.Data.Hydra , mkHydraBlock ) import Kupo.Data.PartialBlock - ( PartialBlock + ( PartialBlock (..) , PartialTransaction (..) ) @@ -50,26 +52,112 @@ runChainSyncClient :: forall m. ( MonadIO m , MonadSTM m - , MonadThrow m, MonadThrow (STM m)) + , MonadThrow m + , MonadThrow (STM m) + ) => Mailbox m (Tip, PartialBlock) (Tip, Point) -> m () -- An action to run before the main loop starts. -> [Point] -> WS.Connection -> m IntersectionNotFoundException -runChainSyncClient mailbox beforeMainLoop _pts ws = do +runChainSyncClient mailbox beforeMainLoop pts ws = do beforeMainLoop TransactionStore{push, pop} <- newTransactionStore - forever $ do - WS.receiveJson ws decodeHydraMessage >>= \case + flip evalStateT (sortOn getPointSlotNo pts) $ forever $ do + lift (WS.receiveJson ws decodeHydraMessage) >>= \case HeadIsOpen{genesisTxs} -> do - atomically (putHighFrequencyMessage mailbox (mkHydraBlock 0 genesisTxs)) + handleBlock $ mkHydraBlock 0 genesisTxs TxValid{tx} -> do - push tx + lift $ push tx SnapshotConfirmed{ snapshot = Snapshot { number, confirmedTransactionIds }} -> do - txs <- pop confirmedTransactionIds - atomically (putHighFrequencyMessage mailbox (mkHydraBlock number txs)) + txs <- lift $ pop confirmedTransactionIds + handleBlock $ mkHydraBlock number txs SomethingElse -> do pure () + where + -- Hydra doesn't provide any mechanism to negotiate an intersection point and receive snapshots + -- only from a certain point. So we 'fake' it by skippping blocks until we have caught up with our + -- state. + -- + -- - We start from a (possibly empty) ascending list of checkpoints that we have already processed. + -- - For each 'block' received from the Hydra head, we have one of three situation. + -- + -- 1. Its slot number is lower than the current cursor (i.e. smallest checkpoint) + -- 2. Its slot number is equal to the current cursor + -- 3. Its slot number is higher than the current cursor. + -- + -- - In the case of (1), it's simple[NOTE^A], we have already seen that block and we can just skip it. The + -- cursor doesn't move as we haven't caught up with that checkpoint yet. + -- + -- - In the case of (2), we have two possibilities: + -- + -- 1. The block header hash and the checkpoint header hash are identical + -- -> This means we've found a block that's on both chains. We can skip it as we already know + -- of it. We can move to the next checkpoint too. + -- + -- 2. The block header hash and the checkpoint header hash are different + -- -> This means we (the indexer) is on a fork. But we don't have enough information to know + -- where to rollback to. So we throw an exception. + -- Note that this truly is *exceptional* because Hydra heads cannot fork in principle. The + -- only way we end up here is if someone tries to connect an already synchronized index to a + -- new head which happen to have different chain histories. And that's a very valid reason to + -- crash. + -- + -- - The case of (3) is in principle _impossible_ because a Hydra snapshots are monotically + -- increasing with no gaps. So we can't possibly hit that scenario without first hitting (2). + -- + -- - When the list of checkpoints is empty, it means we have caught up with the chain and any new + -- block is relevant to the indexer. + -- + -- + -- NOTE^A: It is "simple" because we assume that there is not two different Hydra "chains" that + -- might have two equal blocks. However, in principle, we should verify that the discarded blocks + -- still form a chain up to the last known block we stored. Otherwise, there could be a scenario + -- where we have stored: b1, b2, b3 but receive b1', b2', b3. If are waiting for a snapshot number + -- at 3, then we will discard b1' and b2' even though they are actually different from what we + -- stored. + -- + -- This can really only happen if we have concomittantly: + -- (a) Someone restarting the indexer on a *different* Hydra head. + -- (b) ALL of our selected checkpoint being identical on both heads. + -- + -- Without (b), the condition 2.2 would already throw an exception. Given the scenario we're in, + -- we consider this quite unlikely. + handleBlock + :: (Tip, PartialBlock) + -> StateT [Point] m () + handleBlock (tip, block) = do + get >>= \case + -- No checkpoints, all blocks are relevant. + [] -> do + lift $ atomically (putHighFrequencyMessage mailbox (tip, block)) + + requestedPoints@(cursor:_) -> do + -- Block is before a known checkpoint, ignoring. + if | slot < getPointSlotNo cursor -> do + return () + + -- Block is at checkpoint + | slot == getPointSlotNo cursor -> do + if point == cursor then do + -- Checkpoint matches, we skip the block and move the cursor + modify' (drop 1) + else + lift $ throwIO $ IntersectionNotFound + { requestedPoints = At . getPointSlotNo <$> requestedPoints + , tip = At . getTipSlotNo $ tip + } + + -- Block is after a known checkpoint, absurd + | otherwise -> do + error $ "impossible: received a block *after* a known checkpoint? \ + \There isn't supposed to be any gaps in Hydra snapshots \ + \however this suggests that there was one.\ + \We choked on: " <> show point <> ", while expecting a \ + \snapshot at: " <> show cursor <> "." + where + point = blockPoint block + slot = getPointSlotNo point connect :: ConnectionStatusToggle IO @@ -96,7 +184,13 @@ newtype TransactionStoreException = TransactionNotInStore { transactionId :: Tra instance Exception TransactionStoreException -newTransactionStore :: (Monad m, MonadSTM m, MonadThrow (STM m)) => m (TransactionStore m) +newTransactionStore + :: forall m. + ( Monad m + , MonadSTM m + , MonadThrow (STM m) + ) + => m (TransactionStore m) newTransactionStore = do store <- atomically $ newTVar mempty pure TransactionStore @@ -107,7 +201,7 @@ newTransactionStore = do txMap <- readTVar store forM ids $ \id -> do case Map.lookup id txMap of - Nothing -> throwM $ TransactionNotInStore id + Nothing -> throwIO $ TransactionNotInStore id Just tx -> do writeTVar store (Map.delete id txMap) pure tx diff --git a/src/Kupo/App/ChainSync/Ogmios.hs b/src/Kupo/App/ChainSync/Ogmios.hs index bfab209e..02fd8d02 100644 --- a/src/Kupo/App/ChainSync/Ogmios.hs +++ b/src/Kupo/App/ChainSync/Ogmios.hs @@ -12,9 +12,6 @@ module Kupo.App.ChainSync.Ogmios import Kupo.Prelude -import Control.Exception.Safe - ( MonadThrow - ) import Kupo.App.Mailbox ( Mailbox , putHighFrequencyMessage @@ -23,6 +20,9 @@ import Kupo.App.Mailbox import Kupo.Control.MonadSTM ( MonadSTM (..) ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) import Kupo.Data.Cardano ( Point , SlotNo diff --git a/src/Kupo/Data/Hydra.hs b/src/Kupo/Data/Hydra.hs index daa9d022..6278efc4 100644 --- a/src/Kupo/Data/Hydra.hs +++ b/src/Kupo/Data/Hydra.hs @@ -44,6 +44,7 @@ import Kupo.Data.Cardano , scriptFromBytes , scriptHashFromText , transactionIdFromText + , transactionIdToBytes , unsafeHeaderHashFromBytes , unsafeValueFromList , withReferences @@ -63,7 +64,7 @@ import qualified Data.Aeson.Key as Key import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.Aeson.Types as Json import qualified Data.ByteString as BS -import qualified Data.ByteString.Builder as BS +import qualified Data.ByteString.Builder as B import qualified Data.Map.Strict as Map import qualified Data.Text as Text @@ -83,8 +84,9 @@ data Snapshot = Snapshot mkHydraBlock :: Word64 -> [PartialTransaction] -> (Tip, PartialBlock) mkHydraBlock number txs = do let - headerHash = number - & hashWith @Blake2b_256 (toStrict . BS.toLazyByteString . BS.word64BE) + headerHash = txs + & foldr (\PartialTransaction{id} -> (B.byteString (transactionIdToBytes id) <>)) mempty + & hashWith @Blake2b_256 (toStrict . B.toLazyByteString) & hashToBytes & unsafeHeaderHashFromBytes