Skip to content

Commit

Permalink
Fix Unit tests for FlintIndexReader (#2238) (#2239)
Browse files Browse the repository at this point in the history
(cherry picked from commit 55e8e84)

Signed-off-by: Vamsi Manohar <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 6b221c3 commit 3297502
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 47 deletions.
1 change: 0 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
'org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
@AllArgsConstructor
public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {

protected static final String META_KEY = "_meta";
protected static final String PROPERTIES_KEY = "properties";
protected static final String ENV_KEY = "env";
protected static final String JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID";

private final Client client;

@Override
Expand All @@ -22,12 +27,12 @@ public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
try {
MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName);
Map<String, Object> mappingSourceMap = mappingMetadata.getSourceAsMap();
Map<String, Object> metaMap = (Map<String, Object>) mappingSourceMap.get("_meta");
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get("properties");
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get("env");
return (String) envMap.get("SERVERLESS_EMR_JOB_ID");
Map<String, Object> metaMap = (Map<String, Object>) mappingSourceMap.get(META_KEY);
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
return (String) envMap.get(JOB_ID_KEY);
} catch (NullPointerException npe) {
throw new IllegalArgumentException("Index doesn't exist");
throw new IllegalArgumentException("Provided Index doesn't exist");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public enum FlintIndexType {
SKIPPING("skipping_index"),
COVERING("index"),
MATERIALIZED("materialized_view");
MATERIALIZED_VIEW("materialized_view");

private final String suffix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -31,26 +32,86 @@ public class FlintIndexMetadataReaderImplTest {
@Mock(answer = RETURNS_DEEP_STUBS)
private Client client;

// TODO FIX this
@SneakyThrows
// @Test
void testGetJobIdFromFlintIndexMetadata() {
@Test
void testGetJobIdFromFlintSkippingIndexMetadata() {
URL url =
Resources.getResource(
"flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json");
"flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json");
String mappings = Resources.toString(url, Charsets.UTF_8);
String indexName = "flint_my_glue_default_http_logs_size_year_covering_index";
String indexName = "flint_mys3_default_http_logs_skipping_index";
mockNodeClientIndicesMappings(indexName, mappings);
FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client);
String jobId =
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
new IndexDetails(
"size_year",
new FullyQualifiedTableName("my_glue.default.http_logs"),
null,
new FullyQualifiedTableName("mys3.default.http_logs"),
false,
true,
FlintIndexType.SKIPPING));
Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId);
}

@SneakyThrows
@Test
void testGetJobIdFromFlintCoveringIndexMetadata() {
URL url =
Resources.getResource("flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json");
String mappings = Resources.toString(url, Charsets.UTF_8);
String indexName = "flint_mys3_default_http_logs_cv1_index";
mockNodeClientIndicesMappings(indexName, mappings);
FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client);
String jobId =
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
new IndexDetails(
"cv1",
new FullyQualifiedTableName("mys3.default.http_logs"),
false,
true,
FlintIndexType.COVERING));
Assertions.assertEquals("00fdlum58g9g1g0q", jobId);
Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId);
}

@SneakyThrows
@Test
void testGetJobIDWithNPEException() {
URL url = Resources.getResource("flint-index-mappings/npe_mapping.json");
String mappings = Resources.toString(url, Charsets.UTF_8);
String indexName = "flint_mys3_default_http_logs_cv1_index";
mockNodeClientIndicesMappings(indexName, mappings);
FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
new IndexDetails(
"cv1",
new FullyQualifiedTableName("mys3.default.http_logs"),
false,
true,
FlintIndexType.COVERING)));
Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage());
}

@SneakyThrows
@Test
void testGetJobIdFromUnsupportedIndex() {
FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client);
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
new IndexDetails(
"cv1",
new FullyQualifiedTableName("mys3.default.http_logs"),
false,
true,
FlintIndexType.MATERIALIZED_VIEW)));
Assertions.assertEquals(
"Unsupported Index Type : MATERIALIZED_VIEW", unsupportedOperationException.getMessage());
}

@SneakyThrows
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"flint_mys3_default_http_logs_cv1_index": {
"mappings": {
"_doc": {
"_meta": {
"kind": "skipping",
"indexedColumns": [
{
"columnType": "int",
"kind": "VALUE_SET",
"columnName": "status"
}
],
"name": "flint_mys3_default_http_logs_cv1_index",
"options": {},
"source": "mys3.default.http_logs",
"version": "0.1.0",
"properties": {
"env": {
"SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p",
"SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q"
}
}
}
}
},
"settings": {
"index": {
"number_of_shards": 5,
"number_of_replicas": 0,
"max_result_window": 100,
"version": {
"created": "6050399"
}
}
},
"mapping_version": "1",
"settings_version": "1",
"aliases_version": "1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"flint_mys3_default_http_logs_skipping_index": {
"mappings": {
"_doc": {
"_meta": {
"kind": "skipping",
"indexedColumns": [
{
"columnType": "int",
"kind": "VALUE_SET",
"columnName": "status"
}
],
"name": "flint_mys3_default_http_logs_skipping_index",
"options": {},
"source": "mys3.default.http_logs",
"version": "0.1.0",
"properties": {
"env": {
"SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p",
"SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q"
}
}
}
}
},
"settings": {
"index": {
"number_of_shards": 5,
"number_of_replicas": 0,
"max_result_window": 100,
"version": {
"created": "6050399"
}
}
},
"mapping_version": "1",
"settings_version": "1",
"aliases_version": "1"
}
}
35 changes: 35 additions & 0 deletions spark/src/test/resources/flint-index-mappings/npe_mapping.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"flint_mys3_default_http_logs_cv1_index": {
"mappings": {
"_doc": {
"_meta": {
"kind": "skipping",
"indexedColumns": [
{
"columnType": "int",
"kind": "VALUE_SET",
"columnName": "status"
}
],
"name": "flint_mys3_default_http_logs_cv1_index",
"options": {},
"source": "mys3.default.http_logs",
"version": "0.1.0"
}
}
},
"settings": {
"index": {
"number_of_shards": 5,
"number_of_replicas": 0,
"max_result_window": 100,
"version": {
"created": "6050399"
}
}
},
"mapping_version": "1",
"settings_version": "1",
"aliases_version": "1"
}
}

0 comments on commit 3297502

Please sign in to comment.