Skip to content

Commit

Permalink
Fix bulk ingest NPE with empty pipeline (opensearch-project#15033)
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <[email protected]>
  • Loading branch information
chishui committed Aug 6, 2024
1 parent f980924 commit b47b401
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- Fix constraint bug which allows more primary shards than average primary shards per index ([#14908](https://github.com/opensearch-project/OpenSearch/pull/14908))
- Fix NPE when bulk ingest with empty pipeline ([#15033](https://github.com/opensearch-project/OpenSearch/pull/15033))
- Fix missing value of FieldSort for unsigned_long ([#14963](https://github.com/opensearch-project/OpenSearch/pull/14963))
- Fix delete index template failed when the index template matches a data stream but is unused ([#15080](https://github.com/opensearch-project/OpenSearch/pull/15080))

Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ private void innerBatchExecute(
Consumer<List<IngestDocumentWrapper>> handler
) {
if (pipeline.getProcessors().isEmpty()) {
handler.accept(null);
handler.accept(toIngestDocumentWrappers(slots, indexRequests));
return;
}

Expand Down Expand Up @@ -1271,6 +1271,14 @@ private static IngestDocumentWrapper toIngestDocumentWrapper(int slot, IndexRequ
return new IngestDocumentWrapper(slot, toIngestDocument(indexRequest), null);
}

private static List<IngestDocumentWrapper> toIngestDocumentWrappers(List<Integer> slots, List<IndexRequest> indexRequests) {
List<IngestDocumentWrapper> ingestDocumentWrappers = new ArrayList<>();
for (int i = 0; i < slots.size(); ++i) {
ingestDocumentWrappers.add(toIngestDocumentWrapper(slots.get(i), indexRequests.get(i)));
}
return ingestDocumentWrappers;
}

private static Map<Integer, IndexRequest> createSlotIndexRequestMap(List<Integer> slots, List<IndexRequest> indexRequests) {
Map<Integer, IndexRequest> slotIndexRequestMap = new HashMap<>();
for (int i = 0; i < slots.size(); ++i) {
Expand Down
36 changes: 36 additions & 0 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,42 @@ public void testExecuteBulkRequestInBatchWithDefaultBatchSize() {
verify(mockCompoundProcessor, never()).execute(any(), any());
}

public void testExecuteEmptyPipelineInBatch() throws Exception {
IngestService ingestService = createWithProcessors(emptyMap());
PutPipelineRequest putRequest = new PutPipelineRequest(
"_id",
new BytesArray("{\"processors\": [], \"description\": \"_description\"}"),
MediaTypeRegistry.JSON
);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest3);
IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest4);
bulkRequest.batchSize(4);
final Map<Integer, Exception> failureHandler = new HashMap<>();
final Map<Thread, Exception> completionHandler = new HashMap<>();
ingestService.executeBulkRequest(
4,
bulkRequest.requests(),
failureHandler::put,
completionHandler::put,
indexReq -> {},
Names.WRITE,
bulkRequest
);
assertTrue(failureHandler.isEmpty());
assertEquals(Set.of(Thread.currentThread()), completionHandler.keySet());
}

public void testPrepareBatches_same_index_pipeline() {
IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
Expand Down

0 comments on commit b47b401

Please sign in to comment.