Skip to content

Commit

Permalink
dev_fix_rdi_merge_deduplication (#4323)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarekBiczysko authored and marekbiczysko committed Oct 17, 2024
1 parent 3908f65 commit 6895771
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 160 deletions.
37 changes: 0 additions & 37 deletions src/hct_mis_api/apps/registration_datahub/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,26 +482,6 @@ def deduplication_engine_process(self: Any, program_id: str) -> None:
raise


@app.task(bind=True, default_retry_delay=60, max_retries=3)
@sentry_tags
@log_start_and_end
def create_grievance_tickets_for_dedup_engine_results(self: Any, rdi_id: str) -> None:
from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import (
BiometricDeduplicationService,
)

rdi = RegistrationDataImport.objects.get(id=rdi_id)
program = Program.objects.get(id=rdi.program_id)
set_sentry_business_area_tag(program.business_area.name)

try:
rdi = RegistrationDataImport.objects.get(id=rdi_id)
BiometricDeduplicationService().create_grievance_tickets_for_duplicates(rdi)
except Exception as e:
logger.exception(e)
raise


@app.task(bind=True, default_retry_delay=60, max_retries=3)
@log_start_and_end
@sentry_tags
Expand All @@ -523,20 +503,3 @@ def fetch_biometric_deduplication_results_and_process(self: Any, deduplication_s
except Exception as e:
logger.exception(e)
raise


@app.task(bind=True, default_retry_delay=60, max_retries=3)
@sentry_tags
@log_start_and_end
def update_rdis_deduplication_engine_statistics(self: Any, program_id: str) -> None:
from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import (
BiometricDeduplicationService,
)

program = Program.objects.get(id=program_id)
set_sentry_business_area_tag(program.business_area.name)
try:
BiometricDeduplicationService().update_rdis_deduplication_statistics(program_id)
except Exception as e:
logger.exception(e)
raise
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
IgnoredFilenamesPair,
SimilarityPair,
)
from hct_mis_api.apps.utils.models import MergeStatusModel

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,32 +199,28 @@ def store_rdis_deduplication_statistics(self, deduplication_set_id: str) -> None
)
rdi.save(update_fields=["dedup_engine_batch_duplicates", "dedup_engine_golden_record_duplicates"])

def update_rdis_deduplication_statistics(self, program_id: str) -> None:
program = Program.objects.get(id=program_id)
def update_rdis_deduplication_statistics(self, program: Program, exclude_rdi: RegistrationDataImport) -> None:
rdis = RegistrationDataImport.objects.filter(
status=RegistrationDataImport.IN_REVIEW,
program=program,
deduplication_engine_status=RegistrationDataImport.DEDUP_ENGINE_FINISHED,
)
).exclude(id=exclude_rdi.id)
for rdi in rdis:
rdi.dedup_engine_batch_duplicates = self.get_duplicate_individuals_for_rdi_against_batch_count(rdi)
rdi.dedup_engine_golden_record_duplicates = self.get_duplicate_individuals_for_rdi_against_population_count(
rdi
)
rdi.save(update_fields=["dedup_engine_batch_duplicates", "dedup_engine_golden_record_duplicates"])
rdi.save(update_fields=["dedup_engine_golden_record_duplicates"])

def get_duplicates_for_rdi_against_batch(
self, rdi: RegistrationDataImport
) -> QuerySet[DeduplicationEngineSimilarityPair]:
"""Used in RDI statistics"""
rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).only("id")
return DeduplicationEngineSimilarityPair.objects.filter(
Q(individual1__in=rdi_individuals) & Q(individual2__in=rdi_individuals),
program=rdi.program,
).distinct()

def get_duplicate_individuals_for_rdi_against_batch_count(self, rdi: RegistrationDataImport) -> int:
"""Used in RDI statistics"""
duplicates = self.get_duplicates_for_rdi_against_batch(rdi)

unique_individuals = set()
Expand All @@ -234,60 +231,36 @@ def get_duplicate_individuals_for_rdi_against_batch_count(self, rdi: Registratio

return len(unique_individuals)

def get_duplicates_for_merged_rdi_against_population(
self, rdi: RegistrationDataImport
) -> QuerySet[DeduplicationEngineSimilarityPair]:
"""Used in Grievance tickets creation for merging RDI"""
rdi_pending_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).only(
"id"
)
other_pending_individuals = PendingIndividual.objects.filter(is_removed=False, program=rdi.program).exclude(
id__in=rdi_pending_individuals
)

