Skip to content

Commit

Permalink
Translog GC changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 26, 2024
1 parent 782438e commit 262c0e6
Show file tree
Hide file tree
Showing 6 changed files with 493 additions and 73 deletions.
284 changes: 259 additions & 25 deletions server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
*/
public static long parseIdFromFileName(Path translogFile) {
final String fileName = translogFile.getFileName().toString();
return parseIdFromFileName(fileName);
}

public static long parseIdFromFileName(String fileName) {
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -337,35 +339,53 @@ private void deleteFileIfExists(Path filePath) throws IOException {
}
}

public TranslogTransferMetadata readMetadata(long timestamp) throws IOException {
if (timestamp < 0) {
return readMetadata();
}
return readMetadata((blobMetadataList) -> {
List<String> metadataFiles = blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList());
Set<String> metadataFilesMatchingTimestamp = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(timestamp),
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
if (metadataFilesMatchingTimestamp.isEmpty()) {
return null;
}
assert metadataFilesMatchingTimestamp.size() == 1 : "There should be only 1 metadata file matching given timestamp";
return metadataFilesMatchingTimestamp.stream().findFirst().get();
}, Integer.MAX_VALUE);
}

public TranslogTransferMetadata readMetadata() throws IOException {
return readMetadata((blobMetadataList) -> {
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
return blobMetadataList.get(0).name();
}, METADATA_FILES_TO_FETCH);
}

private TranslogTransferMetadata readMetadata(Function<List<BlobMetadata>, String> getMetadataFileToRead, int numberOfFilesToFetch)
throws IOException {
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
String filename = blobMetadataList.get(0).name();
boolean downloadStatus = false;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
downloadStatus = true;
String filename = getMetadataFileToRead.apply(blobMetadataList);
if (filename == null) {
return;
}
try {
metadataSetOnce.set(readMetadata(filename));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
}, e -> {
if (e instanceof RuntimeException) {
Expand All @@ -381,12 +401,14 @@ public TranslogTransferMetadata readMetadata() throws IOException {
transferService.listAllInSortedOrder(
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
METADATA_FILES_TO_FETCH,
numberOfFilesToFetch,
latchedActionListener
);
latch.await();
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
throw new RuntimeException("Timed out reading metadata file");
}
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
throw new IOException("Exception while reading/downloading metadata file", e);
}

if (exceptionSetOnce.get() != null) {
Expand All @@ -396,6 +418,26 @@ public TranslogTransferMetadata readMetadata() throws IOException {
return metadataSetOnce.get();
}

public TranslogTransferMetadata readMetadata(String metadataFilename) throws IOException {
boolean downloadStatus = false;
TranslogTransferMetadata translogTransferMetadata = null;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, metadataFilename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
translogTransferMetadata = metadataStreamWrapper.readStream(indexInput);
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
return translogTransferMetadata;
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Map<String, String> generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> {
assert s instanceof TranslogFileSnapshot;
Expand Down Expand Up @@ -549,6 +591,16 @@ public void onFailure(Exception e) {
});
}

public void listTranslogMetadataFilesAsync(ActionListener<List<BlobMetadata>> listener) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
Integer.MAX_VALUE,
listener
);
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(
Expand Down Expand Up @@ -635,7 +687,7 @@ public void onFailure(Exception e) {
* @param files list of metadata files to be deleted.
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
public void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, files, new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ public static TimeValue getPinnedTimestampsLookbackInterval() {
return pinnedTimestampsLookbackInterval;
}

// Visible for testing
public static void setPinnedTimestampsLookbackInterval(TimeValue pinnedTimestampsLookbackInterval) {
RemoteStoreSettings.pinnedTimestampsLookbackInterval = pinnedTimestampsLookbackInterval;
}

public static boolean isPinnedTimestampsEnabled() {
return isPinnedTimestampsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,16 @@ ChannelFactory getChannelFactory() {
// Trims from remote now
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
Set<String> dataFiles = blobStoreTransferService.listAll(
getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))
);
assertEquals(10, dataFiles.size());
blobStoreTransferService.deleteBlobs(
getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get())),
new ArrayList<>(dataFiles)
);

}

}

public void testReadLocation() throws IOException {
Expand Down Expand Up @@ -671,13 +675,13 @@ public void testSimpleOperationsUpload() throws Exception {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}

assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));
assertEquals(6, translog.allUploaded().size());
assertEquals(4, translog.allUploaded().size());

