Skip to content

Commit

Permalink
Fix PCES copy bugs. (#10057) (#10062)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Nov 21, 2023
1 parent d13b121 commit a54f6f9
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 272 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.event.preconsensus;

import static com.swirlds.common.io.utility.FileUtils.executeAndRename;
import static com.swirlds.logging.LogMarker.EXCEPTION;
import static com.swirlds.logging.LogMarker.STATE_TO_DISK;

import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.system.NodeId;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Operations for copying preconsensus event files. Is not fully thread safe, best effort only. Race conditions can
* cause the copy to fail, but if it does fail it will fail atomically and in a way that is recoverable.
*/
public final class BestEffortPreconsensusEventFileCopy {

private static final Logger logger = LogManager.getLogger(BestEffortPreconsensusEventFileCopy.class);

/**
* The number of times to attempt to copy the last PCES file. Access to this file is not really coordinated between
* this logic and the code responsible for managing PCES file lifecycle, and so there is a small chance that the
* file moves when we attempt to make a copy. However, this probability is fairly small, and it is very unlikely
* that we will be unable to snatch a copy in time with a few retries.
*/
private static final int COPY_PCES_MAX_RETRIES = 10;

private BestEffortPreconsensusEventFileCopy() {}

/**
* Copy preconsensus event files into the signed state directory. Copying these files is not thread safe and may
* fail as a result. This method retries several times if a failure is encountered. Success is not guaranteed, but
* success or failure is atomic and will not throw an exception.
*
* @param platformContext the platform context
* @param selfId the id of this node
* @param destinationDirectory the directory where the state is being written
* @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
* state that is being written
* @param round the round of the state that is being written
*/
public static void copyPreconsensusEventStreamFilesRetryOnFailure(
@NonNull final PlatformContext platformContext,
@NonNull final NodeId selfId,
@NonNull final Path destinationDirectory,
final long minimumGenerationNonAncient,
final long round) {

final boolean copyPreconsensusStream = platformContext
.getConfiguration()
.getConfigData(PreconsensusEventStreamConfig.class)
.copyRecentStreamToStateSnapshots();
if (!copyPreconsensusStream) {
// PCES copying is disabled
return;
}

final Path pcesDestination =
destinationDirectory.resolve("preconsensus-events").resolve(Long.toString(selfId.id()));

int triesRemaining = COPY_PCES_MAX_RETRIES;
while (triesRemaining > 0) {
triesRemaining--;
try {
executeAndRename(
pcesDestination,
temporaryDirectory -> copyPreconsensusEventStreamFiles(
platformContext, selfId, temporaryDirectory, minimumGenerationNonAncient));

return;
} catch (final IOException e) {
if (triesRemaining > 0) {
logger.warn(STATE_TO_DISK.getMarker(), "Unable to copy PCES files. Retrying.");
} else {
logger.error(
EXCEPTION.getMarker(),
"Unable to copy the last PCES file after {} retries. "
+ "PCES files will not be written into the state snapshot for round {}.",
COPY_PCES_MAX_RETRIES,
round,
e);
}
}
}
}

/**
* Copy preconsensus event files into the signed state directory. These files are necessary for the platform to use
* the state file as a starting point. Note: starting a node using the PCES files in the state directory does not
* guarantee that there is no data loss (i.e. there may be transactions that reach consensus after the state
* snapshot), but it does allow a node to start up and participate in gossip.
*
* <p>
* This general strategy is not very elegant is very much a hack. But it will allow us to do migration testing using
* real production states and streams, in the short term. In the longer term we should consider alternate and
* cleaner strategies.
*
* @param platformContext the platform context
* @param selfId the id of this node
* @param destinationDirectory the directory where the PCES files should be written
* @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
* state that is being written
*/
private static void copyPreconsensusEventStreamFiles(
@NonNull final PlatformContext platformContext,
@NonNull final NodeId selfId,
@NonNull final Path destinationDirectory,
final long minimumGenerationNonAncient)
throws IOException {

final List<PreconsensusEventFile> allFiles = gatherPreconsensusFilesOnDisk(selfId, platformContext);
if (allFiles.isEmpty()) {
return;
}

// Sort by sequence number
Collections.sort(allFiles);

// Discard all files that either have an incorrect origin or that do not contain non-ancient events.
final List<PreconsensusEventFile> filesToCopy =
getRequiredPreconsensusFiles(allFiles, minimumGenerationNonAncient);
if (filesToCopy.isEmpty()) {
return;
}

copyPreconsensusFileList(filesToCopy, destinationDirectory);
}

/**
* Get the preconsensus files that we need to copy to a state. We need any file that has a matching origin and that
* contains non-ancient events (w.r.t. the state).
*
* @param allFiles all PCES files on disk
* @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
* state that is being written
* @return the list of files to copy
*/
@NonNull
private static List<PreconsensusEventFile> getRequiredPreconsensusFiles(
@NonNull final List<PreconsensusEventFile> allFiles, final long minimumGenerationNonAncient) {

final List<PreconsensusEventFile> filesToCopy = new ArrayList<>();
final PreconsensusEventFile lastFile = allFiles.get(allFiles.size() - 1);
for (final PreconsensusEventFile file : allFiles) {
if (file.getOrigin() == lastFile.getOrigin()
&& file.getMaximumGeneration() >= minimumGenerationNonAncient) {
filesToCopy.add(file);
}
}

if (filesToCopy.isEmpty()) {
logger.warn(
STATE_TO_DISK.getMarker(),
"No preconsensus event files meeting specified criteria found to copy. "
+ "Minimum generation non-ancient: {}",
minimumGenerationNonAncient);
} else if (filesToCopy.size() == 1) {
logger.info(
STATE_TO_DISK.getMarker(),
"""
Found 1 preconsensus event file meeting specified criteria to copy.
Minimum generation non-ancient: {}
File: {}
""",
minimumGenerationNonAncient,
filesToCopy.get(0).getPath());
} else {
logger.info(
STATE_TO_DISK.getMarker(),
"""
Found {} preconsensus event files meeting specified criteria to copy.
Minimum generation non-ancient: {}
First file to copy: {}
Last file to copy: {}
""",
filesToCopy.size(),
minimumGenerationNonAncient,
filesToCopy.get(0).getPath(),
filesToCopy.get(filesToCopy.size() - 1).getPath());
}

return filesToCopy;
}

/**
* Gather all PCES files on disk.
*
* @param selfId the id of this node
* @param platformContext the platform context
* @return a list of all PCES files on disk
*/
@NonNull
private static List<PreconsensusEventFile> gatherPreconsensusFilesOnDisk(
@NonNull final NodeId selfId, @NonNull final PlatformContext platformContext) throws IOException {
final List<PreconsensusEventFile> allFiles = new ArrayList<>();
final Path preconsensusEventStreamDirectory =
PreconsensusEventFileManager.getDatabaseDirectory(platformContext, selfId);
try (final Stream<Path> stream = Files.walk(preconsensusEventStreamDirectory)) {
stream.filter(Files::isRegularFile).forEach(path -> {
try {
allFiles.add(PreconsensusEventFile.of(path));
} catch (final IOException e) {
// Ignore, this will get thrown for each file that is not a PCES file
}
});
}

if (allFiles.isEmpty()) {
logger.warn(STATE_TO_DISK.getMarker(), "No preconsensus event files found to copy");
} else if (allFiles.size() == 1) {
logger.info(
STATE_TO_DISK.getMarker(),
"""
Found 1 preconsensus file on disk.
File: {}""",
allFiles.get(0).getPath());
} else {
logger.info(
STATE_TO_DISK.getMarker(),
"""
Found {} preconsensus files on disk.
First file: {}
Last file: {}""",
allFiles.size(),
allFiles.get(0).getPath(),
allFiles.get(allFiles.size() - 1).getPath());
}

return allFiles;
}

/**
* Copy a list of preconsensus event files into a directory.
*
* @param filesToCopy the files to copy
* @param pcesDestination the directory where the files should be copied
*/
private static void copyPreconsensusFileList(
@NonNull final List<PreconsensusEventFile> filesToCopy, @NonNull final Path pcesDestination)
throws IOException {
logger.info(STATE_TO_DISK.getMarker(), "Copying {} preconsensus event file(s)", filesToCopy.size());

// Copy files from newest to oldest. Newer files are more likely to be modified concurrently by the
// PCES file writer and are more likely to fail. If we fail to copy files, it's better to fail early
// so that we can retry again more quickly.
for (int index = filesToCopy.size() - 1; index >= 0; index--) {
final PreconsensusEventFile file = filesToCopy.get(index);
Files.copy(file.getPath(), pcesDestination.resolve(file.getFileName()));
}

logger.info(STATE_TO_DISK.getMarker(), "Finished copying {} preconsensus event file(s)", filesToCopy.size());
}
}
Loading

0 comments on commit a54f6f9

Please sign in to comment.