Skip to content

Commit

Permalink
Merge pull request #206 from SwissBorg/155-journal-persistence-ids-table
Browse files Browse the repository at this point in the history
[#155] New table: journal_persistence_ids
  • Loading branch information
tiagomota authored Aug 29, 2023
2 parents be8c983 + 513e3ca commit 2bfafae
Show file tree
Hide file tree
Showing 43 changed files with 1,051 additions and 146 deletions.
60 changes: 4 additions & 56 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ You can read more about DAOs and schema variants in [the official documentation]
To use `akka-persistence-postgres` in your SBT project, add the following to your `build.sbt`:

```scala
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres" % "0.5.0"
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres" % "0.6.0-RC1"
```

For a maven project add:
```xml
<dependency>
<groupId>com.swissborg</groupId>
<artifactId>akka-persistence-postgres_2.13</artifactId>
<version>0.5.0</version>
<version>0.6.0-RC1</version>
</dependency>
```
to your `pom.xml`.
Expand Down Expand Up @@ -113,62 +113,10 @@ Example partition names: `j_myActor_0`, `j_myActor_1`, `j_worker_0` etc.
Keep in mind that the default maximum length for a table name in Postgres is 63 bytes, so you should avoid any non-ascii characters in your `persistenceId`s and keep the `prefix` reasonably short.

> :warning: Once any of the partitioning setting under `postgres-journal.tables.journal.partitions` branch is settled, you should never change it. Otherwise you might end up with PostgresExceptions caused by table name or range conflicts.
## Migration

### Migration from akka-persistence-jdbc 4.0.0
It is possible to migrate existing journals from Akka Persistence JDBC 4.0.0.
Since we decided to extract metadata from the serialized payload and store it in a separate column it is not possible to migrate exiting journal and snapshot store using plain SQL scripts.

#### How migration works
Each journal event and snapshot has to be read, deserialized, metadata and tags must be extracted and then everything stored in the new table.

We provide you with an optional artifact, `akka-persistence-postgres-migration` that brings to your project the necessary classes to automate the above process.

**Important**: Our util classes neither drop nor update any old data. Original tables will be still there but renamed with an `old_` prefix. It's up to you when to drop them.
#### How to use plugin provided migrations
##### Add akka-persistence-migration to your project
Add the following to your `build.sbt`
```
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.5.0"
```
For a maven project add:
```xml
<dependency>
<groupId>com.swisborg</groupId>
<artifactId>akka-persistence-postgres-migration_2.13</artifactId>
<version>0.5.0</version>
</dependency>
```
to your `pom.xml`.

##### Create and run migrations:
```scala
import akka.persistence.postgres.migration.journal.Jdbc4JournalMigration
import akka.persistence.postgres.migration.snapshot.Jdbc4SnapshotStoreMigration

for {
_ <- new Jdbc4JournalMigration(config).run()
_ <- new Jdbc4SnapshotStoreMigration(config).run()
} yield ()
```
**Very important note**: The migration has to be finished before your application starts any persistent actors!

It's your choice whether you want to trigger migration manually or (recommended) leverage a database version control system of your choice (e.g. Flyway).

#### Examples
An example Flyway-based migration can be found in the demo app: https://github.com/mkubala/demo-akka-persistence-postgres/blob/master/src/main/scala/com/github/mkubala/FlywayMigrationExample.scala

### Migration from akka-persistence-postgres 0.4.0 to 0.5.0
New indices need to be created on each partition, to avoid locking production databases for too long, it should be done in 2 steps:
1. manually create indices CONCURRENTLY,
2. deploy new release with migration scripts.

#### Manually create indices CONCURRENTLY
Execute DDL statements produced by the [sample migration script](scripts/migration-0.5.0/partitioned/1-add-indices-manually.sql), adapt top level variables to match your journal configuration before executing.
## Migration

#### Deploy new release with migration scripts
See [sample flyway migration script](scripts/migration-0.5.0/partitioned/2-add-indices-flyway.sql) and adapt top level variables to match your journal configuration.
Please see the documentation regarding migrations [here](https://swissborg.github.io/akka-persistence-postgres/migration).

## Contributing
We are also always looking for contributions and new ideas, so if you’d like to join the project, check out the [open issues](https://github.com/SwissBorg/akka-persistence-postgres/issues), or post your own suggestions!
Expand Down
45 changes: 42 additions & 3 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,17 @@ postgres-journal {
metadata = "metadata"
}
}

# Used to hold journal information that can be used to speed up queries
journalMetadata {
tableName = "journal_metadata"
schemaName = ""
columnNames = {
persistenceId = "persistence_id"
maxSequenceNumber = "max_sequence_number"
maxOrdering = "max_ordering"
minOrdering = "min_ordering"
}
}
tags {
tableName = "tags"
schameName = ""
Expand Down Expand Up @@ -176,6 +186,14 @@ postgres-journal {
# to the same value for these other journals.
use-shared-db = null

# This setting can be used to enable the usage of the data being stored
# at the journal_metadata table, in order to speed up some queries that would
# solely use the journal table.
# In case the metadata table does not hold the required information (not available yet),
# the logic fallback to the journal-only queries.
# This setting is disabled by default.
use-journal-metadata = false

slick {

db {
Expand Down Expand Up @@ -358,7 +376,18 @@ postgres-read-journal {
# to the same value for these other journals.
use-shared-db = null

dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao"
# This setting can be used to enable the usage of the data being stored
# at the journal_metadata table, in order to speed up some queries that would
# solely use the journal table.
# In case the metadata table does not hold the required information (not available yet),
# the logic fallback to the journal-only queries.
# This setting is disabled by default.
use-journal-metadata = false


# Replace with "akka.persistence.postgres.query.dao.PartitionedReadJournalDao" in order to leverage dedicated queries to
# partitioned journal.
dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao"

# Confguration for akka.persistence.postgres.tag.TagIdResolver
tags {
Expand Down Expand Up @@ -402,7 +431,17 @@ postgres-read-journal {
message = "message"
}
}

# Used to hold journal information that can be used to speed up queries
journalMetadata {
tableName = "journal_metadata"
schemaName = ""
columnNames = {
persistenceId = "persistence_id"
maxSequenceNumber = "max_sequence_number"
maxOrdering = "max_ordering"
minOrdering = "min_ordering"
}
}
tags {
tableName = "tags"
schameName = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.concurrent.duration._

object ConfigKeys {
val useSharedDb = "use-shared-db"
val useJournalMetadata = "use-journal-metadata"
}

class SlickConfiguration(config: Config) {
Expand Down Expand Up @@ -49,6 +50,26 @@ class JournalTableConfiguration(config: Config) {
override def toString: String = s"JournalTableConfiguration($tableName,$schemaName,$columnNames)"
}

class JournalMetadataTableColumnNames(config: Config) {
private val cfg = config.asConfig("tables.journalMetadata.columnNames")
val id: String = cfg.as[String]("id", "id")
val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id")
val maxSequenceNumber: String = cfg.as[String]("maxSequenceNumber", "max_sequence_number")
val maxOrdering: String = cfg.as[String]("maxOrdering", "max_ordering")
val minOrdering: String = cfg.as[String]("minOrdering", "min_ordering")

override def toString: String =
s"JournalMetadataTableColumnNames($id,$persistenceId,$maxSequenceNumber,$maxOrdering,$minOrdering)"
}

class JournalMetadataTableConfiguration(config: Config) {
private val cfg = config.asConfig("tables.journalMetadata")
val tableName: String = cfg.as[String]("tableName", "journal_metadata")
val schemaName: Option[String] = cfg.as[String]("schemaName").trim
val columnNames: JournalMetadataTableColumnNames = new JournalMetadataTableColumnNames(config)
override def toString: String = s"JournalMetadataTableConfiguration($tableName,$schemaName,$columnNames)"
}

class SnapshotTableColumnNames(config: Config) {
private val cfg = config.asConfig("tables.snapshot.columnNames")
val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id")
Expand Down Expand Up @@ -86,7 +107,7 @@ class TagsTableConfiguration(config: Config) {
}

class JournalPluginConfig(config: Config) {
val dao: String = config.asString("dao", "akka.persistence.postgres.dao.bytea.journal.FlatJournalDao")
val dao: String = config.asString("dao", "akka.persistence.postgres.journal.dao.FlatJournalDao")
override def toString: String = s"JournalPluginConfig($dao)"
}

Expand All @@ -101,12 +122,12 @@ class BaseByteArrayJournalDaoConfig(config: Config) {
}

class ReadJournalPluginConfig(config: Config) {
val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.readjournal.ByteArrayReadJournalDao")
val dao: String = config.as[String]("dao", "akka.persistence.postgres.query.dao.FlatReadJournalDao")
override def toString: String = s"ReadJournalPluginConfig($dao)"
}

class SnapshotPluginConfig(config: Config) {
val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.snapshot.ByteArraySnapshotDao")
val dao: String = config.as[String]("dao", "akka.persistence.postgres.snapshot.dao.ByteArraySnapshotDao")
override def toString: String = s"SnapshotPluginConfig($dao)"
}

Expand All @@ -122,13 +143,16 @@ class TagsConfig(config: Config) {
class JournalConfig(config: Config) {
val partitionsConfig = new JournalPartitionsConfiguration(config)
val journalTableConfiguration = new JournalTableConfiguration(config)
val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config)
val pluginConfig = new JournalPluginConfig(config)
val daoConfig = new BaseByteArrayJournalDaoConfig(config)
val tagsConfig = new TagsConfig(config)
val tagsTableConfiguration = new TagsTableConfiguration(config)
val useSharedDb: Option[String] = config.asOptionalNonEmptyString(ConfigKeys.useSharedDb)
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)

override def toString: String =
s"JournalConfig($journalTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb)"
s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb,$useJournalMetadata)"
}

class SnapshotConfig(config: Config) {
Expand Down Expand Up @@ -156,6 +180,7 @@ case class JournalSequenceRetrievalConfig(

class ReadJournalConfig(config: Config) {
val journalTableConfiguration = new JournalTableConfiguration(config)
val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config)
val journalSequenceRetrievalConfiguration = JournalSequenceRetrievalConfig(config)
val pluginConfig = new ReadJournalPluginConfig(config)
val tagsConfig = new TagsConfig(config)
Expand All @@ -164,7 +189,8 @@ class ReadJournalConfig(config: Config) {
val maxBufferSize: Int = config.as[String]("max-buffer-size", "500").toInt
val addShutdownHook: Boolean = config.asBoolean("add-shutdown-hook", true)
val includeDeleted: Boolean = config.as[Boolean]("includeLogicallyDeleted", true)
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)

override def toString: String =
s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)"
s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted,$useJournalMetadata)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult }
import akka.{ Done, NotUsed }
import org.slf4j.{ Logger, LoggerFactory }
import slick.dbio.DBIOAction
import slick.jdbc.JdbcBackend._

import scala.collection.immutable._
Expand All @@ -39,6 +40,9 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW

val logger: Logger = LoggerFactory.getLogger(this.getClass)

lazy val metadataQueries: JournalMetadataQueries = new JournalMetadataQueries(
JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))

// This logging may block since we don't control how the user will configure logback
// We can't use a Akka logging neither because we don't have an ActorSystem in scope and
// we should not introduce another dependency here.
Expand Down Expand Up @@ -137,10 +141,22 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW
private def highestMarkedSequenceNr(persistenceId: String) =
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result

override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
for {
maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
} yield maybeHighestSeqNo.getOrElse(0L)
override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
val query = if (journalConfig.useJournalMetadata) {
metadataQueries.highestSequenceNrForPersistenceId(persistenceId).result.headOption.flatMap {
case Some(maxSequenceNr) =>
// return the stored max sequence nr on journal metadata table
DBIOAction.successful(Some(maxSequenceNr))
case None =>
// journal metadata do not have information for this persistenceId -> fallback to standard behaviour
queries.highestSequenceNrForPersistenceId(persistenceId).result
}
} else
queries.highestSequenceNrForPersistenceId(persistenceId).result

// Default to 0L when nothing is found for this persistenceId
db.run(query).map(_.getOrElse(0L))
}

override def messages(
persistenceId: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.persistence.postgres.journal.dao

import slick.lifted.TableQuery

class JournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) {
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = {
journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1)
}

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)

private def _minAndMaxOrderingForPersistenceId(
persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] =
journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering))

val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
def writeJournalRows(xs: Seq[JournalRow]): FixedSqlAction[Option[Int], NoStream, slick.dbio.Effect.Write] =
compiledJournalTable ++= xs.sortBy(_.sequenceNumber)

private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) =
journalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc)

