Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel bgzip cram #13

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
12 changes: 9 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@ execute_process(COMMAND whoami OUTPUT_VARIABLE USER OUTPUT_STRIP_TRAILING_WHITES

add_definitions(-DVERSION="${PROJECT_VERSION}" -DUSER="${USER}" -DDATE="${DATE}")

set(CMAKE_FIND_LIBRARY_SUFFIXES ".a;${CMAKE_FIND_LIBRARY_SUFFIXES}") # Prefer libz.a when both are available
#set(CMAKE_FIND_LIBRARY_SUFFIXES ".a;${CMAKE_FIND_LIBRARY_SUFFIXES}") # Prefer libz.a when both are available

find_package(OpenMP)
if (OPENMP_FOUND)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
endif()


find_package(OpenSSL REQUIRED)
find_package(CURL REQUIRED)
find_package(LibLZMA REQUIRED)
find_package(BZip2 REQUIRED)

find_package(ZLIB REQUIRED)
find_library(STATGEN_LIBRARY StatGen)
find_library(HTS_LIBRARY hts)

add_executable(minimac4
src/Analysis.cpp
src/Analysis.h
Expand All @@ -46,7 +51,8 @@ add_executable(minimac4
src/Estimation.cpp
src/Estimation.h)

target_link_libraries(minimac4 ${STATGEN_LIBRARY} ${ZLIB_LIBRARIES})
#target_link_libraries(minimac4 ${STATGEN_LIBRARY} ${HTS_LIBRARY} ${ZLIB_LIBRARIES} CURL::libcurl OpenSSL::SSL OpenSSL::Crypto ZLIB::ZLIB LibLZMA::LibLZMA BZip2::BZip2)
target_link_libraries(minimac4 ${STATGEN_LIBRARY} ${HTS_LIBRARY} ${ZLIB_LIBRARIES})

install(TARGETS minimac4 RUNTIME DESTINATION bin)

Expand Down
4 changes: 2 additions & 2 deletions dep/libstatgen.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ project(libStatGen VERSION 1.0.0)

#execute_process(COMMAND ./configure --prefix=${CMAKE_INSTALL_PREFIX} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

add_custom_target(libStatGen ALL COMMAND ${CMAKE_COMMAND} -E env CPATH=${CGET_PREFIX}/include make WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMENT "Builing libStatGen ...")
add_custom_target(libStatGen ALL COMMAND ${CMAKE_COMMAND} -E env CPATH=${CGET_PREFIX}/include make WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMENT "Building libStatGen ...")

file(GLOB_RECURSE LSG_HEADER_LIST "bam/*.h" "fastq/*.h" "general/*.h" "glf/*.h" "samtools/*.h" "vcf/*.h")
file(GLOB_RECURSE LSG_HEADER_LIST "bam/*.h" "fastq/*.h" "general/*.h" "glf/*.h" "vcf/*.h")
install(FILES ${LSG_HEADER_LIST} DESTINATION include)

if (BUILD_SHARED_LIBS)
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
zlib,https://zlib.net/zlib-1.2.11.tar.gz --hash md5:1c9f62f0778697a09d36121ead88e08e
statgen/libStatGen --cmake dep/libstatgen.cmake
#statgen/libStatGen --cmake dep/libstatgen.cmake
daheise/libStatGen@parallel-bgzip-cram --cmake dep/libstatgen.cmake
98 changes: 63 additions & 35 deletions src/Analysis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void MyTokenize(vector<string> &result, const char *input, const char *delimiter

}



bool Analysis::CreateRecombinationMap()
{
Expand Down Expand Up @@ -77,9 +77,9 @@ bool Analysis::CreateRecombinationMap()
SecondIndex++;
}
int h=0;

return true;

}


Expand All @@ -102,7 +102,7 @@ String Analysis::RunAnalysis(String &Reffilename, String &Tarfilename, String &R
cout << "\n Program Exiting ... \n\n";
return "Genetic.Map.Load.Error";
}

if (!targetPanel.ScaffoldGWAStoReference(referencePanel,*MyAllVariables))
{
cout << "\n Program Exiting ... \n\n";
Expand Down Expand Up @@ -357,13 +357,17 @@ void Analysis::AppendtoMainLooVcfFaster(int ChunkNo, int MaxIndex)
tempVariant.altAlleleString.c_str());
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"\tTYPED\tGT:LDS");

vector<string> lines(MaxIndex);
#pragma omp parallel for
for(int j=1;j<=MaxIndex;j++)
{
line.clear();
vcfLoodosepartialList[j-1]->readLine(line);
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"%s",line.c_str());

lines[j-1].clear();
vcfLoodosepartialList[j-1]->readLine(lines[j-1]);
}
for(int j=1;j<=MaxIndex;j++){
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"%s",lines[j-1].c_str());
}

VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"\n");

}
Expand Down Expand Up @@ -408,6 +412,24 @@ void Analysis::AppendtoMainLooVcfFaster(int ChunkNo, int MaxIndex)

void Analysis::AppendtoMainVcfFaster(int ChunkNo, int MaxIndex)
{
char bgzf_partial_mode[16];
char bgzf_final_mode[16];
// To avoid threadbombing on reads, only use more than one CPU on reads
// if there is more than one CPU per file to read.
int readingCpus = (MyAllVariables->myModelVariables.cpus-1)/MaxIndex;
if(readingCpus > 1){
snprintf(bgzf_partial_mode, 16-2, "r@%d", readingCpus);
} else {
snprintf(bgzf_partial_mode, 16-2, "r@%d", 1);
}

// Use all available CPUs for writing
if(MyAllVariables->myModelVariables.cpus > 1){
snprintf(bgzf_final_mode, 16-2, "a@%d", MyAllVariables->myModelVariables.cpus);
} else {
// Otherwise, do a single threaded read.
snprintf(bgzf_final_mode, 16-2, "a@%d", 1);
}

VcfPrintStringLength=0;

Expand All @@ -418,7 +440,7 @@ void Analysis::AppendtoMainVcfFaster(int ChunkNo, int MaxIndex)
cout<<endl;

vector<IFILE> vcfdosepartialList(MaxIndex);
vcfdosepartial = ifopen(MyAllVariables->myOutFormat.OutPrefix + ".dose.vcf" + (MyAllVariables->myOutFormat.gzip ? ".gz" : ""), "a", MyAllVariables->myOutFormat.gzip ?InputFile::BGZF:InputFile::UNCOMPRESSED);
vcfdosepartial = ifopen(MyAllVariables->myOutFormat.OutPrefix + ".dose.vcf" + (MyAllVariables->myOutFormat.gzip ? ".gz" : ""), bgzf_final_mode, MyAllVariables->myOutFormat.gzip ?InputFile::BGZF:InputFile::UNCOMPRESSED);


for(int i=1;i<=MaxIndex;i++)
Expand All @@ -429,7 +451,7 @@ void Analysis::AppendtoMainVcfFaster(int ChunkNo, int MaxIndex)
strs1<<(ChunkNo+1);
tempFileIndex+=(".chunk."+(string)(strs1.str())+".dose.part." +
(string)(strs.str())+".vcf.gz");
vcfdosepartialList[i-1] = ifopen(tempFileIndex.c_str(), "r");
vcfdosepartialList[i-1] = ifopen(tempFileIndex.c_str(), bgzf_partial_mode);
if (!vcfdosepartialList[i-1])
{
std::cout << "Error: Could not open temporary file. Ensure that `ulimit -n` is at least greater than " << (MaxIndex + 5) << "." << std::endl;
Expand Down Expand Up @@ -469,14 +491,17 @@ void Analysis::AppendtoMainVcfFaster(int ChunkNo, int MaxIndex)


VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"\t%s",MyAllVariables->myOutFormat.formatStringForVCF.c_str());
vector<string> lines(MaxIndex);
#pragma omp parallel for
for(int j=1;j<=MaxIndex;j++)
{
line.clear();
vcfdosepartialList[j-1]->readLine(line);
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"%s",line.c_str());

lines[j-1].clear();
vcfdosepartialList[j-1]->readLine(lines[j-1]);
}
for(int j=1;j<=MaxIndex;j++){
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"%s",lines[j-1].c_str());
}
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"\n");
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"\n");

}

Expand All @@ -503,12 +528,15 @@ void Analysis::AppendtoMainVcfFaster(int ChunkNo, int MaxIndex)
freq, freq > 0.5 ? 1.0 - freq : freq);

VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"\t%s",MyAllVariables->myOutFormat.formatStringForVCF.c_str());
vector<string> lines(MaxIndex);
#pragma omp parallel for
for(int j=1;j<=MaxIndex;j++)
{
line.clear();
vcfdosepartialList[j-1]->readLine(line);
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"%s",line.c_str());

lines[j-1].clear();
vcfdosepartialList[j-1]->readLine(lines[j-1]);
}
for(int j=1;j<=MaxIndex;j++){
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"%s",lines[j-1].c_str());
}
VcfPrintStringLength+=sprintf(VcfPrintStringPointer + VcfPrintStringLength,"\n");
}
Expand Down Expand Up @@ -1292,7 +1320,7 @@ void Analysis::GetCurrentPanelReady(int ChunkNo, HaplotypeSet &ThisRefPanel,
if(i==(ThisRefPanel.NoBlocks-1))
Mapper[j]=i;
}

}


