Skip to content

Commit

Permalink
REF: index template wrapper
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 committed Jul 25, 2023
1 parent 8601d8f commit 67fa0a1
Show file tree
Hide file tree
Showing 13 changed files with 1,293 additions and 418 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class ComposableIndexTemplate implements IndexTemplate {

private final Map<String, Object> indexTemplateMap;
private String name;

public ComposableIndexTemplate(final Map<String, Object> indexTemplateMap) {
this.indexTemplateMap = new HashMap<>(indexTemplateMap);
}

@Override
public void setTemplateName(final String name) {
this.name = name;

}

@Override
public void setIndexPatterns(final List<String> indexPatterns) {
indexTemplateMap.put("index_patterns", indexPatterns);
}

@Override
public void putCustomSetting(final String name, final Object value) {

}

@Override
public Optional<Long> getVersion() {
if(!indexTemplateMap.containsKey("version"))
return Optional.empty();
final Number version = (Number) indexTemplateMap.get("version");
return Optional.of(version.longValue());
}

public Map<String, Object> getIndexTemplateMap() {
return Collections.unmodifiableMap(indexTemplateMap);
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.ObjectBuilderDeserializer;
import org.opensearch.client.json.ObjectDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse;
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.get_index_template.IndexTemplateItem;
import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping;
import org.opensearch.client.transport.endpoints.BooleanResponse;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;

public class ComposableTemplateAPIWrapper implements IndexTemplateAPIWrapper<IndexTemplateItem> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final OpenSearchClient openSearchClient;

public ComposableTemplateAPIWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
}

@Override
public void putTemplate(final IndexTemplate indexTemplate) throws IOException {
if(!(indexTemplate instanceof ComposableIndexTemplate)) {
throw new IllegalArgumentException("Unexpected indexTemplate provided to createTemplate.");
}

final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate;
final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(
composableIndexTemplate.getIndexTemplateMap());

final ByteArrayInputStream byteIn = new ByteArrayInputStream(
indexTemplateString.getBytes(StandardCharsets.UTF_8));
final JsonpMapper mapper = openSearchClient._transport().jsonpMapper();
final JsonParser parser = mapper.jsonProvider().createParser(byteIn);

final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer
.getJsonpDeserializer(composableIndexTemplate.getName())
.deserialize(parser, mapper);

openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest);
}

@Override
public Optional<IndexTemplateItem> getTemplate(final String indexTemplateName) throws IOException {
final ExistsIndexTemplateRequest existsRequest = new ExistsIndexTemplateRequest.Builder()
.name(indexTemplateName)
.build();
final BooleanResponse existsResponse = openSearchClient.indices().existsIndexTemplate(existsRequest);

if (!existsResponse.value()) {
return Optional.empty();
}

final GetIndexTemplateRequest getRequest = new GetIndexTemplateRequest.Builder()
.name(indexTemplateName)
.build();
final GetIndexTemplateResponse indexTemplateResponse = openSearchClient.indices().getIndexTemplate(getRequest);

final List<IndexTemplateItem> indexTemplateItems = indexTemplateResponse.indexTemplates();
if (indexTemplateItems.size() == 1) {
return indexTemplateItems.stream().findFirst();
} else {
throw new RuntimeException(String.format("Found zero or multiple index templates result when querying for %s",
indexTemplateName));
}
}

private static class PutIndexTemplateRequestDeserializer {
private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer<PutIndexTemplateRequest.Builder> objectDeserializer) {

objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name");
objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"index_patterns");
objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version");
objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority");
objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"composed_of");
objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template");
}

