diff --git a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java index 23d5df60a..d01f788ff 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java @@ -285,7 +285,7 @@ private void createInferenceListForMapTypeInput(Object sourceValue, List if (sourceValue instanceof Map) { ((Map) sourceValue).forEach((k, v) -> createInferenceListForMapTypeInput(v, texts)); } else if (sourceValue instanceof List) { - texts.addAll(((List) sourceValue)); + ((List) sourceValue).stream().filter(Objects::nonNull).forEach(texts::add); } else { if (sourceValue == null) return; texts.add(sourceValue.toString()); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTests.java b/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTests.java index cd2d0816a..d08ec495a 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTests.java @@ -66,7 +66,7 @@ public void test_batchExecute_emptyInput() { verify(clientAccessor, never()).inferenceSentences(anyString(), anyList(), any()); } - public void test_batchExecute_allFailedValidation() { + public void test_batchExecuteWithEmpty_allFailedValidation() { final int docCount = 2; TestInferenceProcessor processor = new TestInferenceProcessor(createMockVectorResult(), BATCH_SIZE, null); List wrapperList = createIngestDocumentWrappers(docCount); @@ -79,6 +79,32 @@ public void test_batchExecute_allFailedValidation() { assertEquals(docCount, captor.getValue().size()); for (int i = 0; i < docCount; ++i) { assertNotNull(captor.getValue().get(i).getException()); + assertEquals( + "list type field [key1] has empty string, cannot process it", + captor.getValue().get(i).getException().getMessage() + ); + assertEquals(wrapperList.get(i).getIngestDocument(), captor.getValue().get(i).getIngestDocument()); + } + verify(clientAccessor, never()).inferenceSentences(anyString(), anyList(), any()); + } + + public void test_batchExecuteWithNull_allFailedValidation() { + final int docCount = 2; + TestInferenceProcessor processor = new TestInferenceProcessor(createMockVectorResult(), BATCH_SIZE, null); + List wrapperList = createIngestDocumentWrappers(docCount); + wrapperList.get(0).getIngestDocument().setFieldValue("key1", Arrays.asList(null, "value1")); + wrapperList.get(1).getIngestDocument().setFieldValue("key1", Arrays.asList(null, "value1")); + Consumer resultHandler = mock(Consumer.class); + processor.batchExecute(wrapperList, resultHandler); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(resultHandler).accept(captor.capture()); + assertEquals(docCount, captor.getValue().size()); + for (int i = 0; i < docCount; ++i) { + assertNotNull(captor.getValue().get(i).getException()); + assertEquals( + "list type field [key1] has null, cannot process it", + captor.getValue().get(i).getException().getMessage() + ); assertEquals(wrapperList.get(i).getIngestDocument(), captor.getValue().get(i).getIngestDocument()); } verify(clientAccessor, never()).inferenceSentences(anyString(), anyList(), any()); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java index 8fd87f091..4afa4031d 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java @@ -310,5 +310,14 @@ private void ingestBatchDocumentWithBulk(String idPrefix, int docCount, Set> itemMap = (Map>) item; + if (itemMap.get("index").get("error") != null) { + failedDocCount++; + } + } + assertEquals(failedIds.size(), failedDocCount); } } diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index 7ad0e63f8..e6fb45d2a 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -203,6 +203,7 @@ protected void loadModel(final String modelId) throws Exception { isComplete = checkComplete(taskQueryResult); Thread.sleep(DEFAULT_TASK_RESULT_QUERY_INTERVAL_IN_MILLISECOND); } + assertTrue(isComplete); } /**