Skip to content

Commit

Permalink
Add exception handling and retry to uncaught exceptions, catch IndexN… (
Browse files Browse the repository at this point in the history
opensearch-project#3250)

Add exception handling and retry to uncaught exceptions, catch IndexNotFoundException for os source

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Aug 28, 2023
1 parent 76cb807 commit 930a382
Show file tree
Hide file tree
Showing 13 changed files with 488 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
Expand All @@ -14,9 +15,16 @@

public class OpenSearchIndexProgressState {

@JsonInclude(JsonInclude.Include.NON_NULL)
private String pitId;

@JsonInclude(JsonInclude.Include.NON_NULL)
private Long pitCreationTime;

@JsonInclude(JsonInclude.Include.NON_NULL)
private Long keepAlive;

@JsonInclude(JsonInclude.Include.NON_NULL)
private List<String> searchAfter;

public OpenSearchIndexProgressState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.slf4j.Logger;
Expand Down Expand Up @@ -56,32 +57,47 @@ public NoSearchContextWorker(final SearchAccessor searchAccessor,
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);

if (indexPartition.isEmpty()) {
try {

final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);

if (indexPartition.isEmpty()) {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (final InterruptedException e) {
LOG.info("The search_after worker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
}
}

try {
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("The search_after worker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final IndexNotFoundException e) {
LOG.warn("{}, marking index as complete and continuing processing", e.getMessage());
sourceCoordinator.completePartition(indexPartition.get().getPartitionKey());
} catch (final Exception e) {
LOG.error("Unknown exception while processing index '{}', moving on to another index:", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
}
} catch (final Exception e) {
LOG.error("Received an exception while trying to get index to process with search_after, backing off and retrying", e);
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (final InterruptedException e) {
LOG.info("The NoContextSearchWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
} catch (final InterruptedException ex) {
LOG.info("The search_after worker was interrupted before backing off and retrying, stopping processing");
return;
}
}

try {
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final RuntimeException e) {
LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
}
}
}

Expand All @@ -100,26 +116,21 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
try {
searchWithSearchAfterResults = searchAccessor.searchWithoutSearchContext(NoSearchContextSearchRequest.builder()
.withIndex(indexName)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.build());
searchWithSearchAfterResults = searchAccessor.searchWithoutSearchContext(NoSearchContextSearchRequest.builder()
.withIndex(indexName)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.build());

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME),
record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage());
}
});
} catch (final Exception e) {
LOG.error("Received an exception while searching with no search context for index '{}'", indexName);
throw new RuntimeException(e);
}
searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME),
record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage());
}
});

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private boolean shouldIndexBeProcessed(final String indexName) {
final List<OpenSearchIndex> includedIndices = indexParametersConfiguration.getIncludedIndices();
final List<OpenSearchIndex> excludedIndices = indexParametersConfiguration.getExcludedIndices();

final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indexName);
final boolean matchesIncludedPattern = (Objects.isNull(includedIndices) || includedIndices.isEmpty()) || doesIndexMatchPattern(includedIndices, indexName);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indexName);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
Expand Down Expand Up @@ -70,40 +71,53 @@ public PitWorker(final SearchAccessor searchAccessor,
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);
try {
final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);

if (indexPartition.isEmpty()) {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (final InterruptedException e) {
LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
if (indexPartition.isEmpty()) {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (final InterruptedException e) {
LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
}
}
}

try {
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final SearchContextLimitException e) {
LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}",
indexPartition.get().getPartitionKey(), BACKOFF_ON_PIT_LIMIT_REACHED.getSeconds(), e.getMessage());
sourceCoordinator.giveUpPartitions();
try {
Thread.sleep(BACKOFF_ON_PIT_LIMIT_REACHED.toMillis());
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final SearchContextLimitException e) {
LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}",
indexPartition.get().getPartitionKey(), BACKOFF_ON_PIT_LIMIT_REACHED.getSeconds(), e.getMessage());
sourceCoordinator.giveUpPartitions();
try {
Thread.sleep(BACKOFF_ON_PIT_LIMIT_REACHED.toMillis());
} catch (final InterruptedException ex) {
return;
}
} catch (final IndexNotFoundException e){
LOG.warn("{}, marking index as complete and continuing processing", e.getMessage());
sourceCoordinator.completePartition(indexPartition.get().getPartitionKey());
} catch (final RuntimeException e) {
LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
}
} catch (final Exception e) {
LOG.error("Received an exception while trying to get index to process with PIT, backing off and retrying", e);
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
} catch (final InterruptedException ex) {
LOG.info("The PitWorker was interrupted before backing off and retrying, stopping processing");
return;
}
} catch (final RuntimeException e) {
LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
}
}
}
Expand Down Expand Up @@ -137,27 +151,22 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
try {
searchWithSearchAfterResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
.withPitId(openSearchIndexProgressState.getPitId())
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.build());

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME),
record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage());
}
});
} catch (final Exception e) {
LOG.error("Received an exception while searching with PIT for index '{}'", indexName);
throw new RuntimeException(e);
}
searchWithSearchAfterResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
.withPitId(openSearchIndexProgressState.getPitId())
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.build());

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME),
record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage());
}
});

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis());
Expand Down
Loading

0 comments on commit 930a382

Please sign in to comment.