diff --git a/README.md b/README.md index caecfa8..258f10b 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Kuvasz (pronounce as [ˈkuvɒs]) is an ancient hungarian breed of livestock & gu - Uptime & latency monitoring with a configurable interval - Email notifications through SMTP - Slack notifications through webhoooks +- Telegram notifications through the Bot API - Configurable data retention period ### Under development 🚧 diff --git a/examples/docker-compose/docker-compose.yml b/examples/docker-compose/docker-compose.yml index 49d2f17..378aef7 100644 --- a/examples/docker-compose/docker-compose.yml +++ b/examples/docker-compose/docker-compose.yml @@ -28,3 +28,6 @@ services: ENABLE_SLACK_EVENT_HANDLER: 'true' SLACK_WEBHOOK_URL: 'https://your.slack-webhook.url' DATA_RETENTION_DAYS: 30 + ENABLE_TELEGRAM_EVENT_HANDLER: 'true' + TELEGRAM_API_TOKEN: '1232312321321:GJKGHjhklfdhsklHKLFH' + TELEGRAM_CHAT_ID: '1234567890' diff --git a/examples/k8s/kuvasz.configmap.yml b/examples/k8s/kuvasz.configmap.yml index f42f842..f7bdc18 100644 --- a/examples/k8s/kuvasz.configmap.yml +++ b/examples/k8s/kuvasz.configmap.yml @@ -18,3 +18,4 @@ data: slack_event_handler_enabled: "true" slack_webhook_url: "https://your.slack-webhook.url" data_retention_days: "30" + telegram_event_handler_enabled: "true" diff --git a/examples/k8s/kuvasz.deployment.yaml b/examples/k8s/kuvasz.deployment.yaml index 3275e72..ac6bc3d 100644 --- a/examples/k8s/kuvasz.deployment.yaml +++ b/examples/k8s/kuvasz.deployment.yaml @@ -132,3 +132,18 @@ spec: configMapKeyRef: name: kuvasz-config key: data_retention_days + - name: ENABLE_TELEGRAM_EVENT_HANDLER + valueFrom: + configMapKeyRef: + name: kuvasz-config + key: telegram_event_handler_enabled + - name: TELEGRAM_API_TOKEN + valueFrom: + secretKeyRef: + name: telegram-credentials + key: api-token + - name: TELEGRAM_CHAT_ID + valueFrom: + secretKeyRef: + name: telegram-credentials + key: chat-id diff --git a/src/main/kotlin/com/kuvaszuptime/kuvasz/config/handlers/TelegramEventHandlerConfig.kt b/src/main/kotlin/com/kuvaszuptime/kuvasz/config/handlers/TelegramEventHandlerConfig.kt new file mode 100644 index 0000000..71b4530 --- /dev/null +++ b/src/main/kotlin/com/kuvaszuptime/kuvasz/config/handlers/TelegramEventHandlerConfig.kt @@ -0,0 +1,18 @@ +package com.kuvaszuptime.kuvasz.config.handlers + +import io.micronaut.context.annotation.ConfigurationProperties +import io.micronaut.core.annotation.Introspected +import javax.inject.Singleton +import javax.validation.constraints.NotBlank + +@ConfigurationProperties("handler-config.telegram-event-handler") +@Singleton +@Introspected +class TelegramEventHandlerConfig { + + @NotBlank + var token: String = "" + + @NotBlank + var chatId: String = "" +} diff --git a/src/main/kotlin/com/kuvaszuptime/kuvasz/handlers/TelegramEventHandler.kt b/src/main/kotlin/com/kuvaszuptime/kuvasz/handlers/TelegramEventHandler.kt new file mode 100644 index 0000000..bb97dcf --- /dev/null +++ b/src/main/kotlin/com/kuvaszuptime/kuvasz/handlers/TelegramEventHandler.kt @@ -0,0 +1,84 @@ +package com.kuvaszuptime.kuvasz.handlers + +import com.kuvaszuptime.kuvasz.config.handlers.TelegramEventHandlerConfig +import com.kuvaszuptime.kuvasz.models.MonitorDownEvent +import com.kuvaszuptime.kuvasz.models.MonitorUpEvent +import com.kuvaszuptime.kuvasz.models.TelegramAPIMessage +import com.kuvaszuptime.kuvasz.models.UptimeMonitorEvent +import com.kuvaszuptime.kuvasz.models.runWhenStateChanges +import com.kuvaszuptime.kuvasz.models.toEmoji +import com.kuvaszuptime.kuvasz.models.toStructuredMessage +import com.kuvaszuptime.kuvasz.services.EventDispatcher +import com.kuvaszuptime.kuvasz.services.TelegramAPIService +import io.micronaut.context.annotation.Context +import io.micronaut.context.annotation.Requires +import io.micronaut.http.HttpResponse +import io.micronaut.http.client.exceptions.HttpClientResponseException +import io.micronaut.scheduling.TaskExecutors +import io.micronaut.scheduling.annotation.ExecuteOn +import io.reactivex.Flowable +import org.slf4j.LoggerFactory + +@Context +@Requires(property = "handler-config.telegram-event-handler.enabled", value = "true") +class TelegramEventHandler( + private val telegramAPIService: TelegramAPIService, + private val telegramEventHandlerConfig: TelegramEventHandlerConfig, + private val eventDispatcher: EventDispatcher +) { + companion object { + private val logger = LoggerFactory.getLogger(TelegramEventHandler::class.java) + } + + init { + subscribeToEvents() + } + + @ExecuteOn(TaskExecutors.IO) + private fun subscribeToEvents() { + eventDispatcher.subscribeToMonitorUpEvents { event -> + logger.debug("A MonitorUpEvent has been received for monitor with ID: ${event.monitor.id}") + event.runWhenStateChanges { telegramAPIService.sendMessage(it.toTelegramMessage()).handleResponse() } + } + eventDispatcher.subscribeToMonitorDownEvents { event -> + logger.debug("A MonitorDownEvent has been received for monitor with ID: ${event.monitor.id}") + event.runWhenStateChanges { telegramAPIService.sendMessage(it.toTelegramMessage()).handleResponse() } + } + } + + private fun UptimeMonitorEvent.toTelegramMessage(): TelegramAPIMessage = + TelegramAPIMessage( + text = "${toEmoji()} ${toHTMLMessage()}", + chat_id = telegramEventHandlerConfig.chatId + ) + + private fun Flowable>.handleResponse() = + subscribe( + { + logger.debug("A Telegram message to your configured webhook has been successfully sent") + }, + { ex -> + if (ex is HttpClientResponseException) { + val responseBody = ex.response.getBody(String::class.java) + logger.error("Telegram message cannot be delivered due to an error: $responseBody") + } + } + ) + + private fun UptimeMonitorEvent.toHTMLMessage() = + when (this) { + is MonitorUpEvent -> toStructuredMessage().let { details -> + listOfNotNull( + "${details.summary}", + "${details.latency}", + details.previousDownTime.orNull() + ) + } + is MonitorDownEvent -> toStructuredMessage().let { details -> + listOfNotNull( + "${details.summary}", + details.previousUpTime.orNull() + ) + } + }.joinToString("\n") +} diff --git a/src/main/kotlin/com/kuvaszuptime/kuvasz/models/TelegramAPIMessage.kt b/src/main/kotlin/com/kuvaszuptime/kuvasz/models/TelegramAPIMessage.kt new file mode 100644 index 0000000..68e10f2 --- /dev/null +++ b/src/main/kotlin/com/kuvaszuptime/kuvasz/models/TelegramAPIMessage.kt @@ -0,0 +1,12 @@ +package com.kuvaszuptime.kuvasz.models + +import io.micronaut.core.annotation.Introspected + +@Suppress("ConstructorParameterNaming") +@Introspected +data class TelegramAPIMessage( + val chat_id: String, + val text: String, + val disable_web_page_preview: Boolean = true, + val parse_mode: String = "HTML" +) diff --git a/src/main/kotlin/com/kuvaszuptime/kuvasz/services/TelegramAPIService.kt b/src/main/kotlin/com/kuvaszuptime/kuvasz/services/TelegramAPIService.kt new file mode 100644 index 0000000..3f7f2a9 --- /dev/null +++ b/src/main/kotlin/com/kuvaszuptime/kuvasz/services/TelegramAPIService.kt @@ -0,0 +1,41 @@ +package com.kuvaszuptime.kuvasz.services + +import com.kuvaszuptime.kuvasz.config.handlers.TelegramEventHandlerConfig +import com.kuvaszuptime.kuvasz.models.TelegramAPIMessage +import io.micronaut.context.annotation.Requires +import io.micronaut.context.event.ShutdownEvent +import io.micronaut.core.type.Argument +import io.micronaut.http.HttpRequest +import io.micronaut.http.HttpResponse +import io.micronaut.http.client.RxHttpClient +import io.micronaut.runtime.event.annotation.EventListener +import io.reactivex.Flowable +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +@Requires(property = "handler-config.telegram-event-handler.enabled", value = "true") +class TelegramAPIService @Inject constructor( + telegramEventHandlerConfig: TelegramEventHandlerConfig, + private val httpClient: RxHttpClient +) { + private val url = "https://api.telegram.org/bot" + telegramEventHandlerConfig.token + "/sendMessage" + + companion object { + private const val RETRY_COUNT = 3L + } + + fun sendMessage(message: TelegramAPIMessage): Flowable> { + val request: HttpRequest = HttpRequest.POST(url, message) + + return httpClient + .exchange(request, Argument.STRING, Argument.STRING) + .retry(RETRY_COUNT) + } + + @EventListener + @Suppress("UNUSED_PARAMETER") + internal fun onShutdownEvent(event: ShutdownEvent) { + httpClient.close() + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index eb14667..99b6a14 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -89,6 +89,10 @@ handler-config: slack-event-handler: enabled: ${ENABLE_SLACK_EVENT_HANDLER:`false`} webhook-url: ${SLACK_WEBHOOK_URL} + telegram-event-handler: + enabled: ${ENABLE_TELEGRAM_EVENT_HANDLER:`false`} + token: ${TELEGRAM_API_TOKEN} + chat-id: ${TELEGRAM_CHAT_ID} --- admin-auth: username: ${ADMIN_USER} diff --git a/src/test/kotlin/com/kuvaszuptime/kuvasz/config/TelegramEventHandlerConfigTest.kt b/src/test/kotlin/com/kuvaszuptime/kuvasz/config/TelegramEventHandlerConfigTest.kt new file mode 100644 index 0000000..8a1720a --- /dev/null +++ b/src/test/kotlin/com/kuvaszuptime/kuvasz/config/TelegramEventHandlerConfigTest.kt @@ -0,0 +1,65 @@ +package com.kuvaszuptime.kuvasz.config + +import io.kotest.assertions.exceptionToMessage +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.BehaviorSpec +import io.kotest.matchers.string.shouldContain +import io.micronaut.context.ApplicationContext +import io.micronaut.context.env.PropertySource +import io.micronaut.context.exceptions.BeanInstantiationException + +class TelegramEventHandlerConfigTest : BehaviorSpec({ + given("a TelegramEventHandlerConfig bean") { + `when`("there is no API token in the configuration") { + val properties = PropertySource.of( + "test", + mapOf( + "handler-config.telegram-event-handler.enabled" to "true", + "handler-config.telegram-event-handler.chat-id" to "chat-id" + ) + ) + then("ApplicationContext should throw a BeanInstantiationException") { + val exception = shouldThrow { + ApplicationContext.run(properties) + } + exceptionToMessage(exception) shouldContain + "Bean definition [com.kuvaszuptime.kuvasz.handlers.TelegramEventHandler] could not be loaded" + } + } + + `when`("there is no chat ID in the configuration") { + val properties = PropertySource.of( + "test", + mapOf( + "handler-config.telegram-event-handler.enabled" to "true", + "handler-config.telegram-event-handler.token" to "your-token" + ) + ) + then("ApplicationContext should throw a BeanInstantiationException") { + val exception = shouldThrow { + ApplicationContext.run(properties) + } + exceptionToMessage(exception) shouldContain + "Bean definition [com.kuvaszuptime.kuvasz.handlers.TelegramEventHandler] could not be loaded" + } + } + + `when`("chat ID and API token are empty strings") { + val properties = PropertySource.of( + "test", + mapOf( + "handler-config.telegram-event-handler.enabled" to "true", + "handler-config.telegram-event-handler.token" to "", + "handler-config.telegram-event-handler.chat-id" to "" + ) + ) + then("ApplicationContext should throw a BeanInstantiationException") { + val exception = shouldThrow { + ApplicationContext.run(properties) + } + exceptionToMessage(exception) shouldContain + "Bean definition [com.kuvaszuptime.kuvasz.handlers.TelegramEventHandler] could not be loaded" + } + } + } +}) diff --git a/src/test/kotlin/com/kuvaszuptime/kuvasz/handlers/TelegramEventHandlerTest.kt b/src/test/kotlin/com/kuvaszuptime/kuvasz/handlers/TelegramEventHandlerTest.kt new file mode 100644 index 0000000..4e0f509 --- /dev/null +++ b/src/test/kotlin/com/kuvaszuptime/kuvasz/handlers/TelegramEventHandlerTest.kt @@ -0,0 +1,257 @@ +package com.kuvaszuptime.kuvasz.handlers + +import arrow.core.Option +import com.kuvaszuptime.kuvasz.DatabaseBehaviorSpec +import com.kuvaszuptime.kuvasz.config.handlers.TelegramEventHandlerConfig +import com.kuvaszuptime.kuvasz.mocks.createMonitor +import com.kuvaszuptime.kuvasz.models.MonitorDownEvent +import com.kuvaszuptime.kuvasz.models.MonitorUpEvent +import com.kuvaszuptime.kuvasz.models.TelegramAPIMessage +import com.kuvaszuptime.kuvasz.repositories.MonitorRepository +import com.kuvaszuptime.kuvasz.repositories.UptimeEventRepository +import com.kuvaszuptime.kuvasz.services.EventDispatcher +import com.kuvaszuptime.kuvasz.services.TelegramAPIService +import com.kuvaszuptime.kuvasz.tables.UptimeEvent +import io.kotest.assertions.throwables.shouldNotThrowAny +import io.kotest.core.test.TestCase +import io.kotest.core.test.TestResult +import io.kotest.matchers.string.shouldContain +import io.micronaut.core.type.Argument +import io.micronaut.http.HttpResponse +import io.micronaut.http.HttpStatus +import io.micronaut.http.client.RxHttpClient +import io.micronaut.http.client.exceptions.HttpClientResponseException +import io.micronaut.test.annotation.MicronautTest +import io.mockk.clearAllMocks +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.spyk +import io.mockk.verify +import io.reactivex.Flowable + +@MicronautTest +class TelegramEventHandlerTest( + private val eventDispatcher: EventDispatcher, + private val monitorRepository: MonitorRepository, + private val uptimeEventRepository: UptimeEventRepository +) : DatabaseBehaviorSpec() { + private val mockHttpClient = mockk() + + init { + val eventHandlerConfig = TelegramEventHandlerConfig().apply { + token = "my_token" + chatId = "@channel" + } + val telegramAPIService = TelegramAPIService(eventHandlerConfig, mockHttpClient); + val apiServiceSpy = spyk(telegramAPIService, recordPrivateCalls = true) + TelegramEventHandler(apiServiceSpy, eventHandlerConfig, eventDispatcher) + + given("the TelegramEventHandler") { + `when`("it receives a MonitorUpEvent and There is no previous event for the monitor") { + val monitor = createMonitor(monitorRepository) + val event = MonitorUpEvent( + monitor = monitor, + status = HttpStatus.OK, + latency = 1000, + previousEvent = Option.empty() + ) + mockHttpResponse(HttpStatus.OK) + + eventDispatcher.dispatch(event); + + then("it should send a message about the event") { + val slot = slot(); + + verify(exactly = 1) { apiServiceSpy.sendMessage(capture(slot)) } + slot.captured.text shouldContain "Your monitor \"testMonitor\" (http://irrelevant.com) is UP (200)" + } + } + + `when`("it receives a MonitorDownEvent and there is no previous event for the monitor") { + val monitor = createMonitor(monitorRepository) + val event = MonitorDownEvent( + monitor = monitor, + status = HttpStatus.INTERNAL_SERVER_ERROR, + error = Throwable(), + previousEvent = Option.empty() + ) + mockHttpResponse(HttpStatus.OK) + + eventDispatcher.dispatch(event) + + then("it should send a message about the event") { + val slot = slot() + + verify(exactly = 1) { apiServiceSpy.sendMessage(capture(slot)) } + slot.captured.text shouldContain "Your monitor \"testMonitor\" (http://irrelevant.com) is DOWN" + } + } + + `when`("it receives a MonitorUpEvent and there is a previous event with the same status") { + val monitor = createMonitor(monitorRepository) + val firstEvent = MonitorUpEvent( + monitor = monitor, + status = HttpStatus.OK, + latency = 1000, + previousEvent = Option.empty() + ) + mockHttpResponse(HttpStatus.OK) + eventDispatcher.dispatch(firstEvent) + val firstUptimeRecord = uptimeEventRepository.fetchOne(UptimeEvent.UPTIME_EVENT.MONITOR_ID, monitor.id) + + val secondEvent = MonitorUpEvent( + monitor = monitor, + status = HttpStatus.OK, + latency = 1200, + previousEvent = Option.just(firstUptimeRecord) + ) + eventDispatcher.dispatch(secondEvent) + + then("it should send only one notification about them") { + val slot = slot() + + verify(exactly = 1) { apiServiceSpy.sendMessage(capture(slot)) } + slot.captured.text shouldContain "Latency: 1000ms" + } + } + + `when`("it receives a MonitorDownEvent and there is a previous event with the same status") { + val monitor = createMonitor(monitorRepository) + val firstEvent = MonitorDownEvent( + monitor = monitor, + status = HttpStatus.INTERNAL_SERVER_ERROR, + error = Throwable("First error"), + previousEvent = Option.empty() + ) + mockHttpResponse(HttpStatus.OK) + eventDispatcher.dispatch(firstEvent) + val firstUptimeRecord = uptimeEventRepository.fetchOne(UptimeEvent.UPTIME_EVENT.MONITOR_ID, monitor.id) + + val secondEvent = MonitorDownEvent( + monitor = monitor, + status = HttpStatus.NOT_FOUND, + error = Throwable("Second error"), + previousEvent = Option.just(firstUptimeRecord) + ) + eventDispatcher.dispatch(secondEvent) + + then("it should send only one notification about them") { + val slot = slot() + + verify(exactly = 1) { apiServiceSpy.sendMessage(capture(slot)) } + slot.captured.text shouldContain "(500)" + } + } + + `when`("it receives a MonitorUpEvent and there is a previous event with different status") { + val monitor = createMonitor(monitorRepository) + val firstEvent = MonitorDownEvent( + monitor = monitor, + status = HttpStatus.INTERNAL_SERVER_ERROR, + previousEvent = Option.empty(), + error = Throwable() + ) + mockHttpResponse(HttpStatus.OK) + eventDispatcher.dispatch(firstEvent) + val firstUptimeRecord = uptimeEventRepository.fetchOne(UptimeEvent.UPTIME_EVENT.MONITOR_ID, monitor.id) + + val secondEvent = MonitorUpEvent( + monitor = monitor, + status = HttpStatus.OK, + latency = 1000, + previousEvent = Option.just(firstUptimeRecord) + ) + eventDispatcher.dispatch(secondEvent) + + then("it should send two different notifications about them") { + val notificationsSent = mutableListOf() + + verify(exactly = 2) { apiServiceSpy.sendMessage(capture(notificationsSent)) } + notificationsSent[0].text shouldContain "is DOWN (500)" + notificationsSent[1].text shouldContain "Latency: 1000ms" + notificationsSent[1].text shouldContain "is UP (200)" + } + } + + `when`("it receives a MonitorDownEvent and there is a previous event with different status") { + val monitor = createMonitor(monitorRepository) + val firstEvent = MonitorUpEvent( + monitor = monitor, + status = HttpStatus.OK, + latency = 1000, + previousEvent = Option.empty() + ) + mockHttpResponse(HttpStatus.OK) + eventDispatcher.dispatch(firstEvent) + val firstUptimeRecord = uptimeEventRepository.fetchOne(UptimeEvent.UPTIME_EVENT.MONITOR_ID, monitor.id) + + val secondEvent = MonitorDownEvent( + monitor = monitor, + status = HttpStatus.INTERNAL_SERVER_ERROR, + previousEvent = Option.just(firstUptimeRecord), + error = Throwable() + ) + eventDispatcher.dispatch(secondEvent) + + then("it should send two different notifications about them") { + val notificationsSent = mutableListOf() + + verify(exactly = 2) { apiServiceSpy.sendMessage(capture(notificationsSent)) } + notificationsSent[0].text shouldContain "Latency: 1000ms" + notificationsSent[0].text shouldContain "is UP (200)" + notificationsSent[1].text shouldContain "is DOWN (500)" + } + } + + `when`("it receives an event but an error happens when it calls the API") { + val monitor = createMonitor(monitorRepository) + val event = MonitorUpEvent( + monitor = monitor, + status = HttpStatus.OK, + latency = 1000, + previousEvent = Option.empty() + ) + mockHttpErrorResponse(HttpStatus.BAD_REQUEST, "bad_request") + + then("it should send a message about the event") { + val slot = slot() + + shouldNotThrowAny { eventDispatcher.dispatch(event) } + verify(exactly = 1) { apiServiceSpy.sendMessage(capture(slot)) } + slot.captured.text shouldContain "Your monitor \"testMonitor\" (http://irrelevant.com) is UP (200)" + } + } + } + + } + + override fun afterTest(testCase: TestCase, result: TestResult) { + clearAllMocks() + super.afterTest(testCase, result) + } + + private fun mockHttpResponse(status: HttpStatus, body: String = "") { + every { + mockHttpClient.exchange( + any(), + Argument.STRING, + Argument.STRING + ) + } returns Flowable.just( + HttpResponse.status(status).body(body) + ) + } + + private fun mockHttpErrorResponse(status: HttpStatus, body: String = "") { + every { + mockHttpClient.exchange( + any(), + Argument.STRING, + Argument.STRING + ) + } returns Flowable.error( + HttpClientResponseException("error", HttpResponse.status(status).body(body)) + ) + } +}