Skip to content

Commit

Permalink
replace asyncio in fee an trace extraction with concurrent.futures
Browse files Browse the repository at this point in the history
  • Loading branch information
Tommel71 committed Jul 12, 2024
1 parent 8022d4b commit 0821a1e
Showing 1 changed file with 77 additions and 3 deletions.
80 changes: 77 additions & 3 deletions src/graphsenselib/ingest/tron/export_traces_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import concurrent
import logging
import time
from typing import List

import grpc
Expand Down Expand Up @@ -157,7 +159,9 @@ def run(self):

return traces, fees

async def fetch_and_process_block(self, i, wallet_stub, retries=5, timeout=3 * 60):
async def fetch_and_process_block_old(
self, i, wallet_stub, retries=5, timeout=3 * 60
):
# attempt = 0

# while attempt < retries:
Expand All @@ -178,7 +182,7 @@ async def fetch_and_process_block(self, i, wallet_stub, retries=5, timeout=3 * 6

# raise Exception(f"Failed to fetch block {i} after {retries} attempts")

def run_parallel(self):
def run_parallel_old(self):
async def run_async():
async with grpc.aio.insecure_channel(self.grpc_endpoint) as channel:
logger.debug("Connected to gRPC endpoint")
Expand All @@ -187,7 +191,7 @@ async def run_async():

async def sem_fetch_and_process_block(i):
async with semaphore:
ret = await self.fetch_and_process_block(i, wallet_stub)
ret = await self.fetch_and_process_block_old(i, wallet_stub)
if i % 1000 == 0:
logger.debug(f"Loaded Block {i}")
return ret
Expand Down Expand Up @@ -233,3 +237,73 @@ async def sem_fetch_and_process_block(i):
"Failed to fetch block range "
f" {self.start_block} {self.end_block} after {retries} attempts"
)

def fetch_and_process_block(self, i, wallet_stub, retries=5, timeout=3 * 60):
attempt = 0
while attempt < retries:
try:
block = wallet_stub.GetTransactionInfoByBlockNum(
NumberMessage(num=i), timeout=timeout
)
traces_per_block = decode_block_to_traces(i, block)
fees_per_block = decode_fees(i, block)
return traces_per_block, fees_per_block
except grpc.RpcError as e:
logger.error(f"Error fetching block {i}, attempt {attempt + 1}: {e}")
logger.error(e)
logger.error("Retrying...")
attempt += 1
time.sleep(1) # Backoff before retry

raise Exception(f"Failed to fetch block {i} after {retries} attempts")

def run_parallel(self):

def run_parallel():
def fetch_and_process_block_wrapper(i):
return self.fetch_and_process_block(i, wallet_stub)

with grpc.insecure_channel(self.grpc_endpoint) as channel:
logger.debug("Connected to gRPC endpoint")
wallet_stub = WalletStub(channel)

with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
blocks_data = list(
executor.map(
fetch_and_process_block_wrapper,
range(self.start_block, self.end_block + 1),
)
)
import itertools

traces = list(
itertools.chain(*[block_data[0] for block_data in blocks_data])
)
fees = list(
itertools.chain(*[block_data[1] for block_data in blocks_data])
)

logger.debug(
f"Got traces for {self.start_block} - {self.end_block}."
)
return traces, fees

attempt = 0
retries = 5

while attempt < retries:
try:
return run_parallel()
except Exception as e:
logger.error(
f"Error fetching block range {self.start_block} "
f"{self.end_block}, attempt {attempt + 1}: {e}"
)
logger.error(e)
logger.error("Retrying...")
attempt += 1

raise Exception(
f"Failed to fetch block range {self.start_block} {self.end_block} after "
f"{retries} attempts"
)

0 comments on commit 0821a1e

Please sign in to comment.