From 33ae961c5ca06692b7b68899673752c41cdfd107 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 3 Oct 2023 17:22:17 +0200 Subject: [PATCH] Fix: reprocess failed instance messages in migration script (#460) Problem: nodes that do not support instances yet will have marked instance messages as rejected. Solution: use the migration script to put these messages back in the pending queue and reprocess them once the node restarts. --- .../0017_f9fa39b6bdef_vm_instances.py | 40 ++++++++++++++++++- src/aleph/types/message_status.py | 19 +++++++-- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py b/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py index 2e7f54ece..d39711f98 100644 --- a/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py +++ b/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py @@ -17,6 +17,43 @@ depends_on = None +def reprocess_failed_instance_messages(): + op.execute( + """ + INSERT INTO pending_messages(item_hash, type, chain, sender, signature, item_type, item_content, time, channel, + reception_time, check_message, retries, tx_hash, fetched, next_attempt) + (SELECT rm.item_hash, + 'INSTANCE', + rm.message ->> 'chain', + rm.message ->> 'sender', + rm.message ->> 'signature', + rm.message ->> 'item_type', + rm.message ->> 'item_content', + to_timestamp((rm.message ->> 'time')::numeric), + rm.message ->> 'channel', + ms.reception_time, + true, + 0, + null, + false, + now() + FROM rejected_messages rm + JOIN message_status ms on rm.item_hash = ms.item_hash + WHERE message ->> 'type' = 'INSTANCE') + """ + ) + op.execute( + """ + UPDATE message_status + SET status = 'pending' + FROM aleph.public.rejected_messages rm + WHERE message_status.item_hash = rm.item_hash + AND rm.message ->> 'type' = 'INSTANCE' + """ + ) + op.execute("DELETE FROM rejected_messages WHERE message->>'type' = 'INSTANCE'") + + def recreate_cost_views(): op.execute("DROP VIEW costs_view") op.execute("DROP VIEW program_costs_view") @@ -235,6 +272,7 @@ def upgrade() -> None: "INSERT INTO error_codes(code, description) VALUES (304, 'VM volume parent is larger than the child volume')" ) + reprocess_failed_instance_messages() # ### end Alembic commands ### @@ -397,7 +435,7 @@ def downgrade() -> None: op.execute("ALTER INDEX ix_vms_owner RENAME TO ix_programs_owner") op.drop_column("programs", "program_type") - op.drop_column('programs', 'authorized_keys') + op.drop_column("programs", "authorized_keys") # Drop the parent column for persistent VMs op.drop_column("program_machine_volumes", "parent") diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index 772abc139..1d92e46ab 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -61,6 +61,10 @@ def __init__( errors = [errors] super().__init__(errors) + def __str__(self): + # TODO: reimplement for each exception subtype + return self.__class__.__name__ + def details(self) -> Optional[Dict[str, Any]]: errors = self.args[0] return {"errors": errors} if errors else None @@ -301,9 +305,11 @@ def details(self) -> Optional[Dict[str, Any]]: class InsufficientBalanceException(InvalidMessageException): """ - You don't have enough in balance + The user does not have enough Aleph tokens to process the message. """ + error_code = ErrorCode.BALANCE_INSUFFICIENT + def __init__( self, balance: Decimal, @@ -314,6 +320,11 @@ def __init__( def details(self) -> Optional[Dict[str, Any]]: # Note: cast to string to keep the precision - return {"errors": [{"required_balance": str(self.required_balance), "account_balance": str(self.balance)}]} - - error_code = ErrorCode.BALANCE_INSUFFICIENT + return { + "errors": [ + { + "required_balance": str(self.required_balance), + "account_balance": str(self.balance), + } + ] + }