Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev_fix_rdi_merge_deduplication (#4323) #4344

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 0 additions & 37 deletions src/hct_mis_api/apps/registration_datahub/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,26 +482,6 @@ def deduplication_engine_process(self: Any, program_id: str) -> None:
raise


@app.task(bind=True, default_retry_delay=60, max_retries=3)
@sentry_tags
@log_start_and_end
def create_grievance_tickets_for_dedup_engine_results(self: Any, rdi_id: str) -> None:
from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import (
BiometricDeduplicationService,
)

rdi = RegistrationDataImport.objects.get(id=rdi_id)
program = Program.objects.get(id=rdi.program_id)
set_sentry_business_area_tag(program.business_area.name)

try:
rdi = RegistrationDataImport.objects.get(id=rdi_id)
BiometricDeduplicationService().create_grievance_tickets_for_duplicates(rdi)
except Exception as e:
logger.exception(e)
raise


@app.task(bind=True, default_retry_delay=60, max_retries=3)
@log_start_and_end
@sentry_tags
Expand All @@ -523,20 +503,3 @@ def fetch_biometric_deduplication_results_and_process(self: Any, deduplication_s
except Exception as e:
logger.exception(e)
raise


@app.task(bind=True, default_retry_delay=60, max_retries=3)
@sentry_tags
@log_start_and_end
def update_rdis_deduplication_engine_statistics(self: Any, program_id: str) -> None:
from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import (
BiometricDeduplicationService,
)

program = Program.objects.get(id=program_id)
set_sentry_business_area_tag(program.business_area.name)
try:
BiometricDeduplicationService().update_rdis_deduplication_statistics(program_id)
except Exception as e:
logger.exception(e)
raise
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
IgnoredFilenamesPair,
SimilarityPair,
)
from hct_mis_api.apps.utils.models import MergeStatusModel

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,32 +199,28 @@ def store_rdis_deduplication_statistics(self, deduplication_set_id: str) -> None
)
rdi.save(update_fields=["dedup_engine_batch_duplicates", "dedup_engine_golden_record_duplicates"])

def update_rdis_deduplication_statistics(self, program_id: str) -> None:
program = Program.objects.get(id=program_id)
def update_rdis_deduplication_statistics(self, program: Program, exclude_rdi: RegistrationDataImport) -> None:
rdis = RegistrationDataImport.objects.filter(
status=RegistrationDataImport.IN_REVIEW,
program=program,
deduplication_engine_status=RegistrationDataImport.DEDUP_ENGINE_FINISHED,
)
).exclude(id=exclude_rdi.id)
for rdi in rdis:
rdi.dedup_engine_batch_duplicates = self.get_duplicate_individuals_for_rdi_against_batch_count(rdi)
rdi.dedup_engine_golden_record_duplicates = self.get_duplicate_individuals_for_rdi_against_population_count(
rdi
)
rdi.save(update_fields=["dedup_engine_batch_duplicates", "dedup_engine_golden_record_duplicates"])
rdi.save(update_fields=["dedup_engine_golden_record_duplicates"])

def get_duplicates_for_rdi_against_batch(
self, rdi: RegistrationDataImport
) -> QuerySet[DeduplicationEngineSimilarityPair]:
"""Used in RDI statistics"""
rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).only("id")
return DeduplicationEngineSimilarityPair.objects.filter(
Q(individual1__in=rdi_individuals) & Q(individual2__in=rdi_individuals),
program=rdi.program,
).distinct()

def get_duplicate_individuals_for_rdi_against_batch_count(self, rdi: RegistrationDataImport) -> int:
"""Used in RDI statistics"""
duplicates = self.get_duplicates_for_rdi_against_batch(rdi)

unique_individuals = set()
Expand All @@ -234,60 +231,36 @@ def get_duplicate_individuals_for_rdi_against_batch_count(self, rdi: Registratio

return len(unique_individuals)

def get_duplicates_for_merged_rdi_against_population(
self, rdi: RegistrationDataImport
) -> QuerySet[DeduplicationEngineSimilarityPair]:
"""Used in Grievance tickets creation for merging RDI"""
rdi_pending_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).only(
"id"
)
other_pending_individuals = PendingIndividual.objects.filter(is_removed=False, program=rdi.program).exclude(
id__in=rdi_pending_individuals
)

