Skip to content

Commit

Permalink
improved logging
Browse files Browse the repository at this point in the history
  • Loading branch information
soad003 committed Dec 20, 2023
1 parent 1fafe40 commit 1ea6bd1
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 33 deletions.
61 changes: 48 additions & 13 deletions src/graphsenselib/db/cassandra.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hashlib
import logging
import threading
import time
from functools import wraps
from typing import Iterable, List, Optional, Sequence, Union
Expand All @@ -8,7 +9,7 @@

# Session,
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
from cassandra.policies import RetryPolicy
from cassandra.policies import DCAwareRoundRobinPolicy, RetryPolicy, TokenAwarePolicy
from cassandra.query import (
UNSET_VALUE,
BatchStatement,
Expand Down Expand Up @@ -44,14 +45,11 @@ def on_read_timeout(self, *args, **kwargs):
else:
return (self.RETHROW, None)

# def on_write_timeout(self, *args, **kwargs):
# if kwargs["retry_num"] < self.max_retries:
# logger.debug(
# "Retrying write after timeout. Attempt #" + str(kwargs["retry_num"])
# )
# return (self.RETRY, None)
# else:
# return (self.RETHROW, None)
def on_write_timeout(self, *args, **kwargs):
logger.debug(
"Retrying read after timeout. Attempt #" + str(kwargs["retry_num"])
)
return (self.RETHROW, None)

# def on_unavailable(self, *args, **kwargs):
# if kwargs["retry_num"] < self.max_retries:
Expand Down Expand Up @@ -291,21 +289,27 @@ def __init__(self, db_nodes: Iterable, default_timeout=DEFAULT_TIMEOUT) -> None:
self.cluster = None
self.session = None
self.prep_stmts = {}
self._session_timeout = default_timeout
self._default_timeout = default_timeout

def clone(self) -> "CassandraDb":
return CassandraDb(self.db_nodes, self._session_timeout)
return CassandraDb(self.db_nodes, self._default_timeout)

def __repr__(self):
return f"{', '.join(self.db_nodes)}"

def connect(self):
"""Connect to given Cassandra cluster nodes."""
exec_prof = ExecutionProfile(retry_policy=ReadRetryPolicy(), request_timeout=30)
exec_prof = ExecutionProfile(
retry_policy=ReadRetryPolicy(),
request_timeout=self._default_timeout,
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()),
)
self.cluster = Cluster(
self.db_nodes,
execution_profiles={EXEC_PROFILE_DEFAULT: exec_prof},
connect_timeout=self._session_timeout,
connect_timeout=self._default_timeout,
protocol_version=5,
compression="lz4",
)
try:
self.session = self.cluster.connect()
Expand Down Expand Up @@ -489,6 +493,37 @@ def ingest(
if auto_none_to_unset:
items = [none_to_unset(row) for row in items]

if table == "log":
x = [
item
for item in items
if len(item["topics"]) % 2 == 0
and len(item["topics"]) > 0
and item["topics"][: len(item["topics"]) // 2]
== item["topics"][len(item["topics"]) // 2 :]
]
if len(x) > 0:
logger.warning(x)
# from collections import defaultdict

# dic = defaultdict(int)
# for i in items:
# dic[
# (
# i["block_id_group"],
# i["block_id"],
# i["topic0"].hex(),
# i["log_index"],
# )
# ] += 1

# assert len([v for v in dic.values() if v > 1]) == 0
blks = [it["block_id"] for it in items if "block_id" in it]
logger.debug(
f"{threading.get_ident()}, {table},"
f" {min(blks)}, {max(blks)}, {len(items)}"
)

self._exe_with_retries(stmt, items, concurrency=concurrency)

@needs_session
Expand Down
31 changes: 22 additions & 9 deletions src/graphsenselib/ingest/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
parse_timestamp,
remove_prefix,
)
from ..utils.logging import suppress_log_level
from ..utils.logging import configure_logging, suppress_log_level
from ..utils.signals import graceful_ctlc_shutdown
from ..utils.tron import evm_to_bytes, strip_tron_prefix
from .common import (
Expand Down Expand Up @@ -350,7 +350,7 @@ def prepare_logs_inplace(items: Iterable, block_bucket_size: int):

# if topics contain duplicates
if (
len(item["topics"]) % 2 == 0
len(item["topics"]) % 2 == 0 and len(item["topics"]) > 0
): # todo may be removed if we are that there are no duplicates
if (
item["topics"][: len(item["topics"]) // 2]
Expand Down Expand Up @@ -904,11 +904,13 @@ def __init__(
grpc_provider_uri: str,
is_trace_only_mode: bool,
is_update_transactions_mode: bool,
batch_size: int = WEB3_QUERY_BATCH_SIZE,
fees_only_mode: bool = False,
):
super().__init__(
http_provider_uri, provider_timeout, is_trace_only_mode, fees_only_mode
)
self.batch_size = batch_size
self.grpc_provider_uri = grpc_provider_uri
self.is_update_transactions_mode = is_update_transactions_mode

Expand Down Expand Up @@ -945,7 +947,7 @@ def get_source_adapter(self):
return TronStreamerAdapter(
get_connection_from_url(self.http_provider_uri, self.provider_timeout),
grpc_endpoint=self.grpc_provider_uri,
batch_size=WEB3_QUERY_BATCH_SIZE,
batch_size=self.batch_size,
max_workers=WEB3_QUERY_WORKERS,
)

Expand Down Expand Up @@ -973,6 +975,13 @@ def ingest_async(
):
logger.info("Writing data in parallel")

interleave_batches = 3
batch_size = (
(batch_size_user // interleave_batches)
if batch_size_user >= 3
else batch_size_user
)

# make sure that only supported sinks are selected.
if not all((x in ["cassandra", "parquet"]) for x in sink_config.keys()):
raise BadUserInputError(
Expand Down Expand Up @@ -1011,6 +1020,7 @@ def ingest_async(
grpc_provider_uri,
is_trace_only_mode,
is_update_transactions_mode,
batch_size=batch_size,
fees_only_mode=fees_only_mode,
)

Expand Down Expand Up @@ -1060,8 +1070,8 @@ def ingest_async(

thread_context = threading.local()

def initializer_worker(thrd_ctx, db, sink_config, strategy):
logging.disable(logging.INFO)
def initializer_worker(thrd_ctx, db, sink_config, strategy, loglevel):
configure_logging(loglevel)
new_db_conn = db.clone()
new_db_conn.open()
thrd_ctx.db = new_db_conn
Expand All @@ -1086,13 +1096,16 @@ def submit_tasks(ex, thrd_ctx, tasks, data=None):
for cmd in tasks
]

interleave_batches = 3
batch_size = batch_size_user // interleave_batches

with graceful_ctlc_shutdown() as check_shutdown_initialized:
with concurrent.futures.ThreadPoolExecutor(
initializer=initializer_worker,
initargs=(thread_context, db, sink_config, transform_strategy),
initargs=(
thread_context,
db,
sink_config,
transform_strategy,
logger.getEffectiveLevel(),
),
max_workers=15, # we write at most 4 tables in parallel
) as ex:
time1 = datetime.now()
Expand Down
40 changes: 29 additions & 11 deletions src/graphsenselib/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

from ..config import GRAPHSENSE_DEFAULT_DATETIME_FORMAT

# create logger
logger = logging.getLogger(__name__)


@contextmanager
def suppress_log_level(loglevel: int):
Expand All @@ -22,12 +25,16 @@ def suppress_log_level(loglevel: int):
def configure_logging(loglevel):
log_format = " | %(message)s"

if loglevel == 0:
loglevel = logging.WARNING
elif loglevel == 1:
loglevel = logging.INFO
elif loglevel >= 2:
loglevel = logging.DEBUG
if loglevel < 10:
# this means the value passed is
# not a valid log level in python
if loglevel == 0:
loglevel = logging.WARNING
elif loglevel == 1:
loglevel = logging.INFO
elif loglevel >= 2:
loglevel = logging.DEBUG
log_format = " | %(name)s | %(thread)d | %(message)s"

""" RichHandler colorizes the logs """
c = Console(width=220)
Expand All @@ -48,11 +55,22 @@ def configure_logging(loglevel):
handlers=[rh],
)

logging.getLogger("cassandra").setLevel(logging.ERROR)
logging.getLogger("ethereumetl").setLevel(logging.ERROR)
logging.getLogger("Cluster").setLevel(logging.ERROR)
logging.getLogger("requests").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
if loglevel <= logging.DEBUG:
logger.warning("Logging set to verbose mode.")
logging.getLogger("cassandra").setLevel(logging.DEBUG)
logging.getLogger("ethereumetl").setLevel(logging.WARNING)
logging.getLogger("web3").setLevel(logging.WARNING)
logging.getLogger("Cluster").setLevel(logging.DEBUG)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("ProgressLogger").setLevel(logging.ERROR)
else:
logging.getLogger("cassandra").setLevel(logging.WARNING)
logging.getLogger("ethereumetl").setLevel(logging.WARNING)
logging.getLogger("Cluster").setLevel(logging.ERROR)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("ProgressLogger").setLevel(logging.ERROR)


class IndentLogger(logging.LoggerAdapter):
Expand Down

0 comments on commit 1ea6bd1

Please sign in to comment.