Skip to content

Commit

Permalink
improved timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
soad003 committed Jul 10, 2024
1 parent eae1bdb commit 3957729
Showing 1 changed file with 51 additions and 45 deletions.
96 changes: 51 additions & 45 deletions src/graphsenselib/ingest/tron/export_traces_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,75 +158,81 @@ def run(self):
return traces, fees

async def fetch_and_process_block(self, i, wallet_stub, retries=5, timeout=60):
attempt = 0

while attempt < retries:
try:
block = await asyncio.wait_for(
wallet_stub.GetTransactionInfoByBlockNum(
NumberMessage(num=i), timeout=timeout
),
timeout=timeout,
)
traces_per_block = decode_block_to_traces(i, block)
fees_per_block = decode_fees(i, block)
return traces_per_block, fees_per_block

except (grpc.RpcError, asyncio.TimeoutError) as e:
logger.error(f"Error fetching block {i}, attempt {attempt + 1}: {e}")
logger.error(e)
logger.error("Retrying...")
attempt += 1
await asyncio.sleep(1) # Backoff before retry

raise Exception(f"Failed to fetch block {i} after {retries} attempts")
# attempt = 0

# while attempt < retries:
# try:
block = await asyncio.wait_for(
wallet_stub.GetTransactionInfoByBlockNum(
NumberMessage(num=i), timeout=timeout
),
timeout=timeout,
)
traces_per_block = decode_block_to_traces(i, block)
fees_per_block = decode_fees(i, block)
return traces_per_block, fees_per_block

# except (grpc.RpcError, asyncio.TimeoutError) as e:
# logger.error(f"Error fetching block {i}, attempt {attempt + 1}: {e}")
# logger.error(e)
# logger.error("Retrying...")
# attempt += 1
# await asyncio.sleep(1) # Backoff before retry

# raise Exception(f"Failed to fetch block {i} after {retries} attempts")

def run_parallel(self):
loop = asyncio.get_event_loop()

async def run_async():
retries = 3
attempt = 0
async with grpc.aio.insecure_channel(self.grpc_endpoint) as channel:
logger.debug("Connected to gRPC endpoint")
wallet_stub = WalletStub(channel)
semaphore = asyncio.Semaphore(30)

async def sem_fetch_and_process_block(i):
logger.debug(f"Loading Block {i}")
async with semaphore:
logger.debug(f"Loading enter sem Block {i}")
ret = await self.fetch_and_process_block(i, wallet_stub)
logger.debug(f"Loaded Block {i}")
if i % 1000 == 0:
logger.debug(f"Loaded Block {i}")
return ret

tasks = [
sem_fetch_and_process_block(i)
for i in range(self.start_block, self.end_block + 1)
]
while attempt < retries:
try:
async with asyncio.timeout(60 * 15):
results = await asyncio.gather(*tasks)
attempt = 0
except asyncio.TimeoutError as e:
logger.error(
f"Error fetching block range {self.start_block} "
f"-- {self.end_block}, attempt {attempt + 1}: {e}"
)
logger.error(e)
logger.error("Retrying...")
attempt += 1

results = await asyncio.gather(*tasks)
traces = []
fees = []

logger.debug(f"Got traces for {self.start_block} - {self.end_block}.")

# Unpacking results
for traces_per_block, fees_per_block in results:
traces.extend(traces_per_block)
fees.extend(fees_per_block)

return traces, fees

# Execute the asynchronous run inside a synchronous function
return loop.run_until_complete(run_async())
loop = asyncio.get_event_loop()

attempt = 0
retries = 5
timeout = 15 * 60

while attempt < retries:
try:
return loop.run_until_complete(
asyncio.wait_for(run_async(), timeout=15 * 60)
)
except (grpc.RpcError, asyncio.TimeoutError) as e:
logger.error(
f"Error fetching block range in {timeout}s "
f"{self.start_block} {self.end_block}, attempt {attempt + 1}: {e}"
)
logger.error(e)
logger.error("Retrying...")
attempt += 1

raise Exception(
"Failed to fetch block range "
f" {self.start_block} {self.end_block} after {retries} attempts"
)

0 comments on commit 3957729

Please sign in to comment.