Skip to content

Commit

Permalink
Merge pull request #2 from PSNAppz/1.0.0
Browse files Browse the repository at this point in the history
Refactoring + added logging
  • Loading branch information
venky-ganapathy authored Aug 8, 2024
2 parents ca7c485 + 5d6f13e commit ee1b731
Show file tree
Hide file tree
Showing 24 changed files with 300 additions and 154 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def upload_mt940(self, statement_file: UploadFile) -> str:

statement_lob = AccountStatementLob(
statement_id=statement_id,
statement_lob=str(statement_file),
statement_lob=str(statement_file.decode("utf-8")),
active=True,
)
session.add(statement_lob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def construct_disbursements(
disbursements: List[Disbursement] = []
for disbursement_payload in disbursement_payloads:
disbursement = Disbursement(
disbursement_id=str(int(time.time() * 1000)),
disbursement_id=str(int(time.time() * 1000000)),
disbursement_envelope_id=str(
disbursement_payload.disbursement_envelope_id
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List, Optional
import logging

import httpx
from openg2p_g2p_bridge_models.models import (
Expand All @@ -18,7 +19,7 @@
from ..config import Settings

_config = Settings.get_config()

_logger = logging.getLogger(_config.logging_default_logger_name)

class BankPaymentPayload(BaseModel):
payment_reference_number: str
Expand Down Expand Up @@ -51,11 +52,14 @@ class BankPaymentPayload(BaseModel):


class ExampleBankConnector(BankConnectorInterface):
def check_funds(self, account_no, currency, amount) -> CheckFundsResponse:
def check_funds(self, account_number, currency, amount) -> CheckFundsResponse:
_logger.info(
f"Checking funds availability for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
try:
with httpx.Client() as client:
request_data = {
"account_number": account_no,
"account_number": account_number,
"account_currency": currency,
"total_funds_needed": amount,
}
Expand All @@ -66,22 +70,34 @@ def check_funds(self, account_no, currency, amount) -> CheckFundsResponse:

data = response.json()
if data["status"] == "success":
_logger.info(
f"Funds available for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
return CheckFundsResponse(
status=FundsAvailableWithBankEnum.FUNDS_AVAILABLE, error_code=""
)
_logger.info(
f"Funds not available for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
return CheckFundsResponse(
status=FundsAvailableWithBankEnum.FUNDS_NOT_AVAILABLE, error_code=""
)
except httpx.HTTPStatusError as e:
_logger.error(
f"Error checking funds availability for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
return CheckFundsResponse(
status=FundsAvailableWithBankEnum.PENDING_CHECK, error_code=str(e)
)

def block_funds(self, account_no, currency, amount) -> BlockFundsResponse:
def block_funds(self, account_number, currency, amount) -> BlockFundsResponse:
_logger.info(
f"Blocking funds for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
try:
with httpx.Client() as client:
request_data = {
"account_no": account_no,
"account_number": account_number,
"currency": currency,
"amount": amount,
}
Expand All @@ -92,17 +108,26 @@ def block_funds(self, account_no, currency, amount) -> BlockFundsResponse:

data = response.json()
if data["status"] == "success":
_logger.info(
f"Funds blocked for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
return BlockFundsResponse(
status=FundsBlockedWithBankEnum.FUNDS_BLOCK_SUCCESS,
block_reference_no=data["block_reference_no"],
error_code="",
)
_logger.error(
f"Funds block failed for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
return BlockFundsResponse(
status=FundsBlockedWithBankEnum.FUNDS_BLOCK_FAILURE,
block_reference_no="",
error_code=data.get("error_code", ""),
)
except httpx.HTTPStatusError as e:
_logger.error(
f"Error blocking funds for account_number: {account_number}, currency: {currency}, amount: {amount}"
)
return BlockFundsResponse(
status=FundsBlockedWithBankEnum.FUNDS_BLOCK_FAILURE,
block_reference_no="",
Expand All @@ -112,6 +137,7 @@ def block_funds(self, account_no, currency, amount) -> BlockFundsResponse:
def initiate_payment(
self, disbursement_payment_payloads: List[DisbursementPaymentPayload]
) -> PaymentResponse:
_logger.info(f"Initiating payment for {len(disbursement_payment_payloads)} disbursements")
try:
with httpx.Client() as client:
bank_payment_payloads = []
Expand Down Expand Up @@ -143,7 +169,7 @@ def initiate_payment(
)
bank_payment_payloads.append(bank_payment_payload.model_dump())

request_data = {"initiate_payment_payloads": bank_payment_payloads}
request_data = bank_payment_payloads

response = client.post(
_config.funds_disbursement_url_example_bank, json=request_data
Expand All @@ -152,11 +178,14 @@ def initiate_payment(

data = response.json()
if data["status"] == "success":
_logger.info(f"Payment initiated successfully")
return PaymentResponse(status=PaymentStatus.SUCCESS, error_code="")
_logger.error(f"Payment initiation failed")
return PaymentResponse(
status=PaymentStatus.ERROR, error_code=data.get("error_message", "")
)
except httpx.HTTPStatusError as e:
_logger.error(f"Error initiating payment: {e}")
return PaymentResponse(status=PaymentStatus.ERROR, error_code=str(e))

def retrieve_disbursement_id(
Expand All @@ -165,7 +194,7 @@ def retrieve_disbursement_id(
return customer_reference

def retrieve_beneficiary_name(self, narratives: str) -> str:
return narratives[0]
return narratives[3]

def retrieve_reversal_reason(self, narratives: str) -> str:
return narratives[1]
return narratives[5]
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class PaymentResponse(BaseModel):


class BankConnectorInterface(BaseService):
def check_funds(self, account_no, currency, amount) -> CheckFundsResponse:
def check_funds(self, account_number, currency, amount) -> CheckFundsResponse:
raise NotImplementedError()

def block_funds(self, account_no, currency, amount) -> BlockFundsResponse:
def block_funds(self, account_number, currency, amount) -> BlockFundsResponse:
raise NotImplementedError()

def initiate_payment(
Expand Down
10 changes: 10 additions & 0 deletions openg2p-g2p-bridge-celery-beat-producers/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ G2P_BRIDGE_CELERY_TASKS_DB_DBNAME=openg2p_g2p_bridge_db
G2P_BRIDGE_BANK_DECONSTRUCT_STRATEGY="bank_(?P<account_number>\d+)_(?P<bank_code>\d+)_(?P<branch_code>\d+)_(?P<account_type>\w+)"
G2P_BRIDGE_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P<mobile_number>\d+)_(?P<mobile_wallet_provider>\w+)"
G2P_BRIDGE_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P<email_address>\w+)_(?P<email_wallet_provider>\w+)"
G2P_BRIDGE_MAPPER_RESOLVE_ATTEMPTS=3
G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_ATTEMPTS=3
G2P_BRIDGE_FUNDS_BLOCKED_ATTEMPTS=3
G2P_BRIDGE_FUNDS_DISBURSEMENT_ATTEMPTS=3
G2P_BRIDGE_STATEMENT_PROCESS_ATTEMPTS=3
G2P_BRIDGE_MAPPER_RESOLVE_FREQUENCY=3600
G2P_BRIDGE_FUNDS_AVAILABLE_CHECK_FREQUENCY=3600
G2P_BRIDGE_FUNDS_BLOCKED_FREQUENCY=3600
G2P_BRIDGE_FUNDS_DISBURSEMENT_FREQUENCY=3600
G2P_BRIDGE_MT940_PROCESSOR_FREQUENCY=3600
1 change: 1 addition & 0 deletions openg2p-g2p-bridge-celery-beat-producers/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@ docs/_build/
# Ignore secret files and env
.secrets.*
.env
*.db
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_engine():


celery_app = Celery(
"g2p_bridge_celery_tasks",
"g2p_bridge_celery_beat_producer",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
include=["openg2p_g2p_bridge_celery_beat_producers.tasks"],
Expand All @@ -54,5 +54,9 @@ def get_engine():
"task": "disburse_funds_from_bank_beat_producer",
"schedule": _config.funds_disbursement_frequency,
},
"mt940_processor_beat_producer": {
"task": "mt940_processor_beat_producer",
"schedule": _config.mt940_processor_frequency,
},
}
celery_app.conf.timezone = "UTC"
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,14 @@ class Settings(BaseSettings):
db_dbname: str = "openg2p_g2p_bridge_db"
db_driver: str = "postgresql"

mapper_resolve_api_url: str = ""

mapper_resolve_attempts: int = 3
funds_available_check_attempts: int = 3
funds_blocked_attempts: int = 3
funds_disbursement_attempts: int = 3
statement_process_attempts: int = 3

mapper_resolve_frequency: int = 10
funds_available_check_frequency: int = 10
funds_blocked_frequency: int = 10
funds_disbursement_frequency: int = 10
statement_process_frequency: int = 3600

bank_fa_deconstruct_strategy: str = ""
mobile_wallet_deconstruct_strategy: str = ""
email_wallet_deconstruct_strategy: str = ""
mapper_resolve_frequency: int = 3600
funds_available_check_frequency: int = 3600
funds_blocked_frequency: int = 3600
funds_disbursement_frequency: int = 3600
mt940_processor_frequency: int = 3600
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ def block_funds_with_bank_beat_producer():
args=(envelope.disbursement_envelope_id,),
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Completed checking for envelopes to block funds with bank")
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

@celery_app.task(name="check_funds_with_bank_beat_producer")
def check_funds_with_bank_beat_producer():
_logger.info("Checking funds with bank")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)

with session_maker() as session:
envelopes = (
session.execute(
Expand Down Expand Up @@ -62,8 +62,13 @@ def check_funds_with_bank_beat_producer():
)

for envelope in envelopes:
_logger.info(
f"Sending task to check funds with bank for envelope {envelope.disbursement_envelope_id}"
)
celery_app.send_task(
"check_funds_with_bank_worker",
args=(envelope.disbursement_envelope_id,),
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Checking funds with bank beat tasks push completed")
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

@celery_app.task(name="disburse_funds_from_bank_beat_producer")
def disburse_funds_from_bank_beat_producer():
_logger.info("Running disburse_funds_from_bank_beat_producer")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)
with session_maker() as session:
envelopes = (
Expand All @@ -48,7 +49,6 @@ def disburse_funds_from_bank_beat_producer():
.scalars()
.all()
)

for envelope in envelopes:
pending_batches = (
session.execute(
Expand All @@ -68,8 +68,15 @@ def disburse_funds_from_bank_beat_producer():
)

for batch in pending_batches:
_logger.info(
f"Sending task to disburse funds for batch {batch.bank_disbursement_batch_id}"
)
celery_app.send_task(
"disburse_funds_from_bank_worker",
(batch.bank_disbursement_batch_id,),
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info(
f"Sent tasks to disburse funds for {len(pending_batches)} batches"
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

@celery_app.task(name="mapper_resolution_beat_producer")
def mapper_resolution_beat_producer():
_logger.info("Running mapper_resolution_beat_producer")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)

with session_maker() as session:
mapper_resolution_batch_statuses = (
session.execute(
Expand All @@ -36,8 +36,13 @@ def mapper_resolution_beat_producer():
)

for mapper_resolution_batch_status in mapper_resolution_batch_statuses:
_logger.info(
f"Sending mapper_resolution_worker task for mapper_resolution_batch_id: {mapper_resolution_batch_status.mapper_resolution_batch_id}"
)
celery_app.send_task(
"mapper_resolution_worker",
args=[mapper_resolution_batch_status.mapper_resolution_batch_id],
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Finished mapper_resolution_beat_producer")
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

@celery_app.task(name="mt940_processor_beat_producer")
def mt940_processor_beat_producer():
_logger.info("Running mt940_processor_beat_producer")
session_maker = sessionmaker(bind=_engine, expire_on_commit=False)
with session_maker() as session:
account_statements = (
Expand All @@ -35,8 +36,13 @@ def mt940_processor_beat_producer():
)

for statement in account_statements:
_logger.info(
f"Sending mt940_processor_worker task for statement_id: {statement.statement_id}"
)
celery_app.send_task(
"mt940_processor_worker",
args=[statement.statement_id],
queue="g2p_bridge_celery_worker_tasks",
)

_logger.info("Finished mt940_processor_beat_producer")
9 changes: 6 additions & 3 deletions openg2p-g2p-bridge-celery-workers/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ G2P_BRIDGE_CELERY_TASKS_PORT=8001
G2P_BRIDGE_CELERY_TASKS_WORKER_TYPE=gunicorn
G2P_BRIDGE_CELERY_TASKS_NO_OF_WORKERS=1
G2P_BRIDGE_CELERY_TASKS_DB_DBNAME=openg2p_g2p_bridge_db
G2P_BRIDGE_BANK_DECONSTRUCT_STRATEGY="bank_(?P<account_number>\d+)_(?P<bank_code>\d+)_(?P<branch_code>\d+)_(?P<account_type>\w+)"
G2P_BRIDGE_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P<mobile_number>\d+)_(?P<mobile_wallet_provider>\w+)"
G2P_BRIDGE_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P<email_address>\w+)_(?P<email_wallet_provider>\w+)"
G2P_BRIDGE_CELERY_TASKS_BANK_FA_DECONSTRUCT_STRATEGY="bank_(?P<account_number>\d+)_(?P<bank_code>\d+)_(?P<branch_code>\d+)_(?P<account_type>\w+)"
G2P_BRIDGE_CELERY_MOBILE_WALLET_DECONSTRUCT_STRATEGY="mobile_(?P<mobile_number>\d+)_(?P<mobile_wallet_provider>\w+)"
G2P_BRIDGE_CELERY_EMAIL_WALLET_DECONSTRUCT_STRATEGY="email_(?P<email_address>\w+)_(?P<email_wallet_provider>\w+)"
G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_AVAILABLE_CHECK_URL_EXAMPLE_BANK="http://127.0.0.1:8003/check_funds"
G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_BLOCK_URL_EXAMPLE_BANK="http://127.0.0.1:8003/block_funds"
G2P_BRIDGE_CELERY_PRODUCERS_FUNDS_DISBURSEMENT_URL_EXAMPLE_BANK="http://127.0.0.1:8003/initiate_payment"
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def get_engine():


celery_app = Celery(
"g2p_bridge_celery_worker_tasks",
"g2p_bridge_celery_worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
include=["openg2p_g2p_bridge_celery_workers.tasks"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ class Settings(BaseSettings):

mapper_resolve_api_url: str = ""

mapper_resolve_attempts: int = 3
funds_available_check_attempts: int = 3
funds_blocked_attempts: int = 3
funds_disbursement_attempts: int = 3
statement_process_attempts: int = 3

bank_fa_deconstruct_strategy: str = ""
mobile_wallet_deconstruct_strategy: str = ""
email_wallet_deconstruct_strategy: str = ""
Loading

0 comments on commit ee1b731

Please sign in to comment.