diff --git a/program_admin/__init__.py b/program_admin/__init__.py index 36b62a2..4561252 100644 --- a/program_admin/__init__.py +++ b/program_admin/__init__.py @@ -1,3 +1,4 @@ +import asyncio import json import os from pathlib import Path @@ -47,6 +48,8 @@ "pythtest": "https://api.pythtest.pyth.network", } +MAX_CONCURRENT_TRANSACTIONS = 50 + class ProgramAdmin: network: Network @@ -260,6 +263,8 @@ async def sync( # Sync product/price accounts + product_transactions: List[asyncio.Task[None]] = [] + product_updates: bool = False for jump_symbol, _price_account_map in ref_permissions.items(): @@ -278,12 +283,28 @@ async def sync( instructions.extend(product_instructions) if send_transactions: - await self.send_transaction(product_instructions, product_keypairs) + product_transactions.append( + asyncio.create_task( + self.send_transaction( + product_instructions, product_keypairs + ) + ) + ) + + if len(product_transactions) == MAX_CONCURRENT_TRANSACTIONS: + await asyncio.gather(*product_transactions) + product_transactions = [] + + if product_transactions: + await asyncio.gather(*product_transactions) if product_updates: await self.refresh_program_accounts() # Sync publishers + + publisher_transactions = [] + for jump_symbol, _price_account_map in ref_permissions.items(): ref_product = ref_products[jump_symbol] # type: ignore @@ -297,7 +318,18 @@ async def sync( if price_instructions: instructions.extend(price_instructions) if send_transactions: - await self.send_transaction(price_instructions, price_keypairs) + publisher_transactions.append( + asyncio.create_task( + self.send_transaction(price_instructions, price_keypairs) + ) + ) + + if len(publisher_transactions) == MAX_CONCURRENT_TRANSACTIONS: + await asyncio.gather(*publisher_transactions) + publisher_transactions = [] + + if publisher_transactions: + await asyncio.gather(*publisher_transactions) return instructions