-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.nf
168 lines (138 loc) · 4.92 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
nextflow.enable.dsl=2
version = "2.0.0"
def help_message(String version) {
println(
"""
==============================================
Imputation Analysis (GLIMPSE) ~ ${version}
==============================================
Mandatory arguments:
--input_list Path to file with list of sample name, sample depth and vcf files (one sample per line).
--ref Path to reference file with chunk & bin file (one chromosome per line).
--outdir Path of the output directory.
Additional arguments:
--help Show this help message
--version Show the version of the workflow
""".stripIndent()
)
}
def version_message(String version) {
println("Imputation Analysis (GLIMPSE) ~ ${version}")
}
def help_or_version(Map params, String version){
if (params.help) {
help_message(version)
System.exit(0)
}
if (params.version) {
version_message(version)
System.exit(0)
}
}
def default_params(){
def params = [:]
params.help = false
params.version = false
params.input_list = false
params.ref = false
params.outdir = false
return params
}
def check_params(Map params, nextflow.script.WorkflowMetadata workflow) {
// merge defaults with user params
final_params = default_params() + params
// print help or version messages if requested
help_or_version(final_params, version)
// param checks
check_mandatory_parameter(final_params, "input_list")
check_mandatory_parameter(final_params, "ref")
check_mandatory_parameter(final_params, "outdir")
return final_params
}
def check_mandatory_parameter(Map params, String parameter_name){
if ( !params[parameter_name] ){
println "ERROR: Missing mandatory argument; specify '--help' for usage instructions"
System.exit(1)
} else {
return params[parameter_name]
}
}
final_params = check_params(params, workflow)
//Inputs
samples_ch = Channel
.fromPath(final_params.input_list)
.splitCsv(header:true)
.map{ row -> tuple(row.run_id, row.sample_id, file(row.sample_cram), file(row.sample_cram + ".crai")) }
detail_ch = Channel
.fromPath(final_params.ref)
.splitCsv(header:true)
.map{ row -> tuple(row.chr_no, file(row.chunk_txt), file(row.ref_bin)) }
ref_fasta = Channel
.fromPath("s3://nalagenetics-npm-grids-collaboration/SG10K_imputation/reference/Homo_sapiens_assembly38.fasta")
fasta_index = Channel
.fromPath("s3://nalagenetics-npm-grids-collaboration/SG10K_imputation/reference/Homo_sapiens_assembly38.fasta.fai")
phase_inputs = detail_ch.combine(samples_ch)
//Processes
process impute_phase {
tag "${run_id}_${sample_id}_${chr_id}"
publishDir "${final_params.outdir}/${run_id}_${sample_id}/imputation/phase", mode:"copy"
input:
tuple val(chr_id), file(chunk_txt), file(ref_bin), val(run_id), val(sample_id), file(sample_cram), file(sample_index), file(ref_fasta), file(fasta_index)
output:
tuple val(chr_id), val(run_id), val(sample_id), path("*.bcf"), path("*.bcf.csi"), emit: phase_out
script:
"""
python /wrapper/GLIMPSE_phase.py -S ${run_id}_${sample_id} -C ${chr_id} -I ${sample_cram} -R ${ref_bin} -c ${chunk_txt} -F ${ref_fasta}
"""
}
process impute_ligate {
tag "${run_id}_${sample_id}_${chr_id}"
publishDir "${final_params.outdir}/${run_id}_${sample_id}/imputation/ligate", mode:"copy"
input:
tuple val(chr_id), val(run_id), val(sample_id), path(bcf), path(index)
output:
tuple val(run_id), val(sample_id), path("*.merged.bcf"), path("*.merged.bcf.csi"), emit:ligate_out
path ("*.txt"), emit:txt_out
script:
"""
for f in ${bcf}; do echo "\${f}"; done | sort -V > ${run_id}_${sample_id}_${chr_id}.txt
GLIMPSE2_ligate --input ${run_id}_${sample_id}_${chr_id}.txt --output ${run_id}_${sample_id}_${chr_id}.merged.bcf
bcftools index -f ${run_id}_${sample_id}_${chr_id}.merged.bcf
"""
}
process impute_concat {
tag "${run_id}_${sample_id}"
publishDir "${final_params.outdir}/${run_id}_${sample_id}/imputation/final", mode:"copy"
input:
tuple val(run_id), val(sample_id), path(bcf), path(index)
output:
tuple val(run_id), val(sample_id), path("*.final.bcf"), path("*.final.bcf.csi"), emit: concat_out
script:
"""
bcftools concat -a ${bcf} -o ${run_id}_${sample_id}.final.bcf
bcftools index -f ${run_id}_${sample_id}.final.bcf
"""
}
//Workflow
workflow{
phase_outputs = impute_phase(phase_inputs.combine(ref_fasta.combine(fasta_index)))
ligate_outputs = impute_ligate(phase_outputs)
impute_concat(ligate_outputs[0].groupTuple(by:[0,1]))
}
workflow.onComplete {
println ( workflow.success ? """
==============================================
Imputation Analysis (GLIMPSE) ~ EXECUTION SUMMARY
==============================================
Workflow : ${workflow.scriptName} ${version}
Completed at: ${workflow.complete}
Duration : ${workflow.duration}
Success : ${workflow.success}
workDir : ${workflow.workDir}
exit status : ${workflow.exitStatus}
""" : """
Failed: ${workflow.errorReport}
exit status : ${workflow.exitStatus}
"""
)
}