diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/app.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/app.py index 7c0327c..ecc5b50 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/app.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/app.py @@ -17,11 +17,15 @@ AccountStatementController, DisbursementController, DisbursementEnvelopeController, + DisbursementEnvelopeStatusController, + DisbursementStatusController, ) from .services import ( AccountStatementService, DisbursementEnvelopeService, + DisbursementEnvelopeStatusService, DisbursementService, + DisbursementStatusService, ) _logger = logging.getLogger(_config.logging_default_logger_name) @@ -33,9 +37,13 @@ def initialize(self, **kwargs): DisbursementEnvelopeService() DisbursementService() AccountStatementService() + DisbursementStatusService() + DisbursementEnvelopeStatusService() DisbursementEnvelopeController().post_init() DisbursementController().post_init() AccountStatementController().post_init() + DisbursementStatusController().post_init() + DisbursementEnvelopeStatusController().post_init() def migrate_database(self, args): super().migrate_database(args) diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/__init__.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/__init__.py index 7b6d08e..aa51e32 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/__init__.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/__init__.py @@ -1,3 +1,5 @@ from .account_statement import AccountStatementController from .disbursement import DisbursementController from .disbursement_envelope import DisbursementEnvelopeController +from .disbursement_envelope_status import DisbursementEnvelopeStatusController +from .disbursement_status import DisbursementStatusController diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement.py index 984d3c3..3f6b4a0 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement.py @@ -26,13 +26,13 @@ def __init__(self, **kwargs): self.router.add_api_route( "/create_disbursements", self.create_disbursements, - responses={200: {"model": DisbursementRequest}}, + responses={200: {"model": DisbursementResponse}}, methods=["POST"], ) self.router.add_api_route( "/cancel_disbursements", self.cancel_disbursements, - responses={200: {"model": DisbursementRequest}}, + responses={200: {"model": DisbursementResponse}}, methods=["POST"], ) @@ -50,14 +50,14 @@ async def create_disbursements( _logger.error("Error creating disbursements") error_response: DisbursementResponse = ( await self.disbursement_service.construct_disbursement_error_response( - e.code, e.disbursement_payloads + disbursement_request, e.code, e.disbursement_payloads ) ) return error_response disbursement_response: DisbursementResponse = ( await self.disbursement_service.construct_disbursement_success_response( - disbursement_payloads + disbursement_request, disbursement_payloads ) ) _logger.info("Disbursements created successfully") @@ -78,14 +78,14 @@ async def cancel_disbursements( _logger.error("Error cancelling disbursements") error_response: DisbursementResponse = ( await self.disbursement_service.construct_disbursement_error_response( - e.code, e.disbursement_payloads + disbursement_request, e.code, e.disbursement_payloads ) ) return error_response disbursement_response: DisbursementResponse = ( await self.disbursement_service.construct_disbursement_success_response( - disbursement_payloads + disbursement_request, disbursement_payloads ) ) _logger.info("Disbursements cancelled successfully") diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_envelope.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_envelope.py index 92256d7..80be6a3 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_envelope.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_envelope.py @@ -25,13 +25,13 @@ def __init__(self, **kwargs): self.router.add_api_route( "/create_disbursement_envelope", self.create_disbursement_envelope, - responses={200: {"model": DisbursementEnvelopeRequest}}, + responses={200: {"model": DisbursementEnvelopeResponse}}, methods=["POST"], ) self.router.add_api_route( "/cancel_disbursement_envelope", self.cancel_disbursement_envelope, - responses={200: {"model": DisbursementEnvelopeRequest}}, + responses={200: {"model": DisbursementEnvelopeResponse}}, methods=["POST"], ) @@ -48,12 +48,12 @@ async def create_disbursement_envelope( except DisbursementEnvelopeException as e: _logger.error("Error creating disbursement envelope") error_response: DisbursementEnvelopeResponse = await self.disbursement_envelope_service.construct_disbursement_envelope_error_response( - e.code + disbursement_envelope_request, e.code ) return error_response disbursement_envelope_response: DisbursementEnvelopeResponse = await self.disbursement_envelope_service.construct_disbursement_envelope_success_response( - disbursement_envelope_payload + disbursement_envelope_request, disbursement_envelope_payload ) _logger.info("Disbursement envelope created successfully") return disbursement_envelope_response @@ -71,12 +71,12 @@ async def cancel_disbursement_envelope( except DisbursementEnvelopeException as e: _logger.error("Error cancelling disbursement envelope") error_response: DisbursementEnvelopeResponse = await self.disbursement_envelope_service.construct_disbursement_envelope_error_response( - e.code + disbursement_envelope_request, e.code ) return error_response disbursement_envelope_response: DisbursementEnvelopeResponse = await self.disbursement_envelope_service.construct_disbursement_envelope_success_response( - disbursement_envelope_payload + disbursement_envelope_request, disbursement_envelope_payload ) _logger.info("Disbursement envelope cancelled successfully") return disbursement_envelope_response diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_envelope_status.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_envelope_status.py new file mode 100644 index 0000000..a7e377a --- /dev/null +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_envelope_status.py @@ -0,0 +1,54 @@ +import logging + +from openg2p_fastapi_common.controller import BaseController +from openg2p_g2p_bridge_models.errors.exceptions import DisbursementStatusException +from openg2p_g2p_bridge_models.schemas import ( + DisbursementEnvelopeBatchStatusPayload, + DisbursementEnvelopeStatusRequest, + DisbursementEnvelopeStatusResponse, +) + +from ..config import Settings +from ..services import DisbursementEnvelopeStatusService + +_config = Settings.get_config() +_logger = logging.getLogger(_config.logging_default_logger_name) + + +class DisbursementEnvelopeStatusController(BaseController): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + self.disbursement_envelope_status_service = ( + DisbursementEnvelopeStatusService.get_component() + ) + self.router.tags += ["G2P Bridge Disbursement Envelope Status"] + + self.router.add_api_route( + "/get_disbursement_envelope_status", + self.get_disbursement_envelope_status, + responses={200: {"model": DisbursementEnvelopeStatusResponse}}, + methods=["POST"], + ) + + async def get_disbursement_envelope_status( + self, disbursement_envelope_status_request: DisbursementEnvelopeStatusRequest + ) -> DisbursementEnvelopeStatusResponse: + _logger.info("Getting disbursement envelope batch status payload") + try: + disbursement_envelope_batch_status_payload: DisbursementEnvelopeBatchStatusPayload = await self.disbursement_envelope_status_service.get_disbursement_envelope_batch_status( + disbursement_envelope_status_request + ) + disbursement_status_response: DisbursementEnvelopeStatusResponse = await self.disbursement_envelope_status_service.construct_disbursement_envelope_success_response( + disbursement_envelope_status_request, + disbursement_envelope_batch_status_payload, + ) + return disbursement_status_response + + except DisbursementStatusException as e: + _logger.error(f"Error in getting disbursement envelope status: {e}") + error_response: DisbursementEnvelopeStatusResponse = await self.disbursement_envelope_status_service.construct_disbursement_envelope_status_error_response( + disbursement_envelope_status_request, + e.code, + ) + return error_response diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_status.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_status.py new file mode 100644 index 0000000..6d6c2ee --- /dev/null +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/controllers/disbursement_status.py @@ -0,0 +1,52 @@ +import logging +from typing import List + +from openg2p_fastapi_common.controller import BaseController +from openg2p_g2p_bridge_models.errors.exceptions import DisbursementException +from openg2p_g2p_bridge_models.schemas import ( + DisbursementStatusPayload, + DisbursementStatusRequest, + DisbursementStatusResponse, +) + +from ..config import Settings +from ..services import DisbursementStatusService + +_config = Settings.get_config() +_logger = logging.getLogger(_config.logging_default_logger_name) + + +class DisbursementStatusController(BaseController): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + self.disbursement_service = DisbursementStatusService.get_component() + self.router.tags += ["G2P Bridge Disbursement Status"] + + self.router.add_api_route( + "/get_disbursement_status", + self.get_disbursement_status, + responses={200: {"model": DisbursementStatusResponse}}, + methods=["POST"], + ) + + async def get_disbursement_status( + self, disbursement_status_request: DisbursementStatusRequest + ) -> DisbursementStatusResponse: + _logger.info("Retrieving disbursement envelope status") + try: + disbursement_status_payloads: List[ + DisbursementStatusPayload + ] = await self.disbursement_service.get_disbursement_status_payloads( + disbursement_status_request + ) + disbursement_status_response: DisbursementStatusResponse = await self.disbursement_service.construct_disbursement_status_success_response( + disbursement_status_request, disbursement_status_payloads + ) + _logger.info("Disbursements cancelled successfully") + return disbursement_status_response + except DisbursementException as e: + error_response: DisbursementStatusResponse = await self.disbursement_service.construct_disbursement_status_error_response( + disbursement_status_request, e.code + ) + return error_response diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/__init__.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/__init__.py index 994afe3..5b2ea1c 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/__init__.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/__init__.py @@ -1,3 +1,5 @@ from .account_statement import AccountStatementService from .disbursement import DisbursementService from .disbursement_envelope import DisbursementEnvelopeService +from .disbursement_envelope_status import DisbursementEnvelopeStatusService +from .disbursement_status import DisbursementStatusService diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py index 15ffa31..d09d5f7 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/account_statement.py @@ -7,7 +7,8 @@ from openg2p_fastapi_common.service import BaseService from openg2p_g2p_bridge_models.errors.codes import G2PBridgeErrorCodes from openg2p_g2p_bridge_models.models import AccountStatement, AccountStatementLob -from openg2p_g2p_bridge_models.schemas import AccountStatementResponse, ResponseStatus +from openg2p_g2p_bridge_models.schemas import AccountStatementResponse +from openg2p_g2pconnect_common_lib.schemas import StatusEnum, SyncResponseHeader from sqlalchemy.ext.asyncio import async_sessionmaker from ..config import Settings @@ -53,9 +54,13 @@ async def construct_account_statement_success_response( ) -> AccountStatementResponse: _logger.info("Constructing account statement success response") return AccountStatementResponse( - response_status=ResponseStatus.SUCCESS, - statement_id=statement_id, - error_code="", + header=SyncResponseHeader( + message_id="", + message_ts=datetime.now().isoformat(), + action="", + status=StatusEnum.succ, + ), + message=statement_id, ) async def construct_account_statement_error_response( @@ -63,7 +68,11 @@ async def construct_account_statement_error_response( ) -> AccountStatementResponse: _logger.error("Constructing account statement error response") return AccountStatementResponse( - response_status=ResponseStatus.FAILURE, - statement_id="", - error_code=code.value, + header=SyncResponseHeader( + message_id="", + message_ts=datetime.now().isoformat(), + action="", + status=StatusEnum.rjct, + ), + message=code.value, ) diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py index 19185a4..92971ce 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement.py @@ -23,7 +23,10 @@ DisbursementPayload, DisbursementRequest, DisbursementResponse, - ResponseStatus, +) +from openg2p_g2pconnect_common_lib.schemas import ( + StatusEnum, + SyncResponseHeader, ) from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.future import select @@ -44,23 +47,23 @@ async def create_disbursements( try: await self.validate_disbursement_envelope( session=session, - disbursement_payloads=disbursement_request.request_payload, + disbursement_payloads=disbursement_request.message, ) except DisbursementException as e: _logger.error(f"Error validating disbursement envelope: {str(e)}") raise e is_error_free = await self.validate_disbursement_request( - disbursement_payloads=disbursement_request.request_payload + disbursement_payloads=disbursement_request.message ) if not is_error_free: _logger.error("Error validating disbursement request") raise DisbursementException( code=G2PBridgeErrorCodes.INVALID_DISBURSEMENT_PAYLOAD, - disbursement_payloads=disbursement_request.request_payload, + disbursement_payloads=disbursement_request.message, ) disbursements: List[Disbursement] = await self.construct_disbursements( - disbursement_payloads=disbursement_request.request_payload + disbursement_payloads=disbursement_request.message ) disbursement_batch_controls: List[ DisbursementBatchControl @@ -109,7 +112,7 @@ async def create_disbursements( session.add(bank_disbursement_batch_status) await session.commit() _logger.info("Disbursements Created Successfully!") - return disbursement_request.request_payload + return disbursement_request.message async def update_disbursement_envelope_batch_status(self, disbursements, session): _logger.info("Updating Disbursement Envelope Batch Status") @@ -306,25 +309,38 @@ async def validate_disbursement_envelope( async def construct_disbursement_error_response( self, + disbursement_request: DisbursementRequest, code: G2PBridgeErrorCodes, disbursement_payloads: List[DisbursementPayload], ) -> DisbursementResponse: _logger.info("Constructing Disbursement Error Response") disbursement_response: DisbursementResponse = DisbursementResponse( - response_status=ResponseStatus.FAILURE, - response_payload=disbursement_payloads, - response_error_code=code.value, + header=SyncResponseHeader( + message_id=disbursement_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_request.header.action, + status=StatusEnum.rjct, + status_reason_message=code.value, + ), + message=disbursement_payloads, ) _logger.info("Disbursement Error Response Constructed!") return disbursement_response async def construct_disbursement_success_response( - self, disbursement_payloads: List[DisbursementPayload] + self, + disbursement_request: DisbursementRequest, + disbursement_payloads: List[DisbursementPayload], ) -> DisbursementResponse: _logger.info("Constructing Disbursement Success Response") disbursement_response: DisbursementResponse = DisbursementResponse( - response_status=ResponseStatus.SUCCESS, - response_payload=disbursement_payloads, + header=SyncResponseHeader( + message_id=disbursement_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_request.header.action, + status=StatusEnum.succ, + ), + message=disbursement_payloads, ) _logger.info("Disbursement Success Response Constructed!") return disbursement_response @@ -336,14 +352,14 @@ async def cancel_disbursements( session_maker = async_sessionmaker(dbengine.get(), expire_on_commit=False) async with session_maker() as session: is_payload_valid = await self.validate_request_payload( - disbursement_payloads=disbursement_request.request_payload + disbursement_payloads=disbursement_request.message ) if not is_payload_valid: _logger.error("Error validating disbursement request") raise DisbursementException( code=G2PBridgeErrorCodes.INVALID_DISBURSEMENT_PAYLOAD, - disbursement_payloads=disbursement_request.request_payload, + disbursement_payloads=disbursement_request.message, ) disbursements_in_db: List[ @@ -353,12 +369,12 @@ async def cancel_disbursements( _logger.error("Disbursements not found in DB") raise DisbursementException( code=G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, - disbursement_payloads=disbursement_request.request_payload, + disbursement_payloads=disbursement_request.message, ) try: await self.check_for_single_envelope( - disbursements_in_db, disbursement_request.request_payload + disbursements_in_db, disbursement_request.message ) except DisbursementException as e: _logger.error(f"Error checking for single envelope: {str(e)}") @@ -367,7 +383,7 @@ async def cancel_disbursements( try: await self.validate_envelope_for_disbursement_cancellation( disbursements_in_db=disbursements_in_db, - disbursement_payloads=disbursement_request.request_payload, + disbursement_payloads=disbursement_request.message, session=session, ) except DisbursementException as e: @@ -384,7 +400,7 @@ async def cancel_disbursements( if invalid_disbursements_exist: raise DisbursementException( code=G2PBridgeErrorCodes.INVALID_DISBURSEMENT_PAYLOAD, - disbursement_payloads=disbursement_request.request_payload, + disbursement_payloads=disbursement_request.message, ) for disbursement in disbursements_in_db: @@ -422,7 +438,7 @@ async def cancel_disbursements( session.add(disbursement_envelope_batch_status) await session.commit() _logger.info("Disbursements Cancelled Successfully!") - return disbursement_request.request_payload + return disbursement_request.message async def check_for_single_envelope( self, disbursements_in_db, disbursement_payloads @@ -446,7 +462,7 @@ async def check_for_invalid_disbursements( ) -> bool: _logger.info("Checking for Invalid Disbursements") invalid_disbursements_exist = False - for disbursement_payload in disbursement_request.request_payload: + for disbursement_payload in disbursement_request.message: if disbursement_payload.disbursement_id not in [ disbursement.disbursement_id for disbursement in disbursements_in_db ]: @@ -478,7 +494,7 @@ async def fetch_disbursements_from_db( Disbursement.disbursement_id.in_( [ str(disbursement_payload.disbursement_id) - for disbursement_payload in disbursement_request.request_payload + for disbursement_payload in disbursement_request.message ] ) ) diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_envelope.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_envelope.py index 5cdebfa..ded87e5 100644 --- a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_envelope.py +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_envelope.py @@ -19,7 +19,10 @@ DisbursementEnvelopePayload, DisbursementEnvelopeRequest, DisbursementEnvelopeResponse, - ResponseStatus, +) +from openg2p_g2pconnect_common_lib.schemas import ( + StatusEnum, + SyncResponseHeader, ) from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.future import select @@ -42,8 +45,10 @@ async def create_disbursement_envelope( except DisbursementEnvelopeException as e: raise e - disbursement_envelope: DisbursementEnvelope = await self.construct_disbursement_envelope( - disbursement_envelope_payload=disbursement_envelope_request.request_payload + disbursement_envelope: DisbursementEnvelope = ( + await self.construct_disbursement_envelope( + disbursement_envelope_payload=disbursement_envelope_request.message + ) ) disbursement_envelope_batch_status: DisbursementEnvelopeBatchStatus = ( @@ -58,7 +63,7 @@ async def create_disbursement_envelope( await session.commit() disbursement_envelope_payload: DisbursementEnvelopePayload = ( - disbursement_envelope_request.request_payload + disbursement_envelope_request.message ) disbursement_envelope_payload.disbursement_envelope_id = ( disbursement_envelope.disbursement_envelope_id @@ -73,7 +78,7 @@ async def cancel_disbursement_envelope( session_maker = async_sessionmaker(dbengine.get(), expire_on_commit=False) async with session_maker() as session: disbursement_envelope_payload: DisbursementEnvelopePayload = ( - disbursement_envelope_request.request_payload + disbursement_envelope_request.message ) disbursement_envelope_id: str = ( disbursement_envelope_payload.disbursement_envelope_id @@ -117,26 +122,41 @@ async def cancel_disbursement_envelope( return disbursement_envelope_payload async def construct_disbursement_envelope_success_response( - self, disbursement_envelope_payload: DisbursementEnvelopePayload + self, + disbursement_envelope_request: DisbursementEnvelopeRequest, + disbursement_envelope_payload: DisbursementEnvelopePayload, ) -> DisbursementEnvelopeResponse: _logger.info("Constructing disbursement envelope success response") disbursement_envelope_response: DisbursementEnvelopeResponse = ( DisbursementEnvelopeResponse( - response_status=ResponseStatus.SUCCESS, - response_payload=disbursement_envelope_payload, + header=SyncResponseHeader( + message_id=disbursement_envelope_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_envelope_request.header.action, + status=StatusEnum.succ, + ), + message=disbursement_envelope_payload, ) ) _logger.info("Disbursement envelope success response constructed") return disbursement_envelope_response async def construct_disbursement_envelope_error_response( - self, error_code: G2PBridgeErrorCodes + self, + disbursement_envelope_request: DisbursementEnvelopeRequest, + error_code: G2PBridgeErrorCodes, ) -> DisbursementEnvelopeResponse: _logger.error("Constructing disbursement envelope error response") disbursement_envelope_response: DisbursementEnvelopeResponse = ( DisbursementEnvelopeResponse( - response_status=ResponseStatus.FAILURE, - response_error_code=error_code.value, + header=SyncResponseHeader( + message_id=disbursement_envelope_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_envelope_request.header.action, + status=StatusEnum.rjct, + status_reason_message=error_code.value, + ), + message={}, ) ) _logger.error("Disbursement envelope error response constructed") @@ -148,7 +168,7 @@ async def validate_envelope_request( ) -> bool: _logger.info("Validating disbursement envelope request") disbursement_envelope_payload: DisbursementEnvelopePayload = ( - disbursement_envelope_request.request_payload + disbursement_envelope_request.message ) if ( disbursement_envelope_payload.benefit_program_mnemonic is None diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_envelope_status.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_envelope_status.py new file mode 100644 index 0000000..762972c --- /dev/null +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_envelope_status.py @@ -0,0 +1,113 @@ +import logging +from datetime import datetime + +from openg2p_fastapi_common.context import dbengine +from openg2p_fastapi_common.service import BaseService +from openg2p_g2p_bridge_models.errors.codes import G2PBridgeErrorCodes +from openg2p_g2p_bridge_models.errors.exceptions import DisbursementStatusException +from openg2p_g2p_bridge_models.models import ( + DisbursementEnvelopeBatchStatus, +) +from openg2p_g2p_bridge_models.schemas import ( + DisbursementEnvelopeBatchStatusPayload, + DisbursementEnvelopeStatusRequest, + DisbursementEnvelopeStatusResponse, + DisbursementStatusRequest, +) +from openg2p_g2pconnect_common_lib.schemas import ( + StatusEnum, + SyncResponseHeader, +) +from sqlalchemy.ext.asyncio import async_sessionmaker +from sqlalchemy.future import select + +from ..config import Settings + +_config = Settings.get_config() +_logger = logging.getLogger(_config.logging_default_logger_name) + + +class DisbursementEnvelopeStatusService(BaseService): + async def get_disbursement_envelope_batch_status( + self, disbursement_envelope_status_request: DisbursementEnvelopeStatusRequest + ) -> DisbursementEnvelopeBatchStatusPayload: + session_maker = async_sessionmaker(dbengine.get(), expire_on_commit=False) + async with session_maker() as session: + try: + _logger.info( + f"Retrieving disbursement envelope status for {disbursement_envelope_status_request.message}" + ) + disbursement_envelope_batch_status = ( + ( + await session.execute( + select(DisbursementEnvelopeBatchStatus).where( + DisbursementEnvelopeBatchStatus.disbursement_envelope_id + == disbursement_envelope_status_request.message + ) + ) + ) + .scalars() + .first() + ) + + if disbursement_envelope_batch_status is None: + raise DisbursementStatusException( + message="Disbursement envelope not found", + code=G2PBridgeErrorCodes.DISBURSEMENT_ENVELOPE_NOT_FOUND, + ) + disbursement_envelope_batch_status_payload = DisbursementEnvelopeBatchStatusPayload( + disbursement_envelope_id=disbursement_envelope_batch_status.disbursement_envelope_id, + number_of_disbursements_received=disbursement_envelope_batch_status.number_of_disbursements_received, + total_disbursement_amount_received=disbursement_envelope_batch_status.total_disbursement_amount_received, + funds_available_with_bank=disbursement_envelope_batch_status.funds_available_with_bank, + funds_available_latest_timestamp=disbursement_envelope_batch_status.funds_available_latest_timestamp, + funds_available_latest_error_code=disbursement_envelope_batch_status.funds_available_latest_error_code, + funds_available_attempts=disbursement_envelope_batch_status.funds_available_attempts, + funds_blocked_with_bank=disbursement_envelope_batch_status.funds_blocked_with_bank, + funds_blocked_latest_timestamp=disbursement_envelope_batch_status.funds_blocked_latest_timestamp, + funds_blocked_latest_error_code=disbursement_envelope_batch_status.funds_blocked_latest_error_code, + funds_blocked_attempts=disbursement_envelope_batch_status.funds_blocked_attempts, + funds_blocked_reference_number=disbursement_envelope_batch_status.funds_blocked_reference_number, + id_mapper_resolution_required=disbursement_envelope_batch_status.id_mapper_resolution_required, + number_of_disbursements_shipped=disbursement_envelope_batch_status.number_of_disbursements_shipped, + number_of_disbursements_reconciled=disbursement_envelope_batch_status.number_of_disbursements_reconciled, + number_of_disbursements_reversed=disbursement_envelope_batch_status.number_of_disbursements_reversed, + ) + return disbursement_envelope_batch_status_payload + except DisbursementStatusException as e: + _logger.error("Error retrieving disbursement envelope status") + raise e + + async def construct_disbursement_envelope_status_error_response( + self, + disbursement_status_request: DisbursementStatusRequest, + code: str, + ) -> DisbursementEnvelopeStatusResponse: + response = DisbursementEnvelopeStatusResponse( + header=SyncResponseHeader( + message_id=disbursement_status_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_status_request.header.action, + status=StatusEnum.rjct, + status_reason_message=code, + ), + message={}, + ) + + return response + + async def construct_disbursement_envelope_status_success_response( + self, + disbursement_status_request: DisbursementStatusRequest, + disbursement_envelope_batch_status_payload: DisbursementEnvelopeBatchStatusPayload, + ) -> DisbursementEnvelopeStatusResponse: + response = DisbursementEnvelopeStatusResponse( + header=SyncResponseHeader( + message_id=disbursement_status_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_status_request.header.action, + status=StatusEnum.succ, + ), + message=disbursement_envelope_batch_status_payload, + ) + return response diff --git a/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_status.py b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_status.py new file mode 100644 index 0000000..9c3e064 --- /dev/null +++ b/openg2p-g2p-bridge-api/src/openg2p_g2p_bridge_api/services/disbursement_status.py @@ -0,0 +1,169 @@ +import logging +from datetime import datetime +from typing import List + +from openg2p_fastapi_common.context import dbengine +from openg2p_fastapi_common.service import BaseService +from openg2p_g2p_bridge_models.errors.exceptions import DisbursementStatusException +from openg2p_g2p_bridge_models.models import ( + DisbursementErrorRecon, + DisbursementRecon, +) +from openg2p_g2p_bridge_models.schemas import ( + DisbursementEnvelopeStatusResponse, + DisbursementErrorReconPayload, + DisbursementReconPayload, + DisbursementReconRecords, + DisbursementStatusPayload, + DisbursementStatusRequest, + DisbursementStatusResponse, +) +from openg2p_g2pconnect_common_lib.schemas import ( + StatusEnum, + SyncResponseHeader, +) +from sqlalchemy.ext.asyncio import async_sessionmaker +from sqlalchemy.future import select + +from ..config import Settings + +_config = Settings.get_config() +_logger = logging.getLogger(_config.logging_default_logger_name) + + +class DisbursementStatusService(BaseService): + async def get_disbursement_status_payloads( + self, disbursement_status_request: DisbursementStatusRequest + ) -> List[DisbursementStatusPayload]: + session_maker = async_sessionmaker(dbengine.get(), expire_on_commit=False) + async with session_maker() as session: + try: + disbursement_status_payloads = [] + for disbursement_id in disbursement_status_request.message: + disbursement_recon_records = ( + await self.get_disbursement_recon_records( + session, disbursement_id + ) + ) + disbursement_status_payload = DisbursementStatusPayload( + disbursement_id=disbursement_id, + disbursement_recon_records=disbursement_recon_records, + ) + disbursement_status_payloads.append(disbursement_status_payload) + return disbursement_status_payloads + except DisbursementStatusException as e: + _logger.error("Error in getting disbursement status") + raise e + + async def get_disbursement_recon_records( + self, session, disbursement_id: str + ) -> DisbursementReconRecords: + disbursement_recon_payloads = [] + disbursement_error_recon_payloads = [] + + disbursement_recon_payloads_from_db = ( + ( + await session.execute( + select(DisbursementRecon).where( + DisbursementRecon.disbursement_id == disbursement_id + ) + ) + ) + .scalars() + .all() + ) + + for disbursement_recon_payload in disbursement_recon_payloads_from_db: + disbursement_recon_payloads.append( + DisbursementReconPayload( + bank_disbursement_batch_id=disbursement_recon_payload.bank_disbursement_batch_id, + disbursement_id=disbursement_recon_payload.disbursement_id, + disbursement_envelope_id=disbursement_recon_payload.disbursement_envelope_id, + beneficiary_name_from_bank=disbursement_recon_payload.beneficiary_name_from_bank, + remittance_reference_number=disbursement_recon_payload.remittance_reference_number, + remittance_statement_id=disbursement_recon_payload.remittance_statement_id, + remittance_statement_number=disbursement_recon_payload.remittance_statement_number, + remittance_statement_sequence=disbursement_recon_payload.remittance_statement_sequence, + remittance_entry_sequence=disbursement_recon_payload.remittance_entry_sequence, + remittance_entry_date=disbursement_recon_payload.remittance_entry_date, + remittance_value_date=disbursement_recon_payload.remittance_value_date, + reversal_found=disbursement_recon_payload.reversal_found, + reversal_statement_id=disbursement_recon_payload.reversal_statement_id, + reversal_statement_number=disbursement_recon_payload.reversal_statement_number, + reversal_statement_sequence=disbursement_recon_payload.reversal_statement_sequence, + reversal_entry_sequence=disbursement_recon_payload.reversal_entry_sequence, + reversal_entry_date=disbursement_recon_payload.reversal_entry_date, + reversal_value_date=disbursement_recon_payload.reversal_value_date, + reversal_reason=disbursement_recon_payload.reversal_reason, + ) + ) + + disbursement_error_recon_payloads_from_db = ( + ( + await session.execute( + select(DisbursementErrorRecon).where( + DisbursementErrorRecon.disbursement_id == disbursement_id + ) + ) + ) + .scalars() + .all() + ) + + for ( + disbursement_error_recon_payload + ) in disbursement_error_recon_payloads_from_db: + disbursement_error_recon_payloads.append( + DisbursementErrorReconPayload( + statement_id=disbursement_error_recon_payload.statement_id, + statement_number=disbursement_error_recon_payload.statement_number, + statement_sequence=disbursement_error_recon_payload.statement_sequence, + entry_sequence=disbursement_error_recon_payload.entry_sequence, + entry_date=disbursement_error_recon_payload.entry_date, + value_date=disbursement_error_recon_payload.value_date, + error_reason=disbursement_error_recon_payload.error_reason, + disbursement_id=disbursement_error_recon_payload.disbursement_id, + bank_reference_number=disbursement_error_recon_payload.bank_reference_number, + ) + ) + + disbursement_recon_records = DisbursementReconRecords( + disbursement_recon_payloads=disbursement_recon_payloads, + disbursement_error_recon_payloads=disbursement_error_recon_payloads, + ) + + return disbursement_recon_records + + async def construct_disbursement_status_error_response( + self, + disbursement_status_request: DisbursementStatusRequest, + code: str, + ) -> DisbursementStatusResponse: + response = DisbursementEnvelopeStatusResponse( + header=SyncResponseHeader( + message_id=disbursement_status_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_status_request.header.action, + status=StatusEnum.rjct, + status_reason_message=code, + ), + message={}, + ) + + return response + + async def construct_disbursement_status_success_response( + self, + disbursement_status_request: DisbursementStatusRequest, + disbursement_status_payloads: List[DisbursementStatusPayload], + ) -> DisbursementStatusResponse: + response = DisbursementStatusResponse( + header=SyncResponseHeader( + message_id=disbursement_status_request.header.message_id, + message_ts=datetime.now().isoformat(), + action=disbursement_status_request.header.action, + status=StatusEnum.succ, + ), + message=disbursement_status_payloads, + ) + return response diff --git a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py index 6a82900..a1ef39d 100644 --- a/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py +++ b/openg2p-g2p-bridge-bank-connectors/src/openg2p_g2p_bridge_bank_connectors/bank_connectors/example_bank_connector.py @@ -1,5 +1,5 @@ -from typing import List, Optional import logging +from typing import List, Optional import httpx from openg2p_g2p_bridge_models.models import ( @@ -21,6 +21,7 @@ _config = Settings.get_config() _logger = logging.getLogger(_config.logging_default_logger_name) + class BankPaymentPayload(BaseModel): payment_reference_number: str remitting_account: str @@ -137,7 +138,9 @@ def block_funds(self, account_number, currency, amount) -> BlockFundsResponse: def initiate_payment( self, disbursement_payment_payloads: List[DisbursementPaymentPayload] ) -> PaymentResponse: - _logger.info(f"Initiating payment for {len(disbursement_payment_payloads)} disbursements") + _logger.info( + f"Initiating payment for {len(disbursement_payment_payloads)} disbursements" + ) try: with httpx.Client() as client: bank_payment_payloads = [] @@ -178,9 +181,9 @@ def initiate_payment( data = response.json() if data["status"] == "success": - _logger.info(f"Payment initiated successfully") + _logger.info("Payment initiated successfully") return PaymentResponse(status=PaymentStatus.SUCCESS, error_code="") - _logger.error(f"Payment initiation failed") + _logger.error("Payment initiation failed") return PaymentResponse( status=PaymentStatus.ERROR, error_code=data.get("error_message", "") ) @@ -191,10 +194,15 @@ def initiate_payment( def retrieve_disbursement_id( self, bank_reference: str, customer_reference: str, narratives: str ) -> str: + _logger.info( + f"Retrieving disbursement id for bank_reference: {bank_reference}, customer_reference: {customer_reference}" + ) return customer_reference def retrieve_beneficiary_name(self, narratives: str) -> str: + _logger.info(f"Retrieving beneficiary name from narratives: {narratives}") return narratives[3] def retrieve_reversal_reason(self, narratives: str) -> str: - return narratives[5] + _logger.info(f"Retrieving reversal reason from narratives: {narratives}") + return narratives[-1] diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py index d3db5d0..50830ad 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/helpers/resolve_helper.py @@ -1,9 +1,9 @@ import enum +import logging import re import uuid from datetime import datetime from typing import List -import logging from openg2p_fastapi_common.service import BaseService from openg2p_g2p_bridge_models.models import MapperResolvedFaType @@ -20,6 +20,7 @@ _config = Settings.get_config() _logger = logging.getLogger(_config.logging_default_logger_name) + class FAKeys(enum.Enum): account_number = "account_number" bank_code = "bank_code" @@ -52,7 +53,9 @@ def construct_single_resolve_request(self, id: str) -> SingleResolveRequest: def construct_resolve_request( self, single_resolve_requests: List[SingleResolveRequest] ) -> ResolveRequest: - _logger.info(f"Constructing resolve request for {len(single_resolve_requests)} single resolve requests") + _logger.info( + f"Constructing resolve request for {len(single_resolve_requests)} single resolve requests" + ) resolve_request_message = ResolveRequestMessage( transaction_id=str(uuid.uuid4()), resolve_request=single_resolve_requests, @@ -70,7 +73,9 @@ def construct_resolve_request( ), message=resolve_request_message, ) - _logger.info(f"Constructed resolve request for {len(single_resolve_requests)} single resolve requests") + _logger.info( + f"Constructed resolve request for {len(single_resolve_requests)} single resolve requests" + ) return resolve_request def _deconstruct(self, value: str, strategy: str) -> List[KeyValuePair]: diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py index 5c2bb71..bf09ca2 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/disburse_funds_from_bank.py @@ -169,6 +169,9 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): if payment_response.status == PaymentStatus.SUCCESS: batch_status.disbursement_status = ProcessStatus.PROCESSED.value batch_status.latest_error_code = None + envelope_batch_status.number_of_disbursements_shipped += len( + payment_payloads + ) else: batch_status.disbursement_status = ProcessStatus.PENDING.value batch_status.latest_error_code = payment_response.error_code @@ -183,5 +186,7 @@ def disburse_funds_from_bank_worker(bank_disbursement_batch_id: str): batch_status.latest_error_code = str(e) batch_status.disbursement_attempts += 1 - _logger.info(f"Disbursing funds with bank for batch: {bank_disbursement_batch_id} completed") + _logger.info( + f"Disbursing funds with bank for batch: {bank_disbursement_batch_id} completed" + ) session.commit() diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py index 146521f..cfece5a 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mt940_processor.py @@ -1,5 +1,6 @@ import logging from datetime import datetime +from typing import List import mt940 from openg2p_g2p_bridge_bank_connectors.bank_connectors import BankConnectorFactory @@ -11,7 +12,9 @@ AccountStatement, AccountStatementLob, BenefitProgramConfiguration, + Disbursement, DisbursementBatchControl, + DisbursementEnvelopeBatchStatus, DisbursementErrorRecon, DisbursementRecon, ProcessStatus, @@ -126,6 +129,7 @@ def mt940_processor_worker(statement_id: str): debit_credit_indicator, entry_sequence, transaction, + session, ) parsed_transactions_d.append(parsed_transaction) @@ -135,100 +139,43 @@ def mt940_processor_worker(statement_id: str): debit_credit_indicator, entry_sequence, transaction, + session, ) parsed_transactions_rd.append(parsed_transaction) # End of for loop of mt940 statement transactions disbursement_error_recons = [] - disbursement_recons = [] + disbursement_recons_d = [] # Process only debit transactions - for parsed_transaction in parsed_transactions_d: - bank_disbursement_batch_id = get_bank_batch_id( - parsed_transaction, session - ) - - if not bank_disbursement_batch_id: - disbursement_error_recons.append( - construct_disbursement_error_recon( - statement_id, - account_statement.statement_number, - account_statement.sequence_number, - parsed_transaction, - G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, - ) - ) - continue - - disbursement_recon = get_disbursement_recon(parsed_transaction, session) - - if disbursement_recon: - disbursement_error_recons.append( - construct_disbursement_error_recon( - statement_id, - account_statement.statement_number, - account_statement.sequence_number, - parsed_transaction, - G2PBridgeErrorCodes.DUPLICATE_DISBURSEMENT, - ) - ) - continue - - disbursement_recon = construct_new_disbursement_recon( - bank_disbursement_batch_id, - parsed_transaction, - statement_id, - account_statement.statement_number, - account_statement.sequence_number, - ) - disbursement_recons.append(disbursement_recon) + process_debit_transactions( + account_statement, + disbursement_error_recons, + disbursement_recons_d, + parsed_transactions_d, + session, + statement_id, + ) - # End of for loop for parsed transactions - debit - session.add_all(disbursement_recons) - session.add_all(disbursement_error_recons) + # Add disbursement_recons_d to session before processing reversal transactions + session.add_all(disbursement_recons_d) # Start processing reversal transactions - rd - for parsed_transaction in parsed_transactions_rd: - bank_disbursement_batch_id = get_bank_batch_id( - parsed_transaction, session - ) + disbursement_recons_rd = [] + process_reversal_of_debits( + account_statement, + disbursement_error_recons, + disbursement_recons_rd, + parsed_transactions_rd, + session, + statement_id, + ) - if not bank_disbursement_batch_id: - disbursement_error_recons.append( - construct_disbursement_error_recon( - statement_id, - account_statement.statement_number, - account_statement.sequence_number, - parsed_transaction, - G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, - ) - ) - continue - - disbursement_recon = get_disbursement_recon(parsed_transaction, session) - - if not disbursement_recon: - disbursement_error_recons.append( - construct_disbursement_error_recon( - statement_id, - account_statement.statement_number, - account_statement.sequence_number, - parsed_transaction, - G2PBridgeErrorCodes.INVALID_REVERSAL, - ) - ) - else: - update_existing_disbursement_recon( - disbursement_recon, - parsed_transaction, - statement_id, - account_statement.statement_number, - account_statement.sequence_number, - ) - disbursement_recons.append(disbursement_recon) + session.add_all(disbursement_recons_rd) + + update_envelope_batch_status_reconciled(disbursement_recons_d, session) + update_envelope_batch_status_reversed(disbursement_recons_rd, session) - # End of for loop for parsed transactions - rd - session.add_all(disbursement_recons) session.add_all(disbursement_error_recons) # Update account statement with parsed data @@ -257,6 +204,99 @@ def mt940_processor_worker(statement_id: str): raise e +def process_reversal_of_debits( + account_statement, + disbursement_error_recons, + disbursement_recons_rd, + parsed_transactions_rd, + session, + statement_id, +): + for parsed_transaction in parsed_transactions_rd: + bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) + + if not bank_disbursement_batch_id: + disbursement_error_recons.append( + construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, + ) + ) + continue + + disbursement_recon = get_disbursement_recon(parsed_transaction, session) + + if not disbursement_recon: + disbursement_error_recons.append( + construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.INVALID_REVERSAL, + ) + ) + else: + update_existing_disbursement_recon( + disbursement_recon, + parsed_transaction, + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + ) + disbursement_recons_rd.append(disbursement_recon) + + +def process_debit_transactions( + account_statement, + disbursement_error_recons, + disbursement_recons_d, + parsed_transactions_d, + session, + statement_id, +): + for parsed_transaction in parsed_transactions_d: + bank_disbursement_batch_id = get_bank_batch_id(parsed_transaction, session) + + if not bank_disbursement_batch_id: + disbursement_error_recons.append( + construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.INVALID_DISBURSEMENT_ID, + ) + ) + continue + + disbursement_recon = get_disbursement_recon(parsed_transaction, session) + + if disbursement_recon: + disbursement_error_recons.append( + construct_disbursement_error_recon( + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + parsed_transaction, + G2PBridgeErrorCodes.DUPLICATE_DISBURSEMENT, + ) + ) + continue + + disbursement_recon = construct_new_disbursement_recon( + bank_disbursement_batch_id, + parsed_transaction, + statement_id, + account_statement.statement_number, + account_statement.sequence_number, + ) + disbursement_recons_d.append(disbursement_recon) + + def get_disbursement_recon(parsed_transaction, session): disbursement_recon = ( session.query(DisbursementRecon) @@ -331,6 +371,7 @@ def construct_new_disbursement_recon( disbursement_recon = DisbursementRecon( bank_disbursement_batch_id=bank_disbursement_batch_id, disbursement_id=parsed_transaction["disbursement_id"], + disbursement_envelope_id=parsed_transaction["disbursement_envelope_id"], beneficiary_name_from_bank=parsed_transaction["beneficiary_name_from_bank"], remittance_reference_number=parsed_transaction["remittance_reference_number"], remittance_statement_id=statement_id, @@ -345,10 +386,7 @@ def construct_new_disbursement_recon( def construct_parsed_transaction( - bank_connector, - debit_credit_indicator, - entry_sequence, - transaction, + bank_connector, debit_credit_indicator, entry_sequence, transaction, session ) -> dict: parsed_transaction = {} transaction_amount = transaction.data["amount"].amount @@ -358,6 +396,7 @@ def construct_parsed_transaction( disbursement_id = bank_connector.retrieve_disbursement_id( remittance_reference_number, customer_reference, narratives ) + disbursement_envelope_id = get_disbursement_envelope_id(disbursement_id, session) beneficiary_name_from_bank = None remittance_entry_sequence = None remittance_entry_date = None @@ -388,6 +427,7 @@ def construct_parsed_transaction( parsed_transaction.update( { "disbursement_id": disbursement_id, + "disbursement_envelope_id": disbursement_envelope_id, "transaction_amount": transaction_amount, "debit_credit_indicator": debit_credit_indicator, "beneficiary_name_from_bank": beneficiary_name_from_bank, @@ -403,3 +443,80 @@ def construct_parsed_transaction( } ) return parsed_transaction + + +def get_disbursement_envelope_id(disbursement_id, session): + disbursement = ( + session.query(Disbursement) + .filter(Disbursement.disbursement_id == disbursement_id) + .first() + ) + + return disbursement.disbursement_envelope_id + + +def update_envelope_batch_status_reconciled( + disbursement_recons: List[DisbursementRecon], session +): + # Get the unique disbursement envelope ids and count of disbursements + disbursement_envelope_id_count = {} + for disbursement_recon in disbursement_recons: + if ( + disbursement_recon.disbursement_envelope_id + in disbursement_envelope_id_count + ): + disbursement_envelope_id_count[ + disbursement_recon.disbursement_envelope_id + ] += 1 + else: + disbursement_envelope_id_count[ + disbursement_recon.disbursement_envelope_id + ] = 1 + + # Update the disbursement envelope batch status + for disbursement_envelope_id, count in disbursement_envelope_id_count.items(): + disbursement_envelope_batch_status = ( + session.query(DisbursementEnvelopeBatchStatus) + .filter( + DisbursementEnvelopeBatchStatus.disbursement_envelope_id + == disbursement_envelope_id + ) + .first() + ) + disbursement_envelope_batch_status.number_of_disbursements_reconciled += count + session.add(disbursement_envelope_batch_status) + + +def update_envelope_batch_status_reversed( + disbursement_recons: List[DisbursementRecon], session +): + # Get the unique disbursement envelope ids and count of disbursements + disbursement_envelope_id_count = {} + for disbursement_recon in disbursement_recons: + if ( + disbursement_recon.disbursement_envelope_id + in disbursement_envelope_id_count + ): + disbursement_envelope_id_count[ + disbursement_recon.disbursement_envelope_id + ] += 1 + else: + disbursement_envelope_id_count[ + disbursement_recon.disbursement_envelope_id + ] = 1 + + # Update the disbursement envelope batch status + for disbursement_envelope_id, count in disbursement_envelope_id_count.items(): + _logger.info( + f"Disbursement envelope id: {disbursement_envelope_id}, count: {count}" + ) + disbursement_envelope_batch_status = ( + session.query(DisbursementEnvelopeBatchStatus) + .filter( + DisbursementEnvelopeBatchStatus.disbursement_envelope_id + == disbursement_envelope_id + ) + .first() + ) + disbursement_envelope_batch_status.number_of_disbursements_reversed += count + session.add(disbursement_envelope_batch_status) diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/errors/exceptions.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/errors/exceptions.py index b9c01c7..d614a2b 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/errors/exceptions.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/errors/exceptions.py @@ -29,3 +29,14 @@ def __init__(self, code: G2PBridgeErrorCodes, message: Optional[str] = None): self.code: G2PBridgeErrorCodes = code self.message: Optional[str] = message super().__init__(self.message) + + +class DisbursementStatusException(Exception): + def __init__( + self, + code: G2PBridgeErrorCodes, + message: Optional[str] = None, + ): + self.code: G2PBridgeErrorCodes = code + self.message: Optional[str] = message + super().__init__(code, self.message) diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/account_statement.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/account_statement.py index fe1022f..8eb48f6 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/account_statement.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/account_statement.py @@ -42,6 +42,7 @@ class DisbursementRecon(BaseORMModelWithTimes): __tablename__ = "disbursement_recons" bank_disbursement_batch_id: Mapped[str] = mapped_column(String, index=True) disbursement_id: Mapped[str] = mapped_column(String, index=True, unique=True) + disbursement_envelope_id: Mapped[str] = mapped_column(String, nullable=True) beneficiary_name_from_bank: Mapped[str] = mapped_column(String, nullable=True) remittance_reference_number: Mapped[str] = mapped_column( diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement_envelope.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement_envelope.py index 312eb84..be590f5 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement_envelope.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/models/disbursement_envelope.py @@ -90,4 +90,5 @@ class DisbursementEnvelopeBatchStatus(BaseORMModelWithTimes): id_mapper_resolution_required: Mapped[bool] = mapped_column(Boolean, default=True) number_of_disbursements_shipped: Mapped[int] = mapped_column(Integer, default=0) - number_of_disbursements_failed: Mapped[int] = mapped_column(Integer, default=0) + number_of_disbursements_reconciled: Mapped[int] = mapped_column(Integer, default=0) + number_of_disbursements_reversed: Mapped[int] = mapped_column(Integer, default=0) diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/__init__.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/__init__.py index dae3d03..fb542af 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/__init__.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/__init__.py @@ -9,5 +9,14 @@ DisbursementEnvelopeRequest, DisbursementEnvelopeResponse, ) -from .request import BridgeRequest -from .response import ResponseStatus +from .disbursement_status import ( + DisbursementEnvelopeBatchStatusPayload, + DisbursementEnvelopeStatusRequest, + DisbursementEnvelopeStatusResponse, + DisbursementErrorReconPayload, + DisbursementReconPayload, + DisbursementReconRecords, + DisbursementStatusPayload, + DisbursementStatusRequest, + DisbursementStatusResponse, +) diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/account_statement.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/account_statement.py index 1a0230c..f299d36 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/account_statement.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/account_statement.py @@ -1,8 +1,8 @@ from typing import Optional -from .response import BridgeResponse +from openg2p_g2pconnect_common_lib.schemas import SyncResponse -class AccountStatementResponse(BridgeResponse): +class AccountStatementResponse(SyncResponse): statement_id: Optional[str] = None response_error_code: Optional[str] = None diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement.py index 57f5e8b..87cbbc9 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement.py @@ -1,11 +1,10 @@ import datetime from typing import List, Optional +from openg2p_g2pconnect_common_lib.schemas import Request, SyncResponse from pydantic import BaseModel from ..models import CancellationStatus -from .request import BridgeRequest -from .response import BridgeResponse class DisbursementPayload(BaseModel): @@ -22,9 +21,9 @@ class DisbursementPayload(BaseModel): response_error_codes: Optional[List[str]] = None -class DisbursementRequest(BridgeRequest): - request_payload: List[DisbursementPayload] +class DisbursementRequest(Request): + message: List[DisbursementPayload] -class DisbursementResponse(BridgeResponse): - response_payload: Optional[List[DisbursementPayload]] = None +class DisbursementResponse(SyncResponse): + message: Optional[List[DisbursementPayload]] = None diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement_envelope.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement_envelope.py index 624bd2c..675224f 100644 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement_envelope.py +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement_envelope.py @@ -1,11 +1,10 @@ import datetime from typing import Optional +from openg2p_g2pconnect_common_lib.schemas import Request, SyncResponse from pydantic import BaseModel from ..models import DisbursementFrequency -from .request import BridgeRequest -from .response import BridgeResponse class DisbursementEnvelopePayload(BaseModel): @@ -21,9 +20,9 @@ class DisbursementEnvelopePayload(BaseModel): disbursement_schedule_date: Optional[datetime.date] = None -class DisbursementEnvelopeRequest(BridgeRequest): - request_payload: DisbursementEnvelopePayload +class DisbursementEnvelopeRequest(Request): + message: DisbursementEnvelopePayload -class DisbursementEnvelopeResponse(BridgeResponse): - response_payload: Optional[DisbursementEnvelopePayload] = None +class DisbursementEnvelopeResponse(SyncResponse): + message: Optional[DisbursementEnvelopePayload] = None diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement_status.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement_status.py new file mode 100644 index 0000000..3f67472 --- /dev/null +++ b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/disbursement_status.py @@ -0,0 +1,94 @@ +import datetime +from typing import List, Optional + +from openg2p_g2pconnect_common_lib.schemas import Request, SyncResponse +from pydantic import BaseModel + +from ..errors.codes import G2PBridgeErrorCodes +from ..models import FundsAvailableWithBankEnum, FundsBlockedWithBankEnum + + +class DisbursementStatusRequest(Request): + message: List[str] + + +class DisbursementReconPayload(BaseModel): + bank_disbursement_batch_id: str + disbursement_id: str + disbursement_envelope_id: Optional[str] = None + beneficiary_name_from_bank: Optional[str] = None + + remittance_reference_number: Optional[str] = None + remittance_statement_id: Optional[str] = None + remittance_statement_number: Optional[str] = None + remittance_statement_sequence: Optional[str] = None + remittance_entry_sequence: Optional[str] = None + remittance_entry_date: Optional[datetime.datetime] = None + remittance_value_date: Optional[datetime.datetime] = None + + reversal_found: Optional[bool] = None + reversal_statement_id: Optional[str] = None + reversal_statement_number: Optional[str] = None + reversal_statement_sequence: Optional[str] = None + reversal_entry_sequence: Optional[str] = None + reversal_entry_date: Optional[datetime.datetime] = None + reversal_value_date: Optional[datetime.datetime] = None + reversal_reason: Optional[str] = None + + +class DisbursementErrorReconPayload(BaseModel): + statement_id: Optional[str] = None + statement_number: Optional[str] = None + statement_sequence: Optional[str] = None + entry_sequence: Optional[str] = None + entry_date: Optional[datetime.datetime] = None + value_date: Optional[datetime.datetime] = None + error_reason: Optional[G2PBridgeErrorCodes] = None + disbursement_id: str + bank_reference_number: Optional[str] = None + + +class DisbursementReconRecords(BaseModel): + disbursement_recon_payloads: Optional[List[DisbursementReconPayload]] = None + disbursement_error_recon_payloads: Optional[ + List[DisbursementErrorReconPayload] + ] = None + + +class DisbursementStatusPayload(BaseModel): + disbursement_id: str + disbursement_recon_records: Optional[DisbursementReconRecords] = None + + +class DisbursementStatusResponse(SyncResponse): + message: Optional[List[DisbursementStatusPayload]] = None + + +class DisbursementEnvelopeStatusRequest(Request): + message: str + + +class DisbursementEnvelopeBatchStatusPayload(BaseModel): + disbursement_envelope_id: str + number_of_disbursements_received: int + total_disbursement_amount_received: int + + funds_available_with_bank: FundsAvailableWithBankEnum + funds_available_latest_timestamp: Optional[datetime.datetime] = None + funds_available_latest_error_code: Optional[str] = None + funds_available_attempts: int + + funds_blocked_with_bank: FundsBlockedWithBankEnum + funds_blocked_latest_timestamp: Optional[datetime.datetime] = None + funds_blocked_latest_error_code: Optional[str] = None + funds_blocked_attempts: int + funds_blocked_reference_number: Optional[str] = None + + id_mapper_resolution_required: Optional[bool] = None + number_of_disbursements_shipped: int + number_of_disbursements_reconciled: int + number_of_disbursements_reversed: int + + +class DisbursementEnvelopeStatusResponse(SyncResponse): + message: Optional[DisbursementEnvelopeBatchStatusPayload] = None diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/request.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/request.py deleted file mode 100644 index 6b6c3cc..0000000 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/request.py +++ /dev/null @@ -1,18 +0,0 @@ -from typing import Optional - -from pydantic import BaseModel - - -class RequestHeader(BaseModel): - pass - - -class RequestPagination(BaseModel): - request_page: Optional[int] - page_size: Optional[int] - - -class BridgeRequest(BaseModel): - request_header: Optional[RequestHeader] = None - request_pagination: Optional[RequestPagination] = None - request_payload: object diff --git a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/response.py b/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/response.py deleted file mode 100644 index 99f0b55..0000000 --- a/openg2p-g2p-bridge-models/src/openg2p_g2p_bridge_models/schemas/response.py +++ /dev/null @@ -1,30 +0,0 @@ -from enum import Enum -from typing import Optional - -from pydantic import BaseModel - -from ..errors.codes import G2PBridgeErrorCodes - - -class ResponseHeader(BaseModel): - pass - - -class ResponseStatus(Enum): - SUCCESS = "success" - FAILURE = "failure" - - -class ResponsePagination(BaseModel): - current_page: Optional[int] = None - page_size: Optional[int] = None - total_elements: Optional[int] = None - - -class BridgeResponse(BaseModel): - response_header: Optional[ResponseHeader] = None - response_status: ResponseStatus - response_error_code: Optional[G2PBridgeErrorCodes] = None - response_message: Optional[str] = None - response_pagination: Optional[ResponsePagination] = ResponsePagination() - response_payload: Optional[object] = None