Skip to content

Commit

Permalink
Fix: reprocess failed instance messages in migration script (#460)
Browse files Browse the repository at this point in the history
Problem: nodes that do not support instances yet will have marked
instance messages as rejected.

Solution: use the migration script to put these messages back in the
pending queue and reprocess them once the node restarts.
  • Loading branch information
odesenfans authored Oct 3, 2023
1 parent ce4cc1b commit 33ae961
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
40 changes: 39 additions & 1 deletion deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,43 @@
depends_on = None


def reprocess_failed_instance_messages():
op.execute(
"""
INSERT INTO pending_messages(item_hash, type, chain, sender, signature, item_type, item_content, time, channel,
reception_time, check_message, retries, tx_hash, fetched, next_attempt)
(SELECT rm.item_hash,
'INSTANCE',
rm.message ->> 'chain',
rm.message ->> 'sender',
rm.message ->> 'signature',
rm.message ->> 'item_type',
rm.message ->> 'item_content',
to_timestamp((rm.message ->> 'time')::numeric),
rm.message ->> 'channel',
ms.reception_time,
true,
0,
null,
false,
now()
FROM rejected_messages rm
JOIN message_status ms on rm.item_hash = ms.item_hash
WHERE message ->> 'type' = 'INSTANCE')
"""
)
op.execute(
"""
UPDATE message_status
SET status = 'pending'
FROM aleph.public.rejected_messages rm
WHERE message_status.item_hash = rm.item_hash
AND rm.message ->> 'type' = 'INSTANCE'
"""
)
op.execute("DELETE FROM rejected_messages WHERE message->>'type' = 'INSTANCE'")


def recreate_cost_views():
op.execute("DROP VIEW costs_view")
op.execute("DROP VIEW program_costs_view")
Expand Down Expand Up @@ -235,6 +272,7 @@ def upgrade() -> None:
"INSERT INTO error_codes(code, description) VALUES (304, 'VM volume parent is larger than the child volume')"
)

reprocess_failed_instance_messages()
# ### end Alembic commands ###


Expand Down Expand Up @@ -397,7 +435,7 @@ def downgrade() -> None:
op.execute("ALTER INDEX ix_vms_owner RENAME TO ix_programs_owner")

op.drop_column("programs", "program_type")
op.drop_column('programs', 'authorized_keys')
op.drop_column("programs", "authorized_keys")

# Drop the parent column for persistent VMs
op.drop_column("program_machine_volumes", "parent")
Expand Down
19 changes: 15 additions & 4 deletions src/aleph/types/message_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def __init__(
errors = [errors]
super().__init__(errors)

def __str__(self):
# TODO: reimplement for each exception subtype
return self.__class__.__name__

def details(self) -> Optional[Dict[str, Any]]:
errors = self.args[0]
return {"errors": errors} if errors else None
Expand Down Expand Up @@ -301,9 +305,11 @@ def details(self) -> Optional[Dict[str, Any]]:

class InsufficientBalanceException(InvalidMessageException):
"""
You don't have enough in balance
The user does not have enough Aleph tokens to process the message.
"""

error_code = ErrorCode.BALANCE_INSUFFICIENT

def __init__(
self,
balance: Decimal,
Expand All @@ -314,6 +320,11 @@ def __init__(

def details(self) -> Optional[Dict[str, Any]]:
# Note: cast to string to keep the precision
return {"errors": [{"required_balance": str(self.required_balance), "account_balance": str(self.balance)}]}

error_code = ErrorCode.BALANCE_INSUFFICIENT
return {
"errors": [
{
"required_balance": str(self.required_balance),
"account_balance": str(self.balance),
}
]
}

0 comments on commit 33ae961

Please sign in to comment.