Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#155] New table: journal_persistence_ids #206

Merged
merged 34 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f18c56d
WIP
tiagomota Oct 8, 2021
6a9d567
Add migration to populate journal_persistence_ids table
tiagomota Oct 8, 2021
8394331
Only update journal_persistence_ids when necessary
tiagomota Oct 11, 2021
181b520
Increase patience config timeout
tiagomota Oct 11, 2021
34a9757
Styling
tiagomota Oct 12, 2021
b170202
Use journal_persistence_ids on ReadJournal#allPersistenceIds query. A…
tiagomota Oct 12, 2021
cf781c0
Use journal_persistence_ids for maxJournalSequenceQuery
tiagomota Oct 13, 2021
949383e
Revert usage of journal_persistence_ids on other queries besides high…
tiagomota Oct 13, 2021
09bea45
Keep highestSequenceNrForPersistenceId journal query and comment the …
tiagomota Oct 13, 2021
41cee5d
Re-add trigger that checks if max-sequence-number on journal_persiste…
tiagomota Oct 13, 2021
dfb426a
Fix tests by making them respect sequence_number order
tiagomota Oct 14, 2021
de1e319
Remove unused artifacts and add comment on journal metadata migration…
tiagomota Oct 25, 2021
085424d
Partition journal_persistence_ids table by hash
tiagomota Nov 3, 2021
76dafe5
Use correct value for hash partitioning modulus
tiagomota Nov 3, 2021
b2736d1
Hash by persistence_id only
tiagomota Nov 8, 2021
f3771e7
Solve flaky test on PartitionedJournalSpecTestCases.
tiagomota Nov 10, 2021
12843e0
Add test that verifies the execution of the of the new triggers
tiagomota Nov 10, 2021
9d5fb07
Prefer IDENTITY column to SERIAL
tiagomota Nov 11, 2021
dcc1f71
Rename new table to journal_metadata
tiagomota Jul 4, 2023
6579531
Use journal_metadata table on messagesQuery and highestSequenceNrForP…
tiagomota Jul 4, 2023
7c0c540
Make usage of journal metadata optional through configuration
tiagomota Jul 5, 2023
1590265
Fix typo
tiagomota Jul 6, 2023
26db186
Remove unused journal queries
tiagomota Jul 6, 2023
e883da4
Proper distinction between FlatReadJournal and PartitionedReadJournal…
tiagomota Jul 6, 2023
5c8970f
Update documentation
tiagomota Jul 6, 2023
0b0e694
Missing updates of dao classpaths in tests
tiagomota Jul 20, 2023
b903463
Simplify migrations necessary for 0.6.0
tiagomota Aug 2, 2023
7d96c33
Solve issue of min_ordering on upsert
tiagomota Aug 4, 2023
43a3772
Improve migration documentation
tiagomota Aug 7, 2023
a70d627
Use -1 instead of 0 as min_ordering default value
tiagomota Aug 7, 2023
9631cea
Add test cases suggested on review
tiagomota Aug 8, 2023
8debe8b
Remove redundant filter clause
tiagomota Aug 29, 2023
9bd44bf
Fix test
tiagomota Aug 29, 2023
513e3ca
Prepare 0.6.0-RC1 release
tiagomota Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 2 additions & 54 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,62 +113,10 @@ Example partition names: `j_myActor_0`, `j_myActor_1`, `j_worker_0` etc.
Keep in mind that the default maximum length for a table name in Postgres is 63 bytes, so you should avoid any non-ascii characters in your `persistenceId`s and keep the `prefix` reasonably short.

> :warning: Once any of the partitioning setting under `postgres-journal.tables.journal.partitions` branch is settled, you should never change it. Otherwise you might end up with PostgresExceptions caused by table name or range conflicts.
## Migration

### Migration from akka-persistence-jdbc 4.0.0
It is possible to migrate existing journals from Akka Persistence JDBC 4.0.0.
Since we decided to extract metadata from the serialized payload and store it in a separate column it is not possible to migrate exiting journal and snapshot store using plain SQL scripts.

#### How migration works
Each journal event and snapshot has to be read, deserialized, metadata and tags must be extracted and then everything stored in the new table.

We provide you with an optional artifact, `akka-persistence-postgres-migration` that brings to your project the necessary classes to automate the above process.

**Important**: Our util classes neither drop nor update any old data. Original tables will be still there but renamed with an `old_` prefix. It's up to you when to drop them.