return (
DeduplicationEngineSimilarityPair.objects.filter(
Q(individual1__in=rdi_pending_individuals) | Q(individual2__in=rdi_pending_individuals),
Q(individual1__duplicate=False) & Q(individual2__duplicate=False),
Q(individual1__withdrawn=False) & Q(individual2__withdrawn=False),
program=rdi.program,
)
.exclude(Q(individual1__in=other_pending_individuals) | Q(individual2__in=other_pending_individuals))
.distinct()
)

def get_duplicates_for_rdi_against_population(
self, rdi: RegistrationDataImport
self, rdi: RegistrationDataImport, rdi_merged: bool = False
) -> QuerySet[DeduplicationEngineSimilarityPair]:
"""Used in RDI statistics"""
rdi_pending_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).only(
"id"
)
other_pending_individuals = PendingIndividual.objects.filter(is_removed=False, program=rdi.program).exclude(
id__in=rdi_pending_individuals
)

from hct_mis_api.apps.utils.models import MergeStatusModel
if rdi_merged:
rdi_individuals = Individual.objects.filter(registration_data_import=rdi).only("id")
other_pending_rdis_individuals = PendingIndividual.objects.filter(program=rdi.program)
else:
rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).only("id")
other_pending_rdis_individuals = PendingIndividual.objects.filter(program=rdi.program).exclude(
id__in=rdi_individuals
)

return (
DeduplicationEngineSimilarityPair.objects.filter(
Q(individual1__in=rdi_pending_individuals) | Q(individual2__in=rdi_pending_individuals),
Q(individual1__in=rdi_individuals) | Q(individual2__in=rdi_individuals),
Q(individual1__duplicate=False) & Q(individual2__duplicate=False),
Q(individual1__withdrawn=False) & Q(individual2__withdrawn=False),
Q(individual1__rdi_merge_status=MergeStatusModel.MERGED)
| Q(individual2__rdi_merge_status=MergeStatusModel.MERGED),
program=rdi.program,
)
.exclude(Q(individual1__in=other_pending_individuals) | Q(individual2__in=other_pending_individuals))
.exclude(
Q(individual1__in=other_pending_rdis_individuals) | Q(individual2__in=other_pending_rdis_individuals)
)
.distinct()
)

def get_duplicate_individuals_for_rdi_against_population_count(self, rdi: RegistrationDataImport) -> int:
"""Used in RDI statistics"""
duplicates = self.get_duplicates_for_rdi_against_population(rdi)
rdi_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).values_list(
"id", flat=True
)
duplicates = self.get_duplicates_for_rdi_against_population(rdi, rdi_merged=False)
rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).values_list("id", flat=True)

unique_individuals = set()

Expand All @@ -305,7 +278,7 @@ def create_grievance_tickets_for_duplicates(self, rdi: RegistrationDataImport) -
create_needs_adjudication_tickets_for_biometrics,
)

deduplication_pairs = self.get_duplicates_for_merged_rdi_against_population(rdi)
deduplication_pairs = self.get_duplicates_for_rdi_against_population(rdi, rdi_merged=True)

create_needs_adjudication_tickets_for_biometrics(deduplication_pairs, rdi)

