diff --git a/openg2p-g2p-bridge-celery-beat-producers/.env.example b/openg2p-g2p-bridge-celery-beat-producers/.env.example index 95f705f..389735a 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/.env.example +++ b/openg2p-g2p-bridge-celery-beat-producers/.env.example @@ -17,4 +17,4 @@ G2P_BRIDGE_MAPPER_RESOLVE_FREQUENCY=3600 G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_FREQUENCY=3600 G2P_BRIDGE_FUNDS_BLOCKED_FREQUENCY=3600 G2P_BRIDGE_FUNDS_DISBURSEMENT_FREQUENCY=3600 -G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600 \ No newline at end of file +G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600 diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py index e424be8..fbcd519 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py @@ -73,4 +73,5 @@ def block_funds_with_bank_beat_producer(): args=(envelope.disbursement_envelope_id,), queue="g2p_bridge_celery_worker_tasks", ) + _logger.info("Completed checking for envelopes to block funds with bank") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py index 2d855d6..8383c82 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py @@ -20,8 +20,8 @@ @celery_app.task(name="check_funds_with_bank_beat_producer") def check_funds_with_bank_beat_producer(): + _logger.info("Checking funds with bank") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) - with session_maker() as session: envelopes = ( session.execute( @@ -62,8 +62,13 @@ def check_funds_with_bank_beat_producer(): ) for envelope in envelopes: + _logger.info( + f"Sending task to check funds with bank for envelope {envelope.disbursement_envelope_id}" + ) celery_app.send_task( "check_funds_with_bank_worker", args=(envelope.disbursement_envelope_id,), queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Checking funds with bank beat tasks push completed") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py index e91ece9..930a03a 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py @@ -22,6 +22,7 @@ @celery_app.task(name="disburse_funds_from_bank_beat_producer") def disburse_funds_from_bank_beat_producer(): + _logger.info("Running disburse_funds_from_bank_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: envelopes = ( @@ -67,8 +68,15 @@ def disburse_funds_from_bank_beat_producer(): ) for batch in pending_batches: + _logger.info( + f"Sending task to disburse funds for batch {batch.bank_disbursement_batch_id}" + ) celery_app.send_task( "disburse_funds_from_bank_worker", (batch.bank_disbursement_batch_id,), queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info( + f"Sent tasks to disburse funds for {len(pending_batches)} batches" + ) diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py index f4fa723..0c8d928 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py @@ -17,8 +17,8 @@ @celery_app.task(name="mapper_resolution_beat_producer") def mapper_resolution_beat_producer(): + _logger.info("Running mapper_resolution_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) - with session_maker() as session: mapper_resolution_batch_statuses = ( session.execute( @@ -36,8 +36,13 @@ def mapper_resolution_beat_producer(): ) for mapper_resolution_batch_status in mapper_resolution_batch_statuses: + _logger.info( + f"Sending mapper_resolution_worker task for mapper_resolution_batch_id: {mapper_resolution_batch_status.mapper_resolution_batch_id}" + ) celery_app.send_task( "mapper_resolution_worker", args=[mapper_resolution_batch_status.mapper_resolution_batch_id], queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Finished mapper_resolution_beat_producer") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py index e8d0ff3..c21d38d 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py @@ -17,6 +17,7 @@ @celery_app.task(name="mt940_processor_beat_producer") def mt940_processor_beat_producer(): + _logger.info("Running mt940_processor_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: account_statements = ( @@ -35,8 +36,13 @@ def mt940_processor_beat_producer(): ) for statement in account_statements: + _logger.info( + f"Sending mt940_processor_worker task for statement_id: {statement.statement_id}" + ) celery_app.send_task( "mt940_processor_worker", args=[statement.statement_id], queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Finished mt940_processor_beat_producer") diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py index 4c4bf2e..d3db5d0 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py @@ -3,6 +3,7 @@ import uuid from datetime import datetime from typing import List +import logging from openg2p_fastapi_common.service import BaseService from openg2p_g2p_bridge_models.models import MapperResolvedFaType @@ -17,7 +18,7 @@ from ..config import Settings _config = Settings.get_config() - +_logger = logging.getLogger(_config.logging_default_logger_name) class FAKeys(enum.Enum): account_number = "account_number" @@ -38,17 +39,20 @@ class KeyValuePair(BaseModel): class ResolveHelper(BaseService): def construct_single_resolve_request(self, id: str) -> SingleResolveRequest: + _logger.info(f"Constructing single resolve request for ID: {id}") single_resolve_request = SingleResolveRequest( reference_id=str(uuid.uuid4()), timestamp=datetime.now(), id=id, scope="details", ) + _logger.info(f"Constructed single resolve request for ID: {id}") return single_resolve_request def construct_resolve_request( self, single_resolve_requests: List[SingleResolveRequest] ) -> ResolveRequest: + _logger.info(f"Constructing resolve request for {len(single_resolve_requests)} single resolve requests") resolve_request_message = ResolveRequestMessage( transaction_id=str(uuid.uuid4()), resolve_request=single_resolve_requests, @@ -66,10 +70,11 @@ def construct_resolve_request( ), message=resolve_request_message, ) - + _logger.info(f"Constructed resolve request for {len(single_resolve_requests)} single resolve requests") return resolve_request def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]: + _logger.info(f"Deconstructing ID/FA: {value}") regex_res = re.match(strategy, value) deconstructed_list = [] if regex_res: @@ -79,24 +84,30 @@ def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]: KeyValuePair(key=k, value=v) for k, v in regex_res.items() ] except Exception as e: + _logger.error(f"Error while deconstructing ID/FA: {e}") raise ValueError("Error while deconstructing ID/FA") from e + _logger.info(f"Deconstructed ID/FA: {value}") return deconstructed_list def deconstruct_fa(self, fa: str) -> dict: - deconstruct_strategy = self.get_deconstruct_strategy(fa) + _logger.info(f"Deconstructing FA: {fa}") + deconstruct_strategy = self._get_deconstruct_strategy(fa) if deconstruct_strategy: deconstructed_pairs = self._deconstruct(fa, deconstruct_strategy) deconstructed_fa = { pair.key.value: pair.value for pair in deconstructed_pairs } + _logger.info(f"Deconstructed FA: {fa}") return deconstructed_fa return {} - def get_deconstruct_strategy(self, fa: str) -> str: + def _get_deconstruct_strategy(self, fa: str) -> str: + _logger.info(f"Getting deconstruction strategy for FA: {fa}") if fa.endswith(MapperResolvedFaType.BANK_ACCOUNT.value): return _config.bank_fa_deconstruct_strategy elif fa.endswith(MapperResolvedFaType.MOBILE_WALLET.value): return _config.mobile_wallet_fa_deconstruct_strategy elif fa.endswith(MapperResolvedFaType.EMAIL_WALLET.value): return _config.email_wallet_fa_deconstruct_strategy + _logger.info(f"Deconstruction strategy not found for FA: {fa}") return "" diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py index 80433b9..b8cd118 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py @@ -22,6 +22,7 @@ @celery_app.task(name="check_funds_with_bank_worker") def check_funds_with_bank_worker(disbursement_envelope_id: str): + _logger.info(f"Checking funds with bank for envelope: {disbursement_envelope_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -102,5 +103,7 @@ def check_funds_with_bank_worker(disbursement_envelope_id: str): e ) disbursement_envelope_batch_status.funds_available_attempts += 1 - + _logger.info( + f"Checked funds with bank for envelope: {disbursement_envelope_id}" + ) session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py index 029fd55..5c2bb71 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py @@ -29,6 +29,7 @@ @celery_app.task(name="disburse_funds_from_bank_worker") def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): + _logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -42,6 +43,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not batch_status: + _logger.error( + f"Bank Disbursement Batch Status not found for batch: {bank_disbursement_batch_id}" + ) return disbursement_envelope_id = batch_status.disbursement_envelope_id @@ -55,6 +59,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not envelope: + _logger.error( + f"Disbursement Envelope not found for envelope: {disbursement_envelope_id}" + ) return envelope_batch_status = ( @@ -67,6 +74,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not envelope_batch_status: + _logger.error( + f"Disbursement Envelope Batch Status not found for envelope: {disbursement_envelope_id}" + ) return disbursement_batch_controls = ( @@ -167,9 +177,11 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): batch_status.disbursement_attempts += 1 except Exception as e: + _logger.error(f"Error disbursing funds with bank: {str(e)}") batch_status.disbursement_status = ProcessStatus.PENDING.value batch_status.disbursement_timestamp = datetime.utcnow() batch_status.latest_error_code = str(e) batch_status.disbursement_attempts += 1 + _logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id} completed") session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py index 1e6a59b..942a956 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py @@ -24,6 +24,7 @@ @celery_app.task(name="mapper_resolution_worker") def mapper_resolution_worker(mapper_resolution_batch_id: str): + _logger.info(f"Resolving the mapper resolution batch: {mapper_resolution_batch_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -52,6 +53,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): loop.close() if not resolve_response: + _logger.error(f"Failed to resolve the request: {error_msg}") session.query(MapperResolutionBatchStatus).filter( MapperResolutionBatchStatus.mapper_resolution_batch_id == mapper_resolution_batch_id @@ -72,6 +74,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): async def make_resolve_request(disbursement_batch_controls): + _logger.info("Making resolve request") resolve_helper = ResolveHelper.get_component() single_resolve_requests = [ @@ -88,6 +91,7 @@ async def make_resolve_request(disbursement_batch_controls): ) return resolve_response, None except Exception as e: + _logger.error(f"Failed to resolve the request: {e}") error_msg = f"Failed to resolve the request: {e}" return None, error_msg @@ -95,6 +99,7 @@ async def make_resolve_request(disbursement_batch_controls): def process_and_store_resolution( mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map ): + _logger.info("Processing and storing resolution") resolve_helper = ResolveHelper.get_component() session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -157,4 +162,5 @@ def process_and_store_resolution( + 1, } ) + _logger.info("Stored the resolution") session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py index 5195d97..146521f 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py @@ -94,6 +94,9 @@ def mt940_processor_worker(statement_id: str): .first() ) if not benefit_program_configuration: + _logger.error( + f"Benefit program configuration not found for account number: {account_statement.account_number}" + ) account_statement.statement_process_status = ProcessStatus.ERROR account_statement.statement_process_error_code = ( G2PBridgeErrorCodes.INVALID_ACCOUNT_NUMBER.value @@ -141,7 +144,9 @@ def mt940_processor_worker(statement_id: str): # Process only debit transactions for parsed_transaction in parsed_transactions_d: - bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) + bank_disbursement_batch_id = get_bank_batch_id( + parsed_transaction, session + ) if not bank_disbursement_batch_id: disbursement_error_recons.append( @@ -184,7 +189,9 @@ def mt940_processor_worker(statement_id: str): # Start processing reversal transactions - rd for parsed_transaction in parsed_transactions_rd: - bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) + bank_disbursement_batch_id = get_bank_batch_id( + parsed_transaction, session + ) if not bank_disbursement_batch_id: disbursement_error_recons.append( @@ -254,8 +261,7 @@ def get_disbursement_recon(parsed_transaction, session): disbursement_recon = ( session.query(DisbursementRecon) .filter( - DisbursementRecon.disbursement_id - == parsed_transaction["disbursement_id"] + DisbursementRecon.disbursement_id == parsed_transaction["disbursement_id"] ) .first() )