From 9534c5c6ea3db87752ee1ae58b381ceca9609956 Mon Sep 17 00:00:00 2001 From: Tiago Mota Date: Wed, 13 Oct 2021 15:06:52 +0100 Subject: [PATCH] Re-add trigger that checks if max-sequence-number on journal_persistence_ids is bigger, before updating the table. --- .../postgres/nested-partitions-schema.sql | 25 +++++++++++++++++-- .../schema/postgres/partitioned-schema.sql | 25 +++++++++++++++++-- .../schema/postgres/plain-schema.sql | 25 +++++++++++++++++-- .../migration/journal/JournalSchema.scala | 24 ++++++++++++++++-- .../postgres/migration/MigrationTest.scala | 3 +++ ...unction-update-journal-persistence-ids.sql | 4 +-- ...tion-check-persistence-id-max-sequence.sql | 18 +++++++++++++ ...gger-check-persistence-id-max-sequence.sql | 19 ++++++++++++++ 8 files changed, 133 insertions(+), 10 deletions(-) create mode 100644 scripts/migration-0.6.0/4-create-function-check-persistence-id-max-sequence.sql create mode 100644 scripts/migration-0.6.0/5-create-trigger-check-persistence-id-max-sequence.sql diff --git a/core/src/test/resources/schema/postgres/nested-partitions-schema.sql b/core/src/test/resources/schema/postgres/nested-partitions-schema.sql index c207a029..fa4f347b 100644 --- a/core/src/test/resources/schema/postgres/nested-partitions-schema.sql +++ b/core/src/test/resources/schema/postgres/nested-partitions-schema.sql @@ -65,6 +65,8 @@ CREATE TABLE IF NOT EXISTS public.snapshot PRIMARY KEY (persistence_id, sequence_number) ); +DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON public.journal_persistence_ids; +DROP FUNCTION IF EXISTS public.check_persistence_id_max_sequence_number(); DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON public.journal; DROP FUNCTION IF EXISTS public.update_journal_persistence_ids(); DROP TABLE IF EXISTS public.journal_persistence_ids; @@ -85,8 +87,8 @@ BEGIN VALUES (NEW.persistence_id, NEW.sequence_number, NEW.ordering, NEW.ordering) ON CONFLICT (persistence_id) DO UPDATE SET - max_sequence_number = GREATEST(public.journal_persistence_ids.max_sequence_number, NEW.sequence_number), - max_ordering = GREATEST(public.journal_persistence_ids.max_ordering, NEW.ordering), + max_sequence_number = NEW.sequence_number, + max_ordering = NEW.ordering, min_ordering = LEAST(public.journal_persistence_ids.min_ordering, NEW.ordering); RETURN NEW; @@ -98,3 +100,22 @@ CREATE TRIGGER trig_update_journal_persistence_ids AFTER INSERT ON public.journal FOR EACH ROW EXECUTE PROCEDURE public.update_journal_persistence_ids(); + +CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS +$$ +DECLARE +BEGIN + IF NEW.max_sequence_number <= OLD.max_sequence_number THEN + RAISE EXCEPTION 'New max_sequence_number not higher than previous value'; + END IF; + + RETURN NEW; +END; +$$ +LANGUAGE plpgsql; + + +CREATE TRIGGER trig_check_persistence_id_max_sequence_number + BEFORE UPDATE ON public.journal_persistence_ids + FOR EACH ROW + EXECUTE PROCEDURE public.check_persistence_id_max_sequence_number(); diff --git a/core/src/test/resources/schema/postgres/partitioned-schema.sql b/core/src/test/resources/schema/postgres/partitioned-schema.sql index 3745692d..ce28662e 100644 --- a/core/src/test/resources/schema/postgres/partitioned-schema.sql +++ b/core/src/test/resources/schema/postgres/partitioned-schema.sql @@ -66,6 +66,8 @@ CREATE TABLE IF NOT EXISTS public.snapshot PRIMARY KEY (persistence_id, sequence_number) ); +DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON public.journal_persistence_ids; +DROP FUNCTION IF EXISTS public.check_persistence_id_max_sequence_number(); DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON public.journal; DROP FUNCTION IF EXISTS public.update_journal_persistence_ids(); DROP TABLE IF EXISTS public.journal_persistence_ids; @@ -86,8 +88,8 @@ BEGIN VALUES (NEW.persistence_id, NEW.sequence_number, NEW.ordering, NEW.ordering) ON CONFLICT (persistence_id) DO UPDATE SET - max_sequence_number = GREATEST(public.journal_persistence_ids.max_sequence_number, NEW.sequence_number), - max_ordering = GREATEST(public.journal_persistence_ids.max_ordering, NEW.ordering), + max_sequence_number = NEW.sequence_number, + max_ordering = NEW.ordering, min_ordering = LEAST(public.journal_persistence_ids.min_ordering, NEW.ordering); RETURN NEW; @@ -99,3 +101,22 @@ CREATE TRIGGER trig_update_journal_persistence_ids AFTER INSERT ON public.journal FOR EACH ROW EXECUTE PROCEDURE public.update_journal_persistence_ids(); + +CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS +$$ +DECLARE +BEGIN + IF NEW.max_sequence_number <= OLD.max_sequence_number THEN + RAISE EXCEPTION 'New max_sequence_number not higher than previous value'; + END IF; + + RETURN NEW; +END; +$$ +LANGUAGE plpgsql; + + +CREATE TRIGGER trig_check_persistence_id_max_sequence_number + BEFORE UPDATE ON public.journal_persistence_ids + FOR EACH ROW + EXECUTE PROCEDURE public.check_persistence_id_max_sequence_number(); diff --git a/core/src/test/resources/schema/postgres/plain-schema.sql b/core/src/test/resources/schema/postgres/plain-schema.sql index b80579e7..7faaf53e 100644 --- a/core/src/test/resources/schema/postgres/plain-schema.sql +++ b/core/src/test/resources/schema/postgres/plain-schema.sql @@ -39,6 +39,8 @@ CREATE TABLE IF NOT EXISTS public.snapshot PRIMARY KEY (persistence_id, sequence_number) ); +DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON public.journal_persistence_ids; +DROP FUNCTION IF EXISTS public.check_persistence_id_max_sequence_number(); DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON public.journal; DROP FUNCTION IF EXISTS public.update_journal_persistence_ids(); DROP TABLE IF EXISTS public.journal_persistence_ids; @@ -59,8 +61,8 @@ BEGIN VALUES (NEW.persistence_id, NEW.sequence_number, NEW.ordering, NEW.ordering) ON CONFLICT (persistence_id) DO UPDATE SET - max_sequence_number = GREATEST(public.journal_persistence_ids.max_sequence_number, NEW.sequence_number), - max_ordering = GREATEST(public.journal_persistence_ids.max_ordering, NEW.ordering), + max_sequence_number = NEW.sequence_number, + max_ordering = NEW.ordering, min_ordering = LEAST(public.journal_persistence_ids.min_ordering, NEW.ordering); RETURN NEW; @@ -72,3 +74,22 @@ CREATE TRIGGER trig_update_journal_persistence_ids AFTER INSERT ON public.journal FOR EACH ROW EXECUTE PROCEDURE public.update_journal_persistence_ids(); + +CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS +$$ +DECLARE +BEGIN + IF NEW.max_sequence_number <= OLD.max_sequence_number THEN + RAISE EXCEPTION 'New max_sequence_number not higher than previous value'; + END IF; + + RETURN NEW; +END; +$$ +LANGUAGE plpgsql; + + +CREATE TRIGGER trig_check_persistence_id_max_sequence_number + BEFORE UPDATE ON public.journal_persistence_ids + FOR EACH ROW + EXECUTE PROCEDURE public.check_persistence_id_max_sequence_number(); diff --git a/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala b/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala index 77a9dbf9..06754505 100644 --- a/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala +++ b/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala @@ -108,8 +108,8 @@ private[journal] trait JournalSchema { VALUES (NEW.#$jPersistenceId, NEW.#$sequenceNumber, NEW.#$ordering, NEW.#$ordering) ON CONFLICT (#$persistenceId) DO UPDATE SET - #$maxSequenceNumber = GREATEST(#$fullTableName.#$maxSequenceNumber, NEW.#$sequenceNumber), - #$maxOrdering = GREATEST(#$fullTableName.#$maxOrdering, NEW.#$ordering), + #$maxSequenceNumber = NEW.#$sequenceNumber, + #$maxOrdering = NEW.#$ordering, #$minOrdering = LEAST(#$fullTableName.#$minOrdering, NEW.#$ordering); RETURN NEW; @@ -130,6 +130,26 @@ private[journal] trait JournalSchema { FOR EACH ROW EXECUTE PROCEDURE #$schema.update_journal_persistence_ids(); """ + + _ <- sqlu""" + CREATE OR REPLACE FUNCTION #$schema.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS $$$$ + DECLARE + BEGIN + IF NEW.#$maxSequenceNumber <= OLD.#$maxSequenceNumber THEN + RAISE EXCEPTION 'New max_sequence_number not higher than previous value'; + END IF; + + RETURN NEW; + END; + $$$$ LANGUAGE plpgsql; + """ + + _ <- sqlu""" + CREATE TRIGGER trig_check_persistence_id_max_sequence_number + BEFORE UPDATE ON #$fullTableName + FOR EACH ROW + EXECUTE PROCEDURE #$schema.check_persistence_id_max_sequence_number(); + """ } yield () } } diff --git a/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala b/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala index 2fa8e3a3..bd4da7bc 100644 --- a/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala +++ b/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala @@ -190,6 +190,9 @@ trait PrepareDatabase extends BeforeAndAfterEach with BeforeAndAfterAll with Sca _ <- sqlu"""DROP TABLE IF EXISTS migration.#$journalTableName""" _ <- sqlu"""DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON migration.#$journalTableName""" _ <- sqlu"""DROP FUNCTION IF EXISTS migration.update_journal_persistence_ids()""" + _ <- + sqlu"""DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON migration.#$journalPersistenceIdsTableName""" + _ <- sqlu"""DROP FUNCTION IF EXISTS migration.check_persistence_id_max_sequence_number()""" _ <- sqlu"""DROP TABLE IF EXISTS migration.#$journalPersistenceIdsTableName""" _ <- sqlu"""CREATE TABLE IF NOT EXISTS migration.#$journalTableName ( diff --git a/scripts/migration-0.6.0/2-create-function-update-journal-persistence-ids.sql b/scripts/migration-0.6.0/2-create-function-update-journal-persistence-ids.sql index ec65a32a..61a11de1 100644 --- a/scripts/migration-0.6.0/2-create-function-update-journal-persistence-ids.sql +++ b/scripts/migration-0.6.0/2-create-function-update-journal-persistence-ids.sql @@ -26,8 +26,8 @@ BEGIN jpi_table := schema || '.' || jpi_table_name; cols := jpi_persistence_id_column || ', ' || jpi_max_sequence_number_column || ', ' || jpi_max_ordering_column || ', ' || jpi_min_ordering_column; vals := '($1).' || j_persistence_id_column || ', ($1).' || j_sequence_number_column || ', ($1).' || j_ordering_column || ',($1).' || j_ordering_column; - upds := jpi_max_sequence_number_column || ' = GREATEST(' || jpi_table || '.' || jpi_max_sequence_number_column || ', ($1).' || j_sequence_number_column || '), ' || - jpi_max_ordering_column || ' = GREATEST(' || jpi_table || '.' || jpi_max_ordering_column || ', ($1).' || j_ordering_column || '), ' || + upds := jpi_max_sequence_number_column || ' = ($1).' || j_sequence_number_column || ', ' || + jpi_max_ordering_column || ' = ($1).' || j_ordering_column || ', ' || jpi_min_ordering_column || ' = LEAST(' || jpi_table || '.' || jpi_min_ordering_column || ', ($1).' || j_ordering_column || ')'; sql := 'INSERT INTO ' || jpi_table || ' (' || cols || ') VALUES (' || vals || ') ' || diff --git a/scripts/migration-0.6.0/4-create-function-check-persistence-id-max-sequence.sql b/scripts/migration-0.6.0/4-create-function-check-persistence-id-max-sequence.sql new file mode 100644 index 00000000..3846e1ce --- /dev/null +++ b/scripts/migration-0.6.0/4-create-function-check-persistence-id-max-sequence.sql @@ -0,0 +1,18 @@ +-- replace schema value if required +CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS +$$ +DECLARE + -- replace with appropriate values + jpi_max_sequence_number_column CONSTANT TEXT := 'max_sequence_number'; + + -- variables + sql TEXT; +BEGIN + sql := 'IF NEW.' || jpi_max_sequence_number_column || ' <= OLD.' || jpi_max_sequence_number_column || ' THEN + RAISE EXCEPTION ''New max_sequence_number not higher than previous value''; + END IF;'; + + EXECUTE sql USING NEW; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/scripts/migration-0.6.0/5-create-trigger-check-persistence-id-max-sequence.sql b/scripts/migration-0.6.0/5-create-trigger-check-persistence-id-max-sequence.sql new file mode 100644 index 00000000..f20618d0 --- /dev/null +++ b/scripts/migration-0.6.0/5-create-trigger-check-persistence-id-max-sequence.sql @@ -0,0 +1,19 @@ +DO $$ +DECLARE + -- replace with appropriate values + schema CONSTANT TEXT := 'public'; + jpi_table_name CONSTANT TEXT := 'journal_persistence_ids'; + + -- variables + jpi_table TEXT; + sql TEXT; +BEGIN + jpi_table := schema || '.' || jpi_table_name; + + sql := 'CREATE TRIGGER trig_check_persistence_id_max_sequence_number + BEFORE UPDATE ON ' || jpi_table || ' FOR EACH ROW + EXECUTE PROCEDURE ' || schema || '.check_persistence_id_max_sequence_number()'; + + EXECUTE sql; +END ; +$$ LANGUAGE plpgsql;