diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fc1757a..89625c1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,30 @@ -### [2.7.0] - UNRELEASED +### [2.7.0] - 2023-10-13 #### Added -- Support for indexing a Hydra head. Use `--hydra-host` and `--hydra-port` to - connect `kupo` to a `hydra-node`. +- Support for indexing a Hydra head. + + Use `--hydra-host` and `--hydra-port` to connect `kupo` to a `hydra-node`. + + > **Note**: + > + > Hydra heads do not actually form a 'chain'; Hydra doesn't have blocks, but + > snapshots which are akin to blocks so we treat them as such. The block + > header hash we opted for is arbitrary and is a hash of all the transaction + > id present in the snapshot. + > + > It also comes with diminished capabilities since Hydra doesn't have any + > protocol to query metadata for example and Kupo does not store them. So + > unlike Ogmios or Cardano-node, metadata cannot be retrieved when Hydra is + > used as a chain producer. + +#### Changed + +- N/A + +#### Removed + +- N/A ### [2.6.1] - 2023-08-30 diff --git a/modules/websockets-json/package.yaml b/modules/websockets-json/package.yaml index b6e0053f..782d1f9e 100644 --- a/modules/websockets-json/package.yaml +++ b/modules/websockets-json/package.yaml @@ -28,5 +28,6 @@ library: - bytestring - connection - exceptions + - io-classes - websockets - wuss diff --git a/modules/websockets-json/src/Network/WebSockets/Json.hs b/modules/websockets-json/src/Network/WebSockets/Json.hs index e6de1729..16e906c6 100644 --- a/modules/websockets-json/src/Network/WebSockets/Json.hs +++ b/modules/websockets-json/src/Network/WebSockets/Json.hs @@ -13,7 +13,7 @@ import Prelude import Control.Exception ( Exception ) -import Control.Monad.Catch +import Control.Monad.Class.MonadThrow ( MonadThrow (..) ) import Control.Monad.IO.Class @@ -54,7 +54,7 @@ receiveJson receiveJson ws decoder = do bytes <- liftIO (WS.receiveData ws) either - (\(path, err) -> throwM $ MalformedOrUnexpectedResponse bytes path err) + (\(path, err) -> throwIO $ MalformedOrUnexpectedResponse bytes path err) pure (Json.eitherDecodeWith Json.jsonEOF (Json.iparse decoder) bytes) diff --git a/modules/websockets-json/websockets-json.cabal b/modules/websockets-json/websockets-json.cabal index b6266164..71b01af3 100644 --- a/modules/websockets-json/websockets-json.cabal +++ b/modules/websockets-json/websockets-json.cabal @@ -77,6 +77,7 @@ library , bytestring , connection , exceptions + , io-classes , websockets , wuss default-language: Haskell2010 diff --git a/src/Kupo/App.hs b/src/Kupo/App.hs index 7edd32c8..02737390 100644 --- a/src/Kupo/App.hs +++ b/src/Kupo/App.hs @@ -2,6 +2,7 @@ -- License, v. 2.0. If a copy of the MPL was not distributed with this -- file, You can obtain one at http://mozilla.org/MPL/2.0/. +{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} module Kupo.App @@ -77,6 +78,7 @@ import Kupo.Data.Cardano , distanceToTip , getPoint , getPointSlotNo + , pattern GenesisPoint ) import Kupo.Data.ChainSync ( ForcedRollbackHandler (..) @@ -173,11 +175,19 @@ newProducer tr chainProducer callback = do Hydra{hydraHost, hydraPort} -> do logWith tr ConfigurationHydra{hydraHost, hydraPort} mailbox <- atomically (newMailbox mailboxCapacity) + callback forcedRollbackCallback mailbox $ \_tracerChainSync checkpoints statusToggle -> do let runHydra pts beforeMainLoop onIntersectionNotFound continuation = do res <- race (Hydra.connect statusToggle hydraHost hydraPort $ - Hydra.runChainSyncClient mailbox beforeMainLoop pts + -- NOTE: Kupo does not generally index the genesis 'block' / configuration. + -- That's true for the normal case with ogmios or cardano-node; but for + -- Hydra, we might want to. + -- + -- This is debatable and should be backed by use-cases, and may also be + -- confusing since normally Kupo starts indexing only the blocks that + -- follows the given checkpoints. But now we make an exception for genesis. + Hydra.runChainSyncClient mailbox beforeMainLoop (filter (/= GenesisPoint) pts) ) (atomically (takeTMVar forcedRollbackVar)) case res of diff --git a/src/Kupo/App/ChainSync/Hydra.hs b/src/Kupo/App/ChainSync/Hydra.hs index 8d9cc9ec..46641d54 100644 --- a/src/Kupo/App/ChainSync/Hydra.hs +++ b/src/Kupo/App/ChainSync/Hydra.hs @@ -12,10 +12,6 @@ module Kupo.App.ChainSync.Hydra import Kupo.Prelude -import Control.Exception.Safe - ( MonadThrow - , throwM - ) import Kupo.App.Mailbox ( Mailbox , putHighFrequencyMessage @@ -23,10 +19,16 @@ import Kupo.App.Mailbox import Kupo.Control.MonadSTM ( MonadSTM (..) ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) import Kupo.Data.Cardano ( Point , Tip , TransactionId + , WithOrigin (..) + , getPointSlotNo + , getTipSlotNo ) import Kupo.Data.ChainSync ( IntersectionNotFoundException (..) @@ -38,7 +40,7 @@ import Kupo.Data.Hydra , mkHydraBlock ) import Kupo.Data.PartialBlock - ( PartialBlock + ( PartialBlock (..) , PartialTransaction (..) ) @@ -50,26 +52,112 @@ runChainSyncClient :: forall m. ( MonadIO m , MonadSTM m - , MonadThrow m, MonadThrow (STM m)) + , MonadThrow m + , MonadThrow (STM m) + ) => Mailbox m (Tip, PartialBlock) (Tip, Point) -> m () -- An action to run before the main loop starts. -> [Point] -> WS.Connection -> m IntersectionNotFoundException -runChainSyncClient mailbox beforeMainLoop _pts ws = do +runChainSyncClient mailbox beforeMainLoop pts ws = do beforeMainLoop TransactionStore{push, pop} <- newTransactionStore - forever $ do - WS.receiveJson ws decodeHydraMessage >>= \case + flip evalStateT (sortOn getPointSlotNo pts) $ forever $ do + lift (WS.receiveJson ws decodeHydraMessage) >>= \case HeadIsOpen{genesisTxs} -> do - atomically (putHighFrequencyMessage mailbox (mkHydraBlock 0 genesisTxs)) + handleBlock $ mkHydraBlock 0 genesisTxs TxValid{tx} -> do - push tx + lift $ push tx SnapshotConfirmed{ snapshot = Snapshot { number, confirmedTransactionIds }} -> do - txs <- pop confirmedTransactionIds - atomically (putHighFrequencyMessage mailbox (mkHydraBlock number txs)) + txs <- lift $ pop confirmedTransactionIds + handleBlock $ mkHydraBlock number txs SomethingElse -> do pure () + where + -- Hydra doesn't provide any mechanism to negotiate an intersection point and receive snapshots + -- only from a certain point. So we 'fake' it by skippping blocks until we have caught up with our + -- state. + -- + -- - We start from a (possibly empty) ascending list of checkpoints that we have already processed. + -- - For each 'block' received from the Hydra head, we have one of three situation. + -- + -- 1. Its slot number is lower than the current cursor (i.e. smallest checkpoint) + -- 2. Its slot number is equal to the current cursor + -- 3. Its slot number is higher than the current cursor. + -- + -- - In the case of (1), it's simple[NOTE^A], we have already seen that block and we can just skip it. The + -- cursor doesn't move as we haven't caught up with that checkpoint yet. + -- + -- - In the case of (2), we have two possibilities: + -- + -- 1. The block header hash and the checkpoint header hash are identical + -- -> This means we've found a block that's on both chains. We can skip it as we already know + -- of it. We can move to the next checkpoint too. + -- + -- 2. The block header hash and the checkpoint header hash are different + -- -> This means we (the indexer) is on a fork. But we don't have enough information to know + -- where to rollback to. So we throw an exception. + -- Note that this truly is *exceptional* because Hydra heads cannot fork in principle. The + -- only way we end up here is if someone tries to connect an already synchronized index to a + -- new head which happen to have different chain histories. And that's a very valid reason to + -- crash. + -- + -- - The case of (3) is in principle _impossible_ because a Hydra snapshots are monotically + -- increasing with no gaps. So we can't possibly hit that scenario without first hitting (2). + -- + -- - When the list of checkpoints is empty, it means we have caught up with the chain and any new + -- block is relevant to the indexer. + -- + -- + -- NOTE^A: It is "simple" because we assume that there is not two different Hydra "chains" that + -- might have two equal blocks. However, in principle, we should verify that the discarded blocks + -- still form a chain up to the last known block we stored. Otherwise, there could be a scenario + -- where we have stored: b1, b2, b3 but receive b1', b2', b3. If are waiting for a snapshot number + -- at 3, then we will discard b1' and b2' even though they are actually different from what we + -- stored. + -- + -- This can really only happen if we have concomittantly: + -- (a) Someone restarting the indexer on a *different* Hydra head. + -- (b) ALL of our selected checkpoint being identical on both heads. + -- + -- Without (b), the condition 2.2 would already throw an exception. Given the scenario we're in, + -- we consider this quite unlikely. + handleBlock + :: (Tip, PartialBlock) + -> StateT [Point] m () + handleBlock (tip, block) = do + get >>= \case + -- No checkpoints, all blocks are relevant. + [] -> do + lift $ atomically (putHighFrequencyMessage mailbox (tip, block)) + + requestedPoints@(cursor:_) -> do + -- Block is before a known checkpoint, ignoring. + if | slot < getPointSlotNo cursor -> do + return () + + -- Block is at checkpoint + | slot == getPointSlotNo cursor -> do + if point == cursor then do + -- Checkpoint matches, we skip the block and move the cursor + modify' (drop 1) + else + lift $ throwIO $ IntersectionNotFound + { requestedPoints = At . getPointSlotNo <$> requestedPoints + , tip = At . getTipSlotNo $ tip + } + + -- Block is after a known checkpoint, absurd + | otherwise -> do + error $ "impossible: received a block *after* a known checkpoint? \ + \There isn't supposed to be any gaps in Hydra snapshots \ + \however this suggests that there was one.\ + \We choked on: " <> show point <> ", while expecting a \ + \snapshot at: " <> show cursor <> "." + where + point = blockPoint block + slot = getPointSlotNo point connect :: ConnectionStatusToggle IO @@ -96,7 +184,13 @@ newtype TransactionStoreException = TransactionNotInStore { transactionId :: Tra instance Exception TransactionStoreException -newTransactionStore :: (Monad m, MonadSTM m, MonadThrow (STM m)) => m (TransactionStore m) +newTransactionStore + :: forall m. + ( Monad m + , MonadSTM m + , MonadThrow (STM m) + ) + => m (TransactionStore m) newTransactionStore = do store <- atomically $ newTVar mempty pure TransactionStore @@ -107,7 +201,7 @@ newTransactionStore = do txMap <- readTVar store forM ids $ \id -> do case Map.lookup id txMap of - Nothing -> throwM $ TransactionNotInStore id + Nothing -> throwIO $ TransactionNotInStore id Just tx -> do writeTVar store (Map.delete id txMap) pure tx diff --git a/src/Kupo/App/ChainSync/Ogmios.hs b/src/Kupo/App/ChainSync/Ogmios.hs index bfab209e..02fd8d02 100644 --- a/src/Kupo/App/ChainSync/Ogmios.hs +++ b/src/Kupo/App/ChainSync/Ogmios.hs @@ -12,9 +12,6 @@ module Kupo.App.ChainSync.Ogmios import Kupo.Prelude -import Control.Exception.Safe - ( MonadThrow - ) import Kupo.App.Mailbox ( Mailbox , putHighFrequencyMessage @@ -23,6 +20,9 @@ import Kupo.App.Mailbox import Kupo.Control.MonadSTM ( MonadSTM (..) ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) import Kupo.Data.Cardano ( Point , SlotNo diff --git a/src/Kupo/Data/Hydra.hs b/src/Kupo/Data/Hydra.hs index daa9d022..6278efc4 100644 --- a/src/Kupo/Data/Hydra.hs +++ b/src/Kupo/Data/Hydra.hs @@ -44,6 +44,7 @@ import Kupo.Data.Cardano , scriptFromBytes , scriptHashFromText , transactionIdFromText + , transactionIdToBytes , unsafeHeaderHashFromBytes , unsafeValueFromList , withReferences @@ -63,7 +64,7 @@ import qualified Data.Aeson.Key as Key import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.Aeson.Types as Json import qualified Data.ByteString as BS -import qualified Data.ByteString.Builder as BS +import qualified Data.ByteString.Builder as B import qualified Data.Map.Strict as Map import qualified Data.Text as Text @@ -83,8 +84,9 @@ data Snapshot = Snapshot mkHydraBlock :: Word64 -> [PartialTransaction] -> (Tip, PartialBlock) mkHydraBlock number txs = do let - headerHash = number - & hashWith @Blake2b_256 (toStrict . BS.toLazyByteString . BS.word64BE) + headerHash = txs + & foldr (\PartialTransaction{id} -> (B.byteString (transactionIdToBytes id) <>)) mempty + & hashWith @Blake2b_256 (toStrict . B.toLazyByteString) & hashToBytes & unsafeHeaderHashFromBytes