From 68957717d9a4337378752c2238dc266296be5377 Mon Sep 17 00:00:00 2001 From: Marek Biczysko <34810846+MarekBiczysko@users.noreply.github.com> Date: Tue, 15 Oct 2024 15:12:33 +0200 Subject: [PATCH] dev_fix_rdi_merge_deduplication (#4323) --- .../apps/registration_datahub/celery_tasks.py | 37 ------- .../services/biometric_deduplication.py | 67 ++++-------- .../registration_datahub/tasks/rdi_merge.py | 101 ++++++++++-------- .../test_biometric_deduplication_service.py | 28 +++-- .../registration_datahub/test_celery_tasks.py | 17 --- .../test_deduplication.py | 1 - .../registration_datahub/test_rdi_merge.py | 23 ++++ 7 files changed, 114 insertions(+), 160 deletions(-) diff --git a/src/hct_mis_api/apps/registration_datahub/celery_tasks.py b/src/hct_mis_api/apps/registration_datahub/celery_tasks.py index 0860bc02cb..3f50d1a93d 100644 --- a/src/hct_mis_api/apps/registration_datahub/celery_tasks.py +++ b/src/hct_mis_api/apps/registration_datahub/celery_tasks.py @@ -482,26 +482,6 @@ def deduplication_engine_process(self: Any, program_id: str) -> None: raise -@app.task(bind=True, default_retry_delay=60, max_retries=3) -@sentry_tags -@log_start_and_end -def create_grievance_tickets_for_dedup_engine_results(self: Any, rdi_id: str) -> None: - from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import ( - BiometricDeduplicationService, - ) - - rdi = RegistrationDataImport.objects.get(id=rdi_id) - program = Program.objects.get(id=rdi.program_id) - set_sentry_business_area_tag(program.business_area.name) - - try: - rdi = RegistrationDataImport.objects.get(id=rdi_id) - BiometricDeduplicationService().create_grievance_tickets_for_duplicates(rdi) - except Exception as e: - logger.exception(e) - raise - - @app.task(bind=True, default_retry_delay=60, max_retries=3) @log_start_and_end @sentry_tags @@ -523,20 +503,3 @@ def fetch_biometric_deduplication_results_and_process(self: Any, deduplication_s except Exception as e: logger.exception(e) raise - - -@app.task(bind=True, default_retry_delay=60, max_retries=3) -@sentry_tags -@log_start_and_end -def update_rdis_deduplication_engine_statistics(self: Any, program_id: str) -> None: - from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import ( - BiometricDeduplicationService, - ) - - program = Program.objects.get(id=program_id) - set_sentry_business_area_tag(program.business_area.name) - try: - BiometricDeduplicationService().update_rdis_deduplication_statistics(program_id) - except Exception as e: - logger.exception(e) - raise diff --git a/src/hct_mis_api/apps/registration_datahub/services/biometric_deduplication.py b/src/hct_mis_api/apps/registration_datahub/services/biometric_deduplication.py index 952244e7b4..89d061f75b 100644 --- a/src/hct_mis_api/apps/registration_datahub/services/biometric_deduplication.py +++ b/src/hct_mis_api/apps/registration_datahub/services/biometric_deduplication.py @@ -21,6 +21,7 @@ IgnoredFilenamesPair, SimilarityPair, ) +from hct_mis_api.apps.utils.models import MergeStatusModel logger = logging.getLogger(__name__) @@ -198,24 +199,21 @@ def store_rdis_deduplication_statistics(self, deduplication_set_id: str) -> None ) rdi.save(update_fields=["dedup_engine_batch_duplicates", "dedup_engine_golden_record_duplicates"]) - def update_rdis_deduplication_statistics(self, program_id: str) -> None: - program = Program.objects.get(id=program_id) + def update_rdis_deduplication_statistics(self, program: Program, exclude_rdi: RegistrationDataImport) -> None: rdis = RegistrationDataImport.objects.filter( status=RegistrationDataImport.IN_REVIEW, program=program, deduplication_engine_status=RegistrationDataImport.DEDUP_ENGINE_FINISHED, - ) + ).exclude(id=exclude_rdi.id) for rdi in rdis: - rdi.dedup_engine_batch_duplicates = self.get_duplicate_individuals_for_rdi_against_batch_count(rdi) rdi.dedup_engine_golden_record_duplicates = self.get_duplicate_individuals_for_rdi_against_population_count( rdi ) - rdi.save(update_fields=["dedup_engine_batch_duplicates", "dedup_engine_golden_record_duplicates"]) + rdi.save(update_fields=["dedup_engine_golden_record_duplicates"]) def get_duplicates_for_rdi_against_batch( self, rdi: RegistrationDataImport ) -> QuerySet[DeduplicationEngineSimilarityPair]: - """Used in RDI statistics""" rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).only("id") return DeduplicationEngineSimilarityPair.objects.filter( Q(individual1__in=rdi_individuals) & Q(individual2__in=rdi_individuals), @@ -223,7 +221,6 @@ def get_duplicates_for_rdi_against_batch( ).distinct() def get_duplicate_individuals_for_rdi_against_batch_count(self, rdi: RegistrationDataImport) -> int: - """Used in RDI statistics""" duplicates = self.get_duplicates_for_rdi_against_batch(rdi) unique_individuals = set() @@ -234,60 +231,36 @@ def get_duplicate_individuals_for_rdi_against_batch_count(self, rdi: Registratio return len(unique_individuals) - def get_duplicates_for_merged_rdi_against_population( - self, rdi: RegistrationDataImport - ) -> QuerySet[DeduplicationEngineSimilarityPair]: - """Used in Grievance tickets creation for merging RDI""" - rdi_pending_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).only( - "id" - ) - other_pending_individuals = PendingIndividual.objects.filter(is_removed=False, program=rdi.program).exclude( - id__in=rdi_pending_individuals - ) - - return ( - DeduplicationEngineSimilarityPair.objects.filter( - Q(individual1__in=rdi_pending_individuals) | Q(individual2__in=rdi_pending_individuals), - Q(individual1__duplicate=False) & Q(individual2__duplicate=False), - Q(individual1__withdrawn=False) & Q(individual2__withdrawn=False), - program=rdi.program, - ) - .exclude(Q(individual1__in=other_pending_individuals) | Q(individual2__in=other_pending_individuals)) - .distinct() - ) - def get_duplicates_for_rdi_against_population( - self, rdi: RegistrationDataImport + self, rdi: RegistrationDataImport, rdi_merged: bool = False ) -> QuerySet[DeduplicationEngineSimilarityPair]: - """Used in RDI statistics""" - rdi_pending_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).only( - "id" - ) - other_pending_individuals = PendingIndividual.objects.filter(is_removed=False, program=rdi.program).exclude( - id__in=rdi_pending_individuals - ) - - from hct_mis_api.apps.utils.models import MergeStatusModel + if rdi_merged: + rdi_individuals = Individual.objects.filter(registration_data_import=rdi).only("id") + other_pending_rdis_individuals = PendingIndividual.objects.filter(program=rdi.program) + else: + rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).only("id") + other_pending_rdis_individuals = PendingIndividual.objects.filter(program=rdi.program).exclude( + id__in=rdi_individuals + ) return ( DeduplicationEngineSimilarityPair.objects.filter( - Q(individual1__in=rdi_pending_individuals) | Q(individual2__in=rdi_pending_individuals), + Q(individual1__in=rdi_individuals) | Q(individual2__in=rdi_individuals), Q(individual1__duplicate=False) & Q(individual2__duplicate=False), Q(individual1__withdrawn=False) & Q(individual2__withdrawn=False), Q(individual1__rdi_merge_status=MergeStatusModel.MERGED) | Q(individual2__rdi_merge_status=MergeStatusModel.MERGED), program=rdi.program, ) - .exclude(Q(individual1__in=other_pending_individuals) | Q(individual2__in=other_pending_individuals)) + .exclude( + Q(individual1__in=other_pending_rdis_individuals) | Q(individual2__in=other_pending_rdis_individuals) + ) .distinct() ) def get_duplicate_individuals_for_rdi_against_population_count(self, rdi: RegistrationDataImport) -> int: - """Used in RDI statistics""" - duplicates = self.get_duplicates_for_rdi_against_population(rdi) - rdi_individuals = PendingIndividual.objects.filter(is_removed=False, registration_data_import=rdi).values_list( - "id", flat=True - ) + duplicates = self.get_duplicates_for_rdi_against_population(rdi, rdi_merged=False) + rdi_individuals = PendingIndividual.objects.filter(registration_data_import=rdi).values_list("id", flat=True) unique_individuals = set() @@ -305,7 +278,7 @@ def create_grievance_tickets_for_duplicates(self, rdi: RegistrationDataImport) - create_needs_adjudication_tickets_for_biometrics, ) - deduplication_pairs = self.get_duplicates_for_merged_rdi_against_population(rdi) + deduplication_pairs = self.get_duplicates_for_rdi_against_population(rdi, rdi_merged=True) create_needs_adjudication_tickets_for_biometrics(deduplication_pairs, rdi) diff --git a/src/hct_mis_api/apps/registration_datahub/tasks/rdi_merge.py b/src/hct_mis_api/apps/registration_datahub/tasks/rdi_merge.py index e454cc6562..37089328dd 100644 --- a/src/hct_mis_api/apps/registration_datahub/tasks/rdi_merge.py +++ b/src/hct_mis_api/apps/registration_datahub/tasks/rdi_merge.py @@ -38,10 +38,9 @@ KoboImportedSubmission, RegistrationDataImport, ) -from hct_mis_api.apps.registration_datahub.celery_tasks import ( - create_grievance_tickets_for_dedup_engine_results, - deduplicate_documents, - update_rdis_deduplication_engine_statistics, +from hct_mis_api.apps.registration_datahub.celery_tasks import deduplicate_documents +from hct_mis_api.apps.registration_datahub.services.biometric_deduplication import ( + BiometricDeduplicationService, ) from hct_mis_api.apps.registration_datahub.signals import rdi_merged from hct_mis_api.apps.registration_datahub.tasks.deduplicate import DeduplicateTask @@ -271,12 +270,46 @@ def execute(self, registration_data_import_id: str) -> None: logger.info( f"RDI:{registration_data_import_id} Populated index for {len(household_ids)} households" ) + + imported_delivery_mechanism_data = PendingDeliveryMechanismData.objects.filter( + individual_id__in=individual_ids, + ) + self._create_grievance_tickets_for_delivery_mechanisms_errors( + imported_delivery_mechanism_data, obj_hct + ) + + imported_delivery_mechanism_data.update(rdi_merge_status=MergeStatusModel.MERGED) + PendingIndividualRoleInHousehold.objects.filter( + household_id__in=household_ids, individual_id__in=individual_ids + ).update(rdi_merge_status=MergeStatusModel.MERGED) + PendingBankAccountInfo.objects.filter(individual_id__in=individual_ids).update( + rdi_merge_status=MergeStatusModel.MERGED + ) + PendingDocument.objects.filter(individual_id__in=individual_ids).update( + rdi_merge_status=MergeStatusModel.MERGED + ) + PendingIndividualRoleInHousehold.objects.filter(individual_id__in=individual_ids).update( + rdi_merge_status=MergeStatusModel.MERGED + ) + PendingHousehold.objects.filter(id__in=household_ids).update( + rdi_merge_status=MergeStatusModel.MERGED + ) + PendingIndividual.objects.filter(id__in=individual_ids).update( + rdi_merge_status=MergeStatusModel.MERGED + ) + populate_index( + Individual.objects.filter(registration_data_import=obj_hct), + get_individual_doc(obj_hct.business_area.slug), + ) + + individuals = evaluate_qs( + Individual.objects.filter(registration_data_import=obj_hct).select_for_update().order_by("pk") + ) + households = evaluate_qs( + Household.objects.filter(registration_data_import=obj_hct).select_for_update().order_by("pk") + ) + if not obj_hct.business_area.postpone_deduplication: - individuals = evaluate_qs( - PendingIndividual.objects.filter(registration_data_import=obj_hct) - .select_for_update() - .order_by("pk") - ) DeduplicateTask( obj_hct.business_area.slug, obj_hct.program.id ).deduplicate_individuals_against_population(individuals) @@ -327,8 +360,9 @@ def execute(self, registration_data_import_id: str) -> None: deduplicate_documents() # synchronously deduplicate biometrics if obj_hct.program.biometric_deduplication_enabled: - create_grievance_tickets_for_dedup_engine_results(obj_hct.id) - update_rdis_deduplication_engine_statistics(obj_hct.program.id) + dedupe_service = BiometricDeduplicationService() + dedupe_service.create_grievance_tickets_for_duplicates(obj_hct) + dedupe_service.update_rdis_deduplication_statistics(obj_hct.program, exclude_rdi=obj_hct) obj_hct.update_needs_adjudication_tickets_statistic() obj_hct.status = RegistrationDataImport.MERGED @@ -339,32 +373,6 @@ def execute(self, registration_data_import_id: str) -> None: self._update_household_collections(households, obj_hct) self._update_individual_collections(individuals, obj_hct) - imported_delivery_mechanism_data = PendingDeliveryMechanismData.objects.filter( - individual_id__in=individual_ids, - ) - self._create_grievance_tickets_for_delivery_mechanisms_errors( - imported_delivery_mechanism_data, obj_hct - ) - imported_delivery_mechanism_data.update(rdi_merge_status=MergeStatusModel.MERGED) - PendingIndividualRoleInHousehold.objects.filter( - household_id__in=household_ids, individual_id__in=individual_ids - ).update(rdi_merge_status=MergeStatusModel.MERGED) - PendingBankAccountInfo.objects.filter(individual_id__in=individual_ids).update( - rdi_merge_status=MergeStatusModel.MERGED - ) - PendingDocument.objects.filter(individual_id__in=individual_ids).update( - rdi_merge_status=MergeStatusModel.MERGED - ) - PendingIndividualRoleInHousehold.objects.filter(individual_id__in=individual_ids).update( - rdi_merge_status=MergeStatusModel.MERGED - ) - households.update(rdi_merge_status=MergeStatusModel.MERGED) - individuals.update(rdi_merge_status=MergeStatusModel.MERGED) - - populate_index( - Individual.objects.filter(registration_data_import=obj_hct), - get_individual_doc(obj_hct.business_area.slug), - ) logger.info( f"RDI:{registration_data_import_id} Populated index for {len(individual_ids)} individuals" ) @@ -411,9 +419,11 @@ def _update_household_collections(self, households: list, rdi: RegistrationDataI # if this is the 2nd representation - the collection is created now for the new representation and the existing one for household in households: # find other household with the same unicef_id and group them in the same collection - household_from_collection = Household.objects.filter( - unicef_id=household.unicef_id, business_area=rdi.business_area - ).first() + household_from_collection = ( + Household.objects.filter(unicef_id=household.unicef_id, business_area=rdi.business_area) + .exclude(registration_data_import=rdi) + .first() + ) if household_from_collection: if collection := household_from_collection.household_collection: household.household_collection = collection @@ -431,9 +441,14 @@ def _update_individual_collections(self, individuals: list, rdi: RegistrationDat individuals_to_update = [] for individual in individuals: # find other individual with the same unicef_id and group them in the same collection - individual_from_collection = Individual.objects.filter( - unicef_id=individual.unicef_id, business_area=rdi.business_area - ).first() + individual_from_collection = ( + Individual.objects.filter( + unicef_id=individual.unicef_id, + business_area=rdi.business_area, + ) + .exclude(registration_data_import=rdi) + .first() + ) if individual_from_collection: if collection := individual_from_collection.individual_collection: individual.individual_collection = collection diff --git a/tests/unit/apps/registration_datahub/test_biometric_deduplication_service.py b/tests/unit/apps/registration_datahub/test_biometric_deduplication_service.py index 9086a23d74..5225a2459d 100644 --- a/tests/unit/apps/registration_datahub/test_biometric_deduplication_service.py +++ b/tests/unit/apps/registration_datahub/test_biometric_deduplication_service.py @@ -352,7 +352,6 @@ def test_mark_rdis_as(self) -> None: def test_get_duplicates_for_rdi_against_population(self) -> None: self.program.deduplication_set_id = uuid.uuid4() - self.program.business_area.biometric_deduplication_threshold = 0.6 self.program.business_area.save() self.program.save() @@ -400,8 +399,7 @@ def test_get_duplicates_for_rdi_against_population(self) -> None: ] service.store_similarity_pairs(str(self.program.deduplication_set_id), similarity_pairs) - duplicates = service.get_duplicates_for_rdi_against_population(rdi1) - + duplicates = service.get_duplicates_for_rdi_against_population(rdi1, rdi_merged=False) assert len(duplicates) == 2 assert list( duplicates.order_by("similarity_score").values("individual1", "individual2", "similarity_score") @@ -423,7 +421,6 @@ def test_get_duplicates_for_rdi_against_population(self) -> None: def test_get_duplicates_for_merged_rdi_against_population(self) -> None: self.program.deduplication_set_id = uuid.uuid4() - self.program.business_area.biometric_deduplication_threshold = 0.6 self.program.business_area.save() self.program.save() @@ -455,10 +452,6 @@ def test_get_duplicates_for_merged_rdi_against_population(self) -> None: ind5.save() ind6.save() - for ind in [ind3, ind4]: - ind.rdi_merge_status = MergeStatusModel.PENDING - ind.save() - service = BiometricDeduplicationService() similarity_pairs = [ SimilarityPair(score=0.7, first=ind1.id, second=ind2.id), # within merged rdi1 @@ -471,7 +464,7 @@ def test_get_duplicates_for_merged_rdi_against_population(self) -> None: ] service.store_similarity_pairs(str(self.program.deduplication_set_id), similarity_pairs) - duplicates = service.get_duplicates_for_merged_rdi_against_population(rdi2) + duplicates = service.get_duplicates_for_rdi_against_population(rdi2, rdi_merged=True) assert len(duplicates) == 3 assert list( @@ -571,8 +564,8 @@ def test_create_grievance_tickets_for_duplicates( ) service = BiometricDeduplicationService() - service.get_duplicates_for_merged_rdi_against_population = mock.MagicMock() - service.get_duplicates_for_merged_rdi_against_population.return_value = [] + service.get_duplicates_for_rdi_against_population = mock.MagicMock() + service.get_duplicates_for_rdi_against_population.return_value = [] service.create_grievance_tickets_for_duplicates(rdi1) create_needs_adjudication_tickets_for_biometrics_mock.assert_called_once_with([], rdi1) @@ -663,18 +656,23 @@ def test_update_rdis_deduplication_statistics(self) -> None: program=self.program, deduplication_engine_status=RegistrationDataImport.DEDUP_ENGINE_FINISHED, status=RegistrationDataImport.IN_REVIEW, + dedup_engine_batch_duplicates=5, + dedup_engine_golden_record_duplicates=6, + ) + rdi2 = RegistrationDataImportFactory( + program=self.program, + deduplication_engine_status=RegistrationDataImport.DEDUP_ENGINE_FINISHED, + status=RegistrationDataImport.IN_REVIEW, ) - service.get_duplicate_individuals_for_rdi_against_batch_count = mock.Mock(return_value=8) service.get_duplicate_individuals_for_rdi_against_population_count = mock.Mock(return_value=9) - service.update_rdis_deduplication_statistics(self.program.id) + service.update_rdis_deduplication_statistics(self.program, exclude_rdi=rdi2) - service.get_duplicate_individuals_for_rdi_against_batch_count.assert_called_once_with(rdi1) service.get_duplicate_individuals_for_rdi_against_population_count.assert_called_once_with(rdi1) rdi1.refresh_from_db() - assert rdi1.dedup_engine_batch_duplicates == 8 + assert rdi1.dedup_engine_batch_duplicates == 5 assert rdi1.dedup_engine_golden_record_duplicates == 9 @patch( diff --git a/tests/unit/apps/registration_datahub/test_celery_tasks.py b/tests/unit/apps/registration_datahub/test_celery_tasks.py index 386b3097bd..22ca8c8a1d 100644 --- a/tests/unit/apps/registration_datahub/test_celery_tasks.py +++ b/tests/unit/apps/registration_datahub/test_celery_tasks.py @@ -51,7 +51,6 @@ RegistrationDataImport, ) from hct_mis_api.apps.registration_datahub.celery_tasks import ( - create_grievance_tickets_for_dedup_engine_results, deduplication_engine_process, fetch_biometric_deduplication_results_and_process, merge_registration_data_import_task, @@ -1094,22 +1093,6 @@ def test_deduplication_engine_process_task( mock_upload_and_process.assert_called_once_with(self.program) - @patch.dict( - "os.environ", - {"DEDUPLICATION_ENGINE_API_KEY": "dedup_api_key", "DEDUPLICATION_ENGINE_API_URL": "http://dedup-fake-url.com"}, - ) - @patch( - "hct_mis_api.apps.registration_datahub.services.biometric_deduplication.BiometricDeduplicationService" - ".create_grievance_tickets_for_duplicates" - ) - def test_create_grievance_tickets_for_dedup_engine_results_task( - self, - mock_create_tickets: Mock, - ) -> None: - create_grievance_tickets_for_dedup_engine_results(str(self.registration_data_import.id)) - - mock_create_tickets.assert_called_once_with(self.registration_data_import) - @patch.dict( "os.environ", {"DEDUPLICATION_ENGINE_API_KEY": "dedup_api_key", "DEDUPLICATION_ENGINE_API_URL": "http://dedup-fake-url.com"}, diff --git a/tests/unit/apps/registration_datahub/test_deduplication.py b/tests/unit/apps/registration_datahub/test_deduplication.py index 56a6bceb7b..64d2571f08 100644 --- a/tests/unit/apps/registration_datahub/test_deduplication.py +++ b/tests/unit/apps/registration_datahub/test_deduplication.py @@ -304,7 +304,6 @@ def test_batch_deduplication(self) -> None: ) -@disabled_locally_test class TestGoldenRecordDeduplication(TestCase): fixtures = (f"{settings.PROJECT_ROOT}/apps/geo/fixtures/data.json",) diff --git a/tests/unit/apps/registration_datahub/test_rdi_merge.py b/tests/unit/apps/registration_datahub/test_rdi_merge.py index cb9dce774d..9e343de178 100644 --- a/tests/unit/apps/registration_datahub/test_rdi_merge.py +++ b/tests/unit/apps/registration_datahub/test_rdi_merge.py @@ -593,6 +593,29 @@ def test_merging_external_collector(self) -> None: with capture_on_commit_callbacks(execute=True): RdiMergeTask().execute(self.rdi.pk) + @patch.dict( + "os.environ", + {"DEDUPLICATION_ENGINE_API_KEY": "dedup_api_key", "DEDUPLICATION_ENGINE_API_URL": "http://dedup-fake-url.com"}, + ) + @mock.patch( + "hct_mis_api.apps.registration_datahub.services.biometric_deduplication.BiometricDeduplicationService.create_grievance_tickets_for_duplicates" + ) + @mock.patch( + "hct_mis_api.apps.registration_datahub.services.biometric_deduplication.BiometricDeduplicationService.update_rdis_deduplication_statistics" + ) + def test_merge_biometric_deduplication_enabled( + self, + update_rdis_deduplication_statistics_mock: mock.Mock, + create_grievance_tickets_for_duplicates_mock: mock.Mock, + ) -> None: + program = self.rdi.program + program.biometric_deduplication_enabled = True + program.save() + with capture_on_commit_callbacks(execute=True): + RdiMergeTask().execute(self.rdi.pk) + create_grievance_tickets_for_duplicates_mock.assert_called_once_with(self.rdi) + update_rdis_deduplication_statistics_mock.assert_called_once_with(program, exclude_rdi=self.rdi) + class TestRdiMergeTaskDeliveryMechanismData(TestCase): fixtures = [