Skip to content

Commit

Permalink
Implement manual 'FindIntersection' for Hydra producer.
Browse files Browse the repository at this point in the history
  We do this by skipping blocks because we don't really have much alternative. This is therefore relatively unsound for very long running head. Though is it more sound that actually replaying all events from that head onto an existing database; so it's kind of a best effort solution.
  • Loading branch information
KtorZ committed Oct 13, 2023
1 parent 6a6c767 commit 397a4d8
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 20 deletions.
1 change: 1 addition & 0 deletions modules/websockets-json/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ library:
- bytestring
- connection
- exceptions
- io-classes
- websockets
- wuss
4 changes: 2 additions & 2 deletions modules/websockets-json/src/Network/WebSockets/Json.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Prelude
import Control.Exception
( Exception
)
import Control.Monad.Catch
import Control.Monad.Class.MonadThrow
( MonadThrow (..)
)
import Control.Monad.IO.Class
Expand Down Expand Up @@ -54,7 +54,7 @@ receiveJson
receiveJson ws decoder = do
bytes <- liftIO (WS.receiveData ws)
either
(\(path, err) -> throwM $ MalformedOrUnexpectedResponse bytes path err)
(\(path, err) -> throwIO $ MalformedOrUnexpectedResponse bytes path err)
pure
(Json.eitherDecodeWith Json.jsonEOF (Json.iparse decoder) bytes)

Expand Down
1 change: 1 addition & 0 deletions modules/websockets-json/websockets-json.cabal

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 109 additions & 15 deletions src/Kupo/App/ChainSync/Hydra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ module Kupo.App.ChainSync.Hydra

import Kupo.Prelude

import Control.Exception.Safe
( MonadThrow
, throwM
)
import Kupo.App.Mailbox
( Mailbox
, putHighFrequencyMessage
)
import Kupo.Control.MonadSTM
( MonadSTM (..)
)
import Kupo.Control.MonadThrow
( MonadThrow (..)
)
import Kupo.Data.Cardano
( Point
, Tip
, TransactionId
, WithOrigin (..)
, getPointSlotNo
, getTipSlotNo
)
import Kupo.Data.ChainSync
( IntersectionNotFoundException (..)
Expand All @@ -38,7 +40,7 @@ import Kupo.Data.Hydra
, mkHydraBlock
)
import Kupo.Data.PartialBlock
( PartialBlock
( PartialBlock (..)
, PartialTransaction (..)
)

Expand All @@ -50,26 +52,112 @@ runChainSyncClient
:: forall m.
( MonadIO m
, MonadSTM m
, MonadThrow m, MonadThrow (STM m))
, MonadThrow m
, MonadThrow (STM m)
)
=> Mailbox m (Tip, PartialBlock) (Tip, Point)
-> m () -- An action to run before the main loop starts.
-> [Point]
-> WS.Connection
-> m IntersectionNotFoundException
runChainSyncClient mailbox beforeMainLoop _pts ws = do
runChainSyncClient mailbox beforeMainLoop pts ws = do
beforeMainLoop
TransactionStore{push, pop} <- newTransactionStore
forever $ do
WS.receiveJson ws decodeHydraMessage >>= \case
flip evalStateT (sortOn getPointSlotNo pts) $ forever $ do
lift (WS.receiveJson ws decodeHydraMessage) >>= \case
HeadIsOpen{genesisTxs} -> do
atomically (putHighFrequencyMessage mailbox (mkHydraBlock 0 genesisTxs))
handleBlock $ mkHydraBlock 0 genesisTxs
TxValid{tx} -> do
push tx
lift $ push tx
SnapshotConfirmed{ snapshot = Snapshot { number, confirmedTransactionIds }} -> do
txs <- pop confirmedTransactionIds
atomically (putHighFrequencyMessage mailbox (mkHydraBlock number txs))
txs <- lift $ pop confirmedTransactionIds
handleBlock $ mkHydraBlock number txs
SomethingElse -> do
pure ()
where
-- Hydra doesn't provide any mechanism to negotiate an intersection point and receive snapshots
-- only from a certain point. So we 'fake' it by skippping blocks until we have caught up with our
-- state.
--
-- - We start from a (possibly empty) ascending list of checkpoints that we have already processed.
-- - For each 'block' received from the Hydra head, we have one of three situation.
--
-- 1. Its slot number is lower than the current cursor (i.e. smallest checkpoint)
-- 2. Its slot number is equal to the current cursor
-- 3. Its slot number is higher than the current cursor.
--
-- - In the case of (1), it's simple[NOTE^A], we have already seen that block and we can just skip it. The
-- cursor doesn't move as we haven't caught up with that checkpoint yet.
--
-- - In the case of (2), we have two possibilities:
--
-- 1. The block header hash and the checkpoint header hash are identical
-- -> This means we've found a block that's on both chains. We can skip it as we already know
-- of it. We can move to the next checkpoint too.
--
-- 2. The block header hash and the checkpoint header hash are different
-- -> This means we (the indexer) is on a fork. But we don't have enough information to know
-- where to rollback to. So we throw an exception.
-- Note that this truly is *exceptional* because Hydra heads cannot fork in principle. The
-- only way we end up here is if someone tries to connect an already synchronized index to a
-- new head which happen to have different chain histories. And that's a very valid reason to
-- crash.
--
-- - The case of (3) is in principle _impossible_ because a Hydra snapshots are monotically
-- increasing with no gaps. So we can't possibly hit that scenario without first hitting (2).
--
-- - When the list of checkpoints is empty, it means we have caught up with the chain and any new
-- block is relevant to the indexer.
--
--
-- NOTE^A: It is "simple" because we assume that there is not two different Hydra "chains" that
-- might have two equal blocks. However, in principle, we should verify that the discarded blocks
-- still form a chain up to the last known block we stored. Otherwise, there could be a scenario
-- where we have stored: b1, b2, b3 but receive b1', b2', b3. If are waiting for a snapshot number
-- at 3, then we will discard b1' and b2' even though they are actually different from what we
-- stored.
--
-- This can really only happen if we have concomittantly:
-- (a) Someone restarting the indexer on a *different* Hydra head.
-- (b) ALL of our selected checkpoint being identical on both heads.
--
-- Without (b), the condition 2.2 would already throw an exception. Given the scenario we're in,
-- we consider this quite unlikely.
handleBlock
:: (Tip, PartialBlock)
-> StateT [Point] m ()
handleBlock (tip, block) = do
get >>= \case
-- No checkpoints, all blocks are relevant.
[] -> do
lift $ atomically (putHighFrequencyMessage mailbox (tip, block))

requestedPoints@(cursor:_) -> do
-- Block is before a known checkpoint, ignoring.
if | slot < getPointSlotNo cursor -> do
return ()

-- Block is at checkpoint
| slot == getPointSlotNo cursor -> do
if point == cursor then
-- Checkpoint matches, we skip the block and move the cursor
modify' (drop 1)
else
lift $ throwIO $ IntersectionNotFound
{ requestedPoints = At . getPointSlotNo <$> requestedPoints
, tip = At . getTipSlotNo $ tip
}

-- Block is after a known checkpoint, absurd
| otherwise -> do
error $ "impossible: received a block *after* a known checkpoint? \
\There isn't supposed to be any gaps in Hydra snapshots \
\however this suggests that there was one.\
\We choked on: " <> show point <> ", while expecting a \
\snapshot at: " <> show cursor <> "."
where
point = blockPoint block
slot = getPointSlotNo point

connect
:: ConnectionStatusToggle IO
Expand All @@ -96,7 +184,13 @@ newtype TransactionStoreException = TransactionNotInStore { transactionId :: Tra

instance Exception TransactionStoreException

newTransactionStore :: (Monad m, MonadSTM m, MonadThrow (STM m)) => m (TransactionStore m)
newTransactionStore
:: forall m.
( Monad m
, MonadSTM m
, MonadThrow (STM m)
)
=> m (TransactionStore m)
newTransactionStore = do
store <- atomically $ newTVar mempty
pure TransactionStore
Expand All @@ -107,7 +201,7 @@ newTransactionStore = do
txMap <- readTVar store
forM ids $ \id -> do
case Map.lookup id txMap of
Nothing -> throwM $ TransactionNotInStore id
Nothing -> throwIO $ TransactionNotInStore id
Just tx -> do
writeTVar store (Map.delete id txMap)
pure tx
Expand Down
6 changes: 3 additions & 3 deletions src/Kupo/App/ChainSync/Ogmios.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ module Kupo.App.ChainSync.Ogmios

import Kupo.Prelude

import Control.Exception.Safe
( MonadThrow
)
import Kupo.App.Mailbox
( Mailbox
, putHighFrequencyMessage
Expand All @@ -23,6 +20,9 @@ import Kupo.App.Mailbox
import Kupo.Control.MonadSTM
( MonadSTM (..)
)
import Kupo.Control.MonadThrow
( MonadThrow (..)
)
import Kupo.Data.Cardano
( Point
, SlotNo
Expand Down

0 comments on commit 397a4d8

Please sign in to comment.