Skip to content

Commit

Permalink
#1078 keeping table files
Browse files Browse the repository at this point in the history
  • Loading branch information
fmendezh committed Oct 11, 2024
1 parent 250ee38 commit 6f1eab0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ public static void run(
PipelineResult result = p.run();

if (PipelineResult.State.DONE == result.waitUntilFinish()) {
Mutex.Action action = () -> HdfsViewAvroUtils.cleanAndMove(options);
Mutex.Action action = () -> HdfsViewAvroUtils.copyAndOverwrite(options);
if (options.getTestMode()) {
action.execute();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,6 @@
import org.gbif.pipelines.common.beam.utils.PathBuilder;
import org.gbif.pipelines.core.pojo.HdfsConfigs;
import org.gbif.pipelines.core.utils.FsUtils;
import org.gbif.pipelines.io.avro.extension.ac.AudubonTable;
import org.gbif.pipelines.io.avro.extension.dwc.ChronometricAgeTable;
import org.gbif.pipelines.io.avro.extension.dwc.IdentificationTable;
import org.gbif.pipelines.io.avro.extension.dwc.MeasurementOrFactTable;
import org.gbif.pipelines.io.avro.extension.dwc.ResourceRelationshipTable;
import org.gbif.pipelines.io.avro.extension.gbif.DnaDerivedDataTable;
import org.gbif.pipelines.io.avro.extension.gbif.IdentifierTable;
import org.gbif.pipelines.io.avro.extension.gbif.ImageTable;
import org.gbif.pipelines.io.avro.extension.gbif.MultimediaTable;
import org.gbif.pipelines.io.avro.extension.gbif.ReferenceTable;
import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmAccessionTable;
import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmMeasurementScoreTable;
import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmMeasurementTraitTable;
import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmMeasurementTrialTable;
import org.gbif.pipelines.io.avro.extension.ggbn.AmplificationTable;
import org.gbif.pipelines.io.avro.extension.ggbn.CloningTable;
import org.gbif.pipelines.io.avro.extension.ggbn.GelImageTable;
import org.gbif.pipelines.io.avro.extension.ggbn.LoanTable;
import org.gbif.pipelines.io.avro.extension.ggbn.MaterialSampleTable;
import org.gbif.pipelines.io.avro.extension.ggbn.PermitTable;
import org.gbif.pipelines.io.avro.extension.ggbn.PreparationTable;
import org.gbif.pipelines.io.avro.extension.ggbn.PreservationTable;
import org.gbif.pipelines.io.avro.extension.obis.ExtendedMeasurementOrFactTable;

@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
Expand All @@ -45,143 +22,87 @@ public class HdfsViewAvroUtils {
* Cleans empty avro files for extensions and copies all avro files into the directory from
* targetPath. Deletes pre-existing data of the dataset being processed.
*/
public static void cleanAndMove(InterpretationPipelineOptions options) {
public static void copyAndOverwrite(InterpretationPipelineOptions options) {
if (options.getInterpretationTypes().size() == 1
&& options.getInterpretationTypes().contains(OCCURRENCE.name())) {
moveOccurrence(options);
copyOccurrence(options);
} else if (options.getInterpretationTypes().size() == 1
&& options.getInterpretationTypes().contains(EVENT.name())) {
move(options, EVENT);
copyAndOverwrite(options, EVENT);
} else {
moveAll(options);
copyAll(options);
}
}

private static void moveOccurrence(InterpretationPipelineOptions options) {
move(options, OCCURRENCE);
private static void copyOccurrence(InterpretationPipelineOptions options) {
copyAndOverwrite(options, OCCURRENCE);
}

private static void cleanAndMoveTables(InterpretationPipelineOptions opt, RecordType coreType) {

cleanMove(
opt,
coreType,
MEASUREMENT_OR_FACT_TABLE,
Extension.MEASUREMENT_OR_FACT,
MeasurementOrFactTable.class);
cleanMove(
opt, coreType, IDENTIFICATION_TABLE, Extension.IDENTIFICATION, IdentificationTable.class);
cleanMove(
opt,
coreType,
RESOURCE_RELATIONSHIP_TABLE,
Extension.RESOURCE_RELATIONSHIP,
ResourceRelationshipTable.class);
cleanMove(
opt, coreType, AMPLIFICATION_TABLE, Extension.AMPLIFICATION, AmplificationTable.class);
cleanMove(opt, coreType, CLONING_TABLE, Extension.CLONING, CloningTable.class);
cleanMove(opt, coreType, GEL_IMAGE_TABLE, Extension.GEL_IMAGE, GelImageTable.class);
cleanMove(opt, coreType, LOAN_TABLE, Extension.LOAN, LoanTable.class);
cleanMove(
opt, coreType, MATERIAL_SAMPLE_TABLE, Extension.MATERIAL_SAMPLE, MaterialSampleTable.class);
cleanMove(opt, coreType, PERMIT_TABLE, Extension.PERMIT, PermitTable.class);
cleanMove(opt, coreType, PREPARATION_TABLE, Extension.PREPARATION, PreparationTable.class);
cleanMove(opt, coreType, PRESERVATION_TABLE, Extension.PRESERVATION, PreservationTable.class);
cleanMove(
opt,
coreType,
GERMPLASM_MEASUREMENT_SCORE_TABLE,
Extension.GERMPLASM_MEASUREMENT_SCORE,
GermplasmMeasurementScoreTable.class);
cleanMove(
opt,
coreType,
GERMPLASM_MEASUREMENT_TRAIT_TABLE,
Extension.GERMPLASM_MEASUREMENT_TRAIT,
GermplasmMeasurementTraitTable.class);
cleanMove(
opt,
coreType,
GERMPLASM_MEASUREMENT_TRIAL_TABLE,
Extension.GERMPLASM_MEASUREMENT_TRIAL,
GermplasmMeasurementTrialTable.class);
cleanMove(
opt,
coreType,
GERMPLASM_ACCESSION_TABLE,
Extension.GERMPLASM_ACCESSION,
GermplasmAccessionTable.class);
cleanMove(
opt,
coreType,
EXTENDED_MEASUREMENT_OR_FACT_TABLE,
Extension.EXTENDED_MEASUREMENT_OR_FACT,
ExtendedMeasurementOrFactTable.class);
cleanMove(
opt,
coreType,
CHRONOMETRIC_AGE_TABLE,
Extension.CHRONOMETRIC_AGE,
ChronometricAgeTable.class);
cleanMove(opt, coreType, REFERENCE_TABLE, Extension.REFERENCE, ReferenceTable.class);
cleanMove(opt, coreType, IDENTIFIER_TABLE, Extension.IDENTIFIER, IdentifierTable.class);
cleanMove(opt, coreType, AUDUBON_TABLE, Extension.AUDUBON, AudubonTable.class);
cleanMove(opt, coreType, IMAGE_TABLE, Extension.IMAGE, ImageTable.class);
cleanMove(opt, coreType, MULTIMEDIA_TABLE, Extension.MULTIMEDIA, MultimediaTable.class);
cleanMove(
opt,
coreType,
DNA_DERIVED_DATA_TABLE,
Extension.DNA_DERIVED_DATA,
DnaDerivedDataTable.class);
copyAndOverwrite(opt, coreType, MEASUREMENT_OR_FACT_TABLE, Extension.MEASUREMENT_OR_FACT);
copyAndOverwrite(opt, coreType, IDENTIFICATION_TABLE, Extension.IDENTIFICATION);
copyAndOverwrite(opt, coreType, RESOURCE_RELATIONSHIP_TABLE, Extension.RESOURCE_RELATIONSHIP);
copyAndOverwrite(opt, coreType, AMPLIFICATION_TABLE, Extension.AMPLIFICATION);
copyAndOverwrite(opt, coreType, CLONING_TABLE, Extension.CLONING);
copyAndOverwrite(opt, coreType, GEL_IMAGE_TABLE, Extension.GEL_IMAGE);
copyAndOverwrite(opt, coreType, LOAN_TABLE, Extension.LOAN);
copyAndOverwrite(opt, coreType, MATERIAL_SAMPLE_TABLE, Extension.MATERIAL_SAMPLE);
copyAndOverwrite(opt, coreType, PERMIT_TABLE, Extension.PERMIT);
copyAndOverwrite(opt, coreType, PREPARATION_TABLE, Extension.PREPARATION);
copyAndOverwrite(opt, coreType, PRESERVATION_TABLE, Extension.PRESERVATION);
copyAndOverwrite(
opt, coreType, GERMPLASM_MEASUREMENT_SCORE_TABLE, Extension.GERMPLASM_MEASUREMENT_SCORE);
copyAndOverwrite(
opt, coreType, GERMPLASM_MEASUREMENT_TRAIT_TABLE, Extension.GERMPLASM_MEASUREMENT_TRAIT);
copyAndOverwrite(
opt, coreType, GERMPLASM_MEASUREMENT_TRIAL_TABLE, Extension.GERMPLASM_MEASUREMENT_TRIAL);
copyAndOverwrite(opt, coreType, GERMPLASM_ACCESSION_TABLE, Extension.GERMPLASM_ACCESSION);
copyAndOverwrite(
opt, coreType, EXTENDED_MEASUREMENT_OR_FACT_TABLE, Extension.EXTENDED_MEASUREMENT_OR_FACT);
copyAndOverwrite(opt, coreType, CHRONOMETRIC_AGE_TABLE, Extension.CHRONOMETRIC_AGE);
copyAndOverwrite(opt, coreType, REFERENCE_TABLE, Extension.REFERENCE);
copyAndOverwrite(opt, coreType, IDENTIFIER_TABLE, Extension.IDENTIFIER);
copyAndOverwrite(opt, coreType, AUDUBON_TABLE, Extension.AUDUBON);
copyAndOverwrite(opt, coreType, IMAGE_TABLE, Extension.IMAGE);
copyAndOverwrite(opt, coreType, MULTIMEDIA_TABLE, Extension.MULTIMEDIA);
copyAndOverwrite(opt, coreType, DNA_DERIVED_DATA_TABLE, Extension.DNA_DERIVED_DATA);
}

private static void moveAll(InterpretationPipelineOptions options) {
private static void copyAll(InterpretationPipelineOptions options) {
if (options.getCoreRecordType() == OCCURRENCE) {
move(options, OCCURRENCE);
copyAndOverwrite(options, OCCURRENCE);
cleanAndMoveTables(options, OCCURRENCE);
} else if (options.getCoreRecordType() == EVENT) {
move(options, EVENT);
copyAndOverwrite(options, EVENT);
cleanAndMoveTables(options, EVENT);
}
}

private static void move(InterpretationPipelineOptions options, RecordType recordType) {
private static void copyAndOverwrite(
InterpretationPipelineOptions options, RecordType recordType) {
String path = recordType.name().toLowerCase();
move(options, recordType, path, path);
copyAndOverwrite(options, recordType, path, path);
}

private static <T> void cleanMove(
private static <T> void copyAndOverwrite(
InterpretationPipelineOptions options,
RecordType recordType,
RecordType extensionRecordType,
Extension extension,
Class<T> avroClass) {
Extension extension) {
String from = extensionRecordType.name().toLowerCase();
String to = extension.name().toLowerCase().replace("_", "") + "table";
clean(options, recordType, extensionRecordType, avroClass);
move(options, recordType, from, to);
// clean(options, recordType, extensionRecordType, avroClass);
copyAndOverwrite(options, recordType, from, to);
}

private static <T> void clean(
InterpretationPipelineOptions options,
RecordType recordType,
RecordType extensionRecordType,
Class<T> avroClass) {
HdfsConfigs hdfsConfigs =
HdfsConfigs.create(options.getHdfsSiteConfig(), options.getCoreSiteConfig());

String extType = extensionRecordType.name().toLowerCase();
String path = PathBuilder.buildFilePathViewUsingInputPath(options, recordType, extType);
FsUtils.deleteAvroFileIfEmpty(hdfsConfigs, path, avroClass);
}

private static void move(
private static void copyAndOverwrite(
InterpretationPipelineOptions options, RecordType recordType, String from, String to) {
String targetPath = options.getTargetPath();
HdfsConfigs hdfsConfigs =
HdfsConfigs.create(options.getHdfsSiteConfig(), options.getCoreSiteConfig());

// Delete existing avro files in the target directory
String deletePath =
PathBuilder.buildPath(
targetPath, recordType.name().toLowerCase(), to, options.getDatasetId() + "_*")
Expand All @@ -192,10 +113,10 @@ private static void move(
String filter =
PathBuilder.buildFilePathViewUsingInputPath(options, recordType, from, "*.avro");

String movePath =
String copyPath =
PathBuilder.buildPath(targetPath, recordType.name().toLowerCase(), to).toString();
log.info("Moving files with pattern {} to {}", filter, movePath);
FsUtils.moveDirectory(hdfsConfigs, movePath, filter);
log.info("Files moved to {} directory", movePath);
log.info("Copying files with pattern {} to {}", filter, copyPath);
FsUtils.copyToDirectory(hdfsConfigs, copyPath, filter);
log.info("Files copied to {} directory", copyPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -853,16 +853,12 @@ public static void run(InterpretationPipelineOptions options, ExecutorService ex
.write();

// Move files
Mutex.Action action = () -> HdfsViewAvroUtils.cleanAndMove(options);
Mutex.Action action = () -> HdfsViewAvroUtils.copyAndOverwrite(options);
if (options.getTestMode()) {
action.execute();
} else {
SharedLockUtils.doHdfsPrefixLock(options, action);
}
// Delete root directory of table records
FsUtils.deleteIfExist(
hdfsConfigs, PathBuilder.buildFilePathViewUsingInputPath(options, recordType));

MetricsHandler.saveCountersToInputPathFile(options, metrics.getMetricsResult());
log.info("Pipeline has been finished - {}", LocalDateTime.now());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,25 @@ public static void setOwner(
}

/**
* Moves a list files that match against a glob filter into a target directory.
* Copies a list files that match against a glob filter into a target directory.
*
* @param hdfsConfigs path to hdfs-site.xml config file
* @param globFilter filter used to filter files and paths
* @param targetPath target directory
*/
public static void moveDirectory(HdfsConfigs hdfsConfigs, String targetPath, String globFilter) {
public static void copyToDirectory(
HdfsConfigs hdfsConfigs, String targetPath, String globFilter) {
FileSystem fs = getFileSystem(hdfsConfigs, targetPath);
try {
FileStatus[] status = fs.globStatus(new Path(globFilter));
Path[] paths = FileUtil.stat2Paths(status);
for (Path path : paths) {
boolean rename = fs.rename(path, new Path(targetPath, path.getName()));
log.info("File {} moved status - {}", path, rename);
boolean copied =
FileUtil.copy(fs, path, fs, new Path(targetPath, path.getName()), false, fs.getConf());
log.debug("File {} moved status - {}", path, copied);
}
} catch (IOException e) {
log.warn("Can't move files using filter - {}, into path - {}", globFilter, targetPath);
log.warn("Can't copy files using filter - {}, into path - {}", globFilter, targetPath);
}
}

Expand Down

0 comments on commit 6f1eab0

Please sign in to comment.