Skip to content

Commit

Permalink
when consumer is closed, give a signal
Browse files Browse the repository at this point in the history
  • Loading branch information
tuantrannav committed Nov 2, 2023
1 parent 4c6a3a6 commit a85e87a
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class KafkaRapid(
private val running = AtomicBoolean(Stopped)
private val ready = AtomicBoolean(false)
private val producerClosed = AtomicBoolean(false)
private val consumerClosed = AtomicBoolean(false)

private val stringDeserializer = StringDeserializer()
private val stringSerializer = StringSerializer()
Expand Down Expand Up @@ -58,7 +59,7 @@ class KafkaRapid(

fun isRunning() = running.get()
fun isReady() = isRunning() && ready.get()
fun isConsumerClosed() = !isRunning()
fun isConsumerClosed() = consumerClosed.get()
fun isProducerClosed() = producerClosed.get()

override fun publish(message: String) {
Expand Down Expand Up @@ -231,6 +232,7 @@ class KafkaRapid(
} else {
log.info("stopped consuming messages after receiving stop signal")
}
consumerClosed.set(true)
tryAndLog(consumer::unsubscribe)
tryAndLog(consumer::close)
}
Expand Down

0 comments on commit a85e87a

Please sign in to comment.