diff --git a/openg2p-g2p-bridge-celery-workers/.gitignore b/.gitignore similarity index 100% rename from openg2p-g2p-bridge-celery-workers/.gitignore rename to .gitignore diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py index 6869345..15ffa31 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py @@ -39,7 +39,7 @@ async def upload_mt940(self, statement_file: UploadFile) -> str: statement_lob = AccountStatementLob( statement_id=statement_id, - statement_lob=str(statement_file), + statement_lob=str(statement_file.decode("utf-8")), active=True, ) session.add(statement_lob) diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py index b17ed81..19185a4 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py @@ -141,7 +141,7 @@ async def construct_disbursements( disbursements: List[Disbursement] = [] for disbursement_payload in disbursement_payloads: disbursement = Disbursement( - disbursement_id=str(int(time.time() * 1000)), + disbursement_id=str(int(time.time() * 1000000)), disbursement_envelope_id=str( disbursement_payload.disbursement_envelope_id ), diff --git a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py index 33fd60b..6a82900 100644 --- a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py +++ b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py @@ -1,4 +1,5 @@ from typing import List, Optional +import logging import httpx from openg2p_g2p_bridge_models.models import ( @@ -18,7 +19,7 @@ from ..config import Settings _config = Settings.get_config() - +_logger = logging.getLogger(_config.logging_default_logger_name) class BankPaymentPayload(BaseModel): payment_reference_number: str @@ -51,11 +52,14 @@ class BankPaymentPayload(BaseModel): class ExampleBankConnector(BankConnectorInterface): - def check_funds(self, account_no, currency, amount) -> CheckFundsResponse: + def check_funds(self, account_number, currency, amount) -> CheckFundsResponse: + _logger.info( + f"Checking funds availability for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) try: with httpx.Client() as client: request_data = { - "account_number": account_no, + "account_number": account_number, "account_currency": currency, "total_funds_needed": amount, } @@ -66,22 +70,34 @@ def check_funds(self, account_no, currency, amount) -> CheckFundsResponse: data = response.json() if data["status"] == "success": + _logger.info( + f"Funds available for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return CheckFundsResponse( status=FundsAvailableWithBankEnum.FUNDS_AVAILABLE, error_code="" ) + _logger.info( + f"Funds not available for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return CheckFundsResponse( status=FundsAvailableWithBankEnum.FUNDS_NOT_AVAILABLE, error_code="" ) except httpx.HTTPStatusError as e: + _logger.error( + f"Error checking funds availability for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return CheckFundsResponse( status=FundsAvailableWithBankEnum.PENDING_CHECK, error_code=str(e) ) - def block_funds(self, account_no, currency, amount) -> BlockFundsResponse: + def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: + _logger.info( + f"Blocking funds for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) try: with httpx.Client() as client: request_data = { - "account_no": account_no, + "account_number": account_number, "currency": currency, "amount": amount, } @@ -92,17 +108,26 @@ def block_funds(self, account_no, currency, amount) -> BlockFundsResponse: data = response.json() if data["status"] == "success": + _logger.info( + f"Funds blocked for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return BlockFundsResponse( status=FundsBlockedWithBankEnum.FUNDS_BLOCK_SUCCESS, block_reference_no=data["block_reference_no"], error_code="", ) + _logger.error( + f"Funds block failed for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return BlockFundsResponse( status=FundsBlockedWithBankEnum.FUNDS_BLOCK_FAILURE, block_reference_no="", error_code=data.get("error_code", ""), ) except httpx.HTTPStatusError as e: + _logger.error( + f"Error blocking funds for account_number: {account_number}, currency: {currency}, amount: {amount}" + ) return BlockFundsResponse( status=FundsBlockedWithBankEnum.FUNDS_BLOCK_FAILURE, block_reference_no="", @@ -112,6 +137,7 @@ def block_funds(self, account_no, currency, amount) -> BlockFundsResponse: def initiate_payment( self, disbursement_payment_payloads: List[DisbursementPaymentPayload] ) -> PaymentResponse: + _logger.info(f"Initiating payment for {len(disbursement_payment_payloads)} disbursements") try: with httpx.Client() as client: bank_payment_payloads = [] @@ -143,7 +169,7 @@ def initiate_payment( ) bank_payment_payloads.append(bank_payment_payload.model_dump()) - request_data = {"initiate_payment_payloads": bank_payment_payloads} + request_data = bank_payment_payloads response = client.post( _config.funds_disbursement_url_example_bank, json=request_data @@ -152,11 +178,14 @@ def initiate_payment( data = response.json() if data["status"] == "success": + _logger.info(f"Payment initiated successfully") return PaymentResponse(status=PaymentStatus.SUCCESS, error_code="") + _logger.error(f"Payment initiation failed") return PaymentResponse( status=PaymentStatus.ERROR, error_code=data.get("error_message", "") ) except httpx.HTTPStatusError as e: + _logger.error(f"Error initiating payment: {e}") return PaymentResponse(status=PaymentStatus.ERROR, error_code=str(e)) def retrieve_disbursement_id( @@ -165,7 +194,7 @@ def retrieve_disbursement_id( return customer_reference def retrieve_beneficiary_name(self, narratives: str) -> str: - return narratives[0] + return narratives[3] def retrieve_reversal_reason(self, narratives: str) -> str: - return narratives[1] + return narratives[5] diff --git a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py index 34e7064..54171d3 100644 --- a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py +++ b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_interface/bank_connector_interface.py @@ -60,10 +60,10 @@ class PaymentResponse(BaseModel): class BankConnectorInterface(BaseService): - def check_funds(self, account_no, currency, amount) -> CheckFundsResponse: + def check_funds(self, account_number, currency, amount) -> CheckFundsResponse: raise NotImplementedError() - def block_funds(self, account_no, currency, amount) -> BlockFundsResponse: + def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: raise NotImplementedError() def initiate_payment( diff --git a/openg2p-g2p-bridge-celery-beat-producers/.env.example b/openg2p-g2p-bridge-celery-beat-producers/.env.example index 6d3466b..389735a 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/.env.example +++ b/openg2p-g2p-bridge-celery-beat-producers/.env.example @@ -8,3 +8,13 @@ G2P_BRIDGE_CELERY_TASKS_DB_DBNAME=openg2p_g2p_bridge_db G2P_BRIDGE_BANK_DECONSTRUCT_STRATEGY="bank_(?P\d+)_(?P\d+)_(?P\d+)_(?P\w+)" G2P_BRIDGE_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P\d+)_(?P\w+)" G2P_BRIDGE_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P\w+)_(?P\w+)" +G2P_BRIDGE_MAPPER_RESOLVE_ATTEMPTS=3 +G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_ATTEMPTS=3 +G2P_BRIDGE_FUNDS_BLOCKED_ATTEMPTS=3 +G2P_BRIDGE_FUNDS_DISBURSEMENT_ATTEMPTS=3 +G2P_BRIDGE_STATEMENT_PROCESS_ATTEMPTS=3 +G2P_BRIDGE_MAPPER_RESOLVE_FREQUENCY=3600 +G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_FREQUENCY=3600 +G2P_BRIDGE_FUNDS_BLOCKED_FREQUENCY=3600 +G2P_BRIDGE_FUNDS_DISBURSEMENT_FREQUENCY=3600 +G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600 diff --git a/openg2p-g2p-bridge-celery-beat-producers/.gitignore b/openg2p-g2p-bridge-celery-beat-producers/.gitignore index f5e0368..380911c 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/.gitignore +++ b/openg2p-g2p-bridge-celery-beat-producers/.gitignore @@ -79,3 +79,4 @@ docs/_build/ # Ignore secret files and env .secrets.* .env +*.db diff --git a/openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db b/openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db deleted file mode 100644 index a092bf0..0000000 Binary files a/openg2p-g2p-bridge-celery-beat-producers/celerybeat-schedule.db and /dev/null differ diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py index 3bc1216..440a052 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/app.py @@ -31,7 +31,7 @@ def get_engine(): celery_app = Celery( - "g2p_bridge_celery_tasks", + "g2p_bridge_celery_beat_producer", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0", include=["openg2p_g2p_bridge_celery_beat_producers.tasks"], @@ -54,5 +54,9 @@ def get_engine(): "task": "disburse_funds_from_bank_beat_producer", "schedule": _config.funds_disbursement_frequency, }, + "mt940_processor_beat_producer": { + "task": "mt940_processor_beat_producer", + "schedule": _config.mt940_processor_frequency, + }, } celery_app.conf.timezone = "UTC" diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py index a6ebba1..c3ce2a4 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/config.py @@ -20,20 +20,14 @@ class Settings(BaseSettings): db_dbname: str = "openg2p_g2p_bridge_db" db_driver: str = "postgresql" - mapper_resolve_api_url: str = "" - mapper_resolve_attempts: int = 3 funds_available_check_attempts: int = 3 funds_blocked_attempts: int = 3 funds_disbursement_attempts: int = 3 statement_process_attempts: int = 3 - mapper_resolve_frequency: int = 10 - funds_available_check_frequency: int = 10 - funds_blocked_frequency: int = 10 - funds_disbursement_frequency: int = 10 - statement_process_frequency: int = 3600 - - bank_fa_deconstruct_strategy: str = "" - mobile_wallet_deconstruct_strategy: str = "" - email_wallet_deconstruct_strategy: str = "" + mapper_resolve_frequency: int = 3600 + funds_available_check_frequency: int = 3600 + funds_blocked_frequency: int = 3600 + funds_disbursement_frequency: int = 3600 + mt940_processor_frequency: int = 3600 diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py index e424be8..fbcd519 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/block_funds_with_bank.py @@ -73,4 +73,5 @@ def block_funds_with_bank_beat_producer(): args=(envelope.disbursement_envelope_id,), queue="g2p_bridge_celery_worker_tasks", ) + _logger.info("Completed checking for envelopes to block funds with bank") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py index 2d855d6..8383c82 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/check_funds_with_bank_task.py @@ -20,8 +20,8 @@ @celery_app.task(name="check_funds_with_bank_beat_producer") def check_funds_with_bank_beat_producer(): + _logger.info("Checking funds with bank") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) - with session_maker() as session: envelopes = ( session.execute( @@ -62,8 +62,13 @@ def check_funds_with_bank_beat_producer(): ) for envelope in envelopes: + _logger.info( + f"Sending task to check funds with bank for envelope {envelope.disbursement_envelope_id}" + ) celery_app.send_task( "check_funds_with_bank_worker", args=(envelope.disbursement_envelope_id,), queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Checking funds with bank beat tasks push completed") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py index ae17c81..930a03a 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/disburse_funds_from_bank.py @@ -22,6 +22,7 @@ @celery_app.task(name="disburse_funds_from_bank_beat_producer") def disburse_funds_from_bank_beat_producer(): + _logger.info("Running disburse_funds_from_bank_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: envelopes = ( @@ -48,7 +49,6 @@ def disburse_funds_from_bank_beat_producer(): .scalars() .all() ) - for envelope in envelopes: pending_batches = ( session.execute( @@ -68,8 +68,15 @@ def disburse_funds_from_bank_beat_producer(): ) for batch in pending_batches: + _logger.info( + f"Sending task to disburse funds for batch {batch.bank_disbursement_batch_id}" + ) celery_app.send_task( "disburse_funds_from_bank_worker", (batch.bank_disbursement_batch_id,), queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info( + f"Sent tasks to disburse funds for {len(pending_batches)} batches" + ) diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py index f4fa723..0c8d928 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mapper_resolution_task.py @@ -17,8 +17,8 @@ @celery_app.task(name="mapper_resolution_beat_producer") def mapper_resolution_beat_producer(): + _logger.info("Running mapper_resolution_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) - with session_maker() as session: mapper_resolution_batch_statuses = ( session.execute( @@ -36,8 +36,13 @@ def mapper_resolution_beat_producer(): ) for mapper_resolution_batch_status in mapper_resolution_batch_statuses: + _logger.info( + f"Sending mapper_resolution_worker task for mapper_resolution_batch_id: {mapper_resolution_batch_status.mapper_resolution_batch_id}" + ) celery_app.send_task( "mapper_resolution_worker", args=[mapper_resolution_batch_status.mapper_resolution_batch_id], queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Finished mapper_resolution_beat_producer") diff --git a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py index e8d0ff3..c21d38d 100644 --- a/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-beat-producers/src/openg2p_g2p_bridge_celery_beat_producers/tasks/mt940_processor.py @@ -17,6 +17,7 @@ @celery_app.task(name="mt940_processor_beat_producer") def mt940_processor_beat_producer(): + _logger.info("Running mt940_processor_beat_producer") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: account_statements = ( @@ -35,8 +36,13 @@ def mt940_processor_beat_producer(): ) for statement in account_statements: + _logger.info( + f"Sending mt940_processor_worker task for statement_id: {statement.statement_id}" + ) celery_app.send_task( "mt940_processor_worker", args=[statement.statement_id], queue="g2p_bridge_celery_worker_tasks", ) + + _logger.info("Finished mt940_processor_beat_producer") diff --git a/openg2p-g2p-bridge-celery-workers/.env.example b/openg2p-g2p-bridge-celery-workers/.env.example index 6d3466b..d729b54 100644 --- a/openg2p-g2p-bridge-celery-workers/.env.example +++ b/openg2p-g2p-bridge-celery-workers/.env.example @@ -5,6 +5,9 @@ G2P_BRIDGE_CELERY_TASKS_PORT=8001 G2P_BRIDGE_CELERY_TASKS_WORKER_TYPE=gunicorn G2P_BRIDGE_CELERY_TASKS_NO_OF_WORKERS=1 G2P_BRIDGE_CELERY_TASKS_DB_DBNAME=openg2p_g2p_bridge_db -G2P_BRIDGE_BANK_DECONSTRUCT_STRATEGY="bank_(?P\d+)_(?P\d+)_(?P\d+)_(?P\w+)" -G2P_BRIDGE_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P\d+)_(?P\w+)" -G2P_BRIDGE_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P\w+)_(?P\w+)" +G2P_BRIDGE_CELERY_TASKS_BANK_FA_DECONSTRUCT_STRATEGY="bank_(?P\d+)_(?P\d+)_(?P\d+)_(?P\w+)" +G2P_BRIDGE_CELERY_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P\d+)_(?P\w+)" +G2P_BRIDGE_CELERY_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P\w+)_(?P\w+)" +G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_AVAILABLE_CHECK_URL_EXAMPLE_BANK="http://127.0.0.1:8003/check_funds" +G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_BLOCK_URL_EXAMPLE_BANK="http://127.0.0.1:8003/block_funds" +G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_DISBURSEMENT_URL_EXAMPLE_BANK="http://127.0.0.1:8003/initiate_payment" diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py index d2a12e6..f41f6d7 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/app.py @@ -34,7 +34,7 @@ def get_engine(): celery_app = Celery( - "g2p_bridge_celery_worker_tasks", + "g2p_bridge_celery_worker", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0", include=["openg2p_g2p_bridge_celery_workers.tasks"], diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py index 6eea2fd..19e6504 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/config.py @@ -22,12 +22,6 @@ class Settings(BaseSettings): mapper_resolve_api_url: str = "" - mapper_resolve_attempts: int = 3 - funds_available_check_attempts: int = 3 - funds_blocked_attempts: int = 3 - funds_disbursement_attempts: int = 3 - statement_process_attempts: int = 3 - bank_fa_deconstruct_strategy: str = "" mobile_wallet_deconstruct_strategy: str = "" email_wallet_deconstruct_strategy: str = "" diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py index 4c4bf2e..d3db5d0 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py @@ -3,6 +3,7 @@ import uuid from datetime import datetime from typing import List +import logging from openg2p_fastapi_common.service import BaseService from openg2p_g2p_bridge_models.models import MapperResolvedFaType @@ -17,7 +18,7 @@ from ..config import Settings _config = Settings.get_config() - +_logger = logging.getLogger(_config.logging_default_logger_name) class FAKeys(enum.Enum): account_number = "account_number" @@ -38,17 +39,20 @@ class KeyValuePair(BaseModel): class ResolveHelper(BaseService): def construct_single_resolve_request(self, id: str) -> SingleResolveRequest: + _logger.info(f"Constructing single resolve request for ID: {id}") single_resolve_request = SingleResolveRequest( reference_id=str(uuid.uuid4()), timestamp=datetime.now(), id=id, scope="details", ) + _logger.info(f"Constructed single resolve request for ID: {id}") return single_resolve_request def construct_resolve_request( self, single_resolve_requests: List[SingleResolveRequest] ) -> ResolveRequest: + _logger.info(f"Constructing resolve request for {len(single_resolve_requests)} single resolve requests") resolve_request_message = ResolveRequestMessage( transaction_id=str(uuid.uuid4()), resolve_request=single_resolve_requests, @@ -66,10 +70,11 @@ def construct_resolve_request( ), message=resolve_request_message, ) - + _logger.info(f"Constructed resolve request for {len(single_resolve_requests)} single resolve requests") return resolve_request def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]: + _logger.info(f"Deconstructing ID/FA: {value}") regex_res = re.match(strategy, value) deconstructed_list = [] if regex_res: @@ -79,24 +84,30 @@ def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]: KeyValuePair(key=k, value=v) for k, v in regex_res.items() ] except Exception as e: + _logger.error(f"Error while deconstructing ID/FA: {e}") raise ValueError("Error while deconstructing ID/FA") from e + _logger.info(f"Deconstructed ID/FA: {value}") return deconstructed_list def deconstruct_fa(self, fa: str) -> dict: - deconstruct_strategy = self.get_deconstruct_strategy(fa) + _logger.info(f"Deconstructing FA: {fa}") + deconstruct_strategy = self._get_deconstruct_strategy(fa) if deconstruct_strategy: deconstructed_pairs = self._deconstruct(fa, deconstruct_strategy) deconstructed_fa = { pair.key.value: pair.value for pair in deconstructed_pairs } + _logger.info(f"Deconstructed FA: {fa}") return deconstructed_fa return {} - def get_deconstruct_strategy(self, fa: str) -> str: + def _get_deconstruct_strategy(self, fa: str) -> str: + _logger.info(f"Getting deconstruction strategy for FA: {fa}") if fa.endswith(MapperResolvedFaType.BANK_ACCOUNT.value): return _config.bank_fa_deconstruct_strategy elif fa.endswith(MapperResolvedFaType.MOBILE_WALLET.value): return _config.mobile_wallet_fa_deconstruct_strategy elif fa.endswith(MapperResolvedFaType.EMAIL_WALLET.value): return _config.email_wallet_fa_deconstruct_strategy + _logger.info(f"Deconstruction strategy not found for FA: {fa}") return "" diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py index 80433b9..b8cd118 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/check_funds_with_bank_task.py @@ -22,6 +22,7 @@ @celery_app.task(name="check_funds_with_bank_worker") def check_funds_with_bank_worker(disbursement_envelope_id: str): + _logger.info(f"Checking funds with bank for envelope: {disbursement_envelope_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -102,5 +103,7 @@ def check_funds_with_bank_worker(disbursement_envelope_id: str): e ) disbursement_envelope_batch_status.funds_available_attempts += 1 - + _logger.info( + f"Checked funds with bank for envelope: {disbursement_envelope_id}" + ) session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py index 5975706..5c2bb71 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py @@ -29,6 +29,7 @@ @celery_app.task(name="disburse_funds_from_bank_worker") def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): + _logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -42,6 +43,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not batch_status: + _logger.error( + f"Bank Disbursement Batch Status not found for batch: {bank_disbursement_batch_id}" + ) return disbursement_envelope_id = batch_status.disbursement_envelope_id @@ -55,6 +59,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not envelope: + _logger.error( + f"Disbursement Envelope not found for envelope: {disbursement_envelope_id}" + ) return envelope_batch_status = ( @@ -67,6 +74,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): ) if not envelope_batch_status: + _logger.error( + f"Disbursement Envelope Batch Status not found for envelope: {disbursement_envelope_id}" + ) return disbursement_batch_controls = ( @@ -110,6 +120,7 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): payment_payloads.append( DisbursementPaymentPayload( + disbursement_id=disbursement.disbursement_id, remitting_account=benefit_program_configuration.sponsor_bank_account_number, remitting_account_currency=benefit_program_configuration.sponsor_bank_account_currency, payment_amount=disbursement.disbursement_amount, @@ -166,9 +177,11 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): batch_status.disbursement_attempts += 1 except Exception as e: + _logger.error(f"Error disbursing funds with bank: {str(e)}") batch_status.disbursement_status = ProcessStatus.PENDING.value batch_status.disbursement_timestamp = datetime.utcnow() batch_status.latest_error_code = str(e) batch_status.disbursement_attempts += 1 + _logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id} completed") session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py index 1e6a59b..942a956 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py @@ -24,6 +24,7 @@ @celery_app.task(name="mapper_resolution_worker") def mapper_resolution_worker(mapper_resolution_batch_id: str): + _logger.info(f"Resolving the mapper resolution batch: {mapper_resolution_batch_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -52,6 +53,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): loop.close() if not resolve_response: + _logger.error(f"Failed to resolve the request: {error_msg}") session.query(MapperResolutionBatchStatus).filter( MapperResolutionBatchStatus.mapper_resolution_batch_id == mapper_resolution_batch_id @@ -72,6 +74,7 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): async def make_resolve_request(disbursement_batch_controls): + _logger.info("Making resolve request") resolve_helper = ResolveHelper.get_component() single_resolve_requests = [ @@ -88,6 +91,7 @@ async def make_resolve_request(disbursement_batch_controls): ) return resolve_response, None except Exception as e: + _logger.error(f"Failed to resolve the request: {e}") error_msg = f"Failed to resolve the request: {e}" return None, error_msg @@ -95,6 +99,7 @@ async def make_resolve_request(disbursement_batch_controls): def process_and_store_resolution( mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map ): + _logger.info("Processing and storing resolution") resolve_helper = ResolveHelper.get_component() session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -157,4 +162,5 @@ def process_and_store_resolution( + 1, } ) + _logger.info("Stored the resolution") session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py index 328424d..146521f 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py @@ -28,6 +28,7 @@ @celery_app.task(name="mt940_processor_worker") def mt940_processor_worker(statement_id: str): + _logger.info(f"Processing account statement with statement_id: {statement_id}") session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: @@ -50,16 +51,12 @@ def mt940_processor_worker(statement_id: str): return try: - # Set BalanceBase scope to Transaction - mt940.tags.BalanceBase.scope = mt940.models.Transaction - # Parsing header section account_number_parser = mt940.tags.AccountIdentification() statement_number_parser = mt940.tags.StatementNumber() transaction_reference_parser = mt940.tags.TransactionReferenceNumber() statement_parser = mt940.tags.Statement() - mt940_statement = mt940.models.Transactions( processors={ "pre_statement": [mt940.processors.add_currency_pre_processor("")], @@ -75,27 +72,18 @@ def mt940_processor_worker(statement_id: str): mt940_statement.parse(lob.statement_lob) account_statement.account_number = mt940_statement.data.get( - "account_number", "" + "account_identification", "" ) account_statement.reference_number = mt940_statement.data.get( - "reference", "" + "transaction_reference", "" ) - statement_number_and_sequence = mt940_statement.data.get( - "number", "" - ).split("/") - account_statement.statement_number = ( - statement_number_and_sequence[0] - if statement_number_and_sequence - else "" + account_statement.statement_number = mt940_statement.data.get( + "statement_number", "" ) - account_statement.sequence_number = ( - statement_number_and_sequence[1] - if len(statement_number_and_sequence) > 1 - else "" + account_statement.sequence_number = mt940_statement.data.get( + "sequence_number", "" ) - - # TODO: Refactor code - + _logger.info("Parsed account statement header") # Get the benefit program configuration benefit_program_configuration = ( session.query(BenefitProgramConfiguration) @@ -105,11 +93,13 @@ def mt940_processor_worker(statement_id: str): ) .first() ) - if not benefit_program_configuration: + _logger.error( + f"Benefit program configuration not found for account number: {account_statement.account_number}" + ) account_statement.statement_process_status = ProcessStatus.ERROR account_statement.statement_process_error_code = ( - G2PBridgeErrorCodes.INVALID_ACCOUNT_NUMBER + G2PBridgeErrorCodes.INVALID_ACCOUNT_NUMBER.value ) account_statement.statement_process_timestamp = datetime.utcnow() account_statement.statement_process_attempts += 1 @@ -123,90 +113,123 @@ def mt940_processor_worker(statement_id: str): ) # Parsing transactions - parsed_transactions = [] + parsed_transactions_d = [] + parsed_transactions_rd = [] entry_sequence = 0 - for transaction in mt940_statement.transactions: + for transaction in mt940_statement: entry_sequence += 1 debit_credit_indicator = transaction.data["status"] - if debit_credit_indicator in ["D", "RD"]: + if debit_credit_indicator in ["D"]: parsed_transaction = construct_parsed_transaction( bank_connector, debit_credit_indicator, entry_sequence, transaction, ) - parsed_transactions.append(parsed_transaction) + parsed_transactions_d.append(parsed_transaction) - # End of for loop of mt940 statement transactions + if debit_credit_indicator in ["RD"]: + parsed_transaction = construct_parsed_transaction( + bank_connector, + debit_credit_indicator, + entry_sequence, + transaction, + ) + parsed_transactions_rd.append(parsed_transaction) + # End of for loop of mt940 statement transactions disbursement_error_recons = [] disbursement_recons = [] - for parsed_transaction in parsed_transactions: - bank_disbursement_batch_id = ( - session.query(DisbursementBatchControl) - .filter( - DisbursementBatchControl.disbursement_id - == parsed_transaction["disbursement_id"] - ) - .first() - .bank_disbursement_batch_id + + # Process only debit transactions + for parsed_transaction in parsed_transactions_d: + bank_disbursement_batch_id = get_bank_batch_id( + parsed_transaction, session ) if not bank_disbursement_batch_id: disbursement_error_recons.append( construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, parsed_transaction, G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, ) ) continue - disbursement_recon = ( - session.query(DisbursementRecon) - .filter( - DisbursementRecon.disbursement_id - == parsed_transaction["disbursement_id"] - ) - .first() - ) + disbursement_recon = get_disbursement_recon(parsed_transaction, session) - if ( - disbursement_recon - and parsed_transaction["debit_credit_indicator"] == "D" - ): + if disbursement_recon: disbursement_error_recons.append( construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, parsed_transaction, G2PBridgeErrorCodes.DUPLICATE_DISBURSEMENT, ) ) continue - if ( - not disbursement_recon - and parsed_transaction["debit_credit_indicator"] == "RD" - ): + disbursement_recon = construct_new_disbursement_recon( + bank_disbursement_batch_id, + parsed_transaction, + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + ) + disbursement_recons.append(disbursement_recon) + + # End of for loop for parsed transactions - debit + session.add_all(disbursement_recons) + session.add_all(disbursement_error_recons) + + # Start processing reversal transactions - rd + for parsed_transaction in parsed_transactions_rd: + bank_disbursement_batch_id = get_bank_batch_id( + parsed_transaction, session + ) + + if not bank_disbursement_batch_id: disbursement_error_recons.append( construct_disbursement_error_recon( - parsed_transaction, G2PBridgeErrorCodes.INVALID_REVERSAL + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, ) ) continue - if parsed_transaction["debit_credit_indicator"] == "D": - disbursement_recon = construct_new_disbursement_recon( - bank_disbursement_batch_id, - parsed_transaction, + disbursement_recon = get_disbursement_recon(parsed_transaction, session) + + if not disbursement_recon: + disbursement_error_recons.append( + construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.INVALID_REVERSAL, + ) ) - disbursement_recons.append(disbursement_recon) - elif parsed_transaction["debit_credit_indicator"] == "RD": + else: update_existing_disbursement_recon( - disbursement_recon, parsed_transaction + disbursement_recon, + parsed_transaction, + statement_id, + account_statement.statement_number, + account_statement.sequence_number, ) disbursement_recons.append(disbursement_recon) - # End of for loop for parsed transactions + # End of for loop for parsed transactions - rd + session.add_all(disbursement_recons) + session.add_all(disbursement_error_recons) # Update account statement with parsed data account_statement.statement_process_status = ProcessStatus.PROCESSED @@ -215,43 +238,81 @@ def mt940_processor_worker(statement_id: str): account_statement.statement_process_attempts += 1 session.add(account_statement) - session.add_all(disbursement_recons) - session.add_all(disbursement_error_recons) + session.commit() + _logger.info( + f"Processed account statement for account number: {account_statement.account_number}" + ) except Exception as e: + _logger.error( + f"Error processing account statement for statement id: {statement_id}" + f" with error: {str(e)}", + ) account_statement.statement_process_status = ProcessStatus.PENDING account_statement.statement_process_error_code = str(e) account_statement.statement_process_timestamp = datetime.utcnow() account_statement.statement_process_attempts += 1 session.commit() + raise e -def construct_disbursement_error_recon(parsed_transaction, g2p_bridge_error_code): +def get_disbursement_recon(parsed_transaction, session): + disbursement_recon = ( + session.query(DisbursementRecon) + .filter( + DisbursementRecon.disbursement_id == parsed_transaction["disbursement_id"] + ) + .first() + ) + return disbursement_recon + + +def get_bank_batch_id(parsed_transaction, session): + bank_disbursement_batch_id = ( + session.query(DisbursementBatchControl) + .filter( + DisbursementBatchControl.disbursement_id + == parsed_transaction["disbursement_id"] + ) + .first() + .bank_disbursement_batch_id + ) + return bank_disbursement_batch_id + + +def construct_disbursement_error_recon( + statement_id, + statement_number, + statement_sequence, + parsed_transaction, + g2p_bridge_error_code, +): return DisbursementErrorRecon( - disbursement_id="", - bank_reference_number=parsed_transaction["remittance_reference_number"], - statement_id=parsed_transaction["remittance_statement_number"], - statement_number=parsed_transaction["remittance_statement_number"], - statement_sequence=parsed_transaction["remittance_statement_sequence"], + statement_id=statement_id, + statement_number=statement_number, + statement_sequence=statement_sequence, entry_sequence=parsed_transaction["remittance_entry_sequence"], entry_date=parsed_transaction["remittance_entry_date"], value_date=parsed_transaction["remittance_value_date"], error_reason=g2p_bridge_error_code, + disbursement_id=parsed_transaction["disbursement_id"], + bank_reference_number=parsed_transaction["remittance_reference_number"], + active=True, ) -def update_existing_disbursement_recon(disbursement_recon, parsed_transaction): +def update_existing_disbursement_recon( + disbursement_recon, + parsed_transaction, + statement_id, + statement_number, + statement_sequence, +): disbursement_recon.reversal_found = True - disbursement_recon.reversal_statement_id = parsed_transaction[ - "reversal_statement_number" - ] - disbursement_recon.reversal_statement_number = parsed_transaction[ - "reversal_statement_number" - ] - disbursement_recon.reversal_statement_sequence = parsed_transaction[ - "reversal_statement_sequence" - ] + disbursement_recon.reversal_statement_id = statement_id + disbursement_recon.reversal_statement_number = statement_number + disbursement_recon.reversal_statement_sequence = statement_sequence disbursement_recon.reversal_entry_sequence = parsed_transaction[ "reversal_entry_sequence" ] @@ -260,20 +321,25 @@ def update_existing_disbursement_recon(disbursement_recon, parsed_transaction): disbursement_recon.reversal_reason = parsed_transaction["reversal_reason"] -def construct_new_disbursement_recon(bank_disbursement_batch_id, parsed_transaction): +def construct_new_disbursement_recon( + bank_disbursement_batch_id, + parsed_transaction, + statement_id, + statement_number, + statement_sequence, +): disbursement_recon = DisbursementRecon( bank_disbursement_batch_id=bank_disbursement_batch_id, disbursement_id=parsed_transaction["disbursement_id"], beneficiary_name_from_bank=parsed_transaction["beneficiary_name_from_bank"], remittance_reference_number=parsed_transaction["remittance_reference_number"], - remittance_statement_id=parsed_transaction["remittance_statement_number"], - remittance_statement_number=parsed_transaction["remittance_statement_number"], - remittance_statement_sequence=parsed_transaction[ - "remittance_statement_sequence" - ], + remittance_statement_id=statement_id, + remittance_statement_number=statement_number, + remittance_statement_sequence=statement_sequence, remittance_entry_sequence=parsed_transaction["remittance_entry_sequence"], remittance_entry_date=parsed_transaction["remittance_entry_date"], remittance_value_date=parsed_transaction["remittance_value_date"], + active=True, ) return disbursement_recon @@ -293,15 +359,11 @@ def construct_parsed_transaction( remittance_reference_number, customer_reference, narratives ) beneficiary_name_from_bank = None - remittance_statement_number = None - remittance_statement_sequence = None remittance_entry_sequence = None remittance_entry_date = None remittance_value_date = None reversal_found = False - reversal_statement_number = None - reversal_statement_sequence = None reversal_entry_sequence = None reversal_entry_date = None reversal_value_date = None @@ -312,16 +374,12 @@ def construct_parsed_transaction( beneficiary_name_from_bank = bank_connector.retrieve_beneficiary_name( narratives ) - remittance_statement_number = transaction.data["statement_number"] - remittance_statement_sequence = transaction.data["sequence_number"] remittance_entry_sequence = entry_sequence remittance_entry_date = transaction.data["entry_date"] remittance_value_date = transaction.data["date"] if debit_credit_indicator == "RD": reversal_found = True - reversal_statement_number = transaction.data["statement_number"] - reversal_statement_sequence = transaction.data["sequence_number"] reversal_entry_sequence = entry_sequence reversal_entry_date = transaction.data["entry_date"] reversal_value_date = transaction.data["date"] @@ -329,23 +387,19 @@ def construct_parsed_transaction( parsed_transaction.update( { - disbursement_id: disbursement_id, - transaction_amount: transaction_amount, - debit_credit_indicator: debit_credit_indicator, - beneficiary_name_from_bank: beneficiary_name_from_bank, - remittance_reference_number: remittance_reference_number, - remittance_statement_number: remittance_statement_number, - remittance_statement_sequence: remittance_statement_sequence, - remittance_entry_sequence: remittance_entry_sequence, - remittance_entry_date: remittance_entry_date, - remittance_value_date: remittance_value_date, - reversal_found: reversal_found, - reversal_statement_number: reversal_statement_number, - reversal_statement_sequence: reversal_statement_sequence, - reversal_entry_sequence: reversal_entry_sequence, - reversal_entry_date: reversal_entry_date, - reversal_value_date: reversal_value_date, - reversal_reason: reversal_reason, + "disbursement_id": disbursement_id, + "transaction_amount": transaction_amount, + "debit_credit_indicator": debit_credit_indicator, + "beneficiary_name_from_bank": beneficiary_name_from_bank, + "remittance_reference_number": remittance_reference_number, + "remittance_entry_sequence": remittance_entry_sequence, + "remittance_entry_date": remittance_entry_date, + "remittance_value_date": remittance_value_date, + "reversal_found": reversal_found, + "reversal_entry_sequence": reversal_entry_sequence, + "reversal_entry_date": reversal_entry_date, + "reversal_value_date": reversal_value_date, + "reversal_reason": reversal_reason, } ) return parsed_transaction diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py index f6ffe14..dedd13e 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement.py @@ -49,10 +49,10 @@ class DisbursementBatchControl(BaseORMModelWithTimes): disbursement_envelope_id: Mapped[str] = mapped_column(String, index=True) beneficiary_id: Mapped[str] = mapped_column(String) bank_disbursement_batch_id = mapped_column( - UUID, nullable=True, default=None, index=True, unique=True + UUID, nullable=True, default=None, index=True ) mapper_resolution_batch_id = mapped_column( - UUID, nullable=True, default=None, index=True, unique=True + UUID, nullable=True, default=None, index=True )