Skip to content

Commit

Permalink
Add Delete QueryGroup API Logic (#14735)
Browse files Browse the repository at this point in the history
* Add Delete QueryGroup API Logic
Signed-off-by: Ruirui Zhang <[email protected]>

* modify changelog
Signed-off-by: Ruirui Zhang <[email protected]>

* include comments from create pr
Signed-off-by: Ruirui Zhang <[email protected]>

* remove delete all
Signed-off-by: Ruirui Zhang <[email protected]>

* rebase and address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* rebase
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* add UT coverage
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Aug 22, 2024
1 parent d5a6c0b commit ed65482
Show file tree
Hide file tree
Showing 17 changed files with 636 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
- [Workload Management] Add Delete QueryGroup API Logic ([#14735](https://github.com/opensearch-project/OpenSearch/pull/14735))
- [Streaming Indexing] Enhance RestClient with a new streaming API support ([#14437](https://github.com/opensearch-project/OpenSearch/pull/14437))
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))
- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,29 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.inject.Module;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.plugin.wlm.action.CreateQueryGroupAction;
import org.opensearch.plugin.wlm.action.DeleteQueryGroupAction;
import org.opensearch.plugin.wlm.action.GetQueryGroupAction;
import org.opensearch.plugin.wlm.action.TransportCreateQueryGroupAction;
import org.opensearch.plugin.wlm.action.TransportDeleteQueryGroupAction;
import org.opensearch.plugin.wlm.action.TransportGetQueryGroupAction;
import org.opensearch.plugin.wlm.rest.RestCreateQueryGroupAction;
import org.opensearch.plugin.wlm.rest.RestDeleteQueryGroupAction;
import org.opensearch.plugin.wlm.rest.RestGetQueryGroupAction;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand All @@ -46,7 +51,8 @@ public WorkloadManagementPlugin() {}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(
new ActionPlugin.ActionHandler<>(CreateQueryGroupAction.INSTANCE, TransportCreateQueryGroupAction.class),
new ActionPlugin.ActionHandler<>(GetQueryGroupAction.INSTANCE, TransportGetQueryGroupAction.class)
new ActionPlugin.ActionHandler<>(GetQueryGroupAction.INSTANCE, TransportGetQueryGroupAction.class),
new ActionPlugin.ActionHandler<>(DeleteQueryGroupAction.INSTANCE, TransportDeleteQueryGroupAction.class)
);
}

Expand All @@ -60,11 +66,16 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestCreateQueryGroupAction(), new RestGetQueryGroupAction());
return List.of(new RestCreateQueryGroupAction(), new RestGetQueryGroupAction(), new RestDeleteQueryGroupAction());
}

@Override
public List<Setting<?>> getSettings() {
return List.of(QueryGroupPersistenceService.MAX_QUERY_GROUP_COUNT);
}

