Skip to content

Commit

Permalink
Open some private API on Executor (#397)
Browse files Browse the repository at this point in the history
I don't know why some function had the XA passed as argument
since Executor instance itself maintain handle a transactor, but
it makes it more complex to call these function from the outside
so I fixed it as well.
  • Loading branch information
guillaumebort authored May 24, 2019
1 parent 315ff19 commit 44b41e7
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.")
val writeClasspath = taskKey[File]("Write the project classpath to a file.")

val VERSION = "0.9.7"
val VERSION = "0.9.8"

lazy val catsCore = "1.5.0"
lazy val circe = "0.10.1"
Expand Down
16 changes: 7 additions & 9 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ object RetryStrategy {
}
}

private[cuttle] sealed trait ExecutionStatus
private[cuttle] object ExecutionStatus {
sealed trait ExecutionStatus
object ExecutionStatus {
implicit val ExecutionStatusMeta: Meta[ExecutionStatus] =
Meta[Boolean].imap(x => if (x) ExecutionSuccessful else ExecutionFailed: ExecutionStatus) {
case ExecutionSuccessful => true
Expand Down Expand Up @@ -570,16 +570,15 @@ class Executor[S <: Scheduling] (
execution.toExecutionLog(executionStatus).copy(failing = Some(failingJob))
})

private[cuttle] def archivedExecutionsSize(jobs: Set[String]): IO[Int] =
def archivedExecutionsSize(jobs: Set[String]): IO[Int] =
queries.getExecutionLogSize(jobs).transact(xa)

private[cuttle] def archivedExecutions(queryContexts: Fragment,
def archivedExecutions(queryContexts: Fragment,
jobs: Set[String],
sort: String,
asc: Boolean,
offset: Int,
limit: Int,
xa: XA): IO[Seq[ExecutionLog]] =
limit: Int): IO[Seq[ExecutionLog]] =
queries.getExecutionLog(queryContexts, jobs, sort, asc, offset, limit).transact(xa)

/**
Expand All @@ -589,8 +588,7 @@ class Executor[S <: Scheduling] (
sort: String,
asc: Boolean,
offset: Int,
limit: Int,
xa: XA): IO[Seq[ExecutionLog]] =
limit: Int): IO[Seq[ExecutionLog]] =
queries.getRawExecutionLog(jobs, sort, asc, offset, limit).transact(xa)

private[cuttle] def cancelExecution(executionId: String)(implicit user: User): Unit = {
Expand All @@ -615,7 +613,7 @@ class Executor[S <: Scheduling] (
case (a: Option[ExecutionLog]) => IO.pure(a)
}

private[cuttle] def openStreams(executionId: String): fs2.Stream[IO, Byte] =
def openStreams(executionId: String): fs2.Stream[IO, Byte] =
ExecutionStreams.getStreams(executionId, queries, xa)

private[cuttle] def relaunch(jobs: Set[String])(implicit user: User): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions cron/src/main/scala/com/criteo/cuttle/cron/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ package object cron {
job: CronJob,
startDate: Instant,
endDate: Instant,
limit: Int)(implicit transactor: XA) =
limit: Int) =
for {
archived <- executor.rawArchivedExecutions(Set(job.id), "", asc = false, 0, limit, transactor)
archived <- executor.rawArchivedExecutions(Set(job.id), "", asc = false, 0, limit)
running <- IO(executor.runningExecutions.collect {
case (e, status)
if e.job.id == job.id && e.context.instant.isAfter(startDate) && e.context.instant.isBefore(endDate) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject,
q.sort.column,
q.sort.asc,
q.offset,
q.limit,
xa)
q.limit)
.map(execs => execs.asJson)
case _ =>
IO(executions.asJson)
Expand Down Expand Up @@ -607,7 +606,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject,
val requestedInterval = Interval(startDate, endDate)
val contextQuery = Database.sqlGetContextsBetween(Some(startDate), Some(endDate))
val archivedExecutions =
executor.archivedExecutions(contextQuery, Set(jobId), "", asc = true, 0, Int.MaxValue, xa)
executor.archivedExecutions(contextQuery, Set(jobId), "", asc = true, 0, Int.MaxValue)
val runningExecutions = executor.runningExecutions
.filter {
case (e, _) =>
Expand Down

0 comments on commit 44b41e7

Please sign in to comment.