-
Notifications
You must be signed in to change notification settings - Fork 970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: New mode to use transaction results to skip failed transaction and signature verification in catchup #4536
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
namespace stellar | ||
{ | ||
|
||
// Each catchup can be configured by two parameters destination ledger | ||
// Each catchup can be configured by two parameters: destination ledger | ||
// (and its hash, if known) and count of ledgers to apply. | ||
// Value of count can be adjusted in different ways during catchup. If applying | ||
// count ledgers would mean going before the last closed ledger - it is | ||
|
@@ -31,12 +31,13 @@ namespace stellar | |
// and catchup to that instead of destination ledger. This is useful when | ||
// doing offline commandline catchups with stellar-core catchup command. | ||
// | ||
// Catchup can be done in two modes - ONLINE nad OFFLINE. In ONLINE mode node | ||
// is connected to the network. If receives ledgers during catchup and applies | ||
// them after history is applied. Also additional closing ledger is required | ||
// to mark catchup as complete and node as synced. In OFFLINE mode node is not | ||
// connected to network, so new ledgers are not being externalized. Only | ||
// buckets and transactions from history archives are applied. | ||
// Catchup can be done in two modes - ONLINE and OFFLINE. In ONLINE mode, the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did the formatting change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reworded it a bit, which changed the line lengths and formatting. |
||
// node is connected to the network. If receives ledgers during catchup and | ||
// applies them after history is applied. Also, an additional closing ledger is | ||
// required to mark catchup as complete and the node as synced. In OFFLINE mode, | ||
// the node is not connected to network, so new ledgers are not being | ||
// externalized. Only buckets and transactions from history archives are | ||
// applied. | ||
class CatchupConfiguration | ||
{ | ||
public: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,22 +24,22 @@ using WorkSeqPtr = std::shared_ptr<WorkSequence>; | |
|
||
// CatchupWork does all the necessary work to perform any type of catchup. | ||
// It accepts CatchupConfiguration structure to know from which ledger to which | ||
// one do the catchup and if it involves only applying ledgers or ledgers and | ||
// one to do the catchup and if it involves only applying ledgers or ledgers and | ||
// buckets. | ||
// | ||
// First thing it does is to get a history state which allows to calculate | ||
// proper destination ledger (in case CatchupConfiguration::CURRENT) was used | ||
// and to get list of buckets that should be in database on that ledger. | ||
// First, it gets a history state, which allows it to calculate a | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for documentation fixes! |
||
// proper destination ledger (in case CatchupConfiguration::CURRENT) | ||
// and get a list of buckets that should be in the database on that ledger. | ||
// | ||
// Next step is downloading and verifying ledgers (if verifyMode is set to | ||
// VERIFY_BUFFERED_LEDGERS it can also verify against ledgers currently | ||
// Next, it downloads and verifies ledgers (if verifyMode is set to | ||
// VERIFY_BUFFERED_LEDGERS, it can also verify against ledgers currently | ||
// buffered in LedgerManager). | ||
// | ||
// Then, depending on configuration, it can download, verify and apply buckets | ||
// (as in MINIMAL and RECENT catchups), and then download and apply | ||
// transactions (as in COMPLETE and RECENT catchups). | ||
// | ||
// After that, catchup is done and node can replay buffered ledgers and take | ||
// After that, catchup is done and the node can replay buffered ledgers and take | ||
// part in consensus protocol. | ||
|
||
class CatchupWork : public Work | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,44 +43,40 @@ DownloadApplyTxsWork::yieldMoreWork() | |
{ | ||
throw std::runtime_error("Work has no more children to iterate over!"); | ||
} | ||
std::vector<FileType> fileTypesToDownload{ | ||
FileType::HISTORY_FILE_TYPE_TRANSACTIONS}; | ||
std::vector<std::shared_ptr<BasicWork>> downloadSeq; | ||
std::vector<FileTransferInfo> filesToTransfer; | ||
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS) | ||
{ | ||
fileTypesToDownload.emplace_back(FileType::HISTORY_FILE_TYPE_RESULTS); | ||
} | ||
for (auto const& fileType : fileTypesToDownload) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: code would be easier to follow if you kept the |
||
{ | ||
CLOG_INFO(History, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this log is repeated twice, we don't need to log this for the results |
||
"Downloading, unzipping and applying {} for checkpoint {}", | ||
typeString(fileType), mCheckpointToQueue); | ||
FileTransferInfo ft(mDownloadDir, fileType, mCheckpointToQueue); | ||
filesToTransfer.emplace_back(ft); | ||
downloadSeq.emplace_back( | ||
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive)); | ||
} | ||
|
||
CLOG_INFO(History, | ||
"Downloading, unzipping and applying {} for checkpoint {}", | ||
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS), | ||
mCheckpointToQueue); | ||
FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, | ||
mCheckpointToQueue); | ||
auto getAndUnzip = | ||
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive); | ||
OnFailureCallback cb = [archive = mArchive, filesToTransfer]() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please keep the implementation of OnFailureCallback the way it was before: mArchive can be null, in which case |
||
for (auto const& ft : filesToTransfer) | ||
{ | ||
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}", | ||
archive->getName(), ft.remoteName()); | ||
} | ||
}; | ||
|
||
auto const& hm = mApp.getHistoryManager(); | ||
auto low = hm.firstLedgerInCheckpointContaining(mCheckpointToQueue); | ||
auto high = std::min(mCheckpointToQueue, mRange.last()); | ||
|
||
TmpDir const& dir = mDownloadDir; | ||
uint32_t checkpoint = mCheckpointToQueue; | ||
auto getFileWeak = std::weak_ptr<GetAndUnzipRemoteFileWork>(getAndUnzip); | ||
|
||
OnFailureCallback cb = [getFileWeak, checkpoint, &dir]() { | ||
auto getFile = getFileWeak.lock(); | ||
if (getFile) | ||
{ | ||
auto archive = getFile->getArchive(); | ||
if (archive) | ||
{ | ||
FileTransferInfo ti( | ||
dir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, checkpoint); | ||
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}", | ||
archive->getName(), ti.remoteName()); | ||
} | ||
} | ||
}; | ||
|
||
auto apply = std::make_shared<ApplyCheckpointWork>( | ||
mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb); | ||
|
||
std::vector<std::shared_ptr<BasicWork>> seq{getAndUnzip}; | ||
|
||
auto maybeWaitForMerges = [](Application& app) { | ||
if (app.getConfig().CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING) | ||
{ | ||
|
@@ -98,8 +94,10 @@ DownloadApplyTxsWork::yieldMoreWork() | |
{ | ||
auto prev = mLastYieldedWork; | ||
bool pqFellBehind = false; | ||
auto applyName = apply->getName(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unused |
||
auto predicate = [prev, pqFellBehind, waitForPublish = mWaitForPublish, | ||
maybeWaitForMerges](Application& app) mutable { | ||
maybeWaitForMerges, | ||
applyName](Application& app) mutable { | ||
if (!prev) | ||
{ | ||
throw std::runtime_error("Download and apply txs: related Work " | ||
|
@@ -130,37 +128,42 @@ DownloadApplyTxsWork::yieldMoreWork() | |
} | ||
return res && maybeWaitForMerges(app); | ||
}; | ||
seq.push_back(std::make_shared<ConditionalWork>( | ||
downloadSeq.push_back(std::make_shared<ConditionalWork>( | ||
mApp, "conditional-" + apply->getName(), predicate, apply)); | ||
} | ||
else | ||
{ | ||
seq.push_back(std::make_shared<ConditionalWork>( | ||
downloadSeq.push_back(std::make_shared<ConditionalWork>( | ||
mApp, "wait-merges" + apply->getName(), maybeWaitForMerges, apply)); | ||
} | ||
|
||
seq.push_back(std::make_shared<WorkWithCallback>( | ||
downloadSeq.push_back(std::make_shared<WorkWithCallback>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: why the rename? |
||
mApp, "delete-transactions-" + std::to_string(mCheckpointToQueue), | ||
[ft](Application& app) { | ||
try | ||
[filesToTransfer](Application& app) { | ||
for (auto const& ft : filesToTransfer) | ||
{ | ||
std::filesystem::remove( | ||
std::filesystem::path(ft.localPath_nogz())); | ||
CLOG_DEBUG(History, "Deleted transactions {}", | ||
CLOG_DEBUG(History, "Deleting transactions {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the log is misleading, since we're deleting transactions and results |
||
ft.localPath_nogz()); | ||
return true; | ||
} | ||
catch (std::filesystem::filesystem_error const& e) | ||
{ | ||
CLOG_ERROR(History, "Could not delete transactions {}: {}", | ||
ft.localPath_nogz(), e.what()); | ||
return false; | ||
try | ||
{ | ||
std::filesystem::remove( | ||
std::filesystem::path(ft.localPath_nogz())); | ||
CLOG_DEBUG(History, "Deleted transactions {}", | ||
ft.localPath_nogz()); | ||
} | ||
catch (std::filesystem::filesystem_error const& e) | ||
{ | ||
CLOG_ERROR(History, "Could not delete transactions {}: {}", | ||
ft.localPath_nogz(), e.what()); | ||
return false; | ||
} | ||
} | ||
return true; | ||
})); | ||
|
||
auto nextWork = std::make_shared<WorkSequence>( | ||
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), seq, | ||
BasicWork::RETRY_NEVER); | ||
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), | ||
downloadSeq, BasicWork::RETRY_NEVER); | ||
mCheckpointToQueue += mApp.getHistoryManager().getCheckpointFrequency(); | ||
mLastYieldedWork = nextWork; | ||
return nextWork; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid footguns, results should be optional