Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lagt til støtte for å kunne kjøre tasks direkte etter lagring #478

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@ import no.nav.familie.prosessering.domene.TaskLoggMetadata
import no.nav.familie.prosessering.domene.TaskLoggRepository
import no.nav.familie.prosessering.domene.TaskRepository
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Lazy
import org.springframework.data.domain.Pageable
import org.springframework.data.repository.findByIdOrNull
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
import org.springframework.transaction.support.TransactionSynchronizationManager
import java.io.IOException
import java.time.LocalDateTime

/**
* @param taskTransactionSynchronization lazy pga circular dependencies
* TaskX -> TaskService -> TaskTransactionSynchronization -> TaskStepExecutorService -> TaskWorker -> TaskX
*/
@Component
class TaskService internal constructor(
private val taskRepository: TaskRepository,
private val taskLoggRepository: TaskLoggRepository,
@Lazy
private val taskTransactionSynchronization: TaskTransactionSynchronization
) {

private val logger = LoggerFactory.getLogger(javaClass)
Expand All @@ -31,6 +39,15 @@ class TaskService internal constructor(
return taskRepository.findByIdOrNull(id) ?: error("Task med id: $id ikke funnet.")
}

/**
* Oppretter task og trigger kjøring av task etter commit
*/
@Transactional
fun saveAndRun(task: Task): Task {
TransactionSynchronizationManager.registerSynchronization(taskTransactionSynchronization)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hva gjør dette?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stemmer det at dette overstyrer task synkronisteringen slik at vi overstyrer afterCommit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overstyrer ikke, vi legger til en hendelse i afterCommit

return save(task)
}

/**
* Brukes for å opprette task
*/
Expand Down Expand Up @@ -85,7 +102,11 @@ class TaskService internal constructor(
return taskRepository.findByStatusIn(status, page)
}

fun finnTasksMedStatus(status: List<Status>, type: String? = null, page: Pageable = Pageable.unpaged()): List<Task> {
fun finnTasksMedStatus(
status: List<Status>,
type: String? = null,
page: Pageable = Pageable.unpaged(),
): List<Task> {
return if (type == null) {
taskRepository.findByStatusIn(status, page)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.springframework.stereotype.Service
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.math.min

/**
Expand Down Expand Up @@ -49,7 +50,10 @@ class TaskStepExecutorService(
* Pga [ThreadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown] som settes til true for å håndtere att tasker
* har mulighet til å kjøre klart, så er også fortsatt mulig å legge til flere tasks, som vi ikke ønsker
*/
@Volatile private var isShuttingDown = false
@Volatile
private var isShuttingDown = false

private var isRunning = AtomicBoolean(false)

override fun onApplicationEvent(event: ContextClosedEvent) {
isShuttingDown = true
Expand All @@ -58,13 +62,21 @@ class TaskStepExecutorService(
@Scheduled(fixedDelayString = "\${prosessering.fixedDelayString.in.milliseconds:30000}")
fun pollAndExecute() {
if (!enabled) return
if (!isRunning.compareAndSet(false, true)) {
log.debug("Kjører allerede")
blommish marked this conversation as resolved.
Show resolved Hide resolved
return
}

while (true) {
if (isShuttingDown) {
log.info("Shutting down, does not start new pollAndExecuteTasks")
return
try {
while (true) {
if (isShuttingDown) {
log.info("Shutting down, does not start new pollAndExecuteTasks")
return
}
if (!pollAndExecuteTasks()) return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hvorfor trekkes denne inn i while(true)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Den fantes der fra før, det har bare blitt noen flere spaces pga wrap med try catch :)
Den er der fra før for å kunne håndtere at man skal prosessere forløpende (continuousRunning)

}
if (!pollAndExecuteTasks()) return
} finally {
isRunning.set(false)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package no.nav.familie.prosessering.internal

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.transaction.support.TransactionSynchronization

@Component
class TaskTransactionSynchronization(private val taskStepExecutorService: TaskStepExecutorService) :
TransactionSynchronization {

private val logger = LoggerFactory.getLogger(javaClass)
override fun afterCommit() {
logger.debug("Kaller på pollAndExecute")
taskStepExecutorService.pollAndExecute()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Denne kaller ikke nødvendigvis på tasken du akkurat har lagret ned, men tar "neste på lista". Det er jo greit nok for de aller fleste tilfellene, men kan være litt misvisende at det heter saveAndRun - ettersom det egentlig er saveAndPoll.

Eks:

  1. EF kjører g-omregning med X tusen tasker
  2. En saveAndRun på oppdaterOppgave blir opprettet
  3. Da kommer trolig ikke oppdaterOppgaveTasken til å bli kjørt "med en gang"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Det er sant, har fikset i 1a2837f

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package no.nav.familie.prosessering.internal

import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import no.nav.familie.prosessering.IntegrationRunnerTest
import no.nav.familie.prosessering.domene.Status
import no.nav.familie.prosessering.domene.Task
import no.nav.familie.prosessering.task.TaskStep2
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.transaction.TestTransaction

class TaskServiceIntegrationTest : IntegrationRunnerTest() {

private lateinit var loggingEvents: List<ILoggingEvent>

@Autowired
lateinit var taskService: TaskService

private val task = Task(TaskStep2.TASK_2, "{'a'='b'}")

@BeforeEach
fun setUp() {
val listAppender = ListAppender<ILoggingEvent>()
(LoggerFactory.getLogger(TaskTransactionSynchronization::class.java) as Logger).addAppender(listAppender)
listAppender.start()
loggingEvents = listAppender.list
}

@Test
fun `save skal ikke kjøre task direkte`() {
val task = taskService.save(task)
TestTransaction.flagForCommit()
TestTransaction.end()

Thread.sleep(2000)

assertThat(taskService.findById(task.id).status).isEqualTo(Status.UBEHANDLET)
assertThat(loggingEvents.filter { it.message == "Kaller på pollAndExecute" }).isEmpty()
}

@Test
fun `saveAndRun skal kjøre task direkte`() {
val task = taskService.saveAndRun(task)
TestTransaction.flagForCommit()
TestTransaction.end()

Thread.sleep(2000)

assertThat(taskService.findById(task.id).status).isEqualTo(Status.FERDIG)
assertThat(loggingEvents.filter { it.message == "Kaller på pollAndExecute" }).hasSize(1)
}

@Test
fun `saveAndRun skal kjøre task direkte, og kun en gang per tråd`() {
taskService.saveAndRun(task)
taskService.saveAndRun(task)
TestTransaction.flagForCommit()
TestTransaction.end()

Thread.sleep(500)

assertThat(taskService.findAll().map { it.status }).containsExactly(Status.FERDIG, Status.FERDIG)
assertThat(loggingEvents.filter { it.message == "Kaller på pollAndExecute" }).hasSize(1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal class TaskServiceTest {

private val taskRepository = mockk<TaskRepository>()
private val taskLoggRepository = mockk<TaskLoggRepository>()
private val service = TaskService(taskRepository, taskLoggRepository)
private val service = TaskService(taskRepository, taskLoggRepository, mockk())

@Test
fun tomListeGirTomtResultat() {
Expand Down
3 changes: 2 additions & 1 deletion prosessering-core/src/test/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ spring:
max-lifetime: 30000
minimum-idle: 1

prosessering.continuousRunning.enabled: true
prosessering:
continuousRunning.enabled: true
11 changes: 11 additions & 0 deletions prosessering-core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<!-- override spring base logging pattern -->
<property name="CONSOLE_LOG_PATTERN"
value="%d [%-5level] [%thread] %logger{5} %replace(- [%X{consumerId}, %X{callId}, %X{userId}] ){'- \[, , \] ',''}- %m%n"/>
<include resource="org/springframework/boot/logging/logback/base.xml"/>


<logger name="no" level="TRACE"/>
</configuration>