return (
DeduplicationEngineSimilarityPair.objects.filter(
Q(individual1__in=rdi_pending_individuals) | Q(individual2__in=rdi_pending_individuals),
Q(individual1__duplicate=False) & Q(individual2__duplicate=False),
Q(individual1__withdrawn=False) & Q(individual2__withdrawn=False),
program=rdi.program,
)
.exclude(Q(individual1__in=other_pending_individuals) | Q(individual2__in=other_pending_individuals))
.distinct()
)

def get_duplicates_for_rdi_against_population(
self, rdi: RegistrationDataImport
self, rdi: RegistrationDataImport, rdi_merged: bool = False
) -> QuerySet[DeduplicationEngineSimilarityPair]:
"""Used in RDI statistics"""
rdi_pending_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).only(
"id"
)
other_pending_individuals = PendingIndividual.objects.filter(is_removed=False, program=rdi.program).exclude(
id__in=rdi_pending_individuals
)

from hct_mis_api.apps.utils.models import MergeStatusModel
if rdi_merged:
rdi_individuals = Individual.objects.filter(registration_data_import=rdi).only("id")
other_pending_rdis_individuals = PendingIndividual.objects.filter(program=rdi.program)
else:
rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).only("id")
other_pending_rdis_individuals = PendingIndividual.objects.filter(program=rdi.program).exclude(
id__in=rdi_individuals
)

return (
DeduplicationEngineSimilarityPair.objects.filter(
Q(individual1__in=rdi_pending_individuals) | Q(individual2__in=rdi_pending_individuals),
Q(individual1__in=rdi_individuals) | Q(individual2__in=rdi_individuals),
Q(individual1__duplicate=False) & Q(individual2__duplicate=False),
Q(individual1__withdrawn=False) & Q(individual2__withdrawn=False),
Q(individual1__rdi_merge_status=MergeStatusModel.MERGED)
| Q(individual2__rdi_merge_status=MergeStatusModel.MERGED),
program=rdi.program,
)
.exclude(Q(individual1__in=other_pending_individuals) | Q(individual2__in=other_pending_individuals))
.exclude(
Q(individual1__in=other_pending_rdis_individuals) | Q(individual2__in=other_pending_rdis_individuals)
)
.distinct()
)

def get_duplicate_individuals_for_rdi_against_population_count(self, rdi: RegistrationDataImport) -> int:
"""Used in RDI statistics"""
duplicates = self.get_duplicates_for_rdi_against_population(rdi)
rdi_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).values_list(
"id", flat=True
)
duplicates = self.get_duplicates_for_rdi_against_population(rdi, rdi_merged=False)
rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).values_list("id", flat=True)

unique_individuals = set()

Expand All @@ -305,7 +278,7 @@ def create_grievance_tickets_for_duplicates(self, rdi: RegistrationDataImport) -
create_needs_adjudication_tickets_for_biometrics,
)

deduplication_pairs = self.get_duplicates_for_merged_rdi_against_population(rdi)
deduplication_pairs = self.get_duplicates_for_rdi_against_population(rdi, rdi_merged=True)

create_needs_adjudication_tickets_for_biometrics(deduplication_pairs, rdi)

