Skip to content

Commit

Permalink
[Feature] Import/Export For Table, DatabaseSchema, Databases, Service (
Browse files Browse the repository at this point in the history
…open-metadata#15816)

* - Add Import Export Separation for GlossaryTerms

* - Fixed Table Resrouce Test

* - Review Comment #2

* - GlossaryTestFix, Glossary does not allow Tier Tags

* - Database Schema Tests Fix

* - Create Database, DatabaseSchema, DatabaseService import entity if not exists

* - Fix Test for Database DatabaseSchema, Table
  • Loading branch information
mohityadav766 authored Apr 6, 2024
1 parent 61bc856 commit 5a88d15
Show file tree
Hide file tree
Showing 16 changed files with 615 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,45 @@ public static List<String> addTagLabels(List<String> csvRecord, List<TagLabel> t
csvRecord.add(
nullOrEmpty(tags)
? null
: tags.stream().map(TagLabel::getTagFQN).collect(Collectors.joining(FIELD_SEPARATOR)));
: tags.stream()
.filter(
tagLabel ->
tagLabel.getSource().equals(TagLabel.TagSource.CLASSIFICATION)
&& !tagLabel.getTagFQN().split("\\.")[0].equals("Tier")
&& !tagLabel.getLabelType().equals(TagLabel.LabelType.DERIVED))
.map(TagLabel::getTagFQN)
.collect(Collectors.joining(FIELD_SEPARATOR)));

return csvRecord;
}

public static List<String> addGlossaryTerms(List<String> csvRecord, List<TagLabel> tags) {
csvRecord.add(
nullOrEmpty(tags)
? null
: tags.stream()
.filter(
tagLabel ->
tagLabel.getSource().equals(TagLabel.TagSource.GLOSSARY)
&& !tagLabel.getTagFQN().split("\\.")[0].equals("Tier"))
.map(TagLabel::getTagFQN)
.collect(Collectors.joining(FIELD_SEPARATOR)));

return csvRecord;
}

