Skip to content

Commit

Permalink
Translog GC changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 26, 2024
1 parent 2ad4143 commit eb06b7f
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 60 deletions.
287 changes: 262 additions & 25 deletions server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
*/
public static long parseIdFromFileName(Path translogFile) {
final String fileName = translogFile.getFileName().toString();
return parseIdFromFileName(fileName);
}

public static long parseIdFromFileName(String fileName) {
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -337,35 +339,53 @@ private void deleteFileIfExists(Path filePath) throws IOException {
}
}

public TranslogTransferMetadata readMetadata(long timestamp) throws IOException {
if (timestamp < 0) {
return readMetadata();
}
return readMetadata((blobMetadataList) -> {
List<String> metadataFiles = blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList());
Set<String> metadataFilesMatchingTimestamp = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(timestamp),
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
if (metadataFilesMatchingTimestamp.isEmpty()) {
return null;
}
assert metadataFilesMatchingTimestamp.size() == 1 : "There should be only 1 metadata file matching given timestamp";
return metadataFilesMatchingTimestamp.stream().findFirst().get();
}, Integer.MAX_VALUE);
}

public TranslogTransferMetadata readMetadata() throws IOException {
return readMetadata((blobMetadataList) -> {
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
return blobMetadataList.get(0).name();
}, METADATA_FILES_TO_FETCH);
}

private TranslogTransferMetadata readMetadata(Function<List<BlobMetadata>, String> getMetadataFileToRead, int numberOfFilesToFetch)
throws IOException {
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
String filename = blobMetadataList.get(0).name();
boolean downloadStatus = false;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
downloadStatus = true;
String filename = getMetadataFileToRead.apply(blobMetadataList);
if (filename == null) {
return;
}
try {
metadataSetOnce.set(readMetadata(filename));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
}, e -> {
if (e instanceof RuntimeException) {
Expand All @@ -381,12 +401,14 @@ public TranslogTransferMetadata readMetadata() throws IOException {
transferService.listAllInSortedOrder(
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
METADATA_FILES_TO_FETCH,
numberOfFilesToFetch,
latchedActionListener
);
latch.await();
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
throw new RuntimeException("Timed out reading metadata file");
}
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
throw new IOException("Exception while reading/downloading metadata file", e);
}

if (exceptionSetOnce.get() != null) {
Expand All @@ -396,6 +418,26 @@ public TranslogTransferMetadata readMetadata() throws IOException {
return metadataSetOnce.get();
}

public TranslogTransferMetadata readMetadata(String metadataFilename) throws IOException {
boolean downloadStatus = false;
TranslogTransferMetadata translogTransferMetadata = null;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, metadataFilename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
translogTransferMetadata = metadataStreamWrapper.readStream(indexInput);
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
return translogTransferMetadata;
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Map<String, String> generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> {
assert s instanceof TranslogFileSnapshot;
Expand Down Expand Up @@ -549,6 +591,16 @@ public void onFailure(Exception e) {
});
}

public void listTranslogMetadataFilesAsync(ActionListener<List<BlobMetadata>> listener) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
Integer.MAX_VALUE,
listener
);
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(
Expand Down Expand Up @@ -635,7 +687,7 @@ public void onFailure(Exception e) {
* @param files list of metadata files to be deleted.
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
public void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, files, new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,9 @@ public static TimeValue getPinnedTimestampsSchedulerInterval() {
public static TimeValue getPinnedTimestampsLookbackInterval() {
return pinnedTimestampsLookbackInterval;
}

// Visible for testing
public static void setPinnedTimestampsLookbackInterval(TimeValue pinnedTimestampsLookbackInterval) {
RemoteStoreSettings.pinnedTimestampsLookbackInterval = pinnedTimestampsLookbackInterval;
}
}
Loading

0 comments on commit eb06b7f

Please sign in to comment.