Skip to content

Commit

Permalink
Implement NoSearchContextWorker to search with search_after and not u…
Browse files Browse the repository at this point in the history
…se pit or scroll, allow override with search_context_type parameter (opensearch-project#2873)

* Implement NoSearchContextWorker to search with search_after and not use pit or scroll, allow override with search_context_type parameter

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Jun 15, 2023
1 parent 98f7ce7 commit c7bfc2e
Show file tree
Hide file tree
Showing 16 changed files with 642 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.NoSearchContextWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker;
Expand Down Expand Up @@ -76,6 +77,9 @@ public void start() {
case SCROLL:
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier);
break;
case NONE:
searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier);
break;
default:
throw new IllegalArgumentException(
String.format("Search context type must be POINT_IN_TIME or SCROLL, type %s was given instead",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SearchConfiguration {

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(SearchConfiguration.class);

// TODO: Should we default this to NONE and remove the version lookup to determine scroll or point-in-time as the default behavior?
@JsonProperty("search_context_type")
private SearchContextType searchContextType;

@JsonProperty("batch_size")
private Integer batchSize = 1000;

Expand All @@ -28,6 +34,9 @@ public class SearchConfiguration {
@JsonIgnore
private Map<String, Object> queryMap;

public SearchContextType getSearchContextType() {
return searchContextType;
}

public Integer getBatchSize() {
return batchSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

public class NoSearchContextWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(NoSearchContextWorker.class);

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

public NoSearchContextWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.bufferAccumulator = bufferAccumulator;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);

if (indexPartition.isEmpty()) {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (final InterruptedException e) {
LOG.info("The NoContextSearchWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
}
}

try {
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final RuntimeException e) {
LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
}
}
}

private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition) {
final String indexName = openSearchIndexPartition.getPartitionKey();
Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();

if (openSearchIndexProgressStateOptional.isEmpty()) {
openSearchIndexProgressStateOptional = Optional.of(initializeProgressState());
}

final OpenSearchIndexProgressState openSearchIndexProgressState = openSearchIndexProgressStateOptional.get();

final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
SearchWithSearchAfterResults searchWithSearchAfterResults = null;

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
try {
searchWithSearchAfterResults = searchAccessor.searchWithoutSearchContext(NoSearchContextSearchRequest.builder()
.withIndex(indexName)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.build());

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME),
record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage());
}
});
} catch (final Exception e) {
LOG.error("Received an exception while searching with PIT for index '{}'", indexName);
throw new RuntimeException(e);
}

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

try {
bufferAccumulator.flush();
} catch (final Exception e) {
LOG.error("Failed writing remaining OpenSearch documents to buffer due to: {}", e.getMessage());
}
}

private OpenSearchIndexProgressState initializeProgressState() {
return new OpenSearchIndexProgressState();
}

private List<String> getSearchAfter(final OpenSearchIndexProgressState openSearchIndexProgressState, final SearchWithSearchAfterResults searchWithSearchAfterResults) {
if (Objects.isNull(searchWithSearchAfterResults)) {
if (Objects.isNull(openSearchIndexProgressState.getSearchAfter())) {
return null;
} else {
return openSearchIndexProgressState.getSearchAfter();
}
}

return searchWithSearchAfterResults.getNextSearchAfter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,19 +133,19 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
}

final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
SearchPointInTimeResults searchPointInTimeResults = null;
SearchWithSearchAfterResults searchWithSearchAfterResults = null;

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
try {
searchPointInTimeResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
searchWithSearchAfterResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
.withPitId(openSearchIndexProgressState.getPitId())
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchPointInTimeResults))
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.build());

searchPointInTimeResults.getDocuments().stream().map(Record::new).forEach(record -> {
searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
Expand All @@ -159,10 +159,10 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
throw new RuntimeException(e);
}

openSearchIndexProgressState.setSearchAfter(searchPointInTimeResults.getNextSearchAfter());
openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis());
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
} while (searchPointInTimeResults.getDocuments().size() == searchConfiguration.getBatchSize());
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

try {
bufferAccumulator.flush();
Expand All @@ -178,15 +178,15 @@ private OpenSearchIndexProgressState initializeProgressState() {
return new OpenSearchIndexProgressState();
}

private List<String> getSearchAfter(final OpenSearchIndexProgressState openSearchIndexProgressState, final SearchPointInTimeResults searchPointInTimeResults) {
if (Objects.isNull(searchPointInTimeResults) && Objects.isNull(openSearchIndexProgressState.getSearchAfter())) {
private List<String> getSearchAfter(final OpenSearchIndexProgressState openSearchIndexProgressState, final SearchWithSearchAfterResults searchWithSearchAfterResults) {
if (Objects.isNull(searchWithSearchAfterResults) && Objects.isNull(openSearchIndexProgressState.getSearchAfter())) {
return null;
}

if (Objects.isNull(searchPointInTimeResults) && Objects.nonNull(openSearchIndexProgressState.getSearchAfter())) {
if (Objects.isNull(searchWithSearchAfterResults) && Objects.nonNull(openSearchIndexProgressState.getSearchAfter())) {
return openSearchIndexProgressState.getSearchAfter();
}

return searchPointInTimeResults.getNextSearchAfter();
return searchWithSearchAfterResults.getNextSearchAfter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

Expand All @@ -30,7 +31,7 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create
}

@Override
public SearchPointInTimeResults searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
public SearchWithSearchAfterResults searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
//todo: implement
return null;
}
Expand All @@ -57,6 +58,11 @@ public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
//todo: implement
}

@Override
public SearchWithSearchAfterResults searchWithoutSearchContext(NoSearchContextSearchRequest noSearchContextSearchRequest) {
return null;
}

@Override
public Object getClient() {
return null;
Expand Down
Loading

0 comments on commit c7bfc2e

Please sign in to comment.