Skip to content

vigdorchik/rxmon

Repository files navigation

rxmon

To react to error conditions proper, it is necessary to express what the error specific to your application is. The most obvious way is to model the monitored world as a set of streams that are provided by the running application. Examples of such streams might be the number of incoming requests or the number of errors. Given the set of primitive streams, it's easy to compose them into the final "monitorable" stream. This module provides combinators for composing RxJava Observables. In addition, Akka actors may send events to monitor, and those will be viewed as Observable stream.

Details

The defined combinators, that can be used to construct the final monitored Observable, include:

  • binary operations +, -, * with a constant or another Observable for Numeric Observables.

  • exponentiatikon operation ^ with Int power for Numeric Observables.

  • comparison operations < , > with a constant or another Observable for Numeric Observables.

  • logical operations &&, ||, ^ for two Boolean Observables.

  • max, min, avg over a specified Duration for Numeric Observables.

  • drv giving an Observable of the derivative for Numeric Observables. This is used to model variable growth.

  • always, never for Boolean Observables that yield true only if their operand observable is true or false respectively during parameter Duration.

  • count the number of ticks of any Observable.

  • watchdog Boolean Observable emitting true when the source Observable doesn't produce values for a specified duration.

import org.matmexrhino.rxmon._
import Monitoring._

class MyMonitoring extends Registry {
  val actorErrors: Observable[Unit] = register[Unit]("some_error")
  val tooManyErrors: Observable[Boolean] =
    (count(actorError, 1.minutes) > 100).always(1.minutes)
  tooManyErrors.whenTrue { () =>
    // alert or something else
  }
}

...

// Some actor that might err:
registry ! ListEntries 
def receive = {
  ...
  case EntriesResponse(map) =>
    val errorsCollector = map("some_error")
    ...
}

Batching

To prevent network congestion, it's possible to aggregate statistics on a local node and only send the results of aggregation. Note, that aggregation naturally smoothens the curve, and prevents accidental hickups. Batching scheme must be carefully chosen to be consistent with the final rx stream. The following batching modes are supported:

  • max, min, avg for for Numeric variables.
  • ever, never for Boolean variables.
  • tick counts the number of ticks of Unit variable, and outputs Int.

Monitoring message queue

In addition to user-defined metrics, it's possible to monitor the health of akka itself:

import org.matmexrhino.rxmon.QueueMessages._

target ! Size     // sends the current size of the message queue to target.
target ! Enqueues // sends the number of enqueues in the queue to target.
target ! Dequeues // sends the number of dequeues in the queue to target.

In order to be able to transform those messages into numbers, one has to declare the mailbox type and register it:

class MyMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType {
  def create(owner: Option[ActorRef], system: Option[ActorSystem]) = new MyMessageQueue(system.get)
}
class MyMessageQueue(val system: ActorSystem) extends UnboundedMailbox.MessageQueue with QueueSizeReporter
akka.actor.default-mailbox {
  mailbox-type = MyMailbox
}

Referencing

This project is published on Bintray.

To reference from sbt:

resolvers += "bintray-vigdorchik" at "http://dl.bintray.com/vigdorchik/maven"

libraryDependencies += "org.matmexrhino" %% "rxmon" % "0.3.0"

To reference from maven:

Add the repository to Maven:

<repository>
  <id>bintray-vigdorchik</id>
  <url>http://dl.bintray.com/vigdorchik/maven</url>
</repository>

Resolve the library:

<dependency>
  <groupId>org.matmerhino</groupId>
  <artifactId>rxmon_2.10</artifactId>
  <version>0.1.0</version>
 </dependency>

About

Monitor akka applications with reactive streams

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages