Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: New mode to use transaction results to skip failed transaction and signature verification in catchup #4536

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ ApplyCheckpointWork::openInputFiles()
mTxIn.open(ti.localPath_nogz());
mTxHistoryEntry = TransactionHistoryEntry();
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
mTxResultIn.close();
FileTransferInfo tri(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpoint);
CLOG_DEBUG(History, "Replaying transaction results from {}",
tri.localPath_nogz());
mTxResultIn.open(tri.localPath_nogz());
mTxHistoryResultEntry = TransactionHistoryResultEntry{};
}
mFilesOpen = true;
}

Expand Down Expand Up @@ -141,6 +151,39 @@ ApplyCheckpointWork::getCurrentTxSet()
return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader());
}

std::optional<TransactionResultSet>
ApplyCheckpointWork::getCurrentTxResultSet()
{
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;

// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
do
{
if (mTxHistoryResultEntry.ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry.ledgerSeq);
}
else if (mTxHistoryResultEntry.ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry.ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry.txResultSet);
}
} while (mTxResultIn && mTxResultIn.readOne(mTxHistoryResultEntry));
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
return std::nullopt;
}

std::shared_ptr<LedgerCloseData>
ApplyCheckpointWork::getNextLedgerCloseData()
{
Expand Down Expand Up @@ -219,6 +262,12 @@ ApplyCheckpointWork::getNextLedgerCloseData()
CLOG_DEBUG(History, "Ledger {} has {} transactions", header.ledgerSeq,
txset->sizeTxTotal());

std::optional<TransactionResultSet> txres = std::nullopt;
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
txres = getCurrentTxResultSet();
}

// We've verified the ledgerHeader (in the "trusted part of history"
// sense) in CATCHUP_VERIFY phase; we now need to check that the
// txhash we're about to apply is the one denoted by that ledger
Expand Down Expand Up @@ -249,7 +298,7 @@ ApplyCheckpointWork::getNextLedgerCloseData()

return std::make_shared<LedgerCloseData>(
header.ledgerSeq, txset, header.scpValue,
std::make_optional<Hash>(mHeaderHistoryEntry.hash));
std::make_optional<Hash>(mHeaderHistoryEntry.hash), txres);
}

BasicWork::State
Expand Down
27 changes: 17 additions & 10 deletions src/catchup/ApplyCheckpointWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ class TmpDir;
struct LedgerHeaderHistoryEntry;