#### How to use plugin provided migrations
##### Add akka-persistence-migration to your project
Add the following to your `build.sbt`
```
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.5.0"
```
For a maven project add:
```xml
<dependency>
<groupId>com.swisborg</groupId>
<artifactId>akka-persistence-postgres-migration_2.13</artifactId>
<version>0.5.0</version>
</dependency>
```
to your `pom.xml`.

##### Create and run migrations:
```scala
import akka.persistence.postgres.migration.journal.Jdbc4JournalMigration
import akka.persistence.postgres.migration.snapshot.Jdbc4SnapshotStoreMigration

for {
_ <- new Jdbc4JournalMigration(config).run()
_ <- new Jdbc4SnapshotStoreMigration(config).run()
} yield ()
```
**Very important note**: The migration has to be finished before your application starts any persistent actors!

It's your choice whether you want to trigger migration manually or (recommended) leverage a database version control system of your choice (e.g. Flyway).

#### Examples
An example Flyway-based migration can be found in the demo app: https://github.com/mkubala/demo-akka-persistence-postgres/blob/master/src/main/scala/com/github/mkubala/FlywayMigrationExample.scala

### Migration from akka-persistence-postgres 0.4.0 to 0.5.0
New indices need to be created on each partition, to avoid locking production databases for too long, it should be done in 2 steps:
1. manually create indices CONCURRENTLY,
2. deploy new release with migration scripts.

#### Manually create indices CONCURRENTLY
Execute DDL statements produced by the [sample migration script](scripts/migration-0.5.0/partitioned/1-add-indices-manually.sql), adapt top level variables to match your journal configuration before executing.
## Migration

