Skip to content

Latest commit



237 lines (182 loc) · 9.85 KB

File metadata and controls

237 lines (182 loc) · 9.85 KB

#Persistent Buffer

PersistentBuffer is the first of a series of practical Akka Streams flow components. It works like the Akka Streams buffer with the difference that the content of the buffer is stored in a series of memory-mapped files in the directory given at construction of the PersistentBuffer. This allows the buffer size to be virtually limitless, not use the JVM heap for storage, and have extremely good performance in the range of a million messages/second at the same time.


The following dependencies are required for Persistent Buffer to work:

"org.squbs" %% "squbs-pattern" % squbsVersion,
"net.openhft" % "chronicle-queue" % "4.4.4"


The following example shows the use of PersistentBuffer in a stream:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val countFuture = source.via(buffer).runWith(counter)

This version shows the same in a GraphDSL:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
  sink =>
    import GraphDSL.Implicits._
    source ~> buffer ~> sink
val countFuture =

##Back-Pressure PersistentBuffer does not back-pressure upstream. It will take all the stream elements given to it and grow its storage by increasing, or rotating, the number of queue files. It does not have any means to determine a limit on the buffer size or determine the storage capacity. Downstream back-pressure is honored as per Akka Streams and Reactive Streams requirements.

##Failure & Recovery

Due to it's persistent nature, PersistentBuffer can recover from abrupt stream shutdowns, failures, JVM failures or even potential system failures. A restart of a stream with the PersistentBuffer on the same directory will start emitting the elements stored in the buffer and not yet consumed before the newly added elements. Elements consumed from the buffer but not yet finished processing at the time of the previous stream failure or shutdown will cause a loss of only those elements.

Since the buffer is backed by local storage, spindles or SSD, the performance and durability of this buffer is also dependent on the durability of this storage. A system malfunction or storage corruption may cause total loss of all elements in the buffer. So it is important to understand and assume the durability of this buffer not at the level of databases or other off-host persistent stores, in exchange for much higher performance.

##Commit Guarantee

In case of any unexpected failure, all intermediate elements emitted out of the buffer until reaching a sink stage in the stream are effectively lost. Sometimes, it might be required to avoid such data loss. Using a commit stage after a sink might possibly help in such case.

By default auto-commit property is set true to keep the buffer simple. Using a commit stage with auto-commit set to false can help solve the above problem.

This example shows the usage with auto-commit disabled:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val tempPath = new File("/tmp/myqueue")
val config = ConfigFactory.parseMap {
      "persist-dir" -> s"${tempPath.getAbsolutePath}",
      "auto-commit" -> false
val buffer = new PersistentBuffer[ByteString](config)
val commit = buffer.commit
val flowSink = // do some transformation or a sink flow with expected failure
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
  sink =>
    import GraphDSL.Implicits._
    // ensures that records are reprocessed when something fails at tranform flow
    source ~> buffer ~> flowSink ~> commit ~> sink 
val countFuture =

##Space Management

A typical directory for persisting the queue looks like the followings:

$ ls -l
-rw-r--r--  1 squbs_user     110054053  83886080 May 17 20:00 20160518.cq4
-rw-r--r--  1 squbs_user     110054053      8192 May 17 20:00 tailer.idx

Queue files that are created for persisting stream elements can be cleaned by calling clearStorage. This function cleans resource once all the elements in a particular queue file is consumed. Schedule a periodic call to this function based on your buffer configuration.


##Configuration The queue can be created by passing just a location of the persistent directory keeping all default configuration. This is seen in all the examples above. Alternatively, it can be created by passing a Config object at construction. The Config object is a standard HOCON configuration. The following example shows constructing a PersistentBuffer using a Config:

val configText =
    | persist-dir = /tmp/myQueue
    | roll-cycle = xlarge_daily
    | wire-type = compressed_binary
    | block-size = 80m
val config = ConfigFactory.parseString(configText)

// Construct the buffer using a Config.
val buffer = new PersistentBuffer[ByteString](config)

The following configuration properties are used for the PersistentBuffer

persist-dir = /tmp/myQueue # Required
roll-cycle = daily         # Optional, defaults to daily
wire-type = binary         # Optional, defaults to binary
block-size = 80m           # Optional, defaults to 64m
index-spacing = 16k        # Optional, defaults to roll-cycle's spacing 
index-count = 16           # Optional, defaults to roll-cycle's count

Roll-cycle can be specified in lower or upper case. Supported values for roll-cycle are as follows:

Roll Cycle Capacity
MINUTELY 64 million entries per minute
HOURLY 256 million entries per hour
SMALL_DAILY 512 million entries per day
DAILY 4 billion entries per day
LARGE_DAILY 32 billion entries per day
XLARGE_DAILY 2 trillion entries per day
HUGE_DAILY 256 trillion entries per day

Wire-type can be specified in lower or upper case. Supported values for wire-type are as follows:

  • TEXT
  • JSON
  • RAW
  • CSV

The memory sizes such as block-size and index-spacing are specified according to the memory size format defined in the HOCON specification.

##Serialization A QueueSerializer[T] needs to be implicitly provided for a PersistentBuffer[T], as seen in the examples above:

implicit val serializer = QueueSerializer[ByteString]()

The QueueSerializer[T]() call produces a serializer for your target type. It depends on the serialization and deserialization of the underlying infrastructure.

###Implementing a Serializer To control the fine-grained persistent format in the queue, you may want to implement your own serializer as follows:

case class Person(name: String, age: Int)

class PersonSerializer extends QueueSerializer[Person] {

  override def readElement(wire: WireIn): Option[Person] = {
    for {
      name <- Option(`object`(classOf[String]))
      age <- Option(
    } yield { Person(name, age) }

  override def writeElement(element: Person, wire: WireOut): Unit = {

To use this serializer, just declare it implicitly before constructing the PersistentBuffer as follows:

implicit val serializer = new PersonSerializer()
val buffer = new PersistentBuffer[Person](new File("/tmp/myqueue")

##Broadcast Buffer BroadcastBuffer is a variant of persistent buffer. This works similar to PersistentBuffer except that stream elements are broadcasted to multiple output ports. Hence it is a combination of buffer and broadcast stages. The configuration takes an additional parameter named output-ports which specifies the number of output ports.

A broadcast buffer is specially required when stream elements are to be emitted from each output port at an independent rate depending on the speed of downstream demand.

val configText =
    | persist-dir = /tmp/myQueue
    | roll-cycle = xlarge_daily
    | wire-type = compressed_binary
    | block-size = 80m
    | output-ports = 3
    | auto-commit = false
val config = ConfigFactory.parseString(configText)

// Construct the buffer using a Config.
val bcBuffer = new BroadcastBuffer[ByteString](config)


implicit val serializer = QueueSerializer[ByteString]()

val in = Source(1 to 100000)
val flowCounter = Flow[Any].map(_ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)

val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(flowCounter) { implicit builder =>
      sink =>
        import GraphDSL.Implicits._
        val buffer = new BroadcastBuffer[ByteString](config)
        val commit = buffer.commit
        val bcBuffer = builder.add(buffer)
        val mr = builder.add(merge)
        in ~> transform ~> bcBuffer ~> commit ~> mr ~> sink
                           bcBuffer ~> commit ~> mr
                           bcBuffer ~> commit ~> mr
val countFuture =


PersistentBuffer utilizes Chronicle-Queue 4.x as high-performance memory-mapped queue persistence.