Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasBrady committed Nov 6, 2024
1 parent 2a1d084 commit 01b7bb7
Show file tree
Hide file tree
Showing 23 changed files with 351 additions and 104 deletions.
51 changes: 50 additions & 1 deletion src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ ApplyCheckpointWork::openInputFiles()
mTxIn.open(ti.localPath_nogz());
mTxHistoryEntry = TransactionHistoryEntry();
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
mTxResultIn.close();
FileTransferInfo tri(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpoint);
CLOG_DEBUG(History, "Replaying transaction results from {}",
tri.localPath_nogz());
mTxResultIn.open(tri.localPath_nogz());
mTxHistoryResultEntry = TransactionHistoryResultEntry{};
}
mFilesOpen = true;
}

Expand Down Expand Up @@ -141,6 +151,39 @@ ApplyCheckpointWork::getCurrentTxSet()
return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader());
}

std::optional<TransactionResultSet>
ApplyCheckpointWork::getCurrentTxResultSet()
{
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;

// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
do
{
if (mTxHistoryResultEntry.ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry.ledgerSeq);
}
else if (mTxHistoryResultEntry.ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry.ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry.txResultSet);
}
} while (mTxResultIn && mTxResultIn.readOne(mTxHistoryResultEntry));
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
return std::nullopt;
}

std::shared_ptr<LedgerCloseData>
ApplyCheckpointWork::getNextLedgerCloseData()
{
Expand Down Expand Up @@ -219,6 +262,12 @@ ApplyCheckpointWork::getNextLedgerCloseData()
CLOG_DEBUG(History, "Ledger {} has {} transactions", header.ledgerSeq,
txset->sizeTxTotal());

std::optional<TransactionResultSet> txres = std::nullopt;
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
txres = getCurrentTxResultSet();
}

// We've verified the ledgerHeader (in the "trusted part of history"
// sense) in CATCHUP_VERIFY phase; we now need to check that the
// txhash we're about to apply is the one denoted by that ledger
Expand Down Expand Up @@ -249,7 +298,7 @@ ApplyCheckpointWork::getNextLedgerCloseData()

return std::make_shared<LedgerCloseData>(
header.ledgerSeq, txset, header.scpValue,
std::make_optional<Hash>(mHeaderHistoryEntry.hash));
std::make_optional<Hash>(mHeaderHistoryEntry.hash), txres);
}

BasicWork::State
Expand Down
27 changes: 17 additions & 10 deletions src/catchup/ApplyCheckpointWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ class TmpDir;
struct LedgerHeaderHistoryEntry;

