From 3c6064d18e7289c6edb060eabbafc1fc8cf7125a Mon Sep 17 00:00:00 2001 From: PSNAppZ Date: Fri, 4 Oct 2024 12:46:58 +0530 Subject: [PATCH] Mapper resolution task to update the status in corresponding disbursement control --- .../tasks/mapper_resolution_task.py | 91 +++++++++++++------ 1 file changed, 63 insertions(+), 28 deletions(-) diff --git a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py index 942a956..5bd56de 100644 --- a/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py +++ b/openg2p-g2p-bridge-celery-workers/src/openg2p_g2p_bridge_celery_workers/tasks/mapper_resolution_task.py @@ -32,7 +32,10 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): session.execute( select(DisbursementBatchControl).filter( DisbursementBatchControl.mapper_resolution_batch_id - == mapper_resolution_batch_id + == mapper_resolution_batch_id, + DisbursementBatchControl.mapper_status.in_( + [ProcessStatus.PENDING.value, ProcessStatus.ERROR.value] + ), ) ) .scalars() @@ -43,6 +46,10 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): control.beneficiary_id: control.disbursement_id for control in disbursement_batch_controls } + beneficiary_control_map = { + control.beneficiary_id: control.id + for control in disbursement_batch_controls + } loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: @@ -69,7 +76,10 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str): return process_and_store_resolution( - mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map + mapper_resolution_batch_id, + resolve_response, + beneficiary_disbursement_map, + beneficiary_control_map, ) @@ -97,13 +107,17 @@ async def make_resolve_request(disbursement_batch_controls): def process_and_store_resolution( - mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map + mapper_resolution_batch_id, + resolve_response, + beneficiary_disbursement_map, + beneficiary_control_map, ): _logger.info("Processing and storing resolution") resolve_helper = ResolveHelper.get_component() session_maker = sessionmaker(bind=_engine, expire_on_commit=False) with session_maker() as session: details_list = [] + batch_has_error = False for single_response in resolve_response.message.resolve_response: disbursement_id = beneficiary_disbursement_map.get(single_response.id) if disbursement_id and ( @@ -130,37 +144,58 @@ def process_and_store_resolution( email_wallet_provider=deconstructed_fa.get("email_wallet_provider"), active=True, ) + # Update corresponding disbursement control to processed + session.query(DisbursementBatchControl).filter( + DisbursementBatchControl.disbursement_id == disbursement_id + ).update( + { + DisbursementBatchControl.mapper_status: ProcessStatus.PROCESSED, + DisbursementBatchControl.latest_error_code: None, + } + ) details_list.append(details) else: _logger.error( f"Failed to resolve the request for beneficiary: {single_response.id}" ) - session.query(MapperResolutionBatchStatus).filter( - MapperResolutionBatchStatus.mapper_resolution_batch_id - == mapper_resolution_batch_id - ).update( - { - MapperResolutionBatchStatus.resolution_status: ProcessStatus.PENDING, - MapperResolutionBatchStatus.latest_error_code: f"Failed to resolve the request for beneficiary: {single_response.id}", - MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts - + 1, - } - ) - session.commit() - return + # Update corresponding disbursement control to failed + disbursement_id = beneficiary_disbursement_map.get(single_response.id) + if disbursement_id: + session.query(DisbursementBatchControl).filter( + DisbursementBatchControl.disbursement_id == disbursement_id + ).update( + { + DisbursementBatchControl.mapper_status: ProcessStatus.ERROR, + DisbursementBatchControl.latest_error_code: f"Failed to resolve the request for beneficiary: {single_response.id}", + } + ) + batch_has_error = True session.add_all(details_list) - session.query(MapperResolutionBatchStatus).filter( - MapperResolutionBatchStatus.mapper_resolution_batch_id - == mapper_resolution_batch_id - ).update( - { - MapperResolutionBatchStatus.resolution_status: ProcessStatus.PROCESSED, - MapperResolutionBatchStatus.resolution_time_stamp: datetime.utcnow(), - MapperResolutionBatchStatus.latest_error_code: None, - MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts - + 1, - } - ) + if not batch_has_error: + session.query(MapperResolutionBatchStatus).filter( + MapperResolutionBatchStatus.mapper_resolution_batch_id + == mapper_resolution_batch_id + ).update( + { + MapperResolutionBatchStatus.resolution_status: ProcessStatus.PROCESSED, + MapperResolutionBatchStatus.resolution_time_stamp: datetime.utcnow(), + MapperResolutionBatchStatus.latest_error_code: None, + MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts + + 1, + } + ) + else: + session.query(MapperResolutionBatchStatus).filter( + MapperResolutionBatchStatus.mapper_resolution_batch_id + == mapper_resolution_batch_id + ).update( + { + MapperResolutionBatchStatus.resolution_status: ProcessStatus.PENDING, + MapperResolutionBatchStatus.latest_error_code: f"Failed to resolve the request for beneficiary: {single_response.id}", + MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts + + 1, + } + ) _logger.info("Stored the resolution") session.commit()