-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.nf
151 lines (120 loc) · 5.4 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env nextflow
Channel.fromPath(params.gwas_ss_tsv)
.ifEmpty { error "Cannot find gwas_vcf: ${params.gwas_vcf}" }
.splitCsv(header: true, sep: '\t', strip: true)
.map{row -> [ row.gwas_id, file(row.gwas_vcf)]}
.set { gwas_vcf_ch }
Channel.fromPath(params.gwas_lift_chain)
.ifEmpty { error "Cannot find gwas_vcf: ${params.gwas_lift_chain}" }
.set { gwas_lift_chain_ch }
Channel.fromPath(params.hg38_ref_genome)
.ifEmpty { error "Cannot find gwas_vcf: ${params.hg38_ref_genome}" }
.set { hg38_ref_genome_ch }
// Channel.fromPath(params.gene_variant_list)
// .ifEmpty { error "Cannot find gene_variant_list: ${params.gene_variant_list}" }
// .set { gene_variant_list_ch }
Channel.fromPath(params.qtl_ss_tsv)
.ifEmpty { error "Cannot find gene_variant_list: ${params.eqtl_summ_stats_path}" }
.splitCsv(header: true, sep: '\t', strip: true)
.map{row -> [ row.qtl_subset, file(row.qtl_ss), file(row.qtl_ss_index), file(row.qtl_credible_set_or_perm)]}
.set{ eqtl_summ_stats_raw_ch }
// .branch {
// extract_lead_var_pairs_ch: params.use_permutation
// eqtl_summ_stats_ch: !params.use_permutation
// }
if(params.use_permutation){
eqtl_summ_stats_raw_ch.set { extract_lead_var_pairs_ch }
} else {
eqtl_summ_stats_raw_ch.set { eqtl_summ_stats_ch }
}
process lift_to_GRCh38{
tag "${gwas_id}"
//storeDir "/gpfs/hpc/projects/eQTLCatalogue/coloc/GRCh38_conv_GWAS_15Sept2020"
publishDir "${params.outdir}/GRCh38_conv/", mode: 'copy'
container 'quay.io/eqtlcatalogue/genimpute:v20.06.1'
input:
tuple val(gwas_id), file(gwas_vcf) from gwas_vcf_ch
file gwas_lift_chain from gwas_lift_chain_ch.collect()
file hg38_ref_genome from hg38_ref_genome_ch.collect()
output:
tuple val(gwas_id), file("${gwas_vcf.simpleName}.GRCh38.vcf") into tabix_index_ch
script:
"""
CrossMap.py vcf $gwas_lift_chain $gwas_vcf $hg38_ref_genome ${gwas_vcf.simpleName}.GRCh38.vcf
"""
}
process tabix_index_gwas{
tag "${gwas_id}"
//storeDir "/gpfs/hpc/projects/eQTLCatalogue/coloc/GRCh38_conv_GWAS_15Sept2020"
publishDir "${params.outdir}/GRCh38_conv/", mode: 'copy'
container = 'quay.io/eqtlcatalogue/qtlmap:v20.05.1'
input:
tuple val(gwas_id), file(vcf_GRCh38) from tabix_index_ch
output:
tuple val(gwas_id), file("${vcf_GRCh38.simpleName}.GRCh38.sorted.vcf.gz"), file("${vcf_GRCh38.simpleName}.GRCh38.sorted.vcf.gz.tbi") into gwas_summstats_GRCh38
script:
"""
bcftools sort -o ${vcf_GRCh38.simpleName}.GRCh38.sorted.vcf.gz -O z $vcf_GRCh38
tabix -p vcf ${vcf_GRCh38.simpleName}.GRCh38.sorted.vcf.gz
"""
}
if (params.use_permutation) {
process extract_lead_var_pairs{
tag "${qtl_subset}"
publishDir "${params.outdir}/leadpairs/", mode: 'copy', pattern: "*.leadpairs.tsv"
container = 'quay.io/eqtlcatalogue/colocalisation:v21.01.1'
input:
tuple val(qtl_subset), file(eqtl_ss), file(eqtl_ss_index), file(perm_res) from extract_lead_var_pairs_ch
output:
tuple val(qtl_subset), file(eqtl_ss), file(eqtl_ss_index), file("${qtl_subset}.leadpairs.tsv") into eqtl_summ_stats_ch
script:
"""
#!/usr/bin/env Rscript
library(dplyr)
permutation_df <-readr::read_tsv('$perm_res', trim_ws = TRUE)
permutation_df <- permutation_df %>%
mutate(FDR = p.adjust(p = p_beta, method = 'fdr')) %>%
filter(FDR < 0.01)
readr::write_tsv(permutation_df %>% select(molecular_trait_id, variant, chromosome, position), '${qtl_subset}.leadpairs.tsv')
"""
}
}
process run_coloc{
tag "${gwas_id}_${qtl_subset}"
// publishDir "${params.outdir}/coloc_results_batch/", mode: 'copy'
container 'quay.io/eqtlcatalogue/colocalisation:v21.01.1'
input:
each batch_index from 1..params.n_batches
tuple val(qtl_subset), file(eqtl_ss), file(eqtl_ss_index), file(lead_pairs), val(gwas_id), file(gwas_sumstats), file(gwas_sumstats_index) from eqtl_summ_stats_ch.combine(gwas_summstats_GRCh38)
// file lead_pairs from gene_variant_list_ch.collect()
output:
tuple val(gwas_id), val("${gwas_id}_${qtl_subset}"), file("${gwas_id}_${qtl_subset}_${batch_index}_${params.n_batches}.tsv") into batch_files_merge_coloc_results
script:
"""
Rscript $baseDir/bin/run_coloc_batch.R \\
--gwas_sumstats $gwas_sumstats \\
--gwas_id $gwas_id \\
--qtl_sumstats $eqtl_ss \\
--qtl_subset $qtl_subset \\
--lead_pairs $lead_pairs \\
--window_coloc ${params.cis_window} \\
--chunk '$batch_index ${params.n_batches}' \\
--output_prefix ${gwas_id}_${qtl_subset}_${batch_index}_${params.n_batches}.tsv \\
--outdir .
"""
}
process merge_coloc_results{
publishDir "${params.outdir}/coloc_results_merged/${gwas_id}", mode: 'copy'
container 'quay.io/eqtlcatalogue/qtlmap:v20.05.1'
input:
tuple gwas_id, gwas_qtl_subset, file(gwas_qtl_subset_coloc_results_batch_files) from batch_files_merge_coloc_results.groupTuple(by: [1, 0], size: params.n_batches, sort: true)
output:
tuple gwas_qtl_subset, file("${gwas_qtl_subset}.txt.gz")
script:
"""
awk 'NR == 1 || FNR > 1{print}' ${gwas_qtl_subset_coloc_results_batch_files.join(' ')} | bgzip -c > ${gwas_qtl_subset}.txt.gz
"""
}
workflow.onComplete {
println ( workflow.success ? "Done!" : "Oops ... something went wrong" )
}