Skip to content

Commit

Permalink
[#145][#146] feat(client): add client side schema/table manipulation …
Browse files Browse the repository at this point in the history
…support (#202)

### What changes were proposed in this pull request?

This PR adds the support of schema and table operations for Graviton
client.

### Why are the changes needed?

With this, user could manipulate schema and tables through Java client.

Fix: #145 
Fix: #146 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add new UTs to cover the code.
  • Loading branch information
jerryshao authored Aug 10, 2023
1 parent 46b1cb6 commit b595f60
Show file tree
Hide file tree
Showing 10 changed files with 1,233 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import com.datastrato.graviton.dto.MetalakeDTO;
import com.datastrato.graviton.dto.requests.CatalogUpdateRequest;
import com.datastrato.graviton.dto.requests.MetalakeUpdateRequest;
import com.datastrato.graviton.dto.requests.SchemaUpdateRequest;
import com.datastrato.graviton.dto.requests.TableUpdateRequest;
import com.datastrato.graviton.rel.SchemaChange;
import com.datastrato.graviton.rel.TableChange;

class DTOConverters {
private DTOConverters() {}
Expand Down Expand Up @@ -92,4 +96,83 @@ static CatalogUpdateRequest toCatalogUpdateRequest(CatalogChange change) {
"Unknown change type: " + change.getClass().getSimpleName());
}
}

static SchemaUpdateRequest toSchemaUpdateRequest(SchemaChange change) {
if (change instanceof SchemaChange.SetProperty) {
return new SchemaUpdateRequest.SetSchemaPropertyRequest(
((SchemaChange.SetProperty) change).getProperty(),
((SchemaChange.SetProperty) change).getValue());

} else if (change instanceof SchemaChange.RemoveProperty) {
return new SchemaUpdateRequest.RemoveSchemaPropertyRequest(
((SchemaChange.RemoveProperty) change).getProperty());

} else {
throw new IllegalArgumentException(
"Unknown change type: " + change.getClass().getSimpleName());
}
}

static TableUpdateRequest toTableUpdateRequest(TableChange change) {
if (change instanceof TableChange.RenameTable) {
return new TableUpdateRequest.RenameTableRequest(
((TableChange.RenameTable) change).getNewName());

} else if (change instanceof TableChange.UpdateComment) {
return new TableUpdateRequest.UpdateTableCommentRequest(
((TableChange.UpdateComment) change).getNewComment());

} else if (change instanceof TableChange.SetProperty) {
return new TableUpdateRequest.SetTablePropertyRequest(
((TableChange.SetProperty) change).getProperty(),
((TableChange.SetProperty) change).getValue());

} else if (change instanceof TableChange.RemoveProperty) {
return new TableUpdateRequest.RemoveTablePropertyRequest(
((TableChange.RemoveProperty) change).getProperty());

} else if (change instanceof TableChange.ColumnChange) {
return toColumnUpdateRequest((TableChange.ColumnChange) change);

} else {
throw new IllegalArgumentException(
"Unknown change type: " + change.getClass().getSimpleName());
}
}

private static TableUpdateRequest toColumnUpdateRequest(TableChange.ColumnChange change) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
return new TableUpdateRequest.AddTableColumnRequest(
addColumn.fieldNames(),
addColumn.getDataType(),
addColumn.getComment(),
addColumn.getPosition());

} else if (change instanceof TableChange.RenameColumn) {
TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) change;
return new TableUpdateRequest.RenameTableColumnRequest(
renameColumn.fieldNames(), renameColumn.getNewName());

} else if (change instanceof TableChange.UpdateColumnType) {
return new TableUpdateRequest.UpdateTableColumnTypeRequest(
change.fieldNames(), ((TableChange.UpdateColumnType) change).getNewDataType());

} else if (change instanceof TableChange.UpdateColumnComment) {
return new TableUpdateRequest.UpdateTableColumnCommentRequest(
change.fieldNames(), ((TableChange.UpdateColumnComment) change).getNewComment());

} else if (change instanceof TableChange.UpdateColumnPosition) {
return new TableUpdateRequest.UpdateTableColumnPositionRequest(
change.fieldNames(), ((TableChange.UpdateColumnPosition) change).getPosition());

} else if (change instanceof TableChange.DeleteColumn) {
return new TableUpdateRequest.DeleteTableColumnRequest(
change.fieldNames(), ((TableChange.DeleteColumn) change).getIfExists());

} else {
throw new IllegalArgumentException(
"Unknown column change type: " + change.getClass().getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
* The ErrorHandler class is an abstract class specialized for handling ErrorResponse objects.
* Subclasses of ErrorHandler must implement the parseResponse method to provide custom parsing
* logic for different types of errors.
*
* @param <ErrorResponse> The type of ErrorResponse that this ErrorHandler handles.
*/
public abstract class ErrorHandler implements Consumer<ErrorResponse> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
import com.datastrato.graviton.exceptions.MetalakeAlreadyExistsException;
import com.datastrato.graviton.exceptions.NoSuchCatalogException;
import com.datastrato.graviton.exceptions.NoSuchMetalakeException;
import com.datastrato.graviton.exceptions.NoSuchSchemaException;
import com.datastrato.graviton.exceptions.NoSuchTableException;
import com.datastrato.graviton.exceptions.NonEmptySchemaException;
import com.datastrato.graviton.exceptions.NotFoundException;
import com.datastrato.graviton.exceptions.RESTException;
import com.datastrato.graviton.exceptions.SchemaAlreadyExistsException;
import com.datastrato.graviton.exceptions.TableAlreadyExistsException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import java.util.List;
Expand Down Expand Up @@ -41,6 +46,24 @@ public static Consumer<ErrorResponse> catalogErrorHandler() {
return CatalogErrorHandler.INSTANCE;
}

/**
* Creates an error handler specific to Schema operations.
*
* @return A Consumer representing the Schema error handler.
*/
public static Consumer<ErrorResponse> schemaErrorHandler() {
return SchemaErrorHandler.INSTANCE;
}

/**
* Creates an error handler specific to Table operations.
*
* @return A Consumer representing the Table error handler.
*/
public static Consumer<ErrorResponse> tableErrorHandler() {
return TableErrorHandler.INSTANCE;
}

/**
* Creates a generic error handler for REST requests.
*
Expand Down Expand Up @@ -83,30 +106,99 @@ private static String formatErrorMessage(ErrorResponse errorResponse) {
}
}

/** Error handler specific to Table operations. */
private static class TableErrorHandler extends RestErrorHandler {
private static final ErrorHandler INSTANCE = new TableErrorHandler();

@Override
public void accept(ErrorResponse errorResponse) {
String errorMessage = formatErrorMessage(errorResponse);

switch (errorResponse.getCode()) {
case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
throw new IllegalArgumentException(errorMessage);

case ErrorConstants.NOT_FOUND_CODE:
if (errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
throw new NoSuchSchemaException(errorMessage);
} else if (errorResponse.getType().equals(NoSuchTableException.class.getSimpleName())) {
throw new NoSuchTableException(errorMessage);
} else {
throw new NotFoundException(errorMessage);
}

case ErrorConstants.ALREADY_EXISTS_CODE:
throw new TableAlreadyExistsException(errorMessage);

case ErrorConstants.INTERNAL_ERROR_CODE:
throw new RuntimeException(errorMessage);
}

super.accept(errorResponse);
}
}

/** Error handler specific to Schema operations. */
private static class SchemaErrorHandler extends RestErrorHandler {
private static final ErrorHandler INSTANCE = new SchemaErrorHandler();

@Override
public void accept(ErrorResponse errorResponse) {
String errorMessage = formatErrorMessage(errorResponse);

switch (errorResponse.getCode()) {
case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
throw new IllegalArgumentException(errorMessage);

case ErrorConstants.NOT_FOUND_CODE:
if (errorResponse.getType().equals(NoSuchCatalogException.class.getSimpleName())) {
throw new NoSuchCatalogException(errorMessage);
} else if (errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
throw new NoSuchSchemaException(errorMessage);
} else {
throw new NotFoundException(errorMessage);
}

case ErrorConstants.ALREADY_EXISTS_CODE:
throw new SchemaAlreadyExistsException(errorMessage);

case ErrorConstants.NON_EMPTY_CODE:
throw new NonEmptySchemaException(errorMessage);

case ErrorConstants.INTERNAL_ERROR_CODE:
throw new RuntimeException(errorMessage);
}

super.accept(errorResponse);
}
}

/** Error handler specific to Catalog operations. */
private static class CatalogErrorHandler extends RestErrorHandler {
private static final ErrorHandler INSTANCE = new CatalogErrorHandler();

@Override
public void accept(ErrorResponse errorResponse) {
String errorMessage = formatErrorMessage(errorResponse);

switch (errorResponse.getCode()) {
case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
throw new IllegalArgumentException(formatErrorMessage(errorResponse));
throw new IllegalArgumentException(errorMessage);

case ErrorConstants.NOT_FOUND_CODE:
if (errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName())) {
throw new NoSuchMetalakeException(formatErrorMessage(errorResponse));
throw new NoSuchMetalakeException(errorMessage);
} else if (errorResponse.getType().equals(NoSuchCatalogException.class.getSimpleName())) {
throw new NoSuchCatalogException(formatErrorMessage(errorResponse));
throw new NoSuchCatalogException(errorMessage);
} else {
throw new NotFoundException(formatErrorMessage(errorResponse));
throw new NotFoundException(errorMessage);
}

case ErrorConstants.ALREADY_EXISTS_CODE:
throw new CatalogAlreadyExistsException(formatErrorMessage(errorResponse));
throw new CatalogAlreadyExistsException(errorMessage);

case ErrorConstants.INTERNAL_ERROR_CODE:
throw new RuntimeException(formatErrorMessage(errorResponse));
throw new RuntimeException(errorMessage);
}

super.accept(errorResponse);
Expand All @@ -119,18 +211,20 @@ private static class MetalakeErrorHandler extends RestErrorHandler {

@Override
public void accept(ErrorResponse errorResponse) {
String errorMessage = formatErrorMessage(errorResponse);

switch (errorResponse.getCode()) {
case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
throw new IllegalArgumentException(formatErrorMessage(errorResponse));
throw new IllegalArgumentException(errorMessage);

case ErrorConstants.NOT_FOUND_CODE:
throw new NoSuchMetalakeException(formatErrorMessage(errorResponse));
throw new NoSuchMetalakeException(errorMessage);

case ErrorConstants.ALREADY_EXISTS_CODE:
throw new MetalakeAlreadyExistsException(formatErrorMessage(errorResponse));
throw new MetalakeAlreadyExistsException(errorMessage);

case ErrorConstants.INTERNAL_ERROR_CODE:
throw new RuntimeException(formatErrorMessage(errorResponse));
throw new RuntimeException(errorMessage);
}

super.accept(errorResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Graviton Metalake is the top-level metadata repository for users. It contains a list of catalogs
* as sub-level metadata collections. With {@link GravitonMetaLake}, users can list, create, load,
* alter and drop a catalog with specified identifier.
*/
public class GravitonMetaLake extends MetalakeDTO implements SupportsCatalogs {

private static final Logger LOG = LoggerFactory.getLogger(GravitonMetaLake.class);
Expand All @@ -46,6 +51,13 @@ public class GravitonMetaLake extends MetalakeDTO implements SupportsCatalogs {
this.restClient = restClient;
}

/**
* List all the catalogs under this metalake with specified namespace.
*
* @param namespace The namespace to list the catalogs under it.
* @return A list of {@link NameIdentifier} of the catalogs under the specified namespace.
* @throws NoSuchMetalakeException if the metalake with specified namespace does not exist.
*/
@Override
public NameIdentifier[] listCatalogs(Namespace namespace) throws NoSuchMetalakeException {
validateCatalogNamespace(namespace);
Expand All @@ -61,6 +73,13 @@ public NameIdentifier[] listCatalogs(Namespace namespace) throws NoSuchMetalakeE
return resp.identifiers();
}

/**
* Load the catalog with specified identifier.
*
* @param ident The identifier of the catalog to load.
* @return The {@link Catalog} with specified identifier.
* @throws NoSuchCatalogException if the catalog with specified identifier does not exist.
*/
@Override
public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException {
validateCatalogIdentifier(ident);
Expand All @@ -76,6 +95,17 @@ public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException {
return DTOConverters.toCatalog(resp.getCatalog(), restClient);
}

/**
* Create a new catalog with specified identifier, type, comment and properties.
*
* @param ident The identifier of the catalog.
* @param type The type of the catalog.
* @param comment The comment of the catalog.
* @param properties The properties of the catalog.
* @return The created {@link Catalog}.
* @throws NoSuchMetalakeException if the metalake with specified namespace does not exist.
* @throws CatalogAlreadyExistsException if the catalog with specified identifier already exists.
*/
@Override
public Catalog createCatalog(
NameIdentifier ident, Catalog.Type type, String comment, Map<String, String> properties)
Expand All @@ -97,6 +127,15 @@ public Catalog createCatalog(
return DTOConverters.toCatalog(resp.getCatalog(), restClient);
}

/**
* Alter the catalog with specified identifier by applying the changes.
*
* @param ident the identifier of the catalog.
* @param changes the changes to apply to the catalog.
* @return the altered {@link Catalog}.
* @throws NoSuchCatalogException if the catalog with specified identifier does not exist.
* @throws IllegalArgumentException if the changes are invalid.
*/
@Override
public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
throws NoSuchCatalogException, IllegalArgumentException {
Expand All @@ -121,6 +160,12 @@ public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
return DTOConverters.toCatalog(resp.getCatalog(), restClient);
}

/**
* Drop the catalog with specified identifier.
*
* @param ident the identifier of the catalog.
* @return true if the catalog is dropped successfully, false otherwise.
*/
@Override
public boolean dropCatalog(NameIdentifier ident) {
validateCatalogIdentifier(ident);
Expand Down
Loading

0 comments on commit b595f60

Please sign in to comment.