diff --git a/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/pipelines/HdfsViewPipeline.java b/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/pipelines/HdfsViewPipeline.java index 54176aa2a..cc85c1f80 100644 --- a/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/pipelines/HdfsViewPipeline.java +++ b/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/pipelines/HdfsViewPipeline.java @@ -572,7 +572,7 @@ public static void run( PipelineResult result = p.run(); if (PipelineResult.State.DONE == result.waitUntilFinish()) { - Mutex.Action action = () -> HdfsViewAvroUtils.cleanAndMove(options); + Mutex.Action action = () -> HdfsViewAvroUtils.copyAndOverwrite(options); if (options.getTestMode()) { action.execute(); } else { diff --git a/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/utils/HdfsViewAvroUtils.java b/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/utils/HdfsViewAvroUtils.java index 12daede18..f4636dd6d 100644 --- a/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/utils/HdfsViewAvroUtils.java +++ b/gbif/ingestion/ingest-gbif-beam/src/main/java/org/gbif/pipelines/ingest/utils/HdfsViewAvroUtils.java @@ -13,29 +13,6 @@ import org.gbif.pipelines.common.beam.utils.PathBuilder; import org.gbif.pipelines.core.pojo.HdfsConfigs; import org.gbif.pipelines.core.utils.FsUtils; -import org.gbif.pipelines.io.avro.extension.ac.AudubonTable; -import org.gbif.pipelines.io.avro.extension.dwc.ChronometricAgeTable; -import org.gbif.pipelines.io.avro.extension.dwc.IdentificationTable; -import org.gbif.pipelines.io.avro.extension.dwc.MeasurementOrFactTable; -import org.gbif.pipelines.io.avro.extension.dwc.ResourceRelationshipTable; -import org.gbif.pipelines.io.avro.extension.gbif.DnaDerivedDataTable; -import org.gbif.pipelines.io.avro.extension.gbif.IdentifierTable; -import org.gbif.pipelines.io.avro.extension.gbif.ImageTable; -import org.gbif.pipelines.io.avro.extension.gbif.MultimediaTable; -import org.gbif.pipelines.io.avro.extension.gbif.ReferenceTable; -import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmAccessionTable; -import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmMeasurementScoreTable; -import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmMeasurementTraitTable; -import org.gbif.pipelines.io.avro.extension.germplasm.GermplasmMeasurementTrialTable; -import org.gbif.pipelines.io.avro.extension.ggbn.AmplificationTable; -import org.gbif.pipelines.io.avro.extension.ggbn.CloningTable; -import org.gbif.pipelines.io.avro.extension.ggbn.GelImageTable; -import org.gbif.pipelines.io.avro.extension.ggbn.LoanTable; -import org.gbif.pipelines.io.avro.extension.ggbn.MaterialSampleTable; -import org.gbif.pipelines.io.avro.extension.ggbn.PermitTable; -import org.gbif.pipelines.io.avro.extension.ggbn.PreparationTable; -import org.gbif.pipelines.io.avro.extension.ggbn.PreservationTable; -import org.gbif.pipelines.io.avro.extension.obis.ExtendedMeasurementOrFactTable; @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) @@ -45,143 +22,87 @@ public class HdfsViewAvroUtils { * Cleans empty avro files for extensions and copies all avro files into the directory from * targetPath. Deletes pre-existing data of the dataset being processed. */ - public static void cleanAndMove(InterpretationPipelineOptions options) { + public static void copyAndOverwrite(InterpretationPipelineOptions options) { if (options.getInterpretationTypes().size() == 1 && options.getInterpretationTypes().contains(OCCURRENCE.name())) { - moveOccurrence(options); + copyOccurrence(options); } else if (options.getInterpretationTypes().size() == 1 && options.getInterpretationTypes().contains(EVENT.name())) { - move(options, EVENT); + copyAndOverwrite(options, EVENT); } else { - moveAll(options); + copyAll(options); } } - private static void moveOccurrence(InterpretationPipelineOptions options) { - move(options, OCCURRENCE); + private static void copyOccurrence(InterpretationPipelineOptions options) { + copyAndOverwrite(options, OCCURRENCE); } private static void cleanAndMoveTables(InterpretationPipelineOptions opt, RecordType coreType) { - cleanMove( - opt, - coreType, - MEASUREMENT_OR_FACT_TABLE, - Extension.MEASUREMENT_OR_FACT, - MeasurementOrFactTable.class); - cleanMove( - opt, coreType, IDENTIFICATION_TABLE, Extension.IDENTIFICATION, IdentificationTable.class); - cleanMove( - opt, - coreType, - RESOURCE_RELATIONSHIP_TABLE, - Extension.RESOURCE_RELATIONSHIP, - ResourceRelationshipTable.class); - cleanMove( - opt, coreType, AMPLIFICATION_TABLE, Extension.AMPLIFICATION, AmplificationTable.class); - cleanMove(opt, coreType, CLONING_TABLE, Extension.CLONING, CloningTable.class); - cleanMove(opt, coreType, GEL_IMAGE_TABLE, Extension.GEL_IMAGE, GelImageTable.class); - cleanMove(opt, coreType, LOAN_TABLE, Extension.LOAN, LoanTable.class); - cleanMove( - opt, coreType, MATERIAL_SAMPLE_TABLE, Extension.MATERIAL_SAMPLE, MaterialSampleTable.class); - cleanMove(opt, coreType, PERMIT_TABLE, Extension.PERMIT, PermitTable.class); - cleanMove(opt, coreType, PREPARATION_TABLE, Extension.PREPARATION, PreparationTable.class); - cleanMove(opt, coreType, PRESERVATION_TABLE, Extension.PRESERVATION, PreservationTable.class); - cleanMove( - opt, - coreType, - GERMPLASM_MEASUREMENT_SCORE_TABLE, - Extension.GERMPLASM_MEASUREMENT_SCORE, - GermplasmMeasurementScoreTable.class); - cleanMove( - opt, - coreType, - GERMPLASM_MEASUREMENT_TRAIT_TABLE, - Extension.GERMPLASM_MEASUREMENT_TRAIT, - GermplasmMeasurementTraitTable.class); - cleanMove( - opt, - coreType, - GERMPLASM_MEASUREMENT_TRIAL_TABLE, - Extension.GERMPLASM_MEASUREMENT_TRIAL, - GermplasmMeasurementTrialTable.class); - cleanMove( - opt, - coreType, - GERMPLASM_ACCESSION_TABLE, - Extension.GERMPLASM_ACCESSION, - GermplasmAccessionTable.class); - cleanMove( - opt, - coreType, - EXTENDED_MEASUREMENT_OR_FACT_TABLE, - Extension.EXTENDED_MEASUREMENT_OR_FACT, - ExtendedMeasurementOrFactTable.class); - cleanMove( - opt, - coreType, - CHRONOMETRIC_AGE_TABLE, - Extension.CHRONOMETRIC_AGE, - ChronometricAgeTable.class); - cleanMove(opt, coreType, REFERENCE_TABLE, Extension.REFERENCE, ReferenceTable.class); - cleanMove(opt, coreType, IDENTIFIER_TABLE, Extension.IDENTIFIER, IdentifierTable.class); - cleanMove(opt, coreType, AUDUBON_TABLE, Extension.AUDUBON, AudubonTable.class); - cleanMove(opt, coreType, IMAGE_TABLE, Extension.IMAGE, ImageTable.class); - cleanMove(opt, coreType, MULTIMEDIA_TABLE, Extension.MULTIMEDIA, MultimediaTable.class); - cleanMove( - opt, - coreType, - DNA_DERIVED_DATA_TABLE, - Extension.DNA_DERIVED_DATA, - DnaDerivedDataTable.class); + copyAndOverwrite(opt, coreType, MEASUREMENT_OR_FACT_TABLE, Extension.MEASUREMENT_OR_FACT); + copyAndOverwrite(opt, coreType, IDENTIFICATION_TABLE, Extension.IDENTIFICATION); + copyAndOverwrite(opt, coreType, RESOURCE_RELATIONSHIP_TABLE, Extension.RESOURCE_RELATIONSHIP); + copyAndOverwrite(opt, coreType, AMPLIFICATION_TABLE, Extension.AMPLIFICATION); + copyAndOverwrite(opt, coreType, CLONING_TABLE, Extension.CLONING); + copyAndOverwrite(opt, coreType, GEL_IMAGE_TABLE, Extension.GEL_IMAGE); + copyAndOverwrite(opt, coreType, LOAN_TABLE, Extension.LOAN); + copyAndOverwrite(opt, coreType, MATERIAL_SAMPLE_TABLE, Extension.MATERIAL_SAMPLE); + copyAndOverwrite(opt, coreType, PERMIT_TABLE, Extension.PERMIT); + copyAndOverwrite(opt, coreType, PREPARATION_TABLE, Extension.PREPARATION); + copyAndOverwrite(opt, coreType, PRESERVATION_TABLE, Extension.PRESERVATION); + copyAndOverwrite( + opt, coreType, GERMPLASM_MEASUREMENT_SCORE_TABLE, Extension.GERMPLASM_MEASUREMENT_SCORE); + copyAndOverwrite( + opt, coreType, GERMPLASM_MEASUREMENT_TRAIT_TABLE, Extension.GERMPLASM_MEASUREMENT_TRAIT); + copyAndOverwrite( + opt, coreType, GERMPLASM_MEASUREMENT_TRIAL_TABLE, Extension.GERMPLASM_MEASUREMENT_TRIAL); + copyAndOverwrite(opt, coreType, GERMPLASM_ACCESSION_TABLE, Extension.GERMPLASM_ACCESSION); + copyAndOverwrite( + opt, coreType, EXTENDED_MEASUREMENT_OR_FACT_TABLE, Extension.EXTENDED_MEASUREMENT_OR_FACT); + copyAndOverwrite(opt, coreType, CHRONOMETRIC_AGE_TABLE, Extension.CHRONOMETRIC_AGE); + copyAndOverwrite(opt, coreType, REFERENCE_TABLE, Extension.REFERENCE); + copyAndOverwrite(opt, coreType, IDENTIFIER_TABLE, Extension.IDENTIFIER); + copyAndOverwrite(opt, coreType, AUDUBON_TABLE, Extension.AUDUBON); + copyAndOverwrite(opt, coreType, IMAGE_TABLE, Extension.IMAGE); + copyAndOverwrite(opt, coreType, MULTIMEDIA_TABLE, Extension.MULTIMEDIA); + copyAndOverwrite(opt, coreType, DNA_DERIVED_DATA_TABLE, Extension.DNA_DERIVED_DATA); } - private static void moveAll(InterpretationPipelineOptions options) { + private static void copyAll(InterpretationPipelineOptions options) { if (options.getCoreRecordType() == OCCURRENCE) { - move(options, OCCURRENCE); + copyAndOverwrite(options, OCCURRENCE); cleanAndMoveTables(options, OCCURRENCE); } else if (options.getCoreRecordType() == EVENT) { - move(options, EVENT); + copyAndOverwrite(options, EVENT); cleanAndMoveTables(options, EVENT); } } - private static void move(InterpretationPipelineOptions options, RecordType recordType) { + private static void copyAndOverwrite( + InterpretationPipelineOptions options, RecordType recordType) { String path = recordType.name().toLowerCase(); - move(options, recordType, path, path); + copyAndOverwrite(options, recordType, path, path); } - private static void cleanMove( + private static void copyAndOverwrite( InterpretationPipelineOptions options, RecordType recordType, RecordType extensionRecordType, - Extension extension, - Class avroClass) { + Extension extension) { String from = extensionRecordType.name().toLowerCase(); String to = extension.name().toLowerCase().replace("_", "") + "table"; - clean(options, recordType, extensionRecordType, avroClass); - move(options, recordType, from, to); + // clean(options, recordType, extensionRecordType, avroClass); + copyAndOverwrite(options, recordType, from, to); } - private static void clean( - InterpretationPipelineOptions options, - RecordType recordType, - RecordType extensionRecordType, - Class avroClass) { - HdfsConfigs hdfsConfigs = - HdfsConfigs.create(options.getHdfsSiteConfig(), options.getCoreSiteConfig()); - - String extType = extensionRecordType.name().toLowerCase(); - String path = PathBuilder.buildFilePathViewUsingInputPath(options, recordType, extType); - FsUtils.deleteAvroFileIfEmpty(hdfsConfigs, path, avroClass); - } - - private static void move( + private static void copyAndOverwrite( InterpretationPipelineOptions options, RecordType recordType, String from, String to) { String targetPath = options.getTargetPath(); HdfsConfigs hdfsConfigs = HdfsConfigs.create(options.getHdfsSiteConfig(), options.getCoreSiteConfig()); + // Delete existing avro files in the target directory String deletePath = PathBuilder.buildPath( targetPath, recordType.name().toLowerCase(), to, options.getDatasetId() + "_*") @@ -192,10 +113,10 @@ private static void move( String filter = PathBuilder.buildFilePathViewUsingInputPath(options, recordType, from, "*.avro"); - String movePath = + String copyPath = PathBuilder.buildPath(targetPath, recordType.name().toLowerCase(), to).toString(); - log.info("Moving files with pattern {} to {}", filter, movePath); - FsUtils.moveDirectory(hdfsConfigs, movePath, filter); - log.info("Files moved to {} directory", movePath); + log.info("Copying files with pattern {} to {}", filter, copyPath); + FsUtils.copyToDirectory(hdfsConfigs, copyPath, filter); + log.info("Files copied to {} directory", copyPath); } } diff --git a/gbif/ingestion/ingest-gbif-java/src/main/java/org/gbif/pipelines/ingest/java/pipelines/HdfsViewPipeline.java b/gbif/ingestion/ingest-gbif-java/src/main/java/org/gbif/pipelines/ingest/java/pipelines/HdfsViewPipeline.java index b3627778a..89410cc8c 100644 --- a/gbif/ingestion/ingest-gbif-java/src/main/java/org/gbif/pipelines/ingest/java/pipelines/HdfsViewPipeline.java +++ b/gbif/ingestion/ingest-gbif-java/src/main/java/org/gbif/pipelines/ingest/java/pipelines/HdfsViewPipeline.java @@ -853,16 +853,12 @@ public static void run(InterpretationPipelineOptions options, ExecutorService ex .write(); // Move files - Mutex.Action action = () -> HdfsViewAvroUtils.cleanAndMove(options); + Mutex.Action action = () -> HdfsViewAvroUtils.copyAndOverwrite(options); if (options.getTestMode()) { action.execute(); } else { SharedLockUtils.doHdfsPrefixLock(options, action); } - // Delete root directory of table records - FsUtils.deleteIfExist( - hdfsConfigs, PathBuilder.buildFilePathViewUsingInputPath(options, recordType)); - MetricsHandler.saveCountersToInputPathFile(options, metrics.getMetricsResult()); log.info("Pipeline has been finished - {}", LocalDateTime.now()); } diff --git a/sdks/core/src/main/java/org/gbif/pipelines/core/utils/FsUtils.java b/sdks/core/src/main/java/org/gbif/pipelines/core/utils/FsUtils.java index fcd3d924f..6be7427f1 100644 --- a/sdks/core/src/main/java/org/gbif/pipelines/core/utils/FsUtils.java +++ b/sdks/core/src/main/java/org/gbif/pipelines/core/utils/FsUtils.java @@ -203,23 +203,25 @@ public static void setOwner( } /** - * Moves a list files that match against a glob filter into a target directory. + * Copies a list files that match against a glob filter into a target directory. * * @param hdfsConfigs path to hdfs-site.xml config file * @param globFilter filter used to filter files and paths * @param targetPath target directory */ - public static void moveDirectory(HdfsConfigs hdfsConfigs, String targetPath, String globFilter) { + public static void copyToDirectory( + HdfsConfigs hdfsConfigs, String targetPath, String globFilter) { FileSystem fs = getFileSystem(hdfsConfigs, targetPath); try { FileStatus[] status = fs.globStatus(new Path(globFilter)); Path[] paths = FileUtil.stat2Paths(status); for (Path path : paths) { - boolean rename = fs.rename(path, new Path(targetPath, path.getName())); - log.info("File {} moved status - {}", path, rename); + boolean copied = + FileUtil.copy(fs, path, fs, new Path(targetPath, path.getName()), false, fs.getConf()); + log.debug("File {} moved status - {}", path, copied); } } catch (IOException e) { - log.warn("Can't move files using filter - {}, into path - {}", globFilter, targetPath); + log.warn("Can't copy files using filter - {}, into path - {}", globFilter, targetPath); } }