diff --git a/.gitignore b/.gitignore index 4f29282f..ded8721b 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ dist-newstyle ## Dist /dist *.tar.gz +/cabal.project.local diff --git a/.gitmodules b/.gitmodules index 59aab0a9..f0a525ac 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "test/ogmios"] path = test/vectors/ogmios url = git@github.com:cardanosolutions/ogmios.git +[submodule "test/vectors/hydra"] + path = test/vectors/hydra + url = git@github.com:input-output-hk/hydra.git diff --git a/CHANGELOG.md b/CHANGELOG.md index e30d9797..89625c1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,31 @@ +### [2.7.0] - 2023-10-13 + +#### Added + +- Support for indexing a Hydra head. + + Use `--hydra-host` and `--hydra-port` to connect `kupo` to a `hydra-node`. + + > **Note**: + > + > Hydra heads do not actually form a 'chain'; Hydra doesn't have blocks, but + > snapshots which are akin to blocks so we treat them as such. The block + > header hash we opted for is arbitrary and is a hash of all the transaction + > id present in the snapshot. + > + > It also comes with diminished capabilities since Hydra doesn't have any + > protocol to query metadata for example and Kupo does not store them. So + > unlike Ogmios or Cardano-node, metadata cannot be retrieved when Hydra is + > used as a chain producer. + +#### Changed + +- N/A + +#### Removed + +- N/A + ### [2.6.1] - 2023-08-30 #### Added diff --git a/cabal.project b/cabal.project index b4d730d7..a76958fe 100644 --- a/cabal.project +++ b/cabal.project @@ -23,7 +23,6 @@ test-show-details: direct package kupo tests: true - flags: +production package websockets-json tests: false diff --git a/docs/api/latest.yaml b/docs/api/latest.yaml index 85401b4b..3867186b 100644 --- a/docs/api/latest.yaml +++ b/docs/api/latest.yaml @@ -224,6 +224,25 @@ info: Kupo will synchronize data directly from Ogmios! Neat isn't it? + ## --hydra-host {hostname} / --hydra-port {port-number} + + Kupo can also be used to index the layer two ledger available in a + [Hydra](https://github.com/input-output-hk/hydra/#readme) head. For this, we + need to use `--hydra-host` and `--hydra-port` to point to a machine running + a `hydra-node` of a head you want to index and the port of the websocket + API. + + For example: + + ```console + $ kupo \ + --hydra-host 0.0.0.0 \ + --hydra-port 4001 \ + --since origin \ + --match * \ + --workdir ./db + ``` + ## --help In case you're lost, don't forget that a summary of this manual is available by running: diff --git a/docs/man/README.md b/docs/man/README.md index 9799ed6c..b18c269f 100644 --- a/docs/man/README.md +++ b/docs/man/README.md @@ -32,22 +32,32 @@ kupo - Fast, lightweight & configurable chain-index for Cardano. **--node-socket** : Path to the Cardano node domain socket file. - (**NOTE**: Unused when connecting to Ogmios) + (**NOTE**: Unused when connecting to Ogmios or Hydra) **--node-config** : Path to the Cardano node configuration file. - (**NOTE**: Unused when connecting to Ogmios) + (**NOTE**: Unused when connecting to Ogmios or Hydra) **--ogmios-host** : Ogmios' host address. - (**NOTE**: Unused when connecting to a Cardano node) + (**NOTE**: Unused when connecting to a Cardano node or Hydra) **--ogmios-port** : Ogmios' port. - (**NOTE**: Unused when connecting to a Cardano node) + (**NOTE**: Unused when connecting to a Cardano node or Hydra) + +**--hydra-host** +: Hydra-node host address. + + (**NOTE**: Unused when connecting to a Cardano node or Ogmios) + +**--hydra-port** +: Hydra-node port. + + (**NOTE**: Unused when connecting to a Cardano node or Ogmios) **--workdir** : Path to a working directory, where the SQLite database files are stored. By convention, the database is called kupo.sqlite. @@ -256,6 +266,13 @@ kupo - Fast, lightweight & configurable chain-index for Cardano. --since 16588737.4e9bbbb67e3ae262133d94c3da5bffce7b1127fc436e7433b87668dba34c354a\ --match addr1vyc29pvl2uyzqt8nwxrcxnf558ffm27u3d9calxn8tdudjgz4xq9p\ +**kupo**\ + --hydra-host 0.0.0.0\ + --hydra-port 4001\ + --in-memory\ + --since origin\ + --match * + # SEE ALSO Online documentation and API reference: diff --git a/kupo.cabal b/kupo.cabal index 57342c49..1b2bdfc6 100644 --- a/kupo.cabal +++ b/kupo.cabal @@ -40,13 +40,14 @@ source-repository head flag production description: Compile executables for production. manual: True - default: False + default: True library exposed-modules: Kupo Kupo.App Kupo.App.ChainSync + Kupo.App.ChainSync.Hydra Kupo.App.ChainSync.Node Kupo.App.ChainSync.Ogmios Kupo.App.Configuration @@ -107,6 +108,7 @@ library Kupo.Data.Http.SlotRange Kupo.Data.Http.Status Kupo.Data.Http.StatusFlag + Kupo.Data.Hydra Kupo.Data.Ogmios Kupo.Data.PartialBlock Kupo.Data.Pattern @@ -295,6 +297,7 @@ test-suite unit Test.Kupo.Data.Http.OrderMatchesBySpec Test.Kupo.Data.Http.SlotRangeSpec Test.Kupo.Data.OgmiosSpec + Test.Kupo.Data.HydraSpec Test.Kupo.Data.Pattern.Fixture Test.Kupo.Data.PatternSpec Test.Kupo.Data.UtxoConstraint diff --git a/modules/websockets-json/package.yaml b/modules/websockets-json/package.yaml index b6e0053f..782d1f9e 100644 --- a/modules/websockets-json/package.yaml +++ b/modules/websockets-json/package.yaml @@ -28,5 +28,6 @@ library: - bytestring - connection - exceptions + - io-classes - websockets - wuss diff --git a/modules/websockets-json/src/Network/WebSockets/Json.hs b/modules/websockets-json/src/Network/WebSockets/Json.hs index e6de1729..16e906c6 100644 --- a/modules/websockets-json/src/Network/WebSockets/Json.hs +++ b/modules/websockets-json/src/Network/WebSockets/Json.hs @@ -13,7 +13,7 @@ import Prelude import Control.Exception ( Exception ) -import Control.Monad.Catch +import Control.Monad.Class.MonadThrow ( MonadThrow (..) ) import Control.Monad.IO.Class @@ -54,7 +54,7 @@ receiveJson receiveJson ws decoder = do bytes <- liftIO (WS.receiveData ws) either - (\(path, err) -> throwM $ MalformedOrUnexpectedResponse bytes path err) + (\(path, err) -> throwIO $ MalformedOrUnexpectedResponse bytes path err) pure (Json.eitherDecodeWith Json.jsonEOF (Json.iparse decoder) bytes) diff --git a/modules/websockets-json/websockets-json.cabal b/modules/websockets-json/websockets-json.cabal index b6266164..71b01af3 100644 --- a/modules/websockets-json/websockets-json.cabal +++ b/modules/websockets-json/websockets-json.cabal @@ -77,6 +77,7 @@ library , bytestring , connection , exceptions + , io-classes , websockets , wuss default-language: Haskell2010 diff --git a/src/Kupo/App.hs b/src/Kupo/App.hs index 3ed3f7e4..02737390 100644 --- a/src/Kupo/App.hs +++ b/src/Kupo/App.hs @@ -2,6 +2,7 @@ -- License, v. 2.0. If a copy of the MPL was not distributed with this -- file, You can obtain one at http://mozilla.org/MPL/2.0/. +{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} module Kupo.App @@ -77,6 +78,7 @@ import Kupo.Data.Cardano , distanceToTip , getPoint , getPointSlotNo + , pattern GenesisPoint ) import Kupo.Data.ChainSync ( ForcedRollbackHandler (..) @@ -100,6 +102,9 @@ import Kupo.Data.Database import Kupo.Data.FetchBlock ( FetchBlockClient ) +import Kupo.Data.PartialBlock + ( PartialBlock + ) import Kupo.Data.Pattern ( Codecs (..) , Match (..) @@ -111,6 +116,7 @@ import Kupo.Data.Pattern import qualified Data.Map as Map import qualified Data.Set as Set +import qualified Kupo.App.ChainSync.Hydra as Hydra import qualified Kupo.App.ChainSync.Node as Node import qualified Kupo.App.ChainSync.Ogmios as Ogmios import qualified Kupo.App.FetchBlock.Node as Node @@ -166,6 +172,38 @@ newProducer tr chainProducer callback = do runOgmios checkpoints (return ()) throwIO restart + Hydra{hydraHost, hydraPort} -> do + logWith tr ConfigurationHydra{hydraHost, hydraPort} + mailbox <- atomically (newMailbox mailboxCapacity) + + callback forcedRollbackCallback mailbox $ \_tracerChainSync checkpoints statusToggle -> do + let runHydra pts beforeMainLoop onIntersectionNotFound continuation = do + res <- race + (Hydra.connect statusToggle hydraHost hydraPort $ + -- NOTE: Kupo does not generally index the genesis 'block' / configuration. + -- That's true for the normal case with ogmios or cardano-node; but for + -- Hydra, we might want to. + -- + -- This is debatable and should be backed by use-cases, and may also be + -- confusing since normally Kupo starts indexing only the blocks that + -- follows the given checkpoints. But now we make an exception for genesis. + Hydra.runChainSyncClient mailbox beforeMainLoop (filter (/= GenesisPoint) pts) + ) + (atomically (takeTMVar forcedRollbackVar)) + case res of + Left notFound -> + onIntersectionNotFound notFound + Right (point, handler) -> + continuation point handler + + let restart point handler = + runHydra [point] + (onSuccess handler) + (\e -> onFailure handler >> throwIO e) + restart + + runHydra checkpoints (return ()) throwIO restart + CardanoNode{nodeSocket, nodeConfig} -> do logWith tr ConfigurationCardanoNode{nodeSocket,nodeConfig} mailbox <- atomically (newMailbox mailboxCapacity) @@ -209,6 +247,8 @@ withFetchBlockClient chainProducer callback = do case chainProducer of Ogmios{ogmiosHost, ogmiosPort} -> Ogmios.withFetchBlockClient ogmiosHost ogmiosPort callback + Hydra{} -> + callback @PartialBlock (\_point respond -> respond Nothing) CardanoNode{nodeSocket, nodeConfig} -> do NetworkParameters { networkMagic diff --git a/src/Kupo/App/ChainSync/Hydra.hs b/src/Kupo/App/ChainSync/Hydra.hs new file mode 100644 index 00000000..46641d54 --- /dev/null +++ b/src/Kupo/App/ChainSync/Hydra.hs @@ -0,0 +1,208 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +module Kupo.App.ChainSync.Hydra + ( connect + , runChainSyncClient + , newTransactionStore + , TransactionStore (..) + , TransactionStoreException (..) + ) where + +import Kupo.Prelude + +import Kupo.App.Mailbox + ( Mailbox + , putHighFrequencyMessage + ) +import Kupo.Control.MonadSTM + ( MonadSTM (..) + ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) +import Kupo.Data.Cardano + ( Point + , Tip + , TransactionId + , WithOrigin (..) + , getPointSlotNo + , getTipSlotNo + ) +import Kupo.Data.ChainSync + ( IntersectionNotFoundException (..) + ) +import Kupo.Data.Hydra + ( HydraMessage (..) + , Snapshot (..) + , decodeHydraMessage + , mkHydraBlock + ) +import Kupo.Data.PartialBlock + ( PartialBlock (..) + , PartialTransaction (..) + ) + +import qualified Data.Map as Map +import qualified Network.WebSockets as WS +import qualified Network.WebSockets.Json as WS + +runChainSyncClient + :: forall m. + ( MonadIO m + , MonadSTM m + , MonadThrow m + , MonadThrow (STM m) + ) + => Mailbox m (Tip, PartialBlock) (Tip, Point) + -> m () -- An action to run before the main loop starts. + -> [Point] + -> WS.Connection + -> m IntersectionNotFoundException +runChainSyncClient mailbox beforeMainLoop pts ws = do + beforeMainLoop + TransactionStore{push, pop} <- newTransactionStore + flip evalStateT (sortOn getPointSlotNo pts) $ forever $ do + lift (WS.receiveJson ws decodeHydraMessage) >>= \case + HeadIsOpen{genesisTxs} -> do + handleBlock $ mkHydraBlock 0 genesisTxs + TxValid{tx} -> do + lift $ push tx + SnapshotConfirmed{ snapshot = Snapshot { number, confirmedTransactionIds }} -> do + txs <- lift $ pop confirmedTransactionIds + handleBlock $ mkHydraBlock number txs + SomethingElse -> do + pure () + where + -- Hydra doesn't provide any mechanism to negotiate an intersection point and receive snapshots + -- only from a certain point. So we 'fake' it by skippping blocks until we have caught up with our + -- state. + -- + -- - We start from a (possibly empty) ascending list of checkpoints that we have already processed. + -- - For each 'block' received from the Hydra head, we have one of three situation. + -- + -- 1. Its slot number is lower than the current cursor (i.e. smallest checkpoint) + -- 2. Its slot number is equal to the current cursor + -- 3. Its slot number is higher than the current cursor. + -- + -- - In the case of (1), it's simple[NOTE^A], we have already seen that block and we can just skip it. The + -- cursor doesn't move as we haven't caught up with that checkpoint yet. + -- + -- - In the case of (2), we have two possibilities: + -- + -- 1. The block header hash and the checkpoint header hash are identical + -- -> This means we've found a block that's on both chains. We can skip it as we already know + -- of it. We can move to the next checkpoint too. + -- + -- 2. The block header hash and the checkpoint header hash are different + -- -> This means we (the indexer) is on a fork. But we don't have enough information to know + -- where to rollback to. So we throw an exception. + -- Note that this truly is *exceptional* because Hydra heads cannot fork in principle. The + -- only way we end up here is if someone tries to connect an already synchronized index to a + -- new head which happen to have different chain histories. And that's a very valid reason to + -- crash. + -- + -- - The case of (3) is in principle _impossible_ because a Hydra snapshots are monotically + -- increasing with no gaps. So we can't possibly hit that scenario without first hitting (2). + -- + -- - When the list of checkpoints is empty, it means we have caught up with the chain and any new + -- block is relevant to the indexer. + -- + -- + -- NOTE^A: It is "simple" because we assume that there is not two different Hydra "chains" that + -- might have two equal blocks. However, in principle, we should verify that the discarded blocks + -- still form a chain up to the last known block we stored. Otherwise, there could be a scenario + -- where we have stored: b1, b2, b3 but receive b1', b2', b3. If are waiting for a snapshot number + -- at 3, then we will discard b1' and b2' even though they are actually different from what we + -- stored. + -- + -- This can really only happen if we have concomittantly: + -- (a) Someone restarting the indexer on a *different* Hydra head. + -- (b) ALL of our selected checkpoint being identical on both heads. + -- + -- Without (b), the condition 2.2 would already throw an exception. Given the scenario we're in, + -- we consider this quite unlikely. + handleBlock + :: (Tip, PartialBlock) + -> StateT [Point] m () + handleBlock (tip, block) = do + get >>= \case + -- No checkpoints, all blocks are relevant. + [] -> do + lift $ atomically (putHighFrequencyMessage mailbox (tip, block)) + + requestedPoints@(cursor:_) -> do + -- Block is before a known checkpoint, ignoring. + if | slot < getPointSlotNo cursor -> do + return () + + -- Block is at checkpoint + | slot == getPointSlotNo cursor -> do + if point == cursor then do + -- Checkpoint matches, we skip the block and move the cursor + modify' (drop 1) + else + lift $ throwIO $ IntersectionNotFound + { requestedPoints = At . getPointSlotNo <$> requestedPoints + , tip = At . getTipSlotNo $ tip + } + + -- Block is after a known checkpoint, absurd + | otherwise -> do + error $ "impossible: received a block *after* a known checkpoint? \ + \There isn't supposed to be any gaps in Hydra snapshots \ + \however this suggests that there was one.\ + \We choked on: " <> show point <> ", while expecting a \ + \snapshot at: " <> show cursor <> "." + where + point = blockPoint block + slot = getPointSlotNo point + +connect + :: ConnectionStatusToggle IO + -> String + -> Int + -> (WS.Connection -> IO a) + -> IO a +connect ConnectionStatusToggle{toggleConnected} host port action = + WS.runClientWith host port "/" + WS.defaultConnectionOptions [] (\ws -> toggleConnected >> action ws) + +-- | Handle to store and later retrieve transaction. +data TransactionStore m = TransactionStore + { -- | Store a transaction for later retrieval. + push :: PartialTransaction -> m () + + , -- | Removes transactions from the store. + -- Throws 'TransactionNotInStore' when not found. + pop :: MonadThrow m => [TransactionId] -> m [PartialTransaction] + } + +newtype TransactionStoreException = TransactionNotInStore { transactionId :: TransactionId } + deriving (Eq, Show) + +instance Exception TransactionStoreException + +newTransactionStore + :: forall m. + ( Monad m + , MonadSTM m + , MonadThrow (STM m) + ) + => m (TransactionStore m) +newTransactionStore = do + store <- atomically $ newTVar mempty + pure TransactionStore + { push = \tx@PartialTransaction{id} -> + atomically $ modifyTVar' store (Map.insert id tx) + , pop = \ids -> + atomically $ do + txMap <- readTVar store + forM ids $ \id -> do + case Map.lookup id txMap of + Nothing -> throwIO $ TransactionNotInStore id + Just tx -> do + writeTVar store (Map.delete id txMap) + pure tx + } diff --git a/src/Kupo/App/ChainSync/Ogmios.hs b/src/Kupo/App/ChainSync/Ogmios.hs index bfab209e..02fd8d02 100644 --- a/src/Kupo/App/ChainSync/Ogmios.hs +++ b/src/Kupo/App/ChainSync/Ogmios.hs @@ -12,9 +12,6 @@ module Kupo.App.ChainSync.Ogmios import Kupo.Prelude -import Control.Exception.Safe - ( MonadThrow - ) import Kupo.App.Mailbox ( Mailbox , putHighFrequencyMessage @@ -23,6 +20,9 @@ import Kupo.App.Mailbox import Kupo.Control.MonadSTM ( MonadSTM (..) ) +import Kupo.Control.MonadThrow + ( MonadThrow (..) + ) import Kupo.Data.Cardano ( Point , SlotNo diff --git a/src/Kupo/App/Configuration.hs b/src/Kupo/App/Configuration.hs index a2fe7e10..1609bd72 100644 --- a/src/Kupo/App/Configuration.hs +++ b/src/Kupo/App/Configuration.hs @@ -215,6 +215,9 @@ data TraceConfiguration where ConfigurationOgmios :: { ogmiosHost :: String, ogmiosPort :: Int } -> TraceConfiguration + ConfigurationHydra + :: { hydraHost :: String, hydraPort :: Int } + -> TraceConfiguration ConfigurationCardanoNode :: { nodeSocket :: FilePath, nodeConfig :: FilePath } -> TraceConfiguration @@ -240,6 +243,7 @@ instance HasSeverityAnnotation TraceConfiguration where getSeverityAnnotation = \case ConfigurationNetwork{} -> Info ConfigurationOgmios{} -> Info + ConfigurationHydra{} -> Info ConfigurationCardanoNode{} -> Info ConfigurationPatterns{} -> Info ConfigurationCheckpointsForIntersection{} -> Info diff --git a/src/Kupo/App/Database.hs b/src/Kupo/App/Database.hs index b818e36f..146b3a6a 100644 --- a/src/Kupo/App/Database.hs +++ b/src/Kupo/App/Database.hs @@ -830,7 +830,7 @@ mkDatabase tr mode longestRollback bracketConnection = Database _otherwise -> do withTemporaryIndex tr conn "inputsByCreatedAt" "inputs(created_at)" $ do withTemporaryIndex tr conn "inputsBySpentAt" "inputs(spent_at)" $ do - deleteInputsIncrementally tr conn minSlotNo + -- deleteInputsIncrementally tr conn minSlotNo traceExecute tr conn rollbackQryUpdateInputs [ minSlotNo ] traceExecute tr conn rollbackQryDeleteCheckpoints [ minSlotNo ] query_ conn selectMaxCheckpointQry >>= \case @@ -845,11 +845,11 @@ mkDatabase tr mode longestRollback bracketConnection = Database retryWhenBusy tr (constantStrategy 0.1) 1 $ withTransaction conn mode (runReaderT r conn) } -deleteInputsIncrementally :: Tracer IO TraceConnection -> Connection -> SQLData -> IO () -deleteInputsIncrementally tr conn minSlotNo = do +_deleteInputsIncrementally :: Tracer IO TraceConnection -> Connection -> SQLData -> IO () +_deleteInputsIncrementally tr conn minSlotNo = do traceExecute tr conn rollbackQryDeleteInputs [ minSlotNo ] deleted <- changes conn - unless (deleted < pruneInputsMaxIncrement) $ deleteInputsIncrementally tr conn minSlotNo + unless (deleted < pruneInputsMaxIncrement) $ _deleteInputsIncrementally tr conn minSlotNo insertRow :: forall tableName. diff --git a/src/Kupo/Data/Configuration.hs b/src/Kupo/Data/Configuration.hs index b732525f..55987bca 100644 --- a/src/Kupo/Data/Configuration.hs +++ b/src/Kupo/Data/Configuration.hs @@ -65,7 +65,7 @@ import qualified Data.Aeson as Json -- | Application-level configuration. data Configuration = Configuration { chainProducer :: ChainProducer - -- ^ Where the data comes from: cardano-node vs ogmios + -- ^ Where the data comes from: cardano-node, ogmios, or hydra -- -- NOTE: There's no bang pattern on this field because we do not want it -- to be unnecessarily evaluated in some test scenarios (e.g. state-machine) @@ -101,7 +101,8 @@ data Configuration = Configuration -- equivalent in the capabilities and information they offer; a cardano-node -- will have to be through a local connection (domain socket) whereas ogmios can -- happen _over the wire_ on a remote server but is slower overall. So both have --- trade-offs. +-- trade-offs. The 'hydra' chain producer is slightly different as it's "chain" +-- is actually the UTxO and transactions happening on the Hydra head layer 2. data ChainProducer = CardanoNode { nodeSocket :: !FilePath @@ -111,6 +112,10 @@ data ChainProducer { ogmiosHost :: !String , ogmiosPort :: !Int } + | Hydra + { hydraHost :: !String + , hydraPort :: !Int + } deriving (Generic, Eq, Show) -- | Database working directory. 'in-memory' runs the database in hot memory, diff --git a/src/Kupo/Data/Hydra.hs b/src/Kupo/Data/Hydra.hs new file mode 100644 index 00000000..6278efc4 --- /dev/null +++ b/src/Kupo/Data/Hydra.hs @@ -0,0 +1,365 @@ +{-# LANGUAGE PatternSynonyms #-} + +module Kupo.Data.Hydra where + +import Kupo.Prelude + +import Cardano.Crypto.Hash + ( hashFromTextAsHex + , hashToBytes + , hashWith + ) +import Cardano.Ledger.SafeHash + ( unsafeMakeSafeHash + ) +import Data.Aeson + ( (.!=) + , (.:) + , (.:?) + ) +import Kupo.Data.Cardano + ( BinaryData + , BlockNo (..) + , Datum (..) + , DatumHash + , Input + , Output + , OutputIndex + , OutputReference + , Script + , ScriptHash + , SlotNo (..) + , Tip + , TransactionId + , Value + , binaryDataFromBytes + , datumHashFromBytes + , getOutputIndex + , getTransactionId + , mkOutput + , mkOutputReference + , outputIndexFromText + , pattern BlockPoint + , pattern Tip + , scriptFromBytes + , scriptHashFromText + , transactionIdFromText + , transactionIdToBytes + , unsafeHeaderHashFromBytes + , unsafeValueFromList + , withReferences + ) +import Kupo.Data.Ogmios + ( decodeAddress + , decodeTransactionId + ) +import Kupo.Data.PartialBlock + ( PartialBlock (..) + , PartialTransaction (..) + ) + +import qualified Codec.CBOR.Decoding as Cbor +import qualified Codec.CBOR.Read as Cbor +import qualified Data.Aeson.Key as Key +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.Aeson.Types as Json +import qualified Data.ByteString as BS +import qualified Data.ByteString.Builder as B +import qualified Data.Map.Strict as Map +import qualified Data.Text as Text + +-- Types + +data HydraMessage + = HeadIsOpen { genesisTxs :: [PartialTransaction] } + | TxValid { tx :: PartialTransaction } + | SnapshotConfirmed { snapshot :: Snapshot } + | SomethingElse + +data Snapshot = Snapshot + { number :: Word64 + , confirmedTransactionIds :: [TransactionId] + } + +mkHydraBlock :: Word64 -> [PartialTransaction] -> (Tip, PartialBlock) +mkHydraBlock number txs = do + let + headerHash = txs + & foldr (\PartialTransaction{id} -> (B.byteString (transactionIdToBytes id) <>)) mempty + & hashWith @Blake2b_256 (toStrict . B.toLazyByteString) + & hashToBytes + & unsafeHeaderHashFromBytes + + slotNo = + SlotNo number + + blockNo = + BlockNo number + in + ( Tip slotNo headerHash blockNo + , PartialBlock + { blockPoint = BlockPoint slotNo headerHash + , blockBody = toList txs + } + ) + +-- Decoders + +decodeHydraMessage :: Json.Value -> Json.Parser HydraMessage +decodeHydraMessage = + Json.withObject "HydraMessage" $ \o -> do + tag <- o .: "tag" + case tag of + ("HeadIsOpen" :: Text) -> HeadIsOpen <$> decodeHeadIsOpen o + ("TxValid" :: Text) -> TxValid <$> (o .: "transaction" >>= decodePartialTransaction) + ("SnapshotConfirmed" :: Text) -> SnapshotConfirmed <$> decodeSnapshotConfirmed o + _ -> pure SomethingElse + +-- | Decode a 'HeadIsOpen' as a multiple "genesis" transactions producing the +-- UTxO as initially available. +decodeHeadIsOpen :: Json.Object -> Json.Parser [PartialTransaction] +decodeHeadIsOpen o = do + (Json.Object utxoMap) <- o .: "utxo" + parsedUTxO <- forM (KeyMap.toList utxoMap) $ \(k,v) -> do + txId <- decodeInput $ toJSON k + pure (txId, v) + forM + (groupByTransactionId parsedUTxO) + (uncurry decodeGenesisTxForUTxO) + +groupByTransactionId + :: [(OutputReference, a)] + -> [(TransactionId, [(OutputIndex, a)])] +groupByTransactionId = + Map.toList . foldr go mempty + where + go (oref, a) m = + Map.unionWith (<>) m $ + Map.singleton (getTransactionId oref) [(getOutputIndex oref, a)] + +decodeGenesisTxForUTxO + :: TransactionId + -> [(OutputIndex, Json.Value)] + -> Json.Parser PartialTransaction +decodeGenesisTxForUTxO id indexOutputs = do + outputs <- forM indexOutputs $ \(ix, v) -> do + out <- decodeOutput v + pure (mkOutputReference id ix, out) + pure PartialTransaction + { id + , inputs = [] + , outputs + , datums = mempty + , scripts = mempty + , metadata = Nothing + } + +decodePartialTransaction :: Json.Value -> Json.Parser PartialTransaction +decodePartialTransaction = Json.withObject "PartialTransaction" $ \o -> do + id <- o .: "id" >>= decodeTransactionId + + body <- o .: "body" + inputs <- body .: "inputs" >>= traverse decodeInput + outputs <- body .:? "outputs" .!= [] >>= traverse decodeOutput + + wits <- o.: "witnesses" + datums <- wits .:? "datums" .!= Json.Object mempty >>= decodeDatums + scripts <- wits .:? "scripts" .!= Json.Object mempty >>= decodeScripts + + -- TODO + -- This is 'acceptable' for now because: + -- + -- (1) This is only truly required when fetching metadata from a data-source. Kupo does not + -- itsself store metadata, so they have no effect when folding over blocks. + -- + -- (2) Hydra does not support fetching metadata of past transactions. If we wanted to support this + -- feature for Hydra, we would need to first deal with (1) since Hydra doesn't provide a protocol + -- / API for it. + let metadata = Nothing + + pure PartialTransaction + { id + , inputs + , outputs = withReferences 0 id outputs + , datums + , scripts + , metadata + } + +decodeDatums :: Json.Value -> Json.Parser (Map DatumHash BinaryData) +decodeDatums = Json.withObject "Datums" $ + KeyMap.foldrWithKey + (\k v accum -> Map.insert + <$> decodeDatumHash k + <*> decodeBinaryData v + <*> accum + ) + (pure mempty) + +decodeDatumHash + :: Json.Key + -> Json.Parser DatumHash +decodeDatumHash k = do + case datumHashFromBytes <$> decodeBase16 (encodeUtf8 (Key.toText k)) of + Right (Just hash) -> + pure hash + Right Nothing -> + fail "decodeDatumHash: datumHashFromBytes failed." + Left e -> + fail (toString e) + +decodeInput + :: Json.Value + -> Json.Parser Input +decodeInput = Json.withText "Input" $ \t -> + maybe (fail $ "failed to parse: " <> show t) pure $ do + (tId, tIx) <- splitInput t + id <- transactionIdFromText tId + ix <- outputIndexFromText tIx + pure $ mkOutputReference id ix + where + splitInput t = + case Text.split (== '#') t of + [tId, tIx] -> Just (tId, tIx) + _ -> Nothing + +decodeOutput + :: Json.Value + -> Json.Parser Output +decodeOutput = Json.withObject "Output" $ \o -> do + datumHash <- o .:? "datumHash" >>= + traverse (fmap unsafeMakeSafeHash . decodeHash @Blake2b_256) + datum <- o .:? "datum" + mkOutput + <$> (o .: "address" >>= decodeAddress) + <*> (o .: "value" >>= decodeValue) + <*> case (datumHash, datum) of + (Just x, _) -> + pure (Reference (Left x)) + (Nothing, Just x) -> + Inline . Right <$> decodeBinaryData x + (Nothing, Nothing) -> + pure NoDatum + <*> (o .:? "script" >>= traverse decodeScript) + +decodeHash + :: HashAlgorithm alg + => Json.Value + -> Json.Parser (Hash alg a) +decodeHash = + Json.parseJSON >=> maybe empty pure . hashFromTextAsHex + +decodeBinaryData + :: Json.Value + -> Json.Parser BinaryData +decodeBinaryData = Json.withText "BinaryData" $ \t -> + case binaryDataFromBytes <$> decodeBase16 (encodeUtf8 t) of + Right (Just bin) -> + pure bin + Right Nothing -> + fail "decodeBinaryData: binaryDataFromBytes failed." + Left e -> + fail (toString e) + +decodeScript + :: Json.Value + -> Json.Parser Script +decodeScript = Json.withText "Script" $ \bytes -> do + case scriptFromBytes' <$> decodeBase16 (encodeUtf8 @Text bytes) of + Right (Just s) -> + pure s + Right Nothing -> + fail "decodeScript: malformed script" + Left e -> + fail $ "decodeScript: not base16: " <> show e + where + scriptFromBytes' (toLazy -> bytes) = do + (toStrict -> script, tag) <- either (fail . show) pure $ + Cbor.deserialiseFromBytes (Cbor.decodeListLen >> Cbor.decodeWord8) bytes + maybe (fail "decodeScript: malformed script") pure $ + scriptFromBytes (BS.singleton tag <> script) + +decodeScriptInEnvelope + :: Json.Value + -> Json.Parser Script +decodeScriptInEnvelope = Json.withObject "ScriptInEnvelope" $ \o -> do + bytes <- o .: "script" >>= (.: "cborHex") >>= decodeBase16' + nestedBytes <- either (fail . show) (pure . snd) $ + Cbor.deserialiseFromBytes Cbor.decodeBytes (toLazy bytes) + o .: "scriptLanguage" >>= \case + "SimpleScriptLanguage" -> + scriptFromBytes' (BS.pack [0] <> nestedBytes) + "PlutusScriptLanguage PlutusScriptV1" -> + scriptFromBytes' (BS.pack [1] <> nestedBytes) + "PlutusScriptLanguage PlutusScriptV2" -> + scriptFromBytes' (BS.pack [2] <> nestedBytes) + "PlutusScriptLanguage PlutusScriptV3" -> + scriptFromBytes' (BS.pack [3] <> nestedBytes) + (_ :: Text) -> + fail "unrecognized script language" + where + scriptFromBytes' = + maybe (fail "decodeScript: malformed script") pure . scriptFromBytes + +decodeScripts :: Json.Value -> Json.Parser (Map ScriptHash Script) +decodeScripts = Json.withObject "Scripts" $ + KeyMap.foldrWithKey + (\k v accum -> Map.insert + <$> decodeScriptHash k + <*> decodeScript v + <*> accum + ) + (pure mempty) + +decodeScriptHash + :: Json.Key + -> Json.Parser ScriptHash +decodeScriptHash k = + case scriptHashFromText (Key.toText k) of + Nothing -> fail "decodeScriptHash" + Just scriptHash -> pure scriptHash + +decodeSnapshotConfirmed :: Json.Object -> Json.Parser Snapshot +decodeSnapshotConfirmed o = do + snapshot <- o .: "snapshot" + number <- snapshot .: "snapshotNumber" + confirmedTransactionIds <- snapshot .: "confirmedTransactions" >>= mapM decodeTransactionId + pure Snapshot + { number + , confirmedTransactionIds + } + +decodeValue + :: Json.Value + -> Json.Parser Value +decodeValue = Json.withObject "Value" $ \o -> do + coins <- o .: "lovelace" + assets <- KeyMap.foldrWithKey + (\k v accum -> + if k == "lovelace" then accum else do + policyId <- decodeBase16' (Key.toText k) + assets <- decodeAssets policyId v + xs <- accum + pure (assets ++ xs) + ) + (pure mempty) + o + pure (unsafeValueFromList coins assets) + where + decodeAssets + :: ByteString + -> Json.Value + -> Json.Parser [(ByteString, ByteString, Integer)] + decodeAssets policyId = + Json.withObject "Assets" $ KeyMap.foldrWithKey + (\k v accum -> do + assetId <- decodeBase16' (Key.toText k) + quantity <- parseJSON v + xs <- accum + pure ((policyId, assetId, quantity) : xs) + ) + (pure mempty) + +decodeBase16' :: Text -> Json.Parser ByteString +decodeBase16' = + either (fail . toString) pure . decodeBase16 . encodeUtf8 diff --git a/src/Kupo/Data/Ogmios.hs b/src/Kupo/Data/Ogmios.hs index 9699fe01..c403bfcf 100644 --- a/src/Kupo/Data/Ogmios.hs +++ b/src/Kupo/Data/Ogmios.hs @@ -15,6 +15,9 @@ module Kupo.Data.Ogmios , decodeFindIntersectionResponse , decodeNextBlockResponse + -- ** Cardano decoders + , decodeTransactionId + , decodeAddress ) where import Kupo.Prelude @@ -301,9 +304,9 @@ decodeScript = Json.withObject "Script" $ \o -> do Right (Just s) -> pure s Right Nothing -> - fail "decodeScript: decodePlutusV1: malformed script" + fail "decodeScript: malformed script" Left e -> - fail $ "decodeScript: decodePlutusV1: not base16: " <> show e + fail $ "decodeScript: not base16: " <> show e decodeNativeScript :: Json.Object diff --git a/src/Kupo/Options.hs b/src/Kupo/Options.hs index a4407bbb..3ff358e8 100644 --- a/src/Kupo/Options.hs +++ b/src/Kupo/Options.hs @@ -157,7 +157,7 @@ parserInfo = info (helper <*> parser) $ mempty chainProducerOption :: Parser ChainProducer chainProducerOption = - cardanoNodeOptions <|> ogmiosOptions + cardanoNodeOptions <|> ogmiosOptions <|> hydraOptions where cardanoNodeOptions = CardanoNode <$> nodeSocketOption @@ -167,6 +167,10 @@ chainProducerOption = <$> ogmiosHostOption <*> ogmiosPortOption + hydraOptions = Hydra + <$> hydraHostOption + <*> hydraPortOption + -- | --node-socket=FILEPATH nodeSocketOption :: Parser FilePath nodeSocketOption = option str $ mempty @@ -232,6 +236,21 @@ ogmiosPortOption = option auto $ mempty <> metavar "TCP/PORT" <> help "Ogmios' port." +-- | [--hydra-host=IPv4] +hydraHostOption :: Parser String +hydraHostOption = option str $ mempty + <> long "hydra-host" + <> metavar "IPv4" + <> help "Hydra-node host address to connect to." + <> completer (bashCompleter "hostname") + +-- | [--hydra-port=TCP/PORT] +hydraPortOption :: Parser Int +hydraPortOption = option auto $ mempty + <> long "hydra-port" + <> metavar "TCP/PORT" + <> help "Hydra-node port to connect to." + -- | [--since=POINT] sinceOption :: Parser Point sinceOption = option (maybeReader rdr) $ mempty diff --git a/test/Test/Kupo/Data/HydraSpec.hs b/test/Test/Kupo/Data/HydraSpec.hs new file mode 100644 index 00000000..d15ead5a --- /dev/null +++ b/test/Test/Kupo/Data/HydraSpec.hs @@ -0,0 +1,111 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. +{-# OPTIONS_GHC -Wno-orphans #-} + +module Test.Kupo.Data.HydraSpec + ( spec + ) where + +import Kupo.Prelude + +import Data.Aeson.Lens + ( _Array + , key + ) +import Kupo.App.ChainSync.Hydra + ( TransactionStore (..) + , TransactionStoreException (..) + , newTransactionStore + ) +import Kupo.Data.Hydra + ( decodeHydraMessage + ) +import Kupo.Data.PartialBlock + ( PartialTransaction (..) + ) +import Test.Hspec + ( Spec + , context + , parallel + , shouldBe + , shouldThrow + ) +import Test.Hspec.QuickCheck + ( prop + ) +import Test.Kupo.AppSpec + ( genPartialTransactions + ) +import Test.Kupo.Data.Generators + ( genOutputReference + ) +import Test.QuickCheck + ( Property + , counterexample + , label + , listOf1 + , shuffle + , withMaxSuccess + ) +import Test.QuickCheck.Monadic + ( assert + , monadicIO + , monitor + , pick + , run + ) + +import qualified Data.Aeson as Json +import qualified Data.Aeson.Types as Json +import qualified Data.Set as Set + +spec :: Spec +spec = parallel $ do + context "TransactionStore" $ do + prop "can retrieve transactions in any order" prop_canRetrieveTxInAnyOrder + + context "JSON decoders" $ do + context "decodeHydraMessage" $ do + prop "can decode test vectors" $ withMaxSuccess 1 $ do + prop_canDecodeFile + (mapM decodeHydraMessage . getSamples) + "./test/vectors/hydra/hydra-node/golden/ReasonablySized (ServerOutput (Tx BabbageEra)).json" + +prop_canRetrieveTxInAnyOrder + :: Property +prop_canRetrieveTxInAnyOrder = monadicIO $ do + TransactionStore{push, pop} <- run newTransactionStore + txs <- pick $ do + txIns <- listOf1 genOutputReference + evalStateT genPartialTransactions (Set.fromList txIns) + txsWithIds <- forM txs $ \tx@PartialTransaction{id} -> do + run $ push tx + pure (tx, id) + monitor (label $ "Generated list length is " <> show (length txsWithIds)) + shuffledTxs <- pick $ shuffle txsWithIds + pure $ + forM_ shuffledTxs $ \(tx, txId) -> do + txs' <- pop [txId] + txs' `shouldBe` [tx] + pop [txId] `shouldThrow` \case + TransactionNotInStore txId' -> txId' == txId + +prop_canDecodeFile + :: (Json.Value -> Json.Parser b) + -> FilePath + -> Property +prop_canDecodeFile decoder vector = monadicIO $ do + let errDecode = "Failed to decode JSON" + value <- maybe (fail errDecode) pure =<< run (Json.decodeFileStrict vector) + case Json.parse decoder value of + Json.Error str -> do + monitor $ counterexample (decodeUtf8 (Json.encode value)) + monitor $ counterexample str + assert False + Json.Success{} -> do + assert True + +getSamples :: Json.Value -> [Json.Value] +getSamples v = + toList $ v ^. key "samples" . _Array diff --git a/test/Test/Kupo/OptionsSpec.hs b/test/Test/Kupo/OptionsSpec.hs index 57f9f711..1b8e9d37 100644 --- a/test/Test/Kupo/OptionsSpec.hs +++ b/test/Test/Kupo/OptionsSpec.hs @@ -88,6 +88,12 @@ spec = parallel $ do , ( [ "--ogmios-port", "1337" ] , shouldFail ) + , ( [ "--hydra-host", "localhost" ] + , shouldFail + ) + , ( [ "--hydra-port", "4001" ] + , shouldFail + ) , ( defaultArgs , shouldParseAppConfiguration $ defaultConfiguration { chainProducer = CardanoNode @@ -106,6 +112,15 @@ spec = parallel $ do , workDir = InMemory } ) + , ( defaultArgs'' + , shouldParseAppConfiguration $ defaultConfiguration + { chainProducer = Hydra + { hydraHost = "localhost" + , hydraPort = 4001 + } + , workDir = InMemory + } + ) , ( filter (/= "--in-memory") defaultArgs ++ [ "--workdir", "./workdir" ] @@ -342,6 +357,13 @@ defaultArgs' = , "--in-memory" ] +defaultArgs'' :: [String] +defaultArgs'' = + [ "--hydra-host", "localhost" + , "--hydra-port", "4001" + , "--in-memory" + ] + shouldParseAppConfiguration :: Configuration -> (Either String Command-> Expectation) diff --git a/test/Test/KupoSpec.hs b/test/Test/KupoSpec.hs index 47c5f0db..e24f87d8 100644 --- a/test/Test/KupoSpec.hs +++ b/test/Test/KupoSpec.hs @@ -191,6 +191,12 @@ varOgmiosHost = "OGMIOS_HOST" varOgmiosPort :: String varOgmiosPort = "OGMIOS_PORT" +varHydraHost :: String +varHydraHost = "HYDRA_HOST" + +varHydraPort :: String +varHydraPort = "HYDRA_PORT" + type EndToEndContext = ( (Configuration -> Configuration) -> IO (Configuration, Env Kupo) , Env Kupo -> DiffTime -> IO () -> IO () @@ -202,6 +208,18 @@ endToEnd = specify spec :: Spec spec = skippableContext "End-to-end" $ do + + endToEnd "can connect" $ \(configure, runSpec, HttpClient{..}) -> do + (_cfg, env) <- configure $ \defaultCfg -> defaultCfg + { workDir = InMemory + , since = Just GenesisPoint + , patterns = fromList [MatchAny OnlyShelley] + } + runSpec env 5 $ do + waitSlot (>= 0) + matches <- getAllMatches NoStatusFlag + matches `shouldSatisfy` not . null + endToEnd "in-memory" $ \(configure, runSpec, HttpClient{..}) -> do (cfg, env) <- configure $ \defaultCfg -> defaultCfg { workDir = InMemory @@ -279,6 +297,11 @@ spec = skippableContext "End-to-end" $ do { ogmiosHost = "/dev/null" , ogmiosPort } + Hydra{hydraPort} -> + Hydra + { hydraHost = "/dev/null" + , hydraPort + } } runSpec env 5 $ do cps <- listCheckpoints @@ -501,6 +524,9 @@ spec = skippableContext "End-to-end" $ do -- - If 'varOgmiosHost' AND 'varOgmiosPort' are set, the spec items will execute against an Ogmios -- server expected to be running and available through the context defined by these variables. -- +-- - If 'varHydraHost' AND 'varHydraPort' are set, the spec items will execute against a Hydra node +-- with an open head running and available through the context defined by these variables. +-- -- If either set of variables is missing, then the spec items do not run for that item. skippableContext :: String -> SpecWith (Arg (EndToEndContext -> IO ())) -> Spec skippableContext prefix skippableSpec = do @@ -545,6 +571,27 @@ skippableContext prefix skippableSpec = do context ogmios $ around (withTempDirectory manager ref defaultCfg) skippableSpec _skipOtherwise -> xcontext ogmios (pure ()) + + let hydra = prefix <> " (hydra)" + runIO ((,) <$> lookupEnv varHydraHost <*> lookupEnv varHydraPort) >>= \case + (Just hydraHost, Just (Prelude.read -> hydraPort)) -> do + manager <- runIO $ newManager $ + defaultManagerSettings { managerResponseTimeout = responseTimeoutNone } + let defaultCfg = Configuration + { chainProducer = Hydra {hydraHost, hydraPort} + , workDir = InMemory + , serverHost = "127.0.0.1" + , serverPort = 0 + , since = Nothing + , patterns = fromList [] + , inputManagement = MarkSpentInputs + , longestRollback = 43200 + , garbageCollectionInterval = 180 + , deferIndexes = InstallIndexesIfNotExist + } + context hydra $ around (withTempDirectory manager ref defaultCfg) skippableSpec + _skipOtherwise -> + xcontext hydra (pure ()) where withTempDirectory :: Manager diff --git a/test/vectors/hydra b/test/vectors/hydra new file mode 160000 index 00000000..3d825b25 --- /dev/null +++ b/test/vectors/hydra @@ -0,0 +1 @@ +Subproject commit 3d825b25c511b32a663ef96df297ba2f3970e399