Skip to content

Commit

Permalink
Translog GC changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 23, 2024
1 parent 049f4ea commit 18cd33e
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 55 deletions.
287 changes: 262 additions & 25 deletions server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
*/
public static long parseIdFromFileName(Path translogFile) {
final String fileName = translogFile.getFileName().toString();
return parseIdFromFileName(fileName);
}

public static long parseIdFromFileName(String fileName) {
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -337,35 +339,53 @@ private void deleteFileIfExists(Path filePath) throws IOException {
}
}

public TranslogTransferMetadata readMetadata(long timestamp) throws IOException {
if (timestamp < 0) {
return readMetadata();
}
return readMetadata((blobMetadataList) -> {
List<String> metadataFiles = blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList());
Set<String> metadataFilesMatchingTimestamp = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(timestamp),
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
if (metadataFilesMatchingTimestamp.isEmpty()) {
return null;
}
assert metadataFilesMatchingTimestamp.size() == 1 : "There should be only 1 metadata file matching given timestamp";
return metadataFilesMatchingTimestamp.stream().findFirst().get();
}, Integer.MAX_VALUE);
}

public TranslogTransferMetadata readMetadata() throws IOException {
return readMetadata((blobMetadataList) -> {
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
return blobMetadataList.get(0).name();
}, METADATA_FILES_TO_FETCH);
}

private TranslogTransferMetadata readMetadata(Function<List<BlobMetadata>, String> getMetadataFileToRead, int numberOfFilesToFetch)
throws IOException {
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
String filename = blobMetadataList.get(0).name();
boolean downloadStatus = false;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
downloadStatus = true;
String filename = getMetadataFileToRead.apply(blobMetadataList);
if (filename == null) {
return;
}
try {
metadataSetOnce.set(readMetadata(filename));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
}, e -> {
if (e instanceof RuntimeException) {
Expand All @@ -381,12 +401,14 @@ public TranslogTransferMetadata readMetadata() throws IOException {
transferService.listAllInSortedOrder(
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
METADATA_FILES_TO_FETCH,
numberOfFilesToFetch,
latchedActionListener
);
latch.await();
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
throw new RuntimeException("Timed out reading metadata file");
}
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
throw new IOException("Exception while reading/downloading metadata file", e);
}

if (exceptionSetOnce.get() != null) {
Expand All @@ -396,6 +418,26 @@ public TranslogTransferMetadata readMetadata() throws IOException {
return metadataSetOnce.get();
}

public TranslogTransferMetadata readMetadata(String metadataFilename) throws IOException {
boolean downloadStatus = false;
TranslogTransferMetadata translogTransferMetadata = null;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, metadataFilename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
translogTransferMetadata = metadataStreamWrapper.readStream(indexInput);
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
return translogTransferMetadata;
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Map<String, String> generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> {
assert s instanceof TranslogFileSnapshot;
Expand Down Expand Up @@ -549,6 +591,16 @@ public void onFailure(Exception e) {
});
}

public void listTranslogMetadataFilesAsync(ActionListener<List<BlobMetadata>> listener) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
Integer.MAX_VALUE,
listener
);
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(
Expand Down Expand Up @@ -635,7 +687,7 @@ public void onFailure(Exception e) {
* @param files list of metadata files to be deleted.
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
public void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, files, new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,9 @@ public static TimeValue getPinnedTimestampsSchedulerInterval() {
public static TimeValue getPinnedTimestampsLookbackInterval() {
return pinnedTimestampsLookbackInterval;
}

// Visible for testing
public static void setPinnedTimestampsLookbackInterval(TimeValue pinnedTimestampsLookbackInterval) {
RemoteStoreSettings.pinnedTimestampsLookbackInterval = pinnedTimestampsLookbackInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.tests.mockfile.FilterFileChannel;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.mockito.Mockito;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
Expand All @@ -29,9 +33,11 @@
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
Expand All @@ -40,6 +46,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.gateway.remote.model.RemoteStorePinnedTimestampsBlobStore;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.MissingHistoryOperationsException;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
Expand All @@ -51,8 +58,13 @@
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.repositories.fs.FsRepository;
Expand Down Expand Up @@ -95,9 +107,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;

import static org.mockito.ArgumentMatchers.eq;
import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
Expand Down Expand Up @@ -153,6 +167,50 @@ public void setUp() throws Exception {
// if a previous test failed we clean up things here
translogDir = createTempDir();
translog = create(translogDir);

Supplier<RepositoriesService> repositoriesServiceSupplier = mock(Supplier.class);
Settings settings = Settings.builder()
.put(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote-repo")
.build();
RepositoriesService repositoriesService = mock(RepositoriesService.class);
when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
when(repositoriesService.repository("remote-repo")).thenReturn(blobStoreRepository);

ThreadPool mockThreadPool = mock(ThreadPool.class);
when(mockThreadPool.schedule(any(), any(), any())).then(invocationOnMock -> {
Runnable runnable = invocationOnMock.getArgument(0);
runnable.run();
return null;
}).then(subsequentInvocationsOnMock -> null);

ClusterService clusterService = mock(ClusterService.class);
ClusterState clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
Metadata metadata = mock(Metadata.class);
when(clusterState.metadata()).thenReturn(metadata);
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
repositoriesServiceSupplier,
settings,
mockThreadPool,
clusterService
);
RemoteStorePinnedTimestampService remoteStorePinnedTimestampServiceSpy = Mockito.spy(remoteStorePinnedTimestampService);

RemoteStorePinnedTimestampsBlobStore remoteStorePinnedTimestampsBlobStore = mock(RemoteStorePinnedTimestampsBlobStore.class);
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
when(remoteStorePinnedTimestampServiceSpy.pinnedTimestampsBlobStore()).thenReturn(remoteStorePinnedTimestampsBlobStore);
when(remoteStorePinnedTimestampServiceSpy.blobStoreTransferService()).thenReturn(blobStoreTransferService);

doAnswer(invocationOnMock -> {
ActionListener<List<BlobMetadata>> actionListener = invocationOnMock.getArgument(3);
actionListener.onResponse(new ArrayList<>());
return null;
}).when(blobStoreTransferService).listAllInSortedOrder(any(), any(), eq(1), any());

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

remoteStorePinnedTimestampServiceSpy.start();
}

@Override
Expand Down Expand Up @@ -1688,13 +1746,13 @@ public void testDownloadWithRetries() throws IOException {

TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata);
when(mockTransfer.readMetadata(-1)).thenReturn(translogTransferMetadata);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

// Always File not found
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found"));
TranslogTransferManager finalMockTransfer = mockTransfer;
assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger, false));
assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1));

// File not found in first attempt . File found in second attempt.
mockTransfer = mock(TranslogTransferManager.class);
Expand All @@ -1715,7 +1773,7 @@ public void testDownloadWithRetries() throws IOException {
}).when(mockTransfer).downloadTranslog(any(), any(), any());

// no exception thrown
RemoteFsTranslog.download(mockTransfer, location, logger, false);
RemoteFsTranslog.download(mockTransfer, location, logger, false, -1);
}