Expand Down
101 changes: 58 additions & 43 deletions src/hct_mis_api/apps/registration_datahub/tasks/rdi_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@
KoboImportedSubmission,
RegistrationDataImport,
)
from hct_mis_api.apps.registration_datahub.celery_tasks import (
create_grievance_tickets_for_dedup_engine_results,
deduplicate_documents,
update_rdis_deduplication_engine_statistics,
from hct_mis_api.apps.registration_datahub.celery_tasks import deduplicate_documents
from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import (
BiometricDeduplicationService,
)
from hct_mis_api.apps.registration_datahub.signals import rdi_merged
from hct_mis_api.apps.registration_datahub.tasks.deduplicate import DeduplicateTask
Expand Down Expand Up @@ -271,12 +270,46 @@ def execute(self, registration_data_import_id: str) -> None:
logger.info(
f"RDI:{registration_data_import_id} Populated index for {len(household_ids)} households"
)

imported_delivery_mechanism_data = PendingDeliveryMechanismData.objects.filter(
individual_id__in=individual_ids,
)
self._create_grievance_tickets_for_delivery_mechanisms_errors(
imported_delivery_mechanism_data, obj_hct
)

imported_delivery_mechanism_data.update(rdi_merge_status=MergeStatusModel.MERGED)
PendingIndividualRoleInHousehold.objects.filter(
household_id__in=household_ids, individual_id__in=individual_ids
).update(rdi_merge_status=MergeStatusModel.MERGED)
PendingBankAccountInfo.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingDocument.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingIndividualRoleInHousehold.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingHousehold.objects.filter(id__in=household_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingIndividual.objects.filter(id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
populate_index(
Individual.objects.filter(registration_data_import=obj_hct),
get_individual_doc(obj_hct.business_area.slug),
)

individuals = evaluate_qs(
Individual.objects.filter(registration_data_import=obj_hct).select_for_update().order_by("pk")
)
households = evaluate_qs(
Household.objects.filter(registration_data_import=obj_hct).select_for_update().order_by("pk")
)

if not obj_hct.business_area.postpone_deduplication:
individuals = evaluate_qs(
PendingIndividual.objects.filter(registration_data_import=obj_hct)
.select_for_update()
.order_by("pk")
)
DeduplicateTask(
obj_hct.business_area.slug, obj_hct.program.id
).deduplicate_individuals_against_population(individuals)
Expand Down Expand Up @@ -327,8 +360,9 @@ def execute(self, registration_data_import_id: str) -> None:
deduplicate_documents()
# synchronously deduplicate biometrics
if obj_hct.program.biometric_deduplication_enabled:
create_grievance_tickets_for_dedup_engine_results(obj_hct.id)
update_rdis_deduplication_engine_statistics(obj_hct.program.id)
dedupe_service = BiometricDeduplicationService()
dedupe_service.create_grievance_tickets_for_duplicates(obj_hct)
dedupe_service.update_rdis_deduplication_statistics(obj_hct.program, exclude_rdi=obj_hct)

obj_hct.update_needs_adjudication_tickets_statistic()
obj_hct.status = RegistrationDataImport.MERGED
Expand All @@ -339,32 +373,6 @@ def execute(self, registration_data_import_id: str) -> None:
self._update_household_collections(households, obj_hct)
self._update_individual_collections(individuals, obj_hct)

imported_delivery_mechanism_data = PendingDeliveryMechanismData.objects.filter(
individual_id__in=individual_ids,
)
self._create_grievance_tickets_for_delivery_mechanisms_errors(
imported_delivery_mechanism_data, obj_hct
)
imported_delivery_mechanism_data.update(rdi_merge_status=MergeStatusModel.MERGED)
PendingIndividualRoleInHousehold.objects.filter(
household_id__in=household_ids, individual_id__in=individual_ids
).update(rdi_merge_status=MergeStatusModel.MERGED)
PendingBankAccountInfo.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingDocument.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingIndividualRoleInHousehold.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
households.update(rdi_merge_status=MergeStatusModel.MERGED)
individuals.update(rdi_merge_status=MergeStatusModel.MERGED)

populate_index(
Individual.objects.filter(registration_data_import=obj_hct),
get_individual_doc(obj_hct.business_area.slug),
)
logger.info(
f"RDI:{registration_data_import_id} Populated index for {len(individual_ids)} individuals"
)
Expand Down Expand Up @@ -411,9 +419,11 @@ def _update_household_collections(self, households: list, rdi: RegistrationDataI
# if this is the 2nd representation - the collection is created now for the new representation and the existing one
for household in households:
# find other household with the same unicef_id and group them in the same collection
household_from_collection = Household.objects.filter(
unicef_id=household.unicef_id, business_area=rdi.business_area
).first()
household_from_collection = (
Household.objects.filter(unicef_id=household.unicef_id, business_area=rdi.business_area)
.exclude(registration_data_import=rdi)
.first()
)
if household_from_collection:
if collection := household_from_collection.household_collection:
household.household_collection = collection
Expand All @@ -431,9 +441,14 @@ def _update_individual_collections(self, individuals: list, rdi: RegistrationDat
individuals_to_update = []
for individual in individuals:
# find other individual with the same unicef_id and group them in the same collection
individual_from_collection = Individual.objects.filter(
unicef_id=individual.unicef_id, business_area=rdi.business_area
).first()
individual_from_collection = (
Individual.objects.filter(
unicef_id=individual.unicef_id,
business_area=rdi.business_area,
)
.exclude(registration_data_import=rdi)
.first()
)
if individual_from_collection:
if collection := individual_from_collection.individual_collection:
individual.individual_collection = collection
Expand Down
Loading
Loading