#### Deploy new release with migration scripts
See [sample flyway migration script](scripts/migration-0.5.0/partitioned/2-add-indices-flyway.sql) and adapt top level variables to match your journal configuration.
Please see the documentation regarding migrations [here](https://swissborg.github.io/akka-persistence-postgres/migration).

## Contributing
We are also always looking for contributions and new ideas, so if you’d like to join the project, check out the [open issues](https://github.com/SwissBorg/akka-persistence-postgres/issues), or post your own suggestions!
Expand Down
45 changes: 42 additions & 3 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,17 @@ postgres-journal {
metadata = "metadata"
}
}

# Used to hold journal information that can be used to speed up queries
journalMetadata {
tableName = "journal_metadata"
schemaName = ""
columnNames = {
persistenceId = "persistence_id"
maxSequenceNumber = "max_sequence_number"
maxOrdering = "max_ordering"
minOrdering = "min_ordering"
}
}
tags {
tableName = "tags"
schameName = ""
Expand Down Expand Up @@ -176,6 +186,14 @@ postgres-journal {
# to the same value for these other journals.
use-shared-db = null

# This setting can be used to enable the usage of the data being stored
# at the journal_metadata table, in order to speed up some queries that would
# solely use the journal table.
# In case the metadata table does not hold the required information (not available yet),
# the logic fallback to the journal-only queries.
# This setting is disabled by default.
use-journal-metadata = false

slick {

db {
Expand Down Expand Up @@ -358,7 +376,18 @@ postgres-read-journal {
# to the same value for these other journals.
use-shared-db = null

dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao"
# This setting can be used to enable the usage of the data being stored
# at the journal_metadata table, in order to speed up some queries that would
# solely use the journal table.
# In case the metadata table does not hold the required information (not available yet),
# the logic fallback to the journal-only queries.
# This setting is disabled by default.
use-journal-metadata = false


# Replace with "akka.persistence.postgres.query.dao.PartitionedReadJournalDao" in order to leverage dedicated queries to
# partitioned journal.
dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao"

# Confguration for akka.persistence.postgres.tag.TagIdResolver
tags {
Expand Down Expand Up @@ -402,7 +431,17 @@ postgres-read-journal {
message = "message"
}
}

# Used to hold journal information that can be used to speed up queries
journalMetadata {
tableName = "journal_metadata"
schemaName = ""
columnNames = {
persistenceId = "persistence_id"
maxSequenceNumber = "max_sequence_number"
maxOrdering = "max_ordering"
minOrdering = "min_ordering"
}
}
tags {
tableName = "tags"
schameName = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.concurrent.duration._

object ConfigKeys {
val useSharedDb = "use-shared-db"
val useJournalMetadata = "use-journal-metadata"
}

class SlickConfiguration(config: Config) {
Expand Down Expand Up @@ -49,6 +50,26 @@ class JournalTableConfiguration(config: Config) {
override def toString: String = s"JournalTableConfiguration($tableName,$schemaName,$columnNames)"
}

class JournalMetadataTableColumnNames(config: Config) {
private val cfg = config.asConfig("tables.journalMetadata.columnNames")
val id: String = cfg.as[String]("id", "id")
val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id")
val maxSequenceNumber: String = cfg.as[String]("maxSequenceNumber", "max_sequence_number")
val maxOrdering: String = cfg.as[String]("maxOrdering", "max_ordering")
val minOrdering: String = cfg.as[String]("minOrdering", "min_ordering")

override def toString: String =
s"JournalMetadataTableColumnNames($id,$persistenceId,$maxSequenceNumber,$maxOrdering,$minOrdering)"
}

class JournalMetadataTableConfiguration(config: Config) {
private val cfg = config.asConfig("tables.journalMetadata")
val tableName: String = cfg.as[String]("tableName", "journal_metadata")
lomigmegard marked this conversation as resolved.
Show resolved Hide resolved
val schemaName: Option[String] = cfg.as[String]("schemaName").trim
val columnNames: JournalMetadataTableColumnNames = new JournalMetadataTableColumnNames(config)
override def toString: String = s"JournalMetadataTableConfiguration($tableName,$schemaName,$columnNames)"
}

class SnapshotTableColumnNames(config: Config) {
private val cfg = config.asConfig("tables.snapshot.columnNames")
val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id")
Expand Down Expand Up @@ -86,7 +107,7 @@ class TagsTableConfiguration(config: Config) {
}

class JournalPluginConfig(config: Config) {
val dao: String = config.asString("dao", "akka.persistence.postgres.dao.bytea.journal.FlatJournalDao")
val dao: String = config.asString("dao", "akka.persistence.postgres.journal.dao.FlatJournalDao")
override def toString: String = s"JournalPluginConfig($dao)"
}

Expand All @@ -101,12 +122,12 @@ class BaseByteArrayJournalDaoConfig(config: Config) {
}

class ReadJournalPluginConfig(config: Config) {
val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.readjournal.ByteArrayReadJournalDao")
val dao: String = config.as[String]("dao", "akka.persistence.postgres.query.dao.FlatReadJournalDao")
override def toString: String = s"ReadJournalPluginConfig($dao)"
}

class SnapshotPluginConfig(config: Config) {
val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.snapshot.ByteArraySnapshotDao")
val dao: String = config.as[String]("dao", "akka.persistence.postgres.snapshot.dao.ByteArraySnapshotDao")
override def toString: String = s"SnapshotPluginConfig($dao)"
}

Expand All @@ -122,13 +143,16 @@ class TagsConfig(config: Config) {
class JournalConfig(config: Config) {
val partitionsConfig = new JournalPartitionsConfiguration(config)
val journalTableConfiguration = new JournalTableConfiguration(config)
val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config)
val pluginConfig = new JournalPluginConfig(config)
val daoConfig = new BaseByteArrayJournalDaoConfig(config)
val tagsConfig = new TagsConfig(config)
val tagsTableConfiguration = new TagsTableConfiguration(config)
val useSharedDb: Option[String] = config.asOptionalNonEmptyString(ConfigKeys.useSharedDb)
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)

override def toString: String =
s"JournalConfig($journalTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb)"
s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb,$useJournalMetadata)"
}

class SnapshotConfig(config: Config) {
Expand Down Expand Up @@ -156,6 +180,7 @@ case class JournalSequenceRetrievalConfig(

class ReadJournalConfig(config: Config) {
val journalTableConfiguration = new JournalTableConfiguration(config)
val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config)
val journalSequenceRetrievalConfiguration = JournalSequenceRetrievalConfig(config)
val pluginConfig = new ReadJournalPluginConfig(config)
val tagsConfig = new TagsConfig(config)
Expand All @@ -164,7 +189,8 @@ class ReadJournalConfig(config: Config) {
val maxBufferSize: Int = config.as[String]("max-buffer-size", "500").toInt
val addShutdownHook: Boolean = config.asBoolean("add-shutdown-hook", true)
val includeDeleted: Boolean = config.as[Boolean]("includeLogicallyDeleted", true)
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)

override def toString: String =
s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)"
s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted,$useJournalMetadata)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult }
import akka.{ Done, NotUsed }
import org.slf4j.{ Logger, LoggerFactory }
import slick.dbio.DBIOAction
import slick.jdbc.JdbcBackend._

import scala.collection.immutable._
Expand All @@ -39,6 +40,9 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW

val logger: Logger = LoggerFactory.getLogger(this.getClass)

lazy val metadataQueries: JournalMetadataQueries = new JournalMetadataQueries(
JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))

// This logging may block since we don't control how the user will configure logback
// We can't use a Akka logging neither because we don't have an ActorSystem in scope and
// we should not introduce another dependency here.
Expand Down Expand Up @@ -137,10 +141,22 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW
private def highestMarkedSequenceNr(persistenceId: String) =
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result

override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
for {
maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
} yield maybeHighestSeqNo.getOrElse(0L)
override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
val query = if (journalConfig.useJournalMetadata) {
metadataQueries.highestSequenceNrForPersistenceId(persistenceId).result.headOption.flatMap {
case Some(maxSequenceNr) =>
// return the stored max sequence nr on journal metadata table
DBIOAction.successful(Some(maxSequenceNr))
case None =>
// journal metadata do not have information for this persistenceId -> fallback to standard behaviour
queries.highestSequenceNrForPersistenceId(persistenceId).result
}
} else
queries.highestSequenceNrForPersistenceId(persistenceId).result

// Default to 0L when nothing is found for this persistenceId
db.run(query).map(_.getOrElse(0L))
}

override def messages(
persistenceId: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.persistence.postgres.journal.dao

import slick.lifted.TableQuery

class JournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) {
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = {
journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1)
}

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)

private def _minAndMaxOrderingForPersistenceId(
persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] =
journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering))

val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
def writeJournalRows(xs: Seq[JournalRow]): FixedSqlAction[Option[Int], NoStream, slick.dbio.Effect.Write] =
compiledJournalTable ++= xs.sortBy(_.sequenceNumber)

private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) =
journalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc)