translog.rollGeneration();
assertEquals(6, translog.allUploaded().size());
assertEquals(4, translog.allUploaded().size());

Set<String> mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertEquals(2, mdFiles.size());
Expand Down Expand Up @@ -724,7 +728,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(
4,
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});
Expand All @@ -736,9 +740,9 @@ public void testSimpleOperationsUpload() throws Exception {
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(
4,
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});
Expand All @@ -755,9 +759,9 @@ public void testSimpleOperationsUpload() throws Exception {
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(
4,
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});
Expand All @@ -775,7 +779,7 @@ public void testMetadataFileDeletion() throws Exception {
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
}
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> assertEquals(2, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
int moreDocs = randomIntBetween(3, 10);
logger.info("numDocs={} moreDocs={}", numDocs, moreDocs);
Expand Down Expand Up @@ -873,7 +877,7 @@ public void testDrainSync() throws Exception {
assertBusy(() -> assertEquals(0, latch.getCount()));
assertEquals(0, translog.availablePermits());
slowDown.setSleepSeconds(0);
assertEquals(6, translog.allUploaded().size());
assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.readers.size());
Set<String> mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));

Expand All @@ -882,7 +886,7 @@ public void testDrainSync() throws Exception {
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertEquals(6, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)));

// Case 4 - After drainSync, if an upload is an attempted, we do not upload to remote store.
Expand All @@ -892,21 +896,21 @@ public void testDrainSync() throws Exception {
new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 })
);
assertEquals(1, translog.readers.size());
assertEquals(6, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)));

// Refill the permits back
Releasables.close(releasable);
addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 }));
assertEquals(2, translog.readers.size());
assertEquals(8, translog.allUploaded().size());
assertEquals(4, translog.allUploaded().size());
assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());

translog.setMinSeqNoToKeep(3);
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> assertEquals(2, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
}

Expand Down Expand Up @@ -1688,13 +1692,13 @@ public void testDownloadWithRetries() throws IOException {

TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata);
when(mockTransfer.readMetadata(-1)).thenReturn(translogTransferMetadata);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

// Always File not found
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found"));
TranslogTransferManager finalMockTransfer = mockTransfer;
assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger, false));
assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1));

// File not found in first attempt . File found in second attempt.
mockTransfer = mock(TranslogTransferManager.class);
Expand All @@ -1715,7 +1719,7 @@ public void testDownloadWithRetries() throws IOException {
}).when(mockTransfer).downloadTranslog(any(), any(), any());

// no exception thrown
RemoteFsTranslog.download(mockTransfer, location, logger, false);
RemoteFsTranslog.download(mockTransfer, location, logger, false, -1);
}

// No translog data in local as well as remote, we skip creating empty translog
Expand All @@ -1728,7 +1732,7 @@ public void testDownloadWithNoTranslogInLocalAndRemote() throws IOException {
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

Path[] filesBeforeDownload = FileSystemUtils.files(location);
RemoteFsTranslog.download(mockTransfer, location, logger, false);
RemoteFsTranslog.download(mockTransfer, location, logger, false, -1);
assertEquals(filesBeforeDownload, FileSystemUtils.files(location));
}

Expand All @@ -1748,7 +1752,7 @@ public void testDownloadWithTranslogOnlyInLocal() throws IOException {
Checkpoint existingCheckpoint = Translog.readCheckpoint(location);

TranslogTransferManager finalMockTransfer = mockTransfer;
RemoteFsTranslog.download(finalMockTransfer, location, logger, false);
RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1);

Path[] filesPostDownload = FileSystemUtils.files(location);
assertEquals(2, filesPostDownload.length);
Expand Down Expand Up @@ -1784,11 +1788,11 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
TranslogTransferManager finalMockTransfer = mockTransfer;

// download first time will ensure creating empty translog
RemoteFsTranslog.download(finalMockTransfer, location, logger, false);
RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1);
Path[] filesPostFirstDownload = FileSystemUtils.files(location);

// download on empty translog should be a no-op
RemoteFsTranslog.download(finalMockTransfer, location, logger, false);
RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1);
Path[] filesPostSecondDownload = FileSystemUtils.files(location);

assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
Expand Down
Loading

0 comments on commit 262c0e6

Please sign in to comment.