/**
* This class is responsible for applying transactions stored in files on
* temporary directory (downloadDir) to local ledger. It requires two sets of
* files - ledgers and transactions - int .xdr format. Transaction files are
* used to read transactions that will be used and ledger files are used to
* This class is responsible for applying transactions stored in files in the
* temporary directory (downloadDir) to local the ledger. It requires two sets
* of files - ledgers and transactions - in .xdr format. Transaction files are
* used to read transactions that will be applied and ledger files are used to
* check if ledger hashes are matching.
*
* It may also require a third set of files - transaction results - to use in
* accelerated replay, where failed transactions are not applied and successful
* transactions are applied without verifying their signatures.
*
* In each run it skips or applies transactions from one ledger. Skipping occurs
* when ledger to be applied is older than LCL from local ledger. At LCL
* boundary checks are made to confirm that ledgers from files knit up with
* LCL. If everything is OK, an apply ledger operation is performed. Then
* another check is made - if new local ledger matches corresponding ledger from
* file.
* when the ledger to be applied is older than the LCL of the local ledger. At
* LCL, boundary checks are made to confirm that the ledgers from the files knit
* up with LCL. If everything is OK, an apply ledger operation is performed.
* Then another check is made - if the new local ledger matches corresponding
* the ledger from file.
*
* Constructor of this class takes some important parameters:
* The constructor of this class takes some important parameters:
* * downloadDir - directory containing ledger and transaction files
* * range - LedgerRange to apply, must be checkpoint-aligned,
* and cover at most one checkpoint.
Expand All @@ -48,7 +52,9 @@ class ApplyCheckpointWork : public BasicWork

XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
XDRInputFileStream mTxResultIn;
TransactionHistoryEntry mTxHistoryEntry;
TransactionHistoryResultEntry mTxHistoryResultEntry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid footguns, results should be optional

LedgerHeaderHistoryEntry mHeaderHistoryEntry;
OnFailureCallback mOnFailure;

Expand All @@ -57,6 +63,7 @@ class ApplyCheckpointWork : public BasicWork
std::shared_ptr<ConditionalWork> mConditionalWork;

TxSetXDRFrameConstPtr getCurrentTxSet();
std::optional<TransactionResultSet> getCurrentTxResultSet();
void openInputFiles();

std::shared_ptr<LedgerCloseData> getNextLedgerCloseData();
Expand Down
15 changes: 8 additions & 7 deletions src/catchup/CatchupConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace stellar
{

// Each catchup can be configured by two parameters destination ledger
// Each catchup can be configured by two parameters: destination ledger
// (and its hash, if known) and count of ledgers to apply.
// Value of count can be adjusted in different ways during catchup. If applying
// count ledgers would mean going before the last closed ledger - it is
Expand All @@ -31,12 +31,13 @@ namespace stellar
// and catchup to that instead of destination ledger. This is useful when
// doing offline commandline catchups with stellar-core catchup command.
//
// Catchup can be done in two modes - ONLINE nad OFFLINE. In ONLINE mode node
// is connected to the network. If receives ledgers during catchup and applies
// them after history is applied. Also additional closing ledger is required
// to mark catchup as complete and node as synced. In OFFLINE mode node is not
// connected to network, so new ledgers are not being externalized. Only
// buckets and transactions from history archives are applied.
// Catchup can be done in two modes - ONLINE and OFFLINE. In ONLINE mode, the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the formatting change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworded it a bit, which changed the line lengths and formatting.

// node is connected to the network. If receives ledgers during catchup and
// applies them after history is applied. Also, an additional closing ledger is
// required to mark catchup as complete and the node as synced. In OFFLINE mode,
// the node is not connected to network, so new ledgers are not being
// externalized. Only buckets and transactions from history archives are
// applied.
class CatchupConfiguration
{
public:
Expand Down
14 changes: 7 additions & 7 deletions src/catchup/CatchupWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ using WorkSeqPtr = std::shared_ptr<WorkSequence>;

// CatchupWork does all the necessary work to perform any type of catchup.
// It accepts CatchupConfiguration structure to know from which ledger to which
// one do the catchup and if it involves only applying ledgers or ledgers and
// one to do the catchup and if it involves only applying ledgers or ledgers and
// buckets.
//
// First thing it does is to get a history state which allows to calculate
// proper destination ledger (in case CatchupConfiguration::CURRENT) was used
// and to get list of buckets that should be in database on that ledger.
// First, it gets a history state, which allows it to calculate a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for documentation fixes!

// proper destination ledger (in case CatchupConfiguration::CURRENT)
// and get a list of buckets that should be in the database on that ledger.
//
// Next step is downloading and verifying ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS it can also verify against ledgers currently
// Next, it downloads and verifies ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS, it can also verify against ledgers currently
// buffered in LedgerManager).
//
// Then, depending on configuration, it can download, verify and apply buckets
// (as in MINIMAL and RECENT catchups), and then download and apply
// transactions (as in COMPLETE and RECENT catchups).
//
// After that, catchup is done and node can replay buffered ledgers and take
// After that, catchup is done and the node can replay buffered ledgers and take
// part in consensus protocol.

class CatchupWork : public Work
Expand Down
97 changes: 50 additions & 47 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,44 +43,40 @@ DownloadApplyTxsWork::yieldMoreWork()
{
throw std::runtime_error("Work has no more children to iterate over!");
}
std::vector<FileType> fileTypesToDownload{
FileType::HISTORY_FILE_TYPE_TRANSACTIONS};
std::vector<std::shared_ptr<BasicWork>> downloadSeq;
std::vector<FileTransferInfo> filesToTransfer;
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
fileTypesToDownload.emplace_back(FileType::HISTORY_FILE_TYPE_RESULTS);
}
for (auto const& fileType : fileTypesToDownload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: code would be easier to follow if you kept the getAndUnzip work creation as is, and appended results work to the sequence separately behind the CATCHUP_SKIP_KNOWN_RESULTS flag (it's only two file types, so I think the loop can be avoided)

{
CLOG_INFO(History,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this log is repeated twice, we don't need to log this for the results

"Downloading, unzipping and applying {} for checkpoint {}",
typeString(fileType), mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, fileType, mCheckpointToQueue);
filesToTransfer.emplace_back(ft);
downloadSeq.emplace_back(
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive));
}

CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS),
mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpointToQueue);
auto getAndUnzip =
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive);
OnFailureCallback cb = [archive = mArchive, filesToTransfer]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the implementation of OnFailureCallback the way it was before: mArchive can be null, in which case GetAndUnzipRemoteFileWork will pick a random archive. With the current change, core would crash de-referencing a null pointer.

for (auto const& ft : filesToTransfer)
{
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}",
archive->getName(), ft.remoteName());
}
};

auto const& hm = mApp.getHistoryManager();
auto low = hm.firstLedgerInCheckpointContaining(mCheckpointToQueue);
auto high = std::min(mCheckpointToQueue, mRange.last());

TmpDir const& dir = mDownloadDir;
uint32_t checkpoint = mCheckpointToQueue;
auto getFileWeak = std::weak_ptr<GetAndUnzipRemoteFileWork>(getAndUnzip);

OnFailureCallback cb = [getFileWeak, checkpoint, &dir]() {
auto getFile = getFileWeak.lock();
if (getFile)
{
auto archive = getFile->getArchive();
if (archive)
{
FileTransferInfo ti(
dir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, checkpoint);
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}",
archive->getName(), ti.remoteName());
}
}
};

auto apply = std::make_shared<ApplyCheckpointWork>(
mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb);

std::vector<std::shared_ptr<BasicWork>> seq{getAndUnzip};

auto maybeWaitForMerges = [](Application& app) {
if (app.getConfig().CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING)
{
Expand All @@ -98,8 +94,10 @@ DownloadApplyTxsWork::yieldMoreWork()
{
auto prev = mLastYieldedWork;
bool pqFellBehind = false;
auto applyName = apply->getName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

auto predicate = [prev, pqFellBehind, waitForPublish = mWaitForPublish,
maybeWaitForMerges](Application& app) mutable {
maybeWaitForMerges,
applyName](Application& app) mutable {
if (!prev)
{
throw std::runtime_error("Download and apply txs: related Work "
Expand Down Expand Up @@ -130,37 +128,42 @@ DownloadApplyTxsWork::yieldMoreWork()
}
return res && maybeWaitForMerges(app);
};
seq.push_back(std::make_shared<ConditionalWork>(
downloadSeq.push_back(std::make_shared<ConditionalWork>(
mApp, "conditional-" + apply->getName(), predicate, apply));
}
else
{
seq.push_back(std::make_shared<ConditionalWork>(
downloadSeq.push_back(std::make_shared<ConditionalWork>(
mApp, "wait-merges" + apply->getName(), maybeWaitForMerges, apply));
}

seq.push_back(std::make_shared<WorkWithCallback>(
downloadSeq.push_back(std::make_shared<WorkWithCallback>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why the rename? seq includes both download and apply

mApp, "delete-transactions-" + std::to_string(mCheckpointToQueue),
[ft](Application& app) {
try
[filesToTransfer](Application& app) {
for (auto const& ft : filesToTransfer)
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
CLOG_DEBUG(History, "Deleting transactions {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the log is misleading, since we're deleting transactions and results

ft.localPath_nogz());
return true;
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
return false;
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
ft.localPath_nogz());
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
return false;
}
}
return true;
}));

auto nextWork = std::make_shared<WorkSequence>(
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), seq,
BasicWork::RETRY_NEVER);
mApp, "download-apply-" + std::to_string(mCheckpointToQueue),
downloadSeq, BasicWork::RETRY_NEVER);
mCheckpointToQueue += mApp.getHistoryManager().getCheckpointFrequency();
mLastYieldedWork = nextWork;
return nextWork;
Expand Down
9 changes: 5 additions & 4 deletions src/herder/LedgerCloseData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ using namespace std;
namespace stellar
{

LedgerCloseData::LedgerCloseData(uint32_t ledgerSeq,
TxSetXDRFrameConstPtr txSet,
StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash)
LedgerCloseData::LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash,
std::optional<TransactionResultSet> const& expectedResults)
: mLedgerSeq(ledgerSeq)
, mTxSet(txSet)
, mValue(v)
, mExpectedLedgerHash(expectedLedgerHash)
, mExpectedResults(expectedResults)
{
releaseAssert(txSet->getContentsHash() == mValue.txSetHash);
}
Expand Down
10 changes: 9 additions & 1 deletion src/herder/LedgerCloseData.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class LedgerCloseData
public:
LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash = std::nullopt);
std::optional<Hash> const& expectedLedgerHash = std::nullopt,
std::optional<TransactionResultSet> const& expectedResults =
std::nullopt);

uint32_t
getLedgerSeq() const
Expand All @@ -48,6 +50,11 @@ class LedgerCloseData
{
return mExpectedLedgerHash;
}
std::optional<TransactionResultSet> const&
getExpectedResults() const
{
return mExpectedResults;
}

StoredDebugTransactionSet
toXDR() const
Expand Down Expand Up @@ -82,6 +89,7 @@ class LedgerCloseData
TxSetXDRFrameConstPtr mTxSet;
StellarValue mValue;
std::optional<Hash> mExpectedLedgerHash;
std::optional<TransactionResultSet> mExpectedResults;
};

std::string stellarValueToString(Config const& c, StellarValue const& sv);
Expand Down
Loading
Loading