static JsonpDeserializer<PutIndexTemplateRequest> getJsonpDeserializer(final String name) {
return ObjectBuilderDeserializer
.lazy(
() -> new PutIndexTemplateRequest.Builder().name(name),
PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch.indices.ExistsTemplateRequest;
import org.opensearch.client.opensearch.indices.GetTemplateRequest;
import org.opensearch.client.opensearch.indices.GetTemplateResponse;
import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient;
import org.opensearch.client.opensearch.indices.PutTemplateRequest;
import org.opensearch.client.opensearch.indices.PutTemplateResponse;
import org.opensearch.client.opensearch.indices.TemplateMapping;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.transport.endpoints.SimpleEndpoint;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class Es6IndexTemplateAPIWrapper implements IndexTemplateAPIWrapper<TemplateMapping> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final OpenSearchClient openSearchClient;

public Es6IndexTemplateAPIWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
}

@Override
public void putTemplate(final IndexTemplate indexTemplate) throws IOException {
if(!(indexTemplate instanceof LegacyIndexTemplate)) {
throw new IllegalArgumentException("Unexpected indexTemplate provided to createTemplate.");
}

final Map<String, Object> templateMapping = ((LegacyIndexTemplate) indexTemplate).getTemplateMap();
final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(templateMapping);

// Parse byte array to Map
final ByteArrayInputStream byteIn = new ByteArrayInputStream(
indexTemplateString.getBytes(StandardCharsets.UTF_8));
final JsonpMapper mapper = openSearchClient._transport().jsonpMapper();
final JsonParser parser = mapper.jsonProvider().createParser(byteIn);

final PutTemplateRequest putTemplateRequest = PutTemplateRequestDeserializer.getJsonpDeserializer()
.deserialize(parser, mapper);

final OpenSearchIndicesClient openSearchIndicesClient = openSearchClient.indices();
final JsonEndpoint<PutTemplateRequest, PutTemplateResponse, ErrorResponse> endpoint = es6PutTemplateEndpoint(putTemplateRequest);
openSearchIndicesClient._transport().performRequest(putTemplateRequest, endpoint, openSearchIndicesClient._transportOptions());
}

@Override
public Optional<TemplateMapping> getTemplate(final String templateName) throws IOException {
final ExistsTemplateRequest existsTemplateRequest = new ExistsTemplateRequest.Builder()
.name(templateName)
.build();
final BooleanResponse booleanResponse = openSearchClient.indices().existsTemplate(
existsTemplateRequest);
if (!booleanResponse.value()) {
return Optional.empty();
}

final GetTemplateRequest getTemplateRequest = new GetTemplateRequest.Builder()
.name(templateName)
.build();
final GetTemplateResponse response = openSearchClient.indices().getTemplate(getTemplateRequest);

if (response.result().size() == 1) {
return response.result().values().stream().findFirst();
} else {
throw new RuntimeException(String.format("Found zero or multiple index templates result when querying for %s",
templateName));
}
}

private JsonEndpoint<PutTemplateRequest, PutTemplateResponse, ErrorResponse> es6PutTemplateEndpoint(
final PutTemplateRequest putTemplateRequest) {
return new SimpleEndpoint<>(

// Request method
request -> {
return "PUT";

},

// Request path
request -> {
final int _name = 1 << 0;

int propsSet = 0;

propsSet |= _name;

if (propsSet == (_name)) {
StringBuilder buf = new StringBuilder();
buf.append("/_template");
buf.append("/");
SimpleEndpoint.pathEncode(request.name(), buf);
buf.append("?include_type_name=false");
return buf.toString();
}
throw SimpleEndpoint.noPathTemplateFound("path");

},

// Request parameters
request -> {
Map<String, String> params = new HashMap<>();
if (request.masterTimeout() != null) {
params.put("master_timeout", request.masterTimeout()._toJsonString());
}
if (request.clusterManagerTimeout() != null) {
params.put("cluster_manager_timeout", request.clusterManagerTimeout()._toJsonString());
}
if (request.flatSettings() != null) {
params.put("flat_settings", String.valueOf(request.flatSettings()));
}
if (request.create() != null) {
params.put("create", String.valueOf(request.create()));
}
if (request.timeout() != null) {
params.put("timeout", request.timeout()._toJsonString());
}
return params;

}, SimpleEndpoint.emptyMap(), true, PutTemplateResponse._DESERIALIZER);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import java.io.IOException;
import java.util.Optional;

public interface IndexTemplateAPIWrapper<T> {
void putTemplate(IndexTemplate indexTemplate) throws IOException;

Optional<T> getTemplate(String name) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;

public class IndexTemplateAPIWrapperFactory {
public static IndexTemplateAPIWrapper getWrapper(final IndexConfiguration indexConfiguration,
final OpenSearchClient openSearchClient) {
if (DistributionVersion.ES6.equals(indexConfiguration.getDistributionVersion())) {
return new Es6IndexTemplateAPIWrapper(openSearchClient);
} else if (TemplateType.V1.equals(indexConfiguration.getTemplateType())) {
return new OpenSearchLegacyTemplateAPIWrapper(openSearchClient);
} else {
return new ComposableTemplateAPIWrapper(openSearchClient);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class LegacyIndexTemplate implements IndexTemplate {

public static final String SETTINGS_KEY = "settings";
private final Map<String, Object> templateMap;

public LegacyIndexTemplate(final Map<String, Object> templateMap) {
this.templateMap = new HashMap<>(templateMap);
if(this.templateMap.containsKey(SETTINGS_KEY)) {
final HashMap<String, Object> copiedSettings = new HashMap<>((Map<String, Object>) this.templateMap.get(SETTINGS_KEY));
this.templateMap.put(SETTINGS_KEY, copiedSettings);
}
}

@Override
public void setTemplateName(final String name) {
templateMap.put("name", name);
}

@Override
public void setIndexPatterns(final List<String> indexPatterns) {
templateMap.put("index_patterns", indexPatterns);
}

@Override
public void putCustomSetting(final String name, final Object value) {
Map<String, Object> settings = (Map<String, Object>) this.templateMap.computeIfAbsent(SETTINGS_KEY, x -> new HashMap<>());
settings.put(name, value);
}

@Override
public Optional<Long> getVersion() {
if(!templateMap.containsKey("version"))
return Optional.empty();
final Number version = (Number) templateMap.get("version");
return Optional.of(version.longValue());
}

Map<String, Object> getTemplateMap() {
return this.templateMap;
}
}
Loading

0 comments on commit 67fa0a1

Please sign in to comment.