Expand Down Expand Up @@ -1406,7 +1434,7 @@ void Analysis::GetCurrentPanelReady(int ChunkNo, HaplotypeSet &ThisRefPanel,

ThisRefPanel.CalculateAlleleFreq();
ThisTarPanel.CalculateGWASOnlyAlleleFreq();

if(!MyAllVariables->myModelVariables.minimac3)
{
int time_prev = time(0);
Expand Down Expand Up @@ -1434,20 +1462,20 @@ void Analysis::GetCurrentPanelReady(int ChunkNo, HaplotypeSet &ThisRefPanel,
thisDataFast.MainMarkovModel[i].AssignPanels(ThisRefPanel,ThisRefChipOnlyPanel,ThisTarPanel,ThisTarPanel,MyAllVariables);
thisDataFast.MainMarkovModel[i].initializeMatrices();

}
}
}
else
{
int time_prev = time(0);



for(int i=0;i<MyAllVariables->myModelVariables.cpus;i++)
{
thisDataFast.MainMarkovModel[i].AssignPanels(ThisRefPanel,ThisTarPanel,MyAllVariables);
thisDataFast.MainMarkovModel[i].initializeMatricesMinimac3();

}
}
}


Expand Down Expand Up @@ -1677,8 +1705,8 @@ void Analysis::GetCurrentPanelReady(int ChunkNo, HaplotypeSet &ThisRefPanel,

}



bool Analysis::CheckGeneticMapFile()
{
std::cout << "\n Checking genetic map file : "<<MyAllVariables->myHapDataVariables.mapFile << endl<<endl;
Expand All @@ -1689,9 +1717,9 @@ bool Analysis::CheckGeneticMapFile()
vector<string> Pieces(4);
vector<double> AppendPiece(2);
double PrevSumVal=0.0;

if(fileStream)
{
{
while(fileStream->readLine(line)!=-1)
{
MyTokenize(Pieces, line.c_str(), tabSep,4);
Expand All @@ -1715,21 +1743,21 @@ bool Analysis::CheckGeneticMapFile()
cout << "\n Program Exiting ... \n\n";
return false;
}


if(GeneticMapData.size()<2)
{
cout << "\n ERROR !!! Chromosome "<<targetPanel.finChromosome <<" not found in genetic map file : " << MyAllVariables->myHapDataVariables.mapFile << endl;
cout << "\n Program Exiting ... \n\n";
return false;
}

ifclose(fileStream);

cout<<" Successful !!! "<<endl;
return true;
return true;
}

String Analysis::CheckValidity(String &Reffilename, String &Tarfilename, String &Recomfilename, String &Errorfilename)
{
cout<<endl<<endl;
Expand Down
8 changes: 6 additions & 2 deletions src/DosageData.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <stdio.h>
#include "DosageData.h"
#include "HaplotypeSet.h"

Expand All @@ -13,7 +14,10 @@ void DosageData::FlushPartialVcf(int NovcfParts)
PrintEmpStringLength=0;

int time_Start = time(0);

char bgzf_mode[16];
// Use all available CPUs for writing. Parallel writes work best with large
// print buffers.
snprintf(bgzf_mode, 16-3, "wb@%d", MyAllVariables->myModelVariables.cpus);

string PartialVcfFileName(MyAllVariables->myOutFormat.OutPrefix);
string PartialMetaVcfFileName(MyAllVariables->myOutFormat.OutPrefix);
Expand All @@ -23,7 +27,7 @@ void DosageData::FlushPartialVcf(int NovcfParts)
PartialVcfFileName+=(".chunk."+(string)(strs1.str())+ ".dose.part."+ (string)(strs.str())+".vcf.gz");
PartialMetaVcfFileName+=(".chunk."+(string)(strs1.str())+ ".empiricalDose.part."+ (string)(strs.str())+".vcf.gz");

IFILE vcfdosepartial = ifopen(PartialVcfFileName.c_str(), "wb", InputFile::BGZF);
IFILE vcfdosepartial = ifopen(PartialVcfFileName.c_str(), bgzf_mode, InputFile::BGZF);
IFILE vcfLoodosepartial = NULL;

if(MyAllVariables->myOutFormat.meta)
Expand Down
8 changes: 4 additions & 4 deletions src/MyVariables.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ class ModelVariable
int minimac3;
bool referenceEstimates;
String intermediate;


ModelVariable()
{
intermediate="";
Expand Down Expand Up @@ -560,8 +560,8 @@ class HaplotypeDataVariables
}
}



CHR=chr;
START=start;
END=end;
Expand Down