From 5565337ec07281f8b58f65e275185112811357c0 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Fri, 23 Jun 2023 10:07:09 -0700 Subject: [PATCH 1/2] Fix addTags API in EventMetadata (#2926) Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../model/event/DefaultEventMetadata.java | 4 +++- .../model/event/DefaultEventMetadataTest.java | 12 ++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index 24384c2ad3..e2ce55caa2 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -97,7 +97,9 @@ public Boolean hasTags(final List tagsList) { @Override public void addTags(final List newTags) { - tags.addAll(newTags); + if (Objects.nonNull(newTags)) { + tags.addAll(newTags); + } } @Override diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java index 18f15f60d9..fa624a9e2b 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java @@ -199,6 +199,18 @@ void fromEventMetadata_returns_matching_EventMetadata() { assertThat(copiedMetadata.getAttributes(), not(sameInstance(attributes))); } + @Test + public void testEventMetadata_withNullTags() { + final String testEventType = UUID.randomUUID().toString(); + + final EventMetadata eventMetadata = DefaultEventMetadata.builder() + .withEventType(testEventType) + .build(); + assertThat(eventMetadata, notNullValue()); + eventMetadata.addTags(null); + assertThat(eventMetadata.getTags(), equalTo(Collections.emptySet())); + } + @Test public void testBuild_withTags() { final String testEventType = UUID.randomUUID().toString(); From cb5cb604cbce854e28a989d04d42f5b318b2ae59 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 23 Jun 2023 15:26:52 -0500 Subject: [PATCH 2/2] Implement searching with scroll contexts for OpenSearch (#2923) Signed-off-by: Taylor Gray --- .../source/opensearch/worker/PitWorker.java | 2 +- .../opensearch/worker/ScrollWorker.java | 123 ++++++++++- .../worker/client/OpenSearchAccessor.java | 84 ++++++-- .../client/model/CreateScrollRequest.java | 53 ++++- .../client/model/CreateScrollResponse.java | 59 ++++- .../client/model/DeleteScrollRequest.java | 35 ++- .../client/model/SearchScrollRequest.java | 43 +++- .../client/model/SearchScrollResponse.java | 47 +++- .../opensearch/worker/ScrollWorkerTest.java | 202 ++++++++++++++++++ .../worker/client/OpenSearchAccessorTest.java | 195 +++++++++++++++++ 10 files changed, 818 insertions(+), 25 deletions(-) create mode 100644 data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 73040240ed..f0da0ab72d 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -167,7 +167,7 @@ private void processIndex(final SourcePartition op try { bufferAccumulator.flush(); } catch (final Exception e) { - LOG.error("Failed writing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); + LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); } // todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521) diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index 96d450e515..74611ae97a 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -8,15 +8,40 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME; /** * ScrollWorker polls the source cluster via scroll contexts. */ public class ScrollWorker implements SearchWorker { + private static final Logger LOG = LoggerFactory.getLogger(ScrollWorker.class); + private static final int STANDARD_BACKOFF_MILLIS = 30_000; + private static final Duration BACKOFF_ON_SCROLL_LIMIT_REACHED = Duration.ofSeconds(120); + static final String SCROLL_TIME_PER_BATCH = "1m"; + private final SearchAccessor searchAccessor; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; @@ -37,6 +62,102 @@ public ScrollWorker(final SearchAccessor searchAccessor, @Override public void run() { - // todo: implement + while (!Thread.currentThread().isInterrupted()) { + final Optional> indexPartition = sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier); + + if (indexPartition.isEmpty()) { + try { + Thread.sleep(STANDARD_BACKOFF_MILLIS); + continue; + } catch (final InterruptedException e) { + LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing"); + return; + } + } + + try { + processIndex(indexPartition.get()); + + sourceCoordinator.closePartition( + indexPartition.get().getPartitionKey(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount()); + } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { + LOG.warn("ScrollWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); + sourceCoordinator.giveUpPartitions(); + } catch (final SearchContextLimitException e) { + LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}", + indexPartition.get().getPartitionKey(), BACKOFF_ON_SCROLL_LIMIT_REACHED.getSeconds(), e.getMessage()); + sourceCoordinator.giveUpPartitions(); + try { + Thread.sleep(BACKOFF_ON_SCROLL_LIMIT_REACHED.toMillis()); + } catch (final InterruptedException ex) { + return; + } + } catch (final RuntimeException e) { + LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); + sourceCoordinator.giveUpPartitions(); + } + } + } + + private void processIndex(final SourcePartition openSearchIndexPartition) { + final String indexName = openSearchIndexPartition.getPartitionKey(); + + final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(); + + final CreateScrollResponse createScrollResponse = searchAccessor.createScroll(CreateScrollRequest.builder() + .withScrollTime(SCROLL_TIME_PER_BATCH) + .withSize(openSearchSourceConfiguration.getSearchConfiguration().getBatchSize()) + .withIndex(indexName) + .build()); + + writeDocumentsToBuffer(createScrollResponse.getDocuments()); + + SearchScrollResponse searchScrollResponse = null; + + if (createScrollResponse.getDocuments().size() == batchSize) { + do { + try { + searchScrollResponse = searchAccessor.searchWithScroll(SearchScrollRequest.builder() + .withScrollId(Objects.nonNull(searchScrollResponse) && Objects.nonNull(searchScrollResponse.getScrollId()) ? searchScrollResponse.getScrollId() : createScrollResponse.getScrollId()) + .withScrollTime(SCROLL_TIME_PER_BATCH) + .build()); + + writeDocumentsToBuffer(searchScrollResponse.getDocuments()); + sourceCoordinator.saveProgressStateForPartition(indexName, null); + } catch (final Exception e) { + deleteScroll(createScrollResponse.getScrollId()); + throw e; + } + } while (searchScrollResponse.getDocuments().size() == batchSize); + } + + deleteScroll(createScrollResponse.getScrollId()); + + try { + bufferAccumulator.flush(); + } catch (final Exception e) { + LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); + } + } + + private void writeDocumentsToBuffer(final List documents) { + documents.stream().map(Record::new).forEach(record -> { + try { + bufferAccumulator.add(record); + } catch (Exception e) { + LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", + record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME), + record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage()); + } + }); + } + + // todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521) + private void deleteScroll(final String scrollId) { + searchAccessor.deleteScroll(DeleteScrollRequest.builder() + .withScrollId(scrollId) + .build()); } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index b96910dfa5..3daf16097e 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -13,6 +13,9 @@ import org.opensearch.client.opensearch._types.Time; import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.ClearScrollRequest; +import org.opensearch.client.opensearch.core.ClearScrollResponse; +import org.opensearch.client.opensearch.core.ScrollRequest; import org.opensearch.client.opensearch.core.SearchRequest; import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.core.pit.CreatePitRequest; @@ -33,13 +36,14 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Map; @@ -54,6 +58,7 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAccessor.class); static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception"; + static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts"; private final OpenSearchClient openSearchClient; private final SearchContextType searchContextType; @@ -128,19 +133,66 @@ public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) { @Override public CreateScrollResponse createScroll(final CreateScrollRequest createScrollRequest) { - //todo: implement - return null; + + SearchResponse searchResponse; + try { + searchResponse = openSearchClient.search(SearchRequest.of(request -> request + .scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime()))) + .size(createScrollRequest.getSize()) + .index(createScrollRequest.getIndex())), ObjectNode.class); + } catch (final OpenSearchException e) { + LOG.error("There was an error creating a scroll context for OpenSearch: ", e); + throw e; + } catch (final IOException e) { + LOG.error("There was an error creating a scroll context for OpenSearch: ", e); + if (isDueToScrollLimitExceeded(e)) { + throw new SearchContextLimitException(String.format("There was an error creating a new scroll context for index '%s': %s", + createScrollRequest.getIndex(), e.getMessage())); + } + + throw new RuntimeException(e); + } + + return CreateScrollResponse.builder() + .withCreationTime(Instant.now().toEpochMilli()) + .withScrollId(searchResponse.scrollId()) + .withDocuments(getDocumentsFromResponse(searchResponse)) + .build(); } @Override public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScrollRequest) { - //todo: implement - return null; + SearchResponse searchResponse; + try { + searchResponse = openSearchClient.scroll(ScrollRequest.of(request -> request + .scrollId(searchScrollRequest.getScrollId()) + .scroll(Time.of(time -> time.time(searchScrollRequest.getScrollTime())))), ObjectNode.class); + } catch (final OpenSearchException e) { + LOG.error("There was an error searching with a scroll context for OpenSearch: ", e); + throw e; + } catch (final IOException e) { + LOG.error("There was an error searching with a scroll context for OpenSearch: ", e); + throw new RuntimeException(e); + } + + return SearchScrollResponse.builder() + .withScrollId(searchResponse.scrollId()) + .withDocuments(getDocumentsFromResponse(searchResponse)) + .build(); } @Override public void deleteScroll(final DeleteScrollRequest deleteScrollRequest) { - //todo: implement + try { + final ClearScrollResponse clearScrollResponse = openSearchClient.clearScroll(ClearScrollRequest.of(request -> request.scrollId(deleteScrollRequest.getScrollId()))); + if (clearScrollResponse.succeeded()) { + LOG.debug("Successfully deleted scroll context with id {}", deleteScrollRequest.getScrollId()); + } else { + LOG.warn("Scroll context with id {} was not deleted successfully. It will expire from timing out on its own", deleteScrollRequest.getScrollId()); + } + } catch (final IOException | RuntimeException e) { + LOG.error("There was an error deleting the scroll context with id {} for OpenSearch. It will expire from timing out : ", deleteScrollRequest.getScrollId(), e); + } } @Override @@ -177,16 +229,15 @@ private boolean isDueToPitLimitExceeded(final OpenSearchException e) { && PIT_RESOURCE_LIMIT_ERROR_TYPE.equals(e.error().causedBy().type()); } + private boolean isDueToScrollLimitExceeded(final IOException e) { + return e.getMessage().contains(SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE); + } + private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest searchRequest) { try { final SearchResponse searchResponse = openSearchClient.search(searchRequest, ObjectNode.class); - final List documents = searchResponse.hits().hits().stream() - .map(hit -> JacksonEvent.builder() - .withData(hit.source()) - .withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index())) - .withEventType(EventType.DOCUMENT.toString()).build()) - .collect(Collectors.toList()); + final List documents = getDocumentsFromResponse(searchResponse); final List nextSearchAfter = Objects.nonNull(searchResponse.hits().hits()) && !searchResponse.hits().hits().isEmpty() ? searchResponse.hits().hits().get(searchResponse.hits().hits().size() - 1).sort() : @@ -200,4 +251,13 @@ private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest s throw new RuntimeException(e); } } + + private List getDocumentsFromResponse(final SearchResponse searchResponse) { + return searchResponse.hits().hits().stream() + .map(hit -> JacksonEvent.builder() + .withData(hit.source()) + .withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index())) + .withEventType(EventType.DOCUMENT.toString()).build()) + .collect(Collectors.toList()); + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java index 140eea7c2e..e48c09c1bb 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java @@ -6,6 +6,55 @@ public class CreateScrollRequest { - // todo: model after https://opensearch.org/docs/latest/api-reference/scroll/ & - // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html + private final String index; + private final String scrollTime; + private final Integer size; + + public String getIndex() { + return index; + } + + public Integer getSize() { return size; } + + public String getScrollTime() { return scrollTime; } + + private CreateScrollRequest(final CreateScrollRequest.Builder builder) { + this.index = builder.index; + this.size = builder.size; + this.scrollTime = builder.scrollTime; + } + + public static CreateScrollRequest.Builder builder() { + return new CreateScrollRequest.Builder(); + } + + public static class Builder { + + private String index; + private Integer size; + private String scrollTime; + + public Builder() { + + } + + public CreateScrollRequest.Builder withSize(final Integer size) { + this.size = size; + return this; + } + + public CreateScrollRequest.Builder withIndex(final String index) { + this.index = index; + return this; + } + + public CreateScrollRequest.Builder withScrollTime(final String scrollTime) { + this.scrollTime = scrollTime; + return this; + } + + public CreateScrollRequest build() { + return new CreateScrollRequest(this); + } + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java index 4a08b48ddb..517bab199e 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java @@ -4,8 +4,63 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; +import org.opensearch.dataprepper.model.event.Event; + +import java.util.List; + public class CreateScrollResponse { - // todo: model after https://opensearch.org/docs/latest/api-reference/scroll/ & - // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html + private final String scrollId; + private final Long scrollCreationTime; + private final List documents; + + public List getDocuments() { + return documents; + } + + public String getScrollId() { + return scrollId; + } + + public Long getScrollCreationTime() { return scrollCreationTime; } + + private CreateScrollResponse(final CreateScrollResponse.Builder builder) { + this.scrollId = builder.scrollId; + this.scrollCreationTime = builder.scrollCreationTime; + this.documents = builder.documents; + } + + public static CreateScrollResponse.Builder builder() { + return new CreateScrollResponse.Builder(); + } + + public static class Builder { + + private List documents; + private String scrollId; + private Long scrollCreationTime; + + public Builder() { + + } + + public CreateScrollResponse.Builder withDocuments(final List documents) { + this.documents = documents; + return this; + } + + public CreateScrollResponse.Builder withScrollId(final String scrollId) { + this.scrollId = scrollId; + return this; + } + + public CreateScrollResponse.Builder withCreationTime(final Long scrollCreationTime) { + this.scrollCreationTime = scrollCreationTime; + return this; + } + + public CreateScrollResponse build() { + return new CreateScrollResponse(this); + } + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeleteScrollRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeleteScrollRequest.java index 294296842f..025925e858 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeleteScrollRequest.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeleteScrollRequest.java @@ -1,11 +1,40 @@ /* - * Copyright OpenSearch Contributors + * Copyright OpenDelete Contributors * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; public class DeleteScrollRequest { - // todo: model after https://opensearch.org/docs/latest/api-reference/scroll/ & - // https://www.elastic.co/guide/en/elasticsearch/reference/current/clear-scroll-api.html + private final String scrollId; + + public String getScrollId() { + return scrollId; + } + + private DeleteScrollRequest(final DeleteScrollRequest.Builder builder) { + this.scrollId = builder.scrollId; + } + + public static DeleteScrollRequest.Builder builder() { + return new DeleteScrollRequest.Builder(); + } + + public static class Builder { + + private String scrollId; + + public Builder() { + + } + + public DeleteScrollRequest.Builder withScrollId(final String scrollId) { + this.scrollId = scrollId; + return this; + } + + public DeleteScrollRequest build() { + return new DeleteScrollRequest(this); + } + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollRequest.java index 11bc3eece6..5aae510b24 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollRequest.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollRequest.java @@ -6,6 +6,45 @@ public class SearchScrollRequest { - // todo: model after: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html - // & https://opensearch.org/docs/latest/api-reference/scroll/ + private final String scrollId; + private final String scrollTime; + + public String getScrollId() { + return scrollId; + } + + public String getScrollTime() { return scrollTime; } + + private SearchScrollRequest(final SearchScrollRequest.Builder builder) { + this.scrollId = builder.scrollId; + this.scrollTime = builder.scrollTime; + } + + public static SearchScrollRequest.Builder builder() { + return new SearchScrollRequest.Builder(); + } + + public static class Builder { + + private String scrollId; + private String scrollTime; + + public Builder() { + + } + + public SearchScrollRequest.Builder withScrollId(final String scrollId) { + this.scrollId = scrollId; + return this; + } + + public SearchScrollRequest.Builder withScrollTime(final String scrollTime) { + this.scrollTime = scrollTime; + return this; + } + + public SearchScrollRequest build() { + return new SearchScrollRequest(this); + } + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java index dec8725420..8f5809c03f 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java @@ -4,8 +4,51 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; +import org.opensearch.dataprepper.model.event.Event; + +import java.util.List; + public class SearchScrollResponse { - // todo: model after: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html - // & https://opensearch.org/docs/latest/api-reference/scroll/ + private final String scrollId; + private final List documents; + + public String getScrollId() { + return scrollId; + } + + public List getDocuments() { return documents; } + + private SearchScrollResponse(final SearchScrollResponse.Builder builder) { + this.scrollId = builder.scrollId; + this.documents = builder.documents; + } + + public static SearchScrollResponse.Builder builder() { + return new SearchScrollResponse.Builder(); + } + + public static class Builder { + + private String scrollId; + private List documents; + + public Builder() { + + } + + public SearchScrollResponse.Builder withScrollId(final String scrollId) { + this.scrollId = scrollId; + return this; + } + + public SearchScrollResponse.Builder withDocuments(final List documents) { + this.documents = documents; + return this; + } + + public SearchScrollResponse build() { + return new SearchScrollResponse(this); + } + } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java new file mode 100644 index 0000000000..9385bf6239 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -0,0 +1,202 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker.SCROLL_TIME_PER_BATCH; + +@ExtendWith(MockitoExtension.class) +public class ScrollWorkerTest { + + @Mock + private OpenSearchSourceConfiguration openSearchSourceConfiguration; + + @Mock + private OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; + + @Mock + private SourceCoordinator sourceCoordinator; + + @Mock + private SearchAccessor searchAccessor; + + @Mock + private BufferAccumulator> bufferAccumulator; + + private ExecutorService executorService; + + @BeforeEach + void setup() { + executorService = Executors.newSingleThreadExecutor(); + } + + private ScrollWorker createObjectUnderTest() { + return new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier); + } + + @Test + void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrupted() throws InterruptedException { + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.empty()); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scroll_and_closes_that_partition() throws Exception { + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + + final String scrollId = UUID.randomUUID().toString(); + final ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(CreateScrollRequest.class); + final CreateScrollResponse createScrollResponse = mock(CreateScrollResponse.class); + when(createScrollResponse.getScrollId()).thenReturn(scrollId); + when(createScrollResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))); + when(searchAccessor.createScroll(requestArgumentCaptor.capture())).thenReturn(createScrollResponse); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final SearchScrollResponse searchScrollResponse = mock(SearchScrollResponse.class); + when(searchScrollResponse.getScrollId()).thenReturn(scrollId); + when(searchScrollResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))) + .thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + + final ArgumentCaptor searchScrollRequestArgumentCaptor = ArgumentCaptor.forClass(SearchScrollRequest.class); + when(searchAccessor.searchWithScroll(searchScrollRequestArgumentCaptor.capture())).thenReturn(searchScrollResponse); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(bufferAccumulator).flush(); + + final ArgumentCaptor deleteRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteScrollRequest.class); + doNothing().when(searchAccessor).deleteScroll(deleteRequestArgumentCaptor.capture()); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); + when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, + Duration.ZERO, 1); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + final CreateScrollRequest createScrollRequest = requestArgumentCaptor.getValue(); + assertThat(createScrollRequest, notNullValue()); + assertThat(createScrollRequest.getSize(), equalTo(2)); + assertThat(createScrollRequest.getIndex(), equalTo(partitionKey)); + assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); + + verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class)); + verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(null)); + + final List searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues(); + assertThat(searchScrollRequests.size(), equalTo(2)); + assertThat(searchScrollRequests.get(0), notNullValue()); + assertThat(searchScrollRequests.get(0).getScrollId(), equalTo(scrollId)); + assertThat(searchScrollRequests.get(0).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); + + assertThat(searchScrollRequests.get(1), notNullValue()); + assertThat(searchScrollRequests.get(1).getScrollId(), equalTo(scrollId)); + assertThat(searchScrollRequests.get(1).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); + + + final DeleteScrollRequest deleteScrollRequest = deleteRequestArgumentCaptor.getValue(); + assertThat(deleteScrollRequest, notNullValue()); + assertThat(deleteScrollRequest.getScrollId(), equalTo(scrollId)); + } + + @Test + void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLimitException() throws InterruptedException { + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + when(searchAccessor.createScroll(any(CreateScrollRequest.class))).thenThrow(SearchContextLimitException.class); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + verifyNoMoreInteractions(searchAccessor); + verify(sourceCoordinator).giveUpPartitions(); + verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + } + +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java index 6a53004e5b..81fea58a73 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java @@ -16,6 +16,10 @@ import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.ErrorCause; import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch.core.ClearScrollRequest; +import org.opensearch.client.opensearch.core.ClearScrollResponse; +import org.opensearch.client.opensearch.core.ScrollRequest; +import org.opensearch.client.opensearch.core.ScrollResponse; import org.opensearch.client.opensearch.core.SearchRequest; import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.core.pit.CreatePitRequest; @@ -28,9 +32,14 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; import java.io.IOException; @@ -49,6 +58,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.PIT_RESOURCE_LIMIT_ERROR_TYPE; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE; @ExtendWith(MockitoExtension.class) public class OpenSearchAccessorTest { @@ -83,6 +93,50 @@ void create_pit_returns_expected_create_point_in_time_response() throws IOExcept assertThat(createPointInTimeResponse.getPitId(), equalTo(pitId)); } + @Test + void create_scroll_returns_expected_create_scroll_response() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String scrollTime = UUID.randomUUID().toString(); + final Integer size = new Random().nextInt(10); + + final CreateScrollRequest createScrollRequest = mock(CreateScrollRequest.class); + when(createScrollRequest.getIndex()).thenReturn(indexName); + when(createScrollRequest.getScrollTime()).thenReturn(scrollTime); + when(createScrollRequest.getSize()).thenReturn(size); + + final String scrollId = UUID.randomUUID().toString(); + final SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.scrollId()).thenReturn(scrollId); + + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final List> hits = new ArrayList<>(); + final Hit firstHit = mock(Hit.class); + when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); + when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); + when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + + final Hit secondHit = mock(Hit.class); + when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); + when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); + when(secondHit.source()).thenReturn(mock(ObjectNode.class)); + + hits.add(firstHit); + hits.add(secondHit); + + when(hitsMetadata.hits()).thenReturn(hits); + when(searchResponse.hits()).thenReturn(hitsMetadata); + + final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + + when(openSearchClient.search(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse); + + final CreateScrollResponse createScrollResponse = createObjectUnderTest().createScroll(createScrollRequest); + assertThat(createScrollResponse, notNullValue()); + assertThat(createScrollResponse.getScrollId(), equalTo(scrollId)); + assertThat(createScrollResponse.getDocuments(), notNullValue()); + assertThat(createScrollResponse.getDocuments().size(), equalTo(2)); + } + @Test void create_pit_with_exception_for_pit_limit_throws_SearchContextLimitException() throws IOException { final String indexName = UUID.randomUUID().toString(); @@ -105,6 +159,25 @@ void create_pit_with_exception_for_pit_limit_throws_SearchContextLimitException( assertThrows(SearchContextLimitException.class, () -> createObjectUnderTest().createPit(createPointInTimeRequest)); } + @Test + void create_scroll_with_exception_for_scroll_limit_throws_SearchContextLimitException() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String scrollTime = UUID.randomUUID().toString(); + final Integer size = new Random().nextInt(10); + + final CreateScrollRequest createScrollRequest = mock(CreateScrollRequest.class); + when(createScrollRequest.getIndex()).thenReturn(indexName); + when(createScrollRequest.getScrollTime()).thenReturn(scrollTime); + when(createScrollRequest.getSize()).thenReturn(size); + + final IOException exception = mock(IOException.class); + when(exception.getMessage()).thenReturn(UUID.randomUUID() + SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE + UUID.randomUUID()); + + when(openSearchClient.search(any(SearchRequest.class), eq(ObjectNode.class))).thenThrow(exception); + + assertThrows(SearchContextLimitException.class, () -> createObjectUnderTest().createScroll(createScrollRequest)); + } + @Test void createPit_throws_OpenSearch_exception_throws_that_exception() throws IOException { final String indexName = UUID.randomUUID().toString(); @@ -124,6 +197,24 @@ void createPit_throws_OpenSearch_exception_throws_that_exception() throws IOExce assertThrows(OpenSearchException.class, () -> createObjectUnderTest().createPit(createPointInTimeRequest)); } + @Test + void createScroll_throws_OpenSearch_exception_throws_that_exception() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String scrollTime = UUID.randomUUID().toString(); + final Integer size = new Random().nextInt(10); + + final CreateScrollRequest createScrollRequest = mock(CreateScrollRequest.class); + when(createScrollRequest.getIndex()).thenReturn(indexName); + when(createScrollRequest.getScrollTime()).thenReturn(scrollTime); + when(createScrollRequest.getSize()).thenReturn(size); + + final OpenSearchException exception = mock(OpenSearchException.class); + + when(openSearchClient.search(any(SearchRequest.class), eq(ObjectNode.class))).thenThrow(exception); + + assertThrows(OpenSearchException.class, () -> createObjectUnderTest().createScroll(createScrollRequest)); + } + @Test void createPit_throws_runtime_exception_throws_IO_Exception() throws IOException { final String indexName = UUID.randomUUID().toString(); @@ -138,6 +229,26 @@ void createPit_throws_runtime_exception_throws_IO_Exception() throws IOException assertThrows(RuntimeException.class, () -> createObjectUnderTest().createPit(createPointInTimeRequest)); } + @Test + void createScroll_throws_runtime_exception_when_throws_IO_Exception_that_is_not_search_context() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String scrollTime = UUID.randomUUID().toString(); + final Integer size = new Random().nextInt(10); + + final CreateScrollRequest createScrollRequest = mock(CreateScrollRequest.class); + when(createScrollRequest.getIndex()).thenReturn(indexName); + when(createScrollRequest.getScrollTime()).thenReturn(scrollTime); + when(createScrollRequest.getSize()).thenReturn(size); + + final IOException exception = mock(IOException.class); + when(exception.getMessage()).thenReturn(UUID.randomUUID().toString()); + + when(openSearchClient.search(any(SearchRequest.class), eq(ObjectNode.class))).thenThrow(exception); + + final RuntimeException exceptionThrown = assertThrows(RuntimeException.class, () -> createObjectUnderTest().createScroll(createScrollRequest)); + assertThat(exceptionThrown instanceof SearchContextLimitException, equalTo(false)); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void delete_pit_with_no_exception_does_not_throw(final boolean successful) throws IOException { @@ -156,6 +267,23 @@ void delete_pit_with_no_exception_does_not_throw(final boolean successful) throw createObjectUnderTest().deletePit(deletePointInTimeRequest); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void delete_scroll_with_no_exception_does_not_throw(final boolean successful) throws IOException { + final String scrollId = UUID.randomUUID().toString(); + + final DeleteScrollRequest deleteScrollRequest = mock(DeleteScrollRequest.class); + when(deleteScrollRequest.getScrollId()).thenReturn(scrollId); + + final ClearScrollResponse clearScrollResponse = mock(ClearScrollResponse.class); + when(clearScrollResponse.succeeded()).thenReturn(successful); + + + when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollResponse); + + createObjectUnderTest().deleteScroll(deleteScrollRequest); + } + @Test void delete_pit_does_not_throw_during_opensearch_exception() throws IOException { final String pitId = UUID.randomUUID().toString(); @@ -168,6 +296,19 @@ void delete_pit_does_not_throw_during_opensearch_exception() throws IOException createObjectUnderTest().deletePit(deletePointInTimeRequest); } + @Test + void delete_scroll_does_not_throw_during_opensearch_exception() throws IOException { + final String scrollId = UUID.randomUUID().toString(); + + final DeleteScrollRequest deleteScrollRequest = mock(DeleteScrollRequest.class); + when(deleteScrollRequest.getScrollId()).thenReturn(scrollId); + + + when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(OpenSearchException.class); + + createObjectUnderTest().deleteScroll(deleteScrollRequest); + } + @Test void delete_pit_does_not_throw_exception_when_client_throws_IOException() throws IOException { final String pitId = UUID.randomUUID().toString(); @@ -180,6 +321,19 @@ void delete_pit_does_not_throw_exception_when_client_throws_IOException() throws createObjectUnderTest().deletePit(deletePointInTimeRequest); } + @Test + void delete_scroll_does_not_throw_during_IO_exception() throws IOException { + final String scrollId = UUID.randomUUID().toString(); + + final DeleteScrollRequest deleteScrollRequest = mock(DeleteScrollRequest.class); + when(deleteScrollRequest.getScrollId()).thenReturn(scrollId); + + + when(openSearchClient.clearScroll(any(ClearScrollRequest.class))).thenThrow(IOException.class); + + createObjectUnderTest().deleteScroll(deleteScrollRequest); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean hasSearchAfter) throws IOException { @@ -229,4 +383,45 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort())); } + + @Test + void search_with_scroll_returns_expected_SearchScrollResponse() throws IOException { + final String scrollId = UUID.randomUUID().toString(); + final String scrollTime = UUID.randomUUID().toString(); + + final SearchScrollRequest searchScrollRequest = mock(SearchScrollRequest.class); + when(searchScrollRequest.getScrollId()).thenReturn(scrollId); + when(searchScrollRequest.getScrollTime()).thenReturn(scrollTime); + + final ScrollResponse searchResponse = mock(ScrollResponse.class); + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final List> hits = new ArrayList<>(); + final Hit firstHit = mock(Hit.class); + when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); + when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); + when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + + final Hit secondHit = mock(Hit.class); + when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); + when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); + when(secondHit.source()).thenReturn(mock(ObjectNode.class)); + + hits.add(firstHit); + hits.add(secondHit); + + when(hitsMetadata.hits()).thenReturn(hits); + when(searchResponse.hits()).thenReturn(hitsMetadata); + when(searchResponse.scrollId()).thenReturn(scrollId); + + final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(ScrollRequest.class); + + when(openSearchClient.scroll(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse); + + final SearchScrollResponse searchScrollResponse = createObjectUnderTest().searchWithScroll(searchScrollRequest); + + assertThat(searchScrollResponse, notNullValue()); + assertThat(searchScrollResponse.getDocuments(), notNullValue()); + assertThat(searchScrollResponse.getDocuments().size(), equalTo(2)); + assertThat(searchScrollResponse.getScrollId(), equalTo(scrollId)); + } }