Skip to content

Commit

Permalink
PPL in search API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsi-amazon committed Feb 26, 2024
1 parent fcc4be3 commit e390c32
Show file tree
Hide file tree
Showing 3 changed files with 412 additions and 2 deletions.
21 changes: 19 additions & 2 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ScriptPlugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand Down Expand Up @@ -71,12 +72,15 @@
import org.opensearch.sql.opensearch.storage.script.ExpressionScriptEngine;
import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.plugin.config.OpenSearchPluginModule;
import org.opensearch.sql.plugin.queryengine.PPLQueryEngine;
import org.opensearch.sql.plugin.queryengine.SQLQueryEngine;
import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
import org.opensearch.sql.plugin.rest.RestPPLStatsAction;
import org.opensearch.sql.plugin.rest.RestQuerySettingsAction;
import org.opensearch.sql.plugin.transport.PPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.ppl.PPLService;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
Expand All @@ -89,13 +93,14 @@
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse;
import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse;
import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse;
import org.opensearch.sql.sql.SQLService;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {
public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SearchPlugin {

private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class);

Expand All @@ -109,7 +114,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {
private Injector injector;

public String name() {
return "sql";
return "sql_plugin";
}

public String description() {
Expand Down Expand Up @@ -222,6 +227,8 @@ public Collection<Object> createComponents(
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
PPLQueryEngine.initialize(injector.getInstance(PPLService.class));
SQLQueryEngine.initialize(injector.getInstance(SQLService.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down Expand Up @@ -283,4 +290,14 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
}

@Override
public List<QueryEngineSpec<?>> getQueryEnginesSpecs() {
return Arrays.asList( new QueryEngineSpec<>(
PPLQueryEngine.NAME, PPLQueryEngine::new, PPLQueryEngine::fromXContent),
new QueryEngineSpec<>(
SQLQueryEngine.NAME, SQLQueryEngine::new, SQLQueryEngine::fromXContent)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package org.opensearch.sql.plugin.queryengine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import org.apache.lucene.search.TotalHits;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.core.ParseField;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.externalengine.QueryEngineExtBuilder;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.ppl.PPLService;
import org.opensearch.sql.ppl.domain.PPLQueryRequest;

public class PPLQueryEngine extends QueryEngine {

public static final String NAME = "ppl";
private static PPLService pplService;
private String query;

public static void initialize(PPLService pplService) {
PPLQueryEngine.pplService = pplService;
}

@Override
public void executeQuery(
SearchRequest searchRequest, ActionListener<SearchResponse> actionListener) {
PPLQueryRequest pplQueryRequest = new PPLQueryRequest(query, null, "_search", "json");
pplService.execute(
pplQueryRequest,
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse queryResponse) {
SearchResponse searchResponse =
transformFromQueryResponseToSearchResponse(queryResponse);
actionListener.onResponse(searchResponse);
}

@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
});
}

private SearchResponse transformFromQueryResponseToSearchResponse(
ExecutionEngine.QueryResponse queryResponse) {
SearchHit[] hits = new SearchHit[0];
return new SearchResponse(
new InternalSearchResponse(
new SearchHits(hits, new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0F),
(InternalAggregations) null,
null,
null,
false,
(Boolean) null,
1,
Collections.emptyList(),
List.of(new PPLQueryEngine.PPLResponseExternalBuilder(queryResponse))),
(String) null,
0,
0,
0,
0L,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
null);
}

static class PPLResponseExternalBuilder extends QueryEngineExtBuilder {

static ParseField DUMMY_FIELD = new ParseField("ppl");

protected final ExecutionEngine.QueryResponse queryResponse;

public PPLResponseExternalBuilder(ExecutionEngine.QueryResponse queryResponse) {
this.queryResponse = queryResponse;
}

public PPLResponseExternalBuilder(StreamInput in) throws IOException {
this.queryResponse = null;
}

@Override
public String getWriteableName() {
return DUMMY_FIELD.getPreferredName();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString("1");
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Serialize the schema
builder.startObject(NAME);
ArrayList<String> columnNames = new ArrayList<>();
builder.startArray("schema");
for (ExecutionEngine.Schema.Column column : queryResponse.getSchema().getColumns()) {
builder.startObject();
String columnName = getColumnName(column);
columnNames.add(columnName);
builder.field("name", columnName);
builder.field("type", column.getExprType().typeName().toLowerCase(Locale.ROOT));
builder.endObject();
}
builder.endArray();
builder.startArray("datarows");
for (ExprValue result : queryResponse.getResults()) {
builder.startArray();
for (String columnName : columnNames) {
builder.value(result.tupleValue().get(columnName).value());
}
builder.endArray();
}
builder.endArray();
builder.field("total", queryResponse.getResults().size());
builder.field("size", queryResponse.getResults().size());
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return 0;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
return true;
}

public static PPLQueryEngine.PPLResponseExternalBuilder parse(XContentParser parser)
throws IOException {
return null;
}

private String getColumnName(ExecutionEngine.Schema.Column column) {
return (column.getAlias() != null) ? column.getAlias() : column.getName();
}

}

public PPLQueryEngine(String query) {
this.query = query;
}

public PPLQueryEngine(StreamInput in) {}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return null;
}

public static QueryEngine fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
String query = "";
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
token = parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
}
}
return new PPLQueryEngine(query);
}


}
Loading

0 comments on commit e390c32

Please sign in to comment.