/**
* This class is responsible for applying transactions stored in files on
* temporary directory (downloadDir) to local ledger. It requires two sets of
* files - ledgers and transactions - int .xdr format. Transaction files are
* used to read transactions that will be used and ledger files are used to
* This class is responsible for applying transactions stored in files in the
* temporary directory (downloadDir) to local the ledger. It requires two sets
* of files - ledgers and transactions - in .xdr format. Transaction files are
* used to read transactions that will be applied and ledger files are used to
* check if ledger hashes are matching.
*
* It may also require a third set of files - transaction results - to use in
* accelerated replay, where failed transactions are not applied and successful
* transactions are applied without verifying their signatures.
*
* In each run it skips or applies transactions from one ledger. Skipping occurs
* when ledger to be applied is older than LCL from local ledger. At LCL
* boundary checks are made to confirm that ledgers from files knit up with
* LCL. If everything is OK, an apply ledger operation is performed. Then
* another check is made - if new local ledger matches corresponding ledger from
* file.
* when the ledger to be applied is older than the LCL of the local ledger. At
* LCL, boundary checks are made to confirm that the ledgers from the files knit
* up with LCL. If everything is OK, an apply ledger operation is performed.
* Then another check is made - if the new local ledger matches corresponding
* the ledger from file.
*
* Constructor of this class takes some important parameters:
* The constructor of this class takes some important parameters:
* * downloadDir - directory containing ledger and transaction files
* * range - LedgerRange to apply, must be checkpoint-aligned,
* and cover at most one checkpoint.
Expand All @@ -48,7 +52,9 @@ class ApplyCheckpointWork : public BasicWork

XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
XDRInputFileStream mTxResultIn;
TransactionHistoryEntry mTxHistoryEntry;
TransactionHistoryResultEntry mTxHistoryResultEntry;
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
OnFailureCallback mOnFailure;

Expand All @@ -57,6 +63,7 @@ class ApplyCheckpointWork : public BasicWork
std::shared_ptr<ConditionalWork> mConditionalWork;

TxSetXDRFrameConstPtr getCurrentTxSet();
std::optional<TransactionResultSet> getCurrentTxResultSet();
void openInputFiles();

std::shared_ptr<LedgerCloseData> getNextLedgerCloseData();
Expand Down
15 changes: 8 additions & 7 deletions src/catchup/CatchupConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace stellar
{

// Each catchup can be configured by two parameters destination ledger
// Each catchup can be configured by two parameters: destination ledger
// (and its hash, if known) and count of ledgers to apply.
// Value of count can be adjusted in different ways during catchup. If applying
// count ledgers would mean going before the last closed ledger - it is
Expand All @@ -31,12 +31,13 @@ namespace stellar
// and catchup to that instead of destination ledger. This is useful when
// doing offline commandline catchups with stellar-core catchup command.
//
// Catchup can be done in two modes - ONLINE nad OFFLINE. In ONLINE mode node
// is connected to the network. If receives ledgers during catchup and applies
// them after history is applied. Also additional closing ledger is required
// to mark catchup as complete and node as synced. In OFFLINE mode node is not
// connected to network, so new ledgers are not being externalized. Only
// buckets and transactions from history archives are applied.
// Catchup can be done in two modes - ONLINE and OFFLINE. In ONLINE mode, the
// node is connected to the network. If receives ledgers during catchup and
// applies them after history is applied. Also, an additional closing ledger is
// required to mark catchup as complete and the node as synced. In OFFLINE mode,
// the node is not connected to network, so new ledgers are not being
// externalized. Only buckets and transactions from history archives are
// applied.
class CatchupConfiguration
{
public:
Expand Down
14 changes: 7 additions & 7 deletions src/catchup/CatchupWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ using WorkSeqPtr = std::shared_ptr<WorkSequence>;

// CatchupWork does all the necessary work to perform any type of catchup.
// It accepts CatchupConfiguration structure to know from which ledger to which
// one do the catchup and if it involves only applying ledgers or ledgers and
// one to do the catchup and if it involves only applying ledgers or ledgers and
// buckets.
//
// First thing it does is to get a history state which allows to calculate
// proper destination ledger (in case CatchupConfiguration::CURRENT) was used
// and to get list of buckets that should be in database on that ledger.
// First, it gets a history state, which allows it to calculate a
// proper destination ledger (in case CatchupConfiguration::CURRENT)
// and get a list of buckets that should be in the database on that ledger.
//
// Next step is downloading and verifying ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS it can also verify against ledgers currently
// Next, it downloads and verifies ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS, it can also verify against ledgers currently
// buffered in LedgerManager).
//
// Then, depending on configuration, it can download, verify and apply buckets
// (as in MINIMAL and RECENT catchups), and then download and apply
// transactions (as in COMPLETE and RECENT catchups).
//
// After that, catchup is done and node can replay buffered ledgers and take
// After that, catchup is done and the node can replay buffered ledgers and take
// part in consensus protocol.

class CatchupWork : public Work
Expand Down
97 changes: 50 additions & 47 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,44 +43,40 @@ DownloadApplyTxsWork::yieldMoreWork()
{
throw std::runtime_error("Work has no more children to iterate over!");
}
std::vector<FileType> fileTypesToDownload{
FileType::HISTORY_FILE_TYPE_TRANSACTIONS};
std::vector<std::shared_ptr<BasicWork>> downloadSeq;
std::vector<FileTransferInfo> filesToTransfer;
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
fileTypesToDownload.emplace_back(FileType::HISTORY_FILE_TYPE_RESULTS);
}
for (auto const& fileType : fileTypesToDownload)
{
CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(fileType), mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, fileType, mCheckpointToQueue);
filesToTransfer.emplace_back(ft);
downloadSeq.emplace_back(
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive));
}

CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS),
mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpointToQueue);
auto getAndUnzip =
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive);
OnFailureCallback cb = [archive = mArchive, filesToTransfer]() {
for (auto const& ft : filesToTransfer)
{
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}",
archive->getName(), ft.remoteName());
}
};

auto const& hm = mApp.getHistoryManager();
auto low = hm.firstLedgerInCheckpointContaining(mCheckpointToQueue);
auto high = std::min(mCheckpointToQueue, mRange.last());

TmpDir const& dir = mDownloadDir;
uint32_t checkpoint = mCheckpointToQueue;
auto getFileWeak = std::weak_ptr<GetAndUnzipRemoteFileWork>(getAndUnzip);

OnFailureCallback cb = [getFileWeak, checkpoint, &dir]() {
auto getFile = getFileWeak.lock();
if (getFile)
{
auto archive = getFile->getArchive();
if (archive)
{
FileTransferInfo ti(
dir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, checkpoint);
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}",
archive->getName(), ti.remoteName());
}
}
};

auto apply = std::make_shared<ApplyCheckpointWork>(
mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb);

std::vector<std::shared_ptr<BasicWork>> seq{getAndUnzip};

auto maybeWaitForMerges = [](Application& app) {
if (app.getConfig().CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING)
{
Expand All @@ -98,8 +94,10 @@ DownloadApplyTxsWork::yieldMoreWork()
{
auto prev = mLastYieldedWork;
bool pqFellBehind = false;
auto applyName = apply->getName();
auto predicate = [prev, pqFellBehind, waitForPublish = mWaitForPublish,
maybeWaitForMerges](Application& app) mutable {
maybeWaitForMerges,
applyName](Application& app) mutable {
if (!prev)
{
throw std::runtime_error("Download and apply txs: related Work "
Expand Down Expand Up @@ -130,37 +128,42 @@ DownloadApplyTxsWork::yieldMoreWork()
}
return res && maybeWaitForMerges(app);
};
seq.push_back(std::make_shared<ConditionalWork>(
downloadSeq.push_back(std::make_shared<ConditionalWork>(
mApp, "conditional-" + apply->getName(), predicate, apply));
}
else
{
seq.push_back(std::make_shared<ConditionalWork>(
downloadSeq.push_back(std::make_shared<ConditionalWork>(
mApp, "wait-merges" + apply->getName(), maybeWaitForMerges, apply));
}

seq.push_back(std::make_shared<WorkWithCallback>(
downloadSeq.push_back(std::make_shared<WorkWithCallback>(
mApp, "delete-transactions-" + std::to_string(mCheckpointToQueue),
[ft](Application& app) {
try
[filesToTransfer](Application& app) {
for (auto const& ft : filesToTransfer)
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
CLOG_DEBUG(History, "Deleting transactions {}",
ft.localPath_nogz());
return true;
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
return false;
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
ft.localPath_nogz());
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
return false;
}
}
return true;
}));

auto nextWork = std::make_shared<WorkSequence>(
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), seq,
BasicWork::RETRY_NEVER);
mApp, "download-apply-" + std::to_string(mCheckpointToQueue),
downloadSeq, BasicWork::RETRY_NEVER);
mCheckpointToQueue += mApp.getHistoryManager().getCheckpointFrequency();
mLastYieldedWork = nextWork;
return nextWork;
Expand Down
9 changes: 5 additions & 4 deletions src/herder/LedgerCloseData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ using namespace std;
namespace stellar
{

LedgerCloseData::LedgerCloseData(uint32_t ledgerSeq,
TxSetXDRFrameConstPtr txSet,
StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash)
LedgerCloseData::LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash,
std::optional<TransactionResultSet> const& expectedResults)
: mLedgerSeq(ledgerSeq)
, mTxSet(txSet)
, mValue(v)
, mExpectedLedgerHash(expectedLedgerHash)
, mExpectedResults(expectedResults)
{
releaseAssert(txSet->getContentsHash() == mValue.txSetHash);
}
Expand Down
10 changes: 9 additions & 1 deletion src/herder/LedgerCloseData.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class LedgerCloseData
public:
LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash = std::nullopt);
std::optional<Hash> const& expectedLedgerHash = std::nullopt,
std::optional<TransactionResultSet> const& expectedResults =
std::nullopt);

uint32_t
getLedgerSeq() const
Expand All @@ -48,6 +50,11 @@ class LedgerCloseData
{
return mExpectedLedgerHash;
}
std::optional<TransactionResultSet> const&
getExpectedResults() const
{
return mExpectedResults;
}

StoredDebugTransactionSet
toXDR() const
Expand Down Expand Up @@ -82,6 +89,7 @@ class LedgerCloseData
TxSetXDRFrameConstPtr mTxSet;
StellarValue mValue;
std::optional<Hash> mExpectedLedgerHash;
std::optional<TransactionResultSet> mExpectedResults;
};

std::string stellarValueToString(Config const& c, StellarValue const& sv);
Expand Down
Loading

0 comments on commit 01b7bb7

Please sign in to comment.