Skip to content

Commit

Permalink
Mapper resolution task to update the status in corresponding disburse…
Browse files Browse the repository at this point in the history
…ment control
  • Loading branch information
PSNAppz committed Oct 4, 2024
1 parent b57c5ab commit 3c6064d
Showing 1 changed file with 63 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str):
session.execute(
select(DisbursementBatchControl).filter(
DisbursementBatchControl.mapper_resolution_batch_id
== mapper_resolution_batch_id
== mapper_resolution_batch_id,
DisbursementBatchControl.mapper_status.in_(
[ProcessStatus.PENDING.value, ProcessStatus.ERROR.value]
),
)
)
.scalars()
Expand All @@ -43,6 +46,10 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str):
control.beneficiary_id: control.disbursement_id
for control in disbursement_batch_controls
}
beneficiary_control_map = {
control.beneficiary_id: control.id
for control in disbursement_batch_controls
}
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
Expand All @@ -69,7 +76,10 @@ def mapper_resolution_worker(mapper_resolution_batch_id: str):
return

process_and_store_resolution(
mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map
mapper_resolution_batch_id,
resolve_response,
beneficiary_disbursement_map,
beneficiary_control_map,
)


Expand Down Expand Up @@ -97,13 +107,17 @@ async def make_resolve_request(disbursement_batch_controls):


def process_and_store_resolution(
mapper_resolution_batch_id, resolve_response, beneficiary_disbursement_map
mapper_resolution_batch_id,
resolve_response,
beneficiary_disbursement_map,
beneficiary_control_map,
):
_logger.info("Processing and storing resolution")
resolve_helper = ResolveHelper.get_component()
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)
with session_maker() as session:
details_list = []
batch_has_error = False
for single_response in resolve_response.message.resolve_response:
disbursement_id = beneficiary_disbursement_map.get(single_response.id)
if disbursement_id and (
Expand All @@ -130,37 +144,58 @@ def process_and_store_resolution(
email_wallet_provider=deconstructed_fa.get("email_wallet_provider"),
active=True,
)
# Update corresponding disbursement control to processed
session.query(DisbursementBatchControl).filter(
DisbursementBatchControl.disbursement_id == disbursement_id
).update(
{
DisbursementBatchControl.mapper_status: ProcessStatus.PROCESSED,
DisbursementBatchControl.latest_error_code: None,
}
)
details_list.append(details)
else:
_logger.error(
f"Failed to resolve the request for beneficiary: {single_response.id}"
)
session.query(MapperResolutionBatchStatus).filter(
MapperResolutionBatchStatus.mapper_resolution_batch_id
== mapper_resolution_batch_id
).update(
{
MapperResolutionBatchStatus.resolution_status: ProcessStatus.PENDING,
MapperResolutionBatchStatus.latest_error_code: f"Failed to resolve the request for beneficiary: {single_response.id}",
MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts
+ 1,
}
)
session.commit()
return
# Update corresponding disbursement control to failed
disbursement_id = beneficiary_disbursement_map.get(single_response.id)
if disbursement_id:
session.query(DisbursementBatchControl).filter(
DisbursementBatchControl.disbursement_id == disbursement_id
).update(
{
DisbursementBatchControl.mapper_status: ProcessStatus.ERROR,
DisbursementBatchControl.latest_error_code: f"Failed to resolve the request for beneficiary: {single_response.id}",
}
)
batch_has_error = True

session.add_all(details_list)
session.query(MapperResolutionBatchStatus).filter(
MapperResolutionBatchStatus.mapper_resolution_batch_id
== mapper_resolution_batch_id
).update(
{
MapperResolutionBatchStatus.resolution_status: ProcessStatus.PROCESSED,
MapperResolutionBatchStatus.resolution_time_stamp: datetime.utcnow(),
MapperResolutionBatchStatus.latest_error_code: None,
MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts
+ 1,
}
)
if not batch_has_error:
session.query(MapperResolutionBatchStatus).filter(
MapperResolutionBatchStatus.mapper_resolution_batch_id
== mapper_resolution_batch_id
).update(
{
MapperResolutionBatchStatus.resolution_status: ProcessStatus.PROCESSED,
MapperResolutionBatchStatus.resolution_time_stamp: datetime.utcnow(),
MapperResolutionBatchStatus.latest_error_code: None,
MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts
+ 1,
}
)
else:
session.query(MapperResolutionBatchStatus).filter(
MapperResolutionBatchStatus.mapper_resolution_batch_id
== mapper_resolution_batch_id
).update(
{
MapperResolutionBatchStatus.resolution_status: ProcessStatus.PENDING,
MapperResolutionBatchStatus.latest_error_code: f"Failed to resolve the request for beneficiary: {single_response.id}",
MapperResolutionBatchStatus.resolution_attempts: MapperResolutionBatchStatus.resolution_attempts
+ 1,
}
)
_logger.info("Stored the resolution")
session.commit()

0 comments on commit 3c6064d

Please sign in to comment.