-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lagt til støtte for å kunne kjøre tasks direkte etter lagring #478
base: main
Are you sure you want to change the base?
The head ref may contain hidden characters: "kj\u00F8r-direkte"
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ import org.springframework.stereotype.Service | |
import java.util.concurrent.CompletableFuture | ||
import java.util.concurrent.TimeUnit | ||
import java.util.concurrent.TimeoutException | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import kotlin.math.min | ||
|
||
/** | ||
|
@@ -49,7 +50,10 @@ class TaskStepExecutorService( | |
* Pga [ThreadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown] som settes til true for å håndtere att tasker | ||
* har mulighet til å kjøre klart, så er også fortsatt mulig å legge til flere tasks, som vi ikke ønsker | ||
*/ | ||
@Volatile private var isShuttingDown = false | ||
@Volatile | ||
private var isShuttingDown = false | ||
|
||
private var isRunning = AtomicBoolean(false) | ||
|
||
override fun onApplicationEvent(event: ContextClosedEvent) { | ||
isShuttingDown = true | ||
|
@@ -58,13 +62,21 @@ class TaskStepExecutorService( | |
@Scheduled(fixedDelayString = "\${prosessering.fixedDelayString.in.milliseconds:30000}") | ||
fun pollAndExecute() { | ||
if (!enabled) return | ||
if (!isRunning.compareAndSet(false, true)) { | ||
log.debug("Kjører allerede") | ||
blommish marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
|
||
while (true) { | ||
if (isShuttingDown) { | ||
log.info("Shutting down, does not start new pollAndExecuteTasks") | ||
return | ||
try { | ||
while (true) { | ||
if (isShuttingDown) { | ||
log.info("Shutting down, does not start new pollAndExecuteTasks") | ||
return | ||
} | ||
if (!pollAndExecuteTasks()) return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hvorfor trekkes denne inn i while(true)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Den fantes der fra før, det har bare blitt noen flere spaces pga wrap med try catch :) |
||
} | ||
if (!pollAndExecuteTasks()) return | ||
} finally { | ||
isRunning.set(false) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package no.nav.familie.prosessering.internal | ||
|
||
import org.slf4j.LoggerFactory | ||
import org.springframework.stereotype.Component | ||
import org.springframework.transaction.support.TransactionSynchronization | ||
|
||
@Component | ||
class TaskTransactionSynchronization(private val taskStepExecutorService: TaskStepExecutorService) : | ||
TransactionSynchronization { | ||
|
||
private val logger = LoggerFactory.getLogger(javaClass) | ||
override fun afterCommit() { | ||
logger.debug("Kaller på pollAndExecute") | ||
taskStepExecutorService.pollAndExecute() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Denne kaller ikke nødvendigvis på tasken du akkurat har lagret ned, men tar "neste på lista". Det er jo greit nok for de aller fleste tilfellene, men kan være litt misvisende at det heter Eks:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Det er sant, har fikset i 1a2837f |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package no.nav.familie.prosessering.internal | ||
|
||
import ch.qos.logback.classic.Logger | ||
import ch.qos.logback.classic.spi.ILoggingEvent | ||
import ch.qos.logback.core.read.ListAppender | ||
import no.nav.familie.prosessering.IntegrationRunnerTest | ||
import no.nav.familie.prosessering.domene.Status | ||
import no.nav.familie.prosessering.domene.Task | ||
import no.nav.familie.prosessering.task.TaskStep2 | ||
import org.assertj.core.api.Assertions.assertThat | ||
import org.junit.jupiter.api.BeforeEach | ||
import org.junit.jupiter.api.Test | ||
import org.slf4j.LoggerFactory | ||
import org.springframework.beans.factory.annotation.Autowired | ||
import org.springframework.test.context.transaction.TestTransaction | ||
|
||
class TaskServiceIntegrationTest : IntegrationRunnerTest() { | ||
|
||
private lateinit var loggingEvents: List<ILoggingEvent> | ||
|
||
@Autowired | ||
lateinit var taskService: TaskService | ||
|
||
private val task = Task(TaskStep2.TASK_2, "{'a'='b'}") | ||
|
||
@BeforeEach | ||
fun setUp() { | ||
val listAppender = ListAppender<ILoggingEvent>() | ||
(LoggerFactory.getLogger(TaskTransactionSynchronization::class.java) as Logger).addAppender(listAppender) | ||
listAppender.start() | ||
loggingEvents = listAppender.list | ||
} | ||
|
||
@Test | ||
fun `save skal ikke kjøre task direkte`() { | ||
val task = taskService.save(task) | ||
TestTransaction.flagForCommit() | ||
TestTransaction.end() | ||
|
||
Thread.sleep(2000) | ||
|
||
assertThat(taskService.findById(task.id).status).isEqualTo(Status.UBEHANDLET) | ||
assertThat(loggingEvents.filter { it.message == "Kaller på pollAndExecute" }).isEmpty() | ||
} | ||
|
||
@Test | ||
fun `saveAndRun skal kjøre task direkte`() { | ||
val task = taskService.saveAndRun(task) | ||
TestTransaction.flagForCommit() | ||
TestTransaction.end() | ||
|
||
Thread.sleep(2000) | ||
|
||
assertThat(taskService.findById(task.id).status).isEqualTo(Status.FERDIG) | ||
assertThat(loggingEvents.filter { it.message == "Kaller på pollAndExecute" }).hasSize(1) | ||
} | ||
|
||
@Test | ||
fun `saveAndRun skal kjøre task direkte, og kun en gang per tråd`() { | ||
taskService.saveAndRun(task) | ||
taskService.saveAndRun(task) | ||
TestTransaction.flagForCommit() | ||
TestTransaction.end() | ||
|
||
Thread.sleep(500) | ||
|
||
assertThat(taskService.findAll().map { it.status }).containsExactly(Status.FERDIG, Status.FERDIG) | ||
assertThat(loggingEvents.filter { it.message == "Kaller på pollAndExecute" }).hasSize(1) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<configuration> | ||
|
||
<!-- override spring base logging pattern --> | ||
<property name="CONSOLE_LOG_PATTERN" | ||
value="%d [%-5level] [%thread] %logger{5} %replace(- [%X{consumerId}, %X{callId}, %X{userId}] ){'- \[, , \] ',''}- %m%n"/> | ||
<include resource="org/springframework/boot/logging/logback/base.xml"/> | ||
|
||
|
||
<logger name="no" level="TRACE"/> | ||
</configuration> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hva gjør dette?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stemmer det at dette overstyrer task synkronisteringen slik at vi overstyrer
afterCommit
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overstyrer ikke, vi legger til en hendelse i afterCommit