def delete(persistenceId: String, toSequenceNr: Long): FixedSqlAction[Int, NoStream, slick.dbio.Effect.Write] = {
journalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete
}
Expand Down Expand Up @@ -58,16 +55,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {

val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)

private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
selectAllJournalForPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)

val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _)

private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] =
journalTable.map(_.persistenceId).distinct

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand All @@ -81,6 +68,24 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
.sortBy(_.sequenceNumber.asc)
.take(max)

private def _messagesOrderingBoundedQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
toSequenceNr: Rep[Long],
max: ConstColumn[Long],
minOrdering: Rep[Long],
maxOrdering: Rep[Long]): Query[JournalTable, JournalRow, Seq] =
journalTable
.filter(_.persistenceId === persistenceId)
.filter(_.deleted === false)
.filter(_.sequenceNumber >= fromSequenceNr)
.filter(_.sequenceNumber <= toSequenceNr)
.filter(_.ordering >= minOrdering)
.filter(_.ordering <= maxOrdering)
.sortBy(_.sequenceNumber.asc)
.take(max)

val messagesQuery = Compiled(_messagesQuery _)

val messagesOrderingBoundedQuery = Compiled(_messagesOrderingBoundedQuery _)
}
Loading

0 comments on commit 2bfafae

Please sign in to comment.