Skip to content

Commit

Permalink
Cloudify RevertSam (#1949)
Browse files Browse the repository at this point in the history
Cloudify RevertSam
  • Loading branch information
takutosato authored Aug 1, 2024
1 parent d8d87c9 commit 77f4b09
Show file tree
Hide file tree
Showing 20 changed files with 436 additions and 148 deletions.
54 changes: 49 additions & 5 deletions src/main/java/picard/nio/PicardBucketUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import htsjdk.io.IOPath;
import htsjdk.samtools.util.FileExtensions;
import htsjdk.utils.ValidationUtils;
import picard.PicardException;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand All @@ -25,11 +23,14 @@ public class PicardBucketUtils {
public static final String HDFS_SCHEME = "hdfs";
public static final String FILE_SCHEME = "file";

// This Picard test staging bucket has a TTL of 180 days (DeleteAction with Age = 180)
public static final String GCLOUD_PICARD_STAGING_DIRECTORY = "gs://hellbender-test-logs/staging/picard/";

// slashes omitted since hdfs paths seem to only have 1 slash which would be weirder to include than no slashes
private PicardBucketUtils(){} //private so that no one will instantiate this class

/**
* Get a temporary file path based on the prefix and extension provided
* Get a temporary file path based on the prefix and extension provided.
* This file (and possible indexes associated with it) will be scheduled for deletion on shutdown.
*
* @param directory the directory where the temporary fill will be placed. May be null.
Expand Down Expand Up @@ -67,19 +68,62 @@ public static PicardHtsPath getTempFilePath(final String directory, String prefi
}
}

// For local temp file, directory should be null.
/**
* This overload of getTempFilePath takes the directory of type PicardHtsPath instead of String.
*
* @see #getTempFilePath(String, String, String)
*
*/
public static PicardHtsPath getTempFilePath(final IOPath directory, String prefix, final String extension){
return getTempFilePath(directory.getURIString(), prefix, extension);
}

/**
* Calls getTempFilePath with the empty string as the prefix.
*
* @see #getTempFilePath(String, String, String)
*/
public static PicardHtsPath getTempFilePath(String directory, String extension){
return getTempFilePath(directory, "", extension);
}

/**
* Creates a temporary file in a local directory.
*
* @see #getTempFilePath(String, String, String)
*/
public static PicardHtsPath getLocalTempFilePath(final String prefix, final String extension){
return getTempFilePath((String) null, prefix, extension);
}

/**
* Creates a PicardHtsPath object to a "directory" on a Google Cloud System filesystem with a randomly generated URI.
*
* Note that the notion of directories does not exist in GCS. Thus, by "directory,"
* we mean a path object with a randomly generated URI ending in "/", which
* the caller can use as a root URI/path for other files to be created e.g. via PicardHtsPath::resolve.
*
* Note that this method does *not* create an actual directory/file on GCS that one can write to, delete, or otherwise manipulate.
*
* See: https://stackoverflow.com/questions/51892343/google-gsutil-create-folder
*
* @param relativePath The relative location for the new "directory" under the harcoded staging bucket with a TTL set e.g. "test/RevertSam/".
* @return A PicardHtsPath object to a randomly generated "directory" e.g. "gs://hellbender-test-logs/staging/picard/test/RevertSam/{randomly-generated-string}/"
*/
public static PicardHtsPath getRandomGCSDirectory(final String relativePath){
ValidationUtils.validateArg(relativePath.endsWith("/"), "relativePath must end in backslash '/': " + relativePath);

return PicardHtsPath.fromPath(PicardBucketUtils.randomRemotePath(GCLOUD_PICARD_STAGING_DIRECTORY + relativePath, "", "/"));
}

/**
* Picks a random name, by putting some random letters between "prefix" and "suffix".
*
* @param stagingLocation The folder where you want the file to be. Must start with "gs://" or "hdfs://"
* @param prefix The beginning of the file name
* @param suffix The end of the file name, e.g. ".tmp"
*/
private static Path randomRemotePath(String stagingLocation, String prefix, String suffix) {
public static Path randomRemotePath(String stagingLocation, String prefix, String suffix) {
if (isGcsUrl(stagingLocation)) {
return getPathOnGcs(stagingLocation).resolve(prefix + UUID.randomUUID() + suffix);
} else if (isHadoopUrl(stagingLocation)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/picard/nio/PicardHtsPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Path toPath() {
*/
public static PicardHtsPath fromPath(final Path path){
Objects.requireNonNull(path);
return new PicardHtsPath(new HtsPath(path.toUri().toString()));
return new PicardHtsPath(path.toUri().toString());
}

/**
Expand Down
132 changes: 76 additions & 56 deletions src/main/java/picard/sam/RevertSam.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package picard.sam;

import htsjdk.io.IOPath;
import htsjdk.samtools.BAMRecordCodec;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMFileHeader.SortOrder;
Expand All @@ -34,6 +35,7 @@
import htsjdk.samtools.SAMRecordQueryNameComparator;
import htsjdk.samtools.SAMTag;
import htsjdk.samtools.SAMUtils;
import htsjdk.samtools.SamInputResource;
import htsjdk.samtools.SamReader;
import htsjdk.samtools.SamReaderFactory;
import htsjdk.samtools.ValidationStringency;
Expand All @@ -57,14 +59,14 @@
import picard.cmdline.CommandLineProgram;
import picard.cmdline.StandardOptionDefinitions;
import picard.cmdline.programgroups.ReadDataManipulationProgramGroup;
import picard.nio.PicardBucketUtils;
import picard.nio.PicardHtsPath;
import picard.util.TabbedInputParser;
import picard.util.TabbedTextFileWithHeaderParser;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.*;
Expand Down Expand Up @@ -148,10 +150,14 @@ public class RevertSam extends CommandLineProgram {
public PicardHtsPath INPUT;

@Argument(mutex = {"OUTPUT_MAP"}, shortName = StandardOptionDefinitions.OUTPUT_SHORT_NAME, doc = "The output SAM/BAM/CRAM file to create, or an output directory if OUTPUT_BY_READGROUP is true.")
public File OUTPUT;
public PicardHtsPath OUTPUT;

@Argument(mutex = {"OUTPUT"}, shortName = "OM", doc = "Tab separated file with two columns, READ_GROUP_ID and OUTPUT, providing file mapping only used if OUTPUT_BY_READGROUP is true.")
public File OUTPUT_MAP;
public PicardHtsPath OUTPUT_MAP;

public static final String READ_GROUP_ID_COLUMN_NAME = "READ_GROUP_ID";
public static final String OUTPUT_COLUMN_NAME = "OUTPUT";


@Argument(shortName = "OBR", doc = "When true, outputs each read group in a separate file.")
public boolean OUTPUT_BY_READGROUP = false;
Expand Down Expand Up @@ -257,10 +263,13 @@ protected String[] customCommandLineValidation() {

protected int doWork() {
IOUtil.assertFileIsReadable(INPUT.toPath());
ValidationUtil.assertWritable(OUTPUT, OUTPUT_BY_READGROUP);
// Writability check is done for local files only
if (OUTPUT != null && OUTPUT.getScheme().equals(PicardBucketUtils.FILE_SCHEME)) {
ValidationUtil.assertWritable(OUTPUT.toPath(), OUTPUT_BY_READGROUP);
}

final boolean sanitizing = SANITIZE;
final SamReader in = SamReaderFactory.makeDefault().referenceSequence(REFERENCE_SEQUENCE).validationStringency(VALIDATION_STRINGENCY).open(INPUT.toPath());
final SamReader in = SamReaderFactory.makeDefault().referenceSequence(referenceSequence.getReferencePath()).validationStringency(VALIDATION_STRINGENCY).open(SamInputResource.of(INPUT.toPath())); // tsato: confirm this won't break piped input
final SAMFileHeader inHeader = in.getFileHeader();
ValidationUtil.validateHeaderOverrides(inHeader, SAMPLE_ALIAS, LIBRARY_NAME);

Expand All @@ -273,7 +282,7 @@ protected int doWork() {
final SAMFileHeader singleOutHeader = createOutHeader(inHeader, SORT_ORDER, REMOVE_ALIGNMENT_INFORMATION);
inHeader.getReadGroups().forEach(readGroup -> singleOutHeader.addReadGroup(readGroup));

final Map<String, File> outputMap;
final Map<String, Path> outputMap;
final Map<String, SAMFileHeader> headerMap;
if (OUTPUT_BY_READGROUP) {
if (inHeader.getReadGroups().isEmpty()) {
Expand All @@ -287,7 +296,12 @@ protected int doWork() {
defaultExtension = "." + OUTPUT_BY_READGROUP_FILE_FORMAT.toString();
}

outputMap = createOutputMap(OUTPUT_MAP, OUTPUT, defaultExtension, inHeader.getReadGroups());
if (OUTPUT_MAP != null){
outputMap = readOutputMap(OUTPUT_MAP.toPath());
} else {
outputMap = createOutputMapFromReadGroups(inHeader.getReadGroups(), OUTPUT.toPath(), defaultExtension);
}

ValidationUtil.assertAllReadGroupsMapped(outputMap, inHeader.getReadGroups());
headerMap = createHeaderMap(inHeader, SORT_ORDER, REMOVE_ALIGNMENT_INFORMATION);
} else {
Expand All @@ -300,7 +314,7 @@ protected int doWork() {
}

final SAMFileWriterFactory factory = new SAMFileWriterFactory();
final RevertSamWriter out = new RevertSamWriter(OUTPUT_BY_READGROUP, headerMap, outputMap, singleOutHeader, OUTPUT, presorted, factory, REFERENCE_SEQUENCE);
final RevertSamWriter out = new RevertSamWriter(OUTPUT_BY_READGROUP, headerMap, outputMap, singleOutHeader, OUTPUT == null ? null : OUTPUT.toPath(), presorted, factory, referenceSequence.getReferencePath());

////////////////////////////////////////////////////////////////////////////
// Build a sorting collection to use if we are sanitizing
Expand Down Expand Up @@ -335,8 +349,8 @@ protected int doWork() {
final Map<SAMReadGroupRecord, FastqQualityFormat> readGroupToFormat;
final Path referenceSequencePath;
try {
if (REFERENCE_SEQUENCE != null) {
referenceSequencePath = REFERENCE_SEQUENCE.toPath();
if (referenceSequence.getReferencePath() != null) {
referenceSequencePath = referenceSequence.getReferencePath();
} else {
referenceSequencePath = null;
}
Expand Down Expand Up @@ -557,41 +571,32 @@ private void overwriteLibrary(final List<SAMReadGroupRecord> readGroups, final S
readGroups.forEach(rg -> rg.setLibrary(libraryName));
}

static Map<String, File> createOutputMap(
final File outputMapFile,
final File outputDir,
final String defaultExtension,
final List<SAMReadGroupRecord> readGroups) {

final Map<String, File> outputMap;
if (outputMapFile != null) {
outputMap = createOutputMapFromFile(outputMapFile);
} else {
outputMap = createOutputMap(readGroups, outputDir, defaultExtension);
}
return outputMap;
}
public static Map<String, Path> readOutputMap(final Path outputMapFile) {
final Map<String, Path> outputMap = new HashMap<>();

private static Map<String, File> createOutputMapFromFile(final File outputMapFile) {
final Map<String, File> outputMap = new HashMap<>();
final TabbedTextFileWithHeaderParser parser = new TabbedTextFileWithHeaderParser(outputMapFile);
for (final TabbedTextFileWithHeaderParser.Row row : parser) {
final String id = row.getField("READ_GROUP_ID");
final String output = row.getField("OUTPUT");
final File outputPath = new File(output);
outputMap.put(id, outputPath);
try (final TabbedInputParser intermediateParser = new TabbedInputParser(false, Files.newInputStream(outputMapFile));
final TabbedTextFileWithHeaderParser parser = new TabbedTextFileWithHeaderParser(intermediateParser)){
for(final TabbedTextFileWithHeaderParser.Row row : parser) {
final String id = row.getField(READ_GROUP_ID_COLUMN_NAME);
final String output = row.getField(OUTPUT_COLUMN_NAME);
final Path outputPath = new PicardHtsPath(output).toPath();
outputMap.put(id, outputPath);
}
CloserUtil.close(parser);
return outputMap;
} catch (IOException e){
throw new PicardException("Encountered an error while creating an output map", e);
}
CloserUtil.close(parser);
return outputMap;
}

private static Map<String, File> createOutputMap(final List<SAMReadGroupRecord> readGroups, final File outputDir, final String extension) {
final Map<String, File> outputMap = new HashMap<>();
// Create an output map file to be written to a specified directory
public static Map<String, Path> createOutputMapFromReadGroups(final List<SAMReadGroupRecord> readGroups, final Path outputDir, final String extension) {
final Map<String, Path> outputMap = new HashMap<>();
for (final SAMReadGroupRecord readGroup : readGroups) {
final String id = readGroup.getId();
final String fileName = id + extension;
final Path outputPath = Paths.get(outputDir.toString(), fileName);
outputMap.put(id, outputPath.toFile());
final Path outputPath = outputDir.resolve(fileName);
outputMap.put(id, outputPath);
}
return outputMap;
}
Expand Down Expand Up @@ -681,19 +686,19 @@ private static class RevertSamWriter {
RevertSamWriter(
final boolean outputByReadGroup,
final Map<String, SAMFileHeader> headerMap,
final Map<String, File> outputMap,
final Map<String, Path> outputMap,
final SAMFileHeader singleOutHeader,
final File singleOutput,
final Path singleOutput,
final boolean presorted,
final SAMFileWriterFactory factory,
final File referenceFasta) {
final Path referenceFasta) {

this.outputByReadGroup = outputByReadGroup;
if (outputByReadGroup) {
singleWriter = null;
for (final Map.Entry<String, File> outputMapEntry : outputMap.entrySet()) {
for (final Map.Entry<String, Path> outputMapEntry : outputMap.entrySet()) {
final String readGroupId = outputMapEntry.getKey();
final File output = outputMapEntry.getValue();
final Path output = outputMapEntry.getValue();
final SAMFileHeader header = headerMap.get(readGroupId);
final SAMFileWriter writer = factory.makeWriter(header, presorted, output, referenceFasta);
writerMap.put(readGroupId, writer);
Expand Down Expand Up @@ -787,19 +792,29 @@ static void validateSanitizeSortOrder(final boolean sanitize, final SAMFileHeade
}
}

static void validateOutputParams(final boolean outputByReadGroup, final File output, final File outputMap, final List<String> errors) {
/**
*
* @param outputByReadGroup
* @param output Points to the BAM output. May be null.
* @param outputMap Points to the tsv-file containing the (read group, output path) pair in each row. May be null.
* @param errors
*/
static void validateOutputParams(final boolean outputByReadGroup, final IOPath output, final IOPath outputMap, final List<String> errors) {
if (outputByReadGroup) {
validateOutputParamsByReadGroup(output, outputMap, errors);
} else {
validateOutputParamsNotByReadGroup(output, outputMap, errors);
}
}

static void validateOutputParamsByReadGroup(final File output, final File outputMap, final List<String> errors) {
// This method assumes that the caller has checked that OUTPUT_BY_READGROUP is true.
static void validateOutputParamsByReadGroup(final IOPath output, final IOPath outputMap, final List<String> errors) {
if (output != null) {
if (!Files.isDirectory(output.toPath())) {
// If the file is local, check that OUTPUT is a directory
if (output.getScheme().equals(PicardBucketUtils.FILE_SCHEME) && !Files.isDirectory(output.toPath())) {
errors.add("When OUTPUT_BY_READGROUP=true and OUTPUT is provided, it must be a directory: " + output);
}

return;
}
// output is null if we reached here
Expand All @@ -811,13 +826,18 @@ static void validateOutputParamsByReadGroup(final File output, final File output
errors.add("Cannot read OUTPUT_MAP " + outputMap);
return;
}
final TabbedTextFileWithHeaderParser parser = new TabbedTextFileWithHeaderParser(outputMap);
if (!ValidationUtil.isOutputMapHeaderValid(parser.columnLabelsList())) {
errors.add("Invalid header: " + outputMap + ". Must be a tab-separated file with READ_GROUP_ID as first column and OUTPUT as second column.");

try (final TabbedInputParser intermediaryParser = new TabbedInputParser(false, Files.newInputStream(outputMap.toPath()));
final TabbedTextFileWithHeaderParser parser = new TabbedTextFileWithHeaderParser(intermediaryParser)){
if (!ValidationUtil.isOutputMapHeaderValid(parser.columnLabelsList())) {
errors.add("Invalid header: " + outputMap + ". Must be a tab-separated file with READ_GROUP_ID as first column and OUTPUT as second column.");
}
} catch (IOException e){
throw new PicardException("Encountered an exception while parsing the output map", e);
}
}

static void validateOutputParamsNotByReadGroup(final File output, final File outputMap, final List<String> errors) {
static void validateOutputParamsNotByReadGroup(final IOPath output, final IOPath outputMap, final List<String> errors) {
if (outputMap != null) {
errors.add("Cannot provide OUTPUT_MAP when OUTPUT_BY_READGROUP=false. Provide OUTPUT instead.");
}
Expand Down Expand Up @@ -862,7 +882,7 @@ static void validateHeaderOverrides(
}
}

static void assertWritable(final File output, final boolean outputByReadGroup) {
static void assertWritable(final Path output, final boolean outputByReadGroup) {
if (outputByReadGroup) {
if (output != null) {
IOUtil.assertDirectoryIsWritable(output);
Expand All @@ -872,10 +892,10 @@ static void assertWritable(final File output, final boolean outputByReadGroup) {
}
}

static void assertAllReadGroupsMapped(final Map<String, File> outputMap, final List<SAMReadGroupRecord> readGroups) {
static void assertAllReadGroupsMapped(final Map<String, Path> outputMap, final List<SAMReadGroupRecord> readGroups) {
for (final SAMReadGroupRecord readGroup : readGroups) {
final String id = readGroup.getId();
final File output = outputMap.get(id);
final Path output = outputMap.get(id);
if (output == null) {
throw new PicardException("Read group id " + id + " not found in OUTPUT_MAP " + outputMap);
}
Expand All @@ -884,8 +904,8 @@ static void assertAllReadGroupsMapped(final Map<String, File> outputMap, final L

static boolean isOutputMapHeaderValid(final List<String> columnLabels) {
return columnLabels.size() >= 2 &&
"READ_GROUP_ID".equals(columnLabels.get(0)) &&
"OUTPUT".equals(columnLabels.get(1));
READ_GROUP_ID_COLUMN_NAME.equals(columnLabels.get(0)) &&
OUTPUT_COLUMN_NAME.equals(columnLabels.get(1));
}
}
}
Loading

0 comments on commit 77f4b09

Please sign in to comment.