def delete(persistenceId: String, toSequenceNr: Long): FixedSqlAction[Int, NoStream, slick.dbio.Effect.Write] = {
journalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete
}
Expand Down Expand Up @@ -58,16 +55,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {

val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)

private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
selectAllJournalForPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)

val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _)

private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] =
journalTable.map(_.persistenceId).distinct

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand All @@ -81,6 +68,24 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) {
.sortBy(_.sequenceNumber.asc)
.take(max)

private def _messagesOrderingBoundedQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
toSequenceNr: Rep[Long],
max: ConstColumn[Long],
minOrdering: Rep[Long],
maxOrdering: Rep[Long]): Query[JournalTable, JournalRow, Seq] =
journalTable
.filter(_.persistenceId === persistenceId)
.filter(_.deleted === false)
.filter(_.sequenceNumber >= fromSequenceNr)
.filter(_.sequenceNumber <= toSequenceNr)
.filter(_.ordering >= minOrdering)
.filter(_.ordering <= maxOrdering)
.sortBy(_.sequenceNumber.asc)
.take(max)

val messagesQuery = Compiled(_messagesQuery _)

val messagesOrderingBoundedQuery = Compiled(_messagesOrderingBoundedQuery _)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package akka.persistence.postgres
package journal.dao

import akka.persistence.postgres.config.JournalTableConfiguration
import akka.persistence.postgres.config.{ JournalMetadataTableConfiguration, JournalTableConfiguration }
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
import io.circe.Json

Expand Down Expand Up @@ -90,3 +90,31 @@ object NestedPartitionsJournalTable {
def apply(journalTableCfg: JournalTableConfiguration): TableQuery[JournalTable] =
FlatJournalTable.apply(journalTableCfg)
}

class JournalMetadataTable(_tableTag: Tag, journalMetadataTableCfg: JournalMetadataTableConfiguration)
extends Table[JournalMetadataRow](
_tableTag,
_schemaName = journalMetadataTableCfg.schemaName,
_tableName = journalMetadataTableCfg.tableName) {
override def * = (
id,
persistenceId,
maxSequenceNumber,
minOrdering,
maxOrdering) <> (JournalMetadataRow.tupled, JournalMetadataRow.unapply)

val id: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.id)
val persistenceId: Rep[String] =
column[String](journalMetadataTableCfg.columnNames.persistenceId, O.Length(255, varying = true))
val maxSequenceNumber: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.maxSequenceNumber)
val minOrdering: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.minOrdering)
val maxOrdering: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.maxOrdering)

val pk = primaryKey(s"${tableName}_pk", persistenceId)
}

object JournalMetadataTable {
def apply(
journalMetadataTableCfg: JournalMetadataTableConfiguration): TableQuery[JournalMetadataTable] =
TableQuery(tag => new JournalMetadataTable(tag, journalMetadataTableCfg))
}
Loading
Loading