@Override
public Collection<Module> createGuiceModules() {
return List.of(new WorkloadManagementPluginModule());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.Singleton;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;

/**
* Guice Module to manage WorkloadManagement related objects
*/
public class WorkloadManagementPluginModule extends AbstractModule {

/**
* Constructor for WorkloadManagementPluginModule
*/
public WorkloadManagementPluginModule() {}

@Override
protected void configure() {
// Bind QueryGroupPersistenceService as a singleton to ensure a single instance is used,
// preventing multiple throttling key registrations in the constructor.
bind(QueryGroupPersistenceService.class).in(Singleton.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Transport action for delete QueryGroup
*
* @opensearch.experimental
*/
public class DeleteQueryGroupAction extends ActionType<AcknowledgedResponse> {

/**
/**
* An instance of DeleteQueryGroupAction
*/
public static final DeleteQueryGroupAction INSTANCE = new DeleteQueryGroupAction();

/**
* Name for DeleteQueryGroupAction
*/
public static final String NAME = "cluster:admin/opensearch/wlm/query_group/_delete";

/**
* Default constructor
*/
private DeleteQueryGroupAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request for delete QueryGroup
*
* @opensearch.experimental
*/
public class DeleteQueryGroupRequest extends AcknowledgedRequest<DeleteQueryGroupRequest> {
private final String name;

/**
* Default constructor for DeleteQueryGroupRequest
* @param name - name for the QueryGroup to get
*/
public DeleteQueryGroupRequest(String name) {
this.name = name;
}

/**
* Constructor for DeleteQueryGroupRequest
* @param in - A {@link StreamInput} object
*/
public DeleteQueryGroupRequest(StreamInput in) throws IOException {
super(in);
name = in.readOptionalString();
}

@Override
public ActionRequestValidationException validate() {
if (name == null) {
ActionRequestValidationException actionRequestValidationException = new ActionRequestValidationException();
actionRequestValidationException.addValidationError("QueryGroup name is missing");
return actionRequestValidationException;
}
return null;
}

/**
* Name getter
*/
public String getName() {
return name;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/**
Expand All @@ -24,7 +23,6 @@
*/
public class TransportCreateQueryGroupAction extends HandledTransportAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {

private final ThreadPool threadPool;
private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
Expand All @@ -33,25 +31,21 @@ public class TransportCreateQueryGroupAction extends HandledTransportAction<Crea
* @param actionName - action name
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportCreateQueryGroupAction(
String actionName,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new);
this.threadPool = threadPool;
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener<CreateQueryGroupResponse> listener) {
threadPool.executor(ThreadPool.Names.SAME)
.execute(() -> queryGroupPersistenceService.persistInClusterStateMetadata(request.getQueryGroup(), listener));
queryGroupPersistenceService.persistInClusterStateMetadata(request.getQueryGroup(), listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action for delete QueryGroup
*
* @opensearch.experimental
*/
public class TransportDeleteQueryGroupAction extends TransportClusterManagerNodeAction<DeleteQueryGroupRequest, AcknowledgedResponse> {

private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportDeleteQueryGroupAction
*
* @param clusterService - a {@link ClusterService} object
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param indexNameExpressionResolver - a {@link IndexNameExpressionResolver} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportDeleteQueryGroupAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(
DeleteQueryGroupAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
DeleteQueryGroupRequest::new,
indexNameExpressionResolver
);
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void clusterManagerOperation(
DeleteQueryGroupRequest request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
queryGroupPersistenceService.deleteInClusterStateMetadata(request, listener);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}

@Override
protected ClusterBlockException checkBlock(DeleteQueryGroupRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.rest;

import org.opensearch.client.node.NodeClient;
import org.opensearch.plugin.wlm.action.DeleteQueryGroupAction;
import org.opensearch.plugin.wlm.action.DeleteQueryGroupRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;

import static org.opensearch.rest.RestRequest.Method.DELETE;

/**
* Rest action to delete a QueryGroup
*
* @opensearch.experimental
*/
public class RestDeleteQueryGroupAction extends BaseRestHandler {

/**
* Constructor for RestDeleteQueryGroupAction
*/
public RestDeleteQueryGroupAction() {}

@Override
public String getName() {
return "delete_query_group";
}

/**
* The list of {@link Route}s that this RestHandler is responsible for handling.
*/
@Override
public List<Route> routes() {
return List.of(new Route(DELETE, "_wlm/query_group/{name}"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
DeleteQueryGroupRequest deleteQueryGroupRequest = new DeleteQueryGroupRequest(request.param("name"));
deleteQueryGroupRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", deleteQueryGroupRequest.clusterManagerNodeTimeout())
);
deleteQueryGroupRequest.timeout(request.paramAsTime("timeout", deleteQueryGroupRequest.timeout()));
return channel -> client.execute(DeleteQueryGroupAction.INSTANCE, deleteQueryGroupRequest, new RestToXContentListener<>(channel));
}
}
Loading

0 comments on commit ed65482

Please sign in to comment.