// No translog data in local as well as remote, we skip creating empty translog
Expand All @@ -1728,7 +1786,7 @@ public void testDownloadWithNoTranslogInLocalAndRemote() throws IOException {
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

Path[] filesBeforeDownload = FileSystemUtils.files(location);
RemoteFsTranslog.download(mockTransfer, location, logger, false);
RemoteFsTranslog.download(mockTransfer, location, logger, false, -1);
assertEquals(filesBeforeDownload, FileSystemUtils.files(location));
}

Expand All @@ -1748,7 +1806,7 @@ public void testDownloadWithTranslogOnlyInLocal() throws IOException {
Checkpoint existingCheckpoint = Translog.readCheckpoint(location);

TranslogTransferManager finalMockTransfer = mockTransfer;
RemoteFsTranslog.download(finalMockTransfer, location, logger, false);
RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1);

Path[] filesPostDownload = FileSystemUtils.files(location);
assertEquals(2, filesPostDownload.length);
Expand Down Expand Up @@ -1784,11 +1842,11 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
TranslogTransferManager finalMockTransfer = mockTransfer;

// download first time will ensure creating empty translog
RemoteFsTranslog.download(finalMockTransfer, location, logger, false);
RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1);
Path[] filesPostFirstDownload = FileSystemUtils.files(location);

// download on empty translog should be a no-op
RemoteFsTranslog.download(finalMockTransfer, location, logger, false);
RemoteFsTranslog.download(finalMockTransfer, location, logger, false, -1);
Path[] filesPostSecondDownload = FileSystemUtils.files(location);

assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
Expand Down
Loading

0 comments on commit 18cd33e

Please sign in to comment.