public static List<String> addTagTiers(List<String> csvRecord, List<TagLabel> tags) {
csvRecord.add(
nullOrEmpty(tags)
? null
: tags.stream()
.filter(
tagLabel ->
tagLabel.getSource().equals(TagLabel.TagSource.CLASSIFICATION)
&& tagLabel.getTagFQN().split("\\.")[0].equals("Tier"))
.map(TagLabel::getTagFQN)
.collect(Collectors.joining(FIELD_SEPARATOR)));

return csvRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.commons.csv.CSVFormat.Builder;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
Expand Down Expand Up @@ -275,20 +276,26 @@ protected final List<EntityReference> getEntityReferences(
}

protected final List<TagLabel> getTagLabels(
CSVPrinter printer, CSVRecord csvRecord, int fieldNumber) throws IOException {
CSVPrinter printer,
CSVRecord csvRecord,
List<Pair<Integer, TagSource>> fieldNumbersWithSource)
throws IOException {
if (!processRecord) {
return null;
}
List<EntityReference> refs = getEntityReferences(printer, csvRecord, fieldNumber, Entity.TAG);
if (!processRecord || nullOrEmpty(refs)) {
return null;
}
List<TagLabel> tagLabels = new ArrayList<>();
for (EntityReference ref : refs) {
tagLabels.add(
new TagLabel()
.withSource(TagSource.CLASSIFICATION)
.withTagFQN(ref.getFullyQualifiedName()));
for (Pair<Integer, TagSource> pair : fieldNumbersWithSource) {
int fieldNumbers = pair.getLeft();
TagSource source = pair.getRight();
List<EntityReference> refs =
source == TagSource.CLASSIFICATION
? getEntityReferences(printer, csvRecord, fieldNumbers, Entity.TAG)
: getEntityReferences(printer, csvRecord, fieldNumbers, Entity.GLOSSARY_TERM);
if (processRecord && !nullOrEmpty(refs)) {
for (EntityReference ref : refs) {
tagLabels.add(new TagLabel().withSource(source).withTagFQN(ref.getFullyQualifiedName()));
}
}
}
return tagLabels;
}
Expand Down Expand Up @@ -391,6 +398,7 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en
responseStatus = response.getStatus();
} catch (Exception ex) {
importFailure(resultsPrinter, ex.getMessage(), csvRecord);
importResult.setStatus(ApiStatus.FAILURE);
return;
}
} else { // Dry run don't create the entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package org.openmetadata.service.jdbi3;

import static org.openmetadata.csv.CsvUtil.addField;
import static org.openmetadata.csv.CsvUtil.addGlossaryTerms;
import static org.openmetadata.csv.CsvUtil.addOwner;
import static org.openmetadata.csv.CsvUtil.addTagLabels;
import static org.openmetadata.csv.CsvUtil.addTagTiers;
import static org.openmetadata.service.Entity.DATABASE_SCHEMA;

import java.io.IOException;
Expand All @@ -26,6 +28,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.csv.EntityCsv;
import org.openmetadata.schema.EntityInterface;
Expand All @@ -36,6 +39,7 @@
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
Expand Down Expand Up @@ -116,7 +120,12 @@ public String exportToCsv(String name, String user) throws IOException {
@Override
public CsvImportResult importFromCsv(String name, String csv, boolean dryRun, String user)
throws IOException {
Database database = getByName(null, name, Fields.EMPTY_FIELDS); // Validate glossary name
Database database =
getByName(
null,
name,
getFields(
"service")); // Validate glossary name, and get service needed in case of create
DatabaseCsv databaseCsv = new DatabaseCsv(database, user);
return databaseCsv.importCsv(csv, dryRun);
}
Expand Down Expand Up @@ -234,22 +243,33 @@ protected void createEntity(CSVPrinter printer, List<CSVRecord> csvRecords) thro
try {
schema = Entity.getEntityByName(DATABASE_SCHEMA, schemaFqn, "*", Include.NON_DELETED);
} catch (Exception ex) {
importFailure(printer, entityNotFound(0, DATABASE_SCHEMA, schemaFqn), csvRecord);
processRecord = false;
return;
LOG.warn("Database Schema not found: {}, it will be created with Import.", schemaFqn);
schema =
new DatabaseSchema()
.withDatabase(database.getEntityReference())
.withService(database.getService());
}

// Headers: name, displayName, description, owner, tags, retentionPeriod, sourceUrl, domain
// Headers: name, displayName, description, owner, tags, glossaryTerms, tiers retentionPeriod,
// sourceUrl, domain
// Field 1,2,3,6,7 - database schema name, displayName, description
List<TagLabel> tagLabels =
getTagLabels(
printer,
csvRecord,
List.of(
Pair.of(4, TagLabel.TagSource.CLASSIFICATION),
Pair.of(5, TagLabel.TagSource.GLOSSARY),
Pair.of(6, TagLabel.TagSource.CLASSIFICATION)));
schema
.withName(csvRecord.get(0))
.withDisplayName(csvRecord.get(1))
.withDescription(csvRecord.get(2))
.withOwner(getOwner(printer, csvRecord, 3))
.withTags(getTagLabels(printer, csvRecord, 4))
.withRetentionPeriod(csvRecord.get(5))
.withSourceUrl(csvRecord.get(6))
.withDomain(getEntityReference(printer, csvRecord, 7, Entity.DOMAIN));
.withTags(tagLabels)
.withRetentionPeriod(csvRecord.get(7))
.withSourceUrl(csvRecord.get(8))
.withDomain(getEntityReference(printer, csvRecord, 9, Entity.DOMAIN));
if (processRecord) {
createEntity(printer, csvRecord, schema);
}
Expand All @@ -264,6 +284,8 @@ protected void addRecord(CsvFile csvFile, DatabaseSchema entity) {
addField(recordList, entity.getDescription());
addOwner(recordList, entity.getOwner());
addTagLabels(recordList, entity.getTags());
addGlossaryTerms(recordList, entity.getTags());
addTagTiers(recordList, entity.getTags());
addField(recordList, entity.getRetentionPeriod());
addField(recordList, entity.getSourceUrl());
String domain =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package org.openmetadata.service.jdbi3;

import static org.openmetadata.csv.CsvUtil.addField;
import static org.openmetadata.csv.CsvUtil.addGlossaryTerms;
import static org.openmetadata.csv.CsvUtil.addOwner;
import static org.openmetadata.csv.CsvUtil.addTagLabels;
import static org.openmetadata.csv.CsvUtil.addTagTiers;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.DATABASE_SCHEMA;
import static org.openmetadata.service.Entity.TABLE;
Expand All @@ -29,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.csv.EntityCsv;
import org.openmetadata.schema.EntityInterface;
Expand All @@ -39,6 +42,7 @@
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.csv.CsvDocumentation;
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
Expand Down Expand Up @@ -188,7 +192,8 @@ public String exportToCsv(String name, String user) throws IOException {
@Override
public CsvImportResult importFromCsv(String name, String csv, boolean dryRun, String user)
throws IOException {
DatabaseSchema schema = getByName(null, name, Fields.EMPTY_FIELDS); // Validate database schema
DatabaseSchema schema =
getByName(null, name, getFields("database,service")); // Validate database schema
return new DatabaseSchemaCsv(schema, user).importCsv(csv, dryRun);
}

Expand Down Expand Up @@ -266,21 +271,35 @@ protected void createEntity(CSVPrinter printer, List<CSVRecord> csvRecords) thro
try {
table = Entity.getEntityByName(TABLE, tableFqn, "*", Include.NON_DELETED);
} catch (Exception ex) {
importFailure(printer, entityNotFound(0, TABLE, tableFqn), csvRecord);
processRecord = false;
return;
LOG.warn("Table not found: {}, it will be created with Import.", tableFqn);
table =
new Table()
.withService(schema.getService())
.withDatabase(schema.getDatabase())
.withDatabaseSchema(schema.getEntityReference());
}

// Headers: name, displayName, description, owner, tags, retentionPeriod, sourceUrl, domain
// Headers: name, displayName, description, owner, tags, glossaryTerms, tiers retentionPeriod,
// sourceUrl, domain
// Field 1,2,3,6,7 - database schema name, displayName, description
List<TagLabel> tagLabels =
getTagLabels(
printer,
csvRecord,
List.of(
Pair.of(4, TagLabel.TagSource.CLASSIFICATION),
Pair.of(5, TagLabel.TagSource.GLOSSARY),
Pair.of(6, TagLabel.TagSource.CLASSIFICATION)));
table
.withName(csvRecord.get(0))
.withDisplayName(csvRecord.get(1))
.withDescription(csvRecord.get(2))
.withOwner(getOwner(printer, csvRecord, 3))
.withTags(getTagLabels(printer, csvRecord, 4))
.withRetentionPeriod(csvRecord.get(5))
.withSourceUrl(csvRecord.get(6))
.withDomain(getEntityReference(printer, csvRecord, 7, Entity.DOMAIN));
.withTags(tagLabels)
.withRetentionPeriod(csvRecord.get(7))
.withSourceUrl(csvRecord.get(8))
.withColumns(new ArrayList<>())
.withDomain(getEntityReference(printer, csvRecord, 9, Entity.DOMAIN));

if (processRecord) {
createEntity(printer, csvRecord, table);
Expand All @@ -296,6 +315,8 @@ protected void addRecord(CsvFile csvFile, Table entity) {
addField(recordList, entity.getDescription());
addOwner(recordList, entity.getOwner());
addTagLabels(recordList, entity.getTags());
addGlossaryTerms(recordList, entity.getTags());
addTagTiers(recordList, entity.getTags());
addField(recordList, entity.getRetentionPeriod());
addField(recordList, entity.getSourceUrl());
String domain =
Expand Down
Loading

0 comments on commit 5a88d15

Please sign in to comment.