Expand Down
101 changes: 58 additions & 43 deletions src/hct_mis_api/apps/registration_datahub/tasks/rdi_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@
KoboImportedSubmission,
RegistrationDataImport,
)
from hct_mis_api.apps.registration_datahub.celery_tasks import (
create_grievance_tickets_for_dedup_engine_results,
deduplicate_documents,
update_rdis_deduplication_engine_statistics,
from hct_mis_api.apps.registration_datahub.celery_tasks import deduplicate_documents
from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import (
BiometricDeduplicationService,
)
from hct_mis_api.apps.registration_datahub.signals import rdi_merged
from hct_mis_api.apps.registration_datahub.tasks.deduplicate import DeduplicateTask
Expand Down Expand Up @@ -271,12 +270,46 @@ def execute(self, registration_data_import_id: str) -> None:
logger.info(
f"RDI:{registration_data_import_id} Populated index for {len(household_ids)} households"
)

imported_delivery_mechanism_data = PendingDeliveryMechanismData.objects.filter(
individual_id__in=individual_ids,
)
self._create_grievance_tickets_for_delivery_mechanisms_errors(
imported_delivery_mechanism_data, obj_hct
)

imported_delivery_mechanism_data.update(rdi_merge_status=MergeStatusModel.MERGED)
PendingIndividualRoleInHousehold.objects.filter(
household_id__in=household_ids, individual_id__in=individual_ids
).update(rdi_merge_status=MergeStatusModel.MERGED)
PendingBankAccountInfo.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingDocument.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingIndividualRoleInHousehold.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingHousehold.objects.filter(id__in=household_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingIndividual.objects.filter(id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
populate_index(
Individual.objects.filter(registration_data_import=obj_hct),
get_individual_doc(obj_hct.business_area.slug),
)

individuals = evaluate_qs(
Individual.objects.filter(registration_data_import=obj_hct).select_for_update().order_by("pk")
)
households = evaluate_qs(
Household.objects.filter(registration_data_import=obj_hct).select_for_update().order_by("pk")
)

if not obj_hct.business_area.postpone_deduplication:
individuals = evaluate_qs(
PendingIndividual.objects.filter(registration_data_import=obj_hct)
.select_for_update()
.order_by("pk")
)
DeduplicateTask(
obj_hct.business_area.slug, obj_hct.program.id
).deduplicate_individuals_against_population(individuals)
Expand Down Expand Up @@ -327,8 +360,9 @@ def execute(self, registration_data_import_id: str) -> None:
deduplicate_documents()
# synchronously deduplicate biometrics
if obj_hct.program.biometric_deduplication_enabled:
create_grievance_tickets_for_dedup_engine_results(obj_hct.id)
update_rdis_deduplication_engine_statistics(obj_hct.program.id)
dedupe_service = BiometricDeduplicationService()
dedupe_service.create_grievance_tickets_for_duplicates(obj_hct)
dedupe_service.update_rdis_deduplication_statistics(obj_hct.program, exclude_rdi=obj_hct)

obj_hct.update_needs_adjudication_tickets_statistic()
obj_hct.status = RegistrationDataImport.MERGED
Expand All @@ -339,32 +373,6 @@ def execute(self, registration_data_import_id: str) -> None:
self._update_household_collections(households, obj_hct)
self._update_individual_collections(individuals, obj_hct)

imported_delivery_mechanism_data = PendingDeliveryMechanismData.objects.filter(
individual_id__in=individual_ids,
)
self._create_grievance_tickets_for_delivery_mechanisms_errors(
imported_delivery_mechanism_data, obj_hct
)
imported_delivery_mechanism_data.update(rdi_merge_status=MergeStatusModel.MERGED)
PendingIndividualRoleInHousehold.objects.filter(
household_id__in=household_ids, individual_id__in=individual_ids
).update(rdi_merge_status=MergeStatusModel.MERGED)
PendingBankAccountInfo.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingDocument.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
PendingIndividualRoleInHousehold.objects.filter(individual_id__in=individual_ids).update(
rdi_merge_status=MergeStatusModel.MERGED
)
households.update(rdi_merge_status=MergeStatusModel.MERGED)
individuals.update(rdi_merge_status=MergeStatusModel.MERGED)

populate_index(
Individual.objects.filter(registration_data_import=obj_hct),
get_individual_doc(obj_hct.business_area.slug),
)
logger.info(
f"RDI:{registration_data_import_id} Populated index for {len(individual_ids)} individuals"
)
Expand Down Expand Up @@ -411,9 +419,11 @@ def _update_household_collections(self, households: list, rdi: RegistrationDataI
# if this is the 2nd representation - the collection is created now for the new representation and the existing one
for household in households:
# find other household with the same unicef_id and group them in the same collection
household_from_collection = Household.objects.filter(
unicef_id=household.unicef_id, business_area=rdi.business_area
).first()
household_from_collection = (
Household.objects.filter(unicef_id=household.unicef_id, business_area=rdi.business_area)
.exclude(registration_data_import=rdi)
.first()
)
if household_from_collection:
if collection := household_from_collection.household_collection:
household.household_collection = collection
Expand All @@ -431,9 +441,14 @@ def _update_individual_collections(self, individuals: list, rdi: RegistrationDat
individuals_to_update = []
for individual in individuals:
# find other individual with the same unicef_id and group them in the same collection
individual_from_collection = Individual.objects.filter(
unicef_id=individual.unicef_id, business_area=rdi.business_area
).first()
individual_from_collection = (
Individual.objects.filter(
unicef_id=individual.unicef_id,
business_area=rdi.business_area,
)
.exclude(registration_data_import=rdi)
.first()
)
if individual_from_collection:
if collection := individual_from_collection.individual_collection:
individual.individual_collection = collection
Expand Down
Loading

0 comments on commit 6895771

Please sign in to comment.