forked from joelplourde3/clin-pipelines
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FileImport.scala
70 lines (63 loc) · 2.98 KB
/
FileImport.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package bio.ferlab.clin.etl
import bio.ferlab.clin.etl.conf.FerloadConf
import bio.ferlab.clin.etl.fhir.FhirClient.buildFhirClients
import bio.ferlab.clin.etl.fhir.IClinFhirClient
import bio.ferlab.clin.etl.s3.S3Utils
import bio.ferlab.clin.etl.s3.S3Utils.buildS3Client
import bio.ferlab.clin.etl.task.fileimport.model.{FileEntry, Metadata, TBundle}
import bio.ferlab.clin.etl.task.fileimport.{BuildBundle, CheckS3Data}
import ca.uhn.fhir.rest.client.api.IGenericClient
import cats.data.ValidatedNel
import cats.implicits.catsSyntaxTuple2Semigroupal
import software.amazon.awssdk.services.s3.S3Client
import java.time.LocalDateTime
object FileImport extends App {
withSystemExit {
withLog {
withConf { conf =>
val (prefix, dryRun) = args match {
case Array(b) => (b, false)
case Array(b, "true") => (b, true)
case Array(b, "false") => (b, false)
}
implicit val s3Client: S3Client = buildS3Client(conf.aws)
val (clinClient, client) = buildFhirClients(conf.fhir, conf.keycloak)
val bucket = conf.aws.bucketName
withReport(bucket, prefix) { reportPath =>
run(bucket, prefix, conf.aws.outputBucketName, conf.aws.outputPrefix, reportPath, dryRun)(s3Client, client, clinClient, conf.ferload)
}
}
}
}
def writeAheadLog(inputBucket: String, reportPath: String, bundle: TBundle, files: Seq[FileEntry])(implicit s3: S3Client, client: IGenericClient): Unit = {
S3Utils.writeContent(inputBucket, s"$reportPath/bundle.json", bundle.print())
val filesToCSV = files.map(f => s"${f.key},${f.id}").mkString("\n")
S3Utils.writeContent(inputBucket, s"$reportPath/files.csv", filesToCSV)
}
def run(inputBucket: String, inputPrefix: String, outputBucket: String, outputPrefix: String, reportPath: String, dryRun: Boolean)(implicit s3: S3Client, client: IGenericClient, clinFhirClient: IClinFhirClient, ferloadConf: FerloadConf) = {
val metadata: ValidatedNel[String, Metadata] = Metadata.validateMetadataFile(inputBucket, inputPrefix)
metadata.andThen { m: Metadata =>
val rawFileEntries = CheckS3Data.loadRawFileEntries(inputBucket, inputPrefix)
val fileEntries = CheckS3Data.loadFileEntries(m, rawFileEntries, outputPrefix)
val results = (BuildBundle.validate(m, fileEntries), CheckS3Data.validateFileEntries(rawFileEntries, fileEntries))
if (!dryRun) {
results
.mapN { (bundle, files) =>
try {
//In case something bad happen in the distributed transaction, we store the modification brings to the resource (FHIR and S3 objects)
writeAheadLog(inputBucket, reportPath, bundle, files)
CheckS3Data.copyFiles(files, outputBucket)
bundle.save()
} catch {
case e: Exception =>
CheckS3Data.revert(files, outputBucket)
throw e
}
}
} else {
results
.mapN { (bundle, files) => (bundle, files) }
}
}
}
}