Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into feature-httpsink-skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
mallikagogoi7 committed Jun 26, 2023
2 parents 4bac6ea + cb5cb60 commit 458e4e0
Show file tree
Hide file tree
Showing 12 changed files with 833 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public Boolean hasTags(final List<String> tagsList) {

@Override
public void addTags(final List<String> newTags) {
tags.addAll(newTags);
if (Objects.nonNull(newTags)) {
tags.addAll(newTags);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ void fromEventMetadata_returns_matching_EventMetadata() {
assertThat(copiedMetadata.getAttributes(), not(sameInstance(attributes)));
}

@Test
public void testEventMetadata_withNullTags() {
final String testEventType = UUID.randomUUID().toString();

final EventMetadata eventMetadata = DefaultEventMetadata.builder()
.withEventType(testEventType)
.build();
assertThat(eventMetadata, notNullValue());
eventMetadata.addTags(null);
assertThat(eventMetadata.getTags(), equalTo(Collections.emptySet()));
}

@Test
public void testBuild_withTags() {
final String testEventType = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
try {
bufferAccumulator.flush();
} catch (final Exception e) {
LOG.error("Failed writing remaining OpenSearch documents to buffer due to: {}", e.getMessage());
LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage());
}

// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,40 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

/**
* ScrollWorker polls the source cluster via scroll contexts.
*/
public class ScrollWorker implements SearchWorker {

private static final Logger LOG = LoggerFactory.getLogger(ScrollWorker.class);
private static final int STANDARD_BACKOFF_MILLIS = 30_000;
private static final Duration BACKOFF_ON_SCROLL_LIMIT_REACHED = Duration.ofSeconds(120);
static final String SCROLL_TIME_PER_BATCH = "1m";

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
Expand All @@ -37,6 +62,102 @@ public ScrollWorker(final SearchAccessor searchAccessor,

@Override
public void run() {
// todo: implement
while (!Thread.currentThread().isInterrupted()) {
final Optional<SourcePartition<OpenSearchIndexProgressState>> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier);

if (indexPartition.isEmpty()) {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (final InterruptedException e) {
LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
}
}

try {
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("ScrollWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final SearchContextLimitException e) {
LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}",
indexPartition.get().getPartitionKey(), BACKOFF_ON_SCROLL_LIMIT_REACHED.getSeconds(), e.getMessage());
sourceCoordinator.giveUpPartitions();
try {
Thread.sleep(BACKOFF_ON_SCROLL_LIMIT_REACHED.toMillis());
} catch (final InterruptedException ex) {
return;
}
} catch (final RuntimeException e) {
LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
}
}
}

private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition) {
final String indexName = openSearchIndexPartition.getPartitionKey();

final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize();

final CreateScrollResponse createScrollResponse = searchAccessor.createScroll(CreateScrollRequest.builder()
.withScrollTime(SCROLL_TIME_PER_BATCH)
.withSize(openSearchSourceConfiguration.getSearchConfiguration().getBatchSize())
.withIndex(indexName)
.build());

writeDocumentsToBuffer(createScrollResponse.getDocuments());

SearchScrollResponse searchScrollResponse = null;

if (createScrollResponse.getDocuments().size() == batchSize) {
do {
try {
searchScrollResponse = searchAccessor.searchWithScroll(SearchScrollRequest.builder()
.withScrollId(Objects.nonNull(searchScrollResponse) && Objects.nonNull(searchScrollResponse.getScrollId()) ? searchScrollResponse.getScrollId() : createScrollResponse.getScrollId())
.withScrollTime(SCROLL_TIME_PER_BATCH)
.build());

writeDocumentsToBuffer(searchScrollResponse.getDocuments());
sourceCoordinator.saveProgressStateForPartition(indexName, null);
} catch (final Exception e) {
deleteScroll(createScrollResponse.getScrollId());
throw e;
}
} while (searchScrollResponse.getDocuments().size() == batchSize);
}

deleteScroll(createScrollResponse.getScrollId());

try {
bufferAccumulator.flush();
} catch (final Exception e) {
LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage());
}
}

private void writeDocumentsToBuffer(final List<Event> documents) {
documents.stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME),
record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage());
}
});
}

// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521)
private void deleteScroll(final String scrollId) {
searchAccessor.deleteScroll(DeleteScrollRequest.builder()
.withScrollId(scrollId)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.opensearch.client.opensearch._types.Time;
import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch.core.ClearScrollRequest;
import org.opensearch.client.opensearch.core.ClearScrollResponse;
import org.opensearch.client.opensearch.core.ScrollRequest;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
Expand All @@ -33,13 +36,14 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -54,6 +58,7 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAccessor.class);

static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception";
static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts";

private final OpenSearchClient openSearchClient;
private final SearchContextType searchContextType;
Expand Down Expand Up @@ -128,19 +133,66 @@ public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {

@Override
public CreateScrollResponse createScroll(final CreateScrollRequest createScrollRequest) {
//todo: implement
return null;

SearchResponse<ObjectNode> searchResponse;
try {
searchResponse = openSearchClient.search(SearchRequest.of(request -> request
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.size(createScrollRequest.getSize())
.index(createScrollRequest.getIndex())), ObjectNode.class);
} catch (final OpenSearchException e) {
LOG.error("There was an error creating a scroll context for OpenSearch: ", e);
throw e;
} catch (final IOException e) {
LOG.error("There was an error creating a scroll context for OpenSearch: ", e);
if (isDueToScrollLimitExceeded(e)) {
throw new SearchContextLimitException(String.format("There was an error creating a new scroll context for index '%s': %s",
createScrollRequest.getIndex(), e.getMessage()));
}

throw new RuntimeException(e);
}

return CreateScrollResponse.builder()
.withCreationTime(Instant.now().toEpochMilli())
.withScrollId(searchResponse.scrollId())
.withDocuments(getDocumentsFromResponse(searchResponse))
.build();
}

@Override
public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScrollRequest) {
//todo: implement
return null;
SearchResponse<ObjectNode> searchResponse;
try {
searchResponse = openSearchClient.scroll(ScrollRequest.of(request -> request
.scrollId(searchScrollRequest.getScrollId())
.scroll(Time.of(time -> time.time(searchScrollRequest.getScrollTime())))), ObjectNode.class);
} catch (final OpenSearchException e) {
LOG.error("There was an error searching with a scroll context for OpenSearch: ", e);
throw e;
} catch (final IOException e) {
LOG.error("There was an error searching with a scroll context for OpenSearch: ", e);
throw new RuntimeException(e);
}

return SearchScrollResponse.builder()
.withScrollId(searchResponse.scrollId())
.withDocuments(getDocumentsFromResponse(searchResponse))
.build();
}

@Override
public void deleteScroll(final DeleteScrollRequest deleteScrollRequest) {
//todo: implement
try {
final ClearScrollResponse clearScrollResponse = openSearchClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId())));
if (clearScrollResponse.succeeded()) {
LOG.debug("Successfully deleted scroll context with id {}", deleteScrollRequest.getScrollId());
} else {
LOG.warn("Scroll context with id {} was not deleted successfully. It will expire from timing out on its own", deleteScrollRequest.getScrollId());
}
} catch (final IOException | RuntimeException e) {
LOG.error("There was an error deleting the scroll context with id {} for OpenSearch. It will expire from timing out : ", deleteScrollRequest.getScrollId(), e);
}
}

@Override
Expand Down Expand Up @@ -177,16 +229,15 @@ private boolean isDueToPitLimitExceeded(final OpenSearchException e) {
&& PIT_RESOURCE_LIMIT_ERROR_TYPE.equals(e.error().causedBy().type());
}

private boolean isDueToScrollLimitExceeded(final IOException e) {
return e.getMessage().contains(SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE);
}

private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest searchRequest) {
try {
final SearchResponse<ObjectNode> searchResponse = openSearchClient.search(searchRequest, ObjectNode.class);

final List<Event> documents = searchResponse.hits().hits().stream()
.map(hit -> JacksonEvent.builder()
.withData(hit.source())
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());
final List<Event> documents = getDocumentsFromResponse(searchResponse);

final List<String> nextSearchAfter = Objects.nonNull(searchResponse.hits().hits()) && !searchResponse.hits().hits().isEmpty() ?
searchResponse.hits().hits().get(searchResponse.hits().hits().size() - 1).sort() :
Expand All @@ -200,4 +251,13 @@ private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest s
throw new RuntimeException(e);
}
}

private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> searchResponse) {
return searchResponse.hits().hits().stream()
.map(hit -> JacksonEvent.builder()
.withData(hit.source())
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,55 @@

public class CreateScrollRequest {

// todo: model after https://opensearch.org/docs/latest/api-reference/scroll/ &
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
private final String index;
private final String scrollTime;
private final Integer size;

public String getIndex() {
return index;
}

public Integer getSize() { return size; }

public String getScrollTime() { return scrollTime; }

private CreateScrollRequest(final CreateScrollRequest.Builder builder) {
this.index = builder.index;
this.size = builder.size;
this.scrollTime = builder.scrollTime;
}

public static CreateScrollRequest.Builder builder() {
return new CreateScrollRequest.Builder();
}

public static class Builder {

private String index;
private Integer size;
private String scrollTime;

public Builder() {

}

public CreateScrollRequest.Builder withSize(final Integer size) {
this.size = size;
return this;
}

public CreateScrollRequest.Builder withIndex(final String index) {
this.index = index;
return this;
}

public CreateScrollRequest.Builder withScrollTime(final String scrollTime) {
this.scrollTime = scrollTime;
return this;
}

public CreateScrollRequest build() {
return new CreateScrollRequest(this);
}
}
}
Loading

0 comments on commit 458e4e0

Please sign in to comment.