diff --git a/JeMPI_Apps/JeMPI_API/src/main/java/org/jembi/jempi/api/HttpServer.java b/JeMPI_Apps/JeMPI_API/src/main/java/org/jembi/jempi/api/HttpServer.java index 6d42e48b8..cd3661987 100644 --- a/JeMPI_Apps/JeMPI_API/src/main/java/org/jembi/jempi/api/HttpServer.java +++ b/JeMPI_Apps/JeMPI_API/src/main/java/org/jembi/jempi/api/HttpServer.java @@ -84,11 +84,19 @@ private Route createJeMPIRoutes( http)), path(GlobalConstants.SEGMENT_POST_FILTER_GIDS, () -> Routes.postFilterGids(actorSystem, backEnd)), - path(GlobalConstants.SEGMENT_PROXY_CR_REGISTER, + path(GlobalConstants.SEGMENT_PROXY_POST_LINK_INTERACTION, + () -> Routes.postLinkInteraction(AppConfig.LINKER_IP, + AppConfig.LINKER_HTTP_PORT, + http)), + path(GlobalConstants.SEGMENT_PROXY_POST_LINK_INTERACTION_TO_GID, + () -> Routes.postLinkInteractionToGid(AppConfig.LINKER_IP, + AppConfig.LINKER_HTTP_PORT, + http)), + path(GlobalConstants.SEGMENT_PROXY_POST_CR_REGISTER, () -> Routes.postCrRegister(AppConfig.LINKER_IP, AppConfig.LINKER_HTTP_PORT, http)), - path(GlobalConstants.SEGMENT_PROXY_CR_FIND, + path(GlobalConstants.SEGMENT_PROXY_POST_CR_FIND, () -> Routes.postCrFind(AppConfig.LINKER_IP, AppConfig.LINKER_HTTP_PORT, http)), - path(GlobalConstants.SEGMENT_PROXY_CR_CANDIDATES, + path(GlobalConstants.SEGMENT_PROXY_POST_CR_CANDIDATES, () -> Routes.postCrCandidates(AppConfig.LINKER_IP, AppConfig.LINKER_HTTP_PORT, http)), path(GlobalConstants.SEGMENT_POST_FILTER_GIDS_WITH_INTERACTION_COUNT, () -> Routes.postFilterGidsWithInteractionCount(actorSystem, backEnd)))), @@ -98,8 +106,10 @@ private Route createJeMPIRoutes( () -> Routes.patchIidNewGidLink(actorSystem, backEnd)), path(GlobalConstants.SEGMENT_PATCH_IID_GID_LINK, () -> Routes.patchIidGidLink(actorSystem, backEnd)), - path(GlobalConstants.SEGMENT_PROXY_CR_UPDATE_FIELDS, - () -> Routes.patchCrUpdateFields(AppConfig.LINKER_IP, AppConfig.LINKER_HTTP_PORT, http)))), + path(GlobalConstants.SEGMENT_PROXY_PATCH_CR_UPDATE_FIELDS, + () -> Routes.patchCrUpdateFields(AppConfig.LINKER_IP, + AppConfig.LINKER_HTTP_PORT, + http)))), get(() -> concat(path(GlobalConstants.SEGMENT_COUNT_GOLDEN_RECORDS, () -> Routes.countGoldenRecords(actorSystem, backEnd)), path(GlobalConstants.SEGMENT_COUNT_INTERACTIONS, diff --git a/JeMPI_Apps/JeMPI_Controller/src/main/java/org/jembi/jempi/controller/HttpServer.java b/JeMPI_Apps/JeMPI_Controller/src/main/java/org/jembi/jempi/controller/HttpServer.java index c50149eea..adefbeec9 100644 --- a/JeMPI_Apps/JeMPI_Controller/src/main/java/org/jembi/jempi/controller/HttpServer.java +++ b/JeMPI_Apps/JeMPI_Controller/src/main/java/org/jembi/jempi/controller/HttpServer.java @@ -12,14 +12,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jembi.jempi.AppConfig; +import org.jembi.jempi.shared.models.ApiModels; import org.jembi.jempi.shared.models.GlobalConstants; -import org.jembi.jempi.shared.models.LinkInteractionSyncBody; -import org.jembi.jempi.shared.models.LinkInteractionToGidSyncBody; -import org.jembi.jempi.shared.utils.AppUtils; import java.util.Locale; import java.util.concurrent.CompletionStage; +import static org.jembi.jempi.shared.utils.AppUtils.OBJECT_MAPPER; + public final class HttpServer extends AllDirectives { private static final Logger LOGGER = LogManager.getLogger(HttpServer.class); @@ -42,7 +42,7 @@ void open( LOGGER.info("Server online at http://{}:{}", "0.0.0.0", AppConfig.CONTROLLER_HTTP_PORT); } - private CompletionStage postLinkInteraction(final LinkInteractionSyncBody body) throws JsonProcessingException { + private CompletionStage postLinkInteraction(final ApiModels.LinkInteractionSyncBody body) throws JsonProcessingException { final HttpRequest request; request = HttpRequest .create(String.format(Locale.ROOT, @@ -51,12 +51,12 @@ private CompletionStage postLinkInteraction(final LinkInteractionS AppConfig.LINKER_HTTP_PORT, GlobalConstants.SEGMENT_PROXY_POST_LINK_INTERACTION)) .withMethod(HttpMethods.POST) - .withEntity(ContentTypes.APPLICATION_JSON, AppUtils.OBJECT_MAPPER.writeValueAsBytes(body)); + .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); final var stage = http.singleRequest(request); return stage.thenApply(response -> response); } - private CompletionStage postLinkInteractionToGid(final LinkInteractionToGidSyncBody body) throws JsonProcessingException { + private CompletionStage postLinkInteractionToGid(final ApiModels.LinkInteractionToGidSyncBody body) throws JsonProcessingException { final var request = HttpRequest .create(String.format(Locale.ROOT, "http://%s:%d/JeMPI/%s", @@ -64,7 +64,7 @@ private CompletionStage postLinkInteractionToGid(final LinkInterac AppConfig.LINKER_HTTP_PORT, GlobalConstants.SEGMENT_PROXY_POST_LINK_INTERACTION_TO_GID)) .withMethod(HttpMethods.POST) - .withEntity(ContentTypes.APPLICATION_JSON, AppUtils.OBJECT_MAPPER.writeValueAsBytes(body)); + .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); final var stage = http.singleRequest(request); return stage.thenApply(response -> response); } @@ -78,7 +78,7 @@ private CompletionStage getMU() { } private Route routeLinkInteraction() { - return entity(Jackson.unmarshaller(LinkInteractionSyncBody.class), + return entity(Jackson.unmarshaller(ApiModels.LinkInteractionSyncBody.class), obj -> { try { LOGGER.debug("{}", obj); @@ -94,17 +94,17 @@ private Route routeLinkInteraction() { } private Route routeLinkInteractionToGid() { - return entity(Jackson.unmarshaller(LinkInteractionToGidSyncBody.class), + return entity(Jackson.unmarshaller(ApiModels.LinkInteractionToGidSyncBody.class), obj -> { try { return onComplete(postLinkInteractionToGid(obj), response -> response.isSuccess() ? complete(response.get()) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } catch (JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); - return complete(StatusCodes.IM_A_TEAPOT); } + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); }); } diff --git a/JeMPI_Apps/JeMPI_LibAPI/pom.xml b/JeMPI_Apps/JeMPI_LibAPI/pom.xml index e7e66dd24..fc960b1ef 100644 --- a/JeMPI_Apps/JeMPI_LibAPI/pom.xml +++ b/JeMPI_Apps/JeMPI_LibAPI/pom.xml @@ -194,8 +194,8 @@ org.apache.maven.plugins maven-compiler-plugin - 17 - 17 + 21 + 21 --enable-preview diff --git a/JeMPI_Apps/JeMPI_LibAPI/src/main/java/org/jembi/jempi/libapi/Routes.java b/JeMPI_Apps/JeMPI_LibAPI/src/main/java/org/jembi/jempi/libapi/Routes.java index aa5cbcd07..1fb739cf7 100644 --- a/JeMPI_Apps/JeMPI_LibAPI/src/main/java/org/jembi/jempi/libapi/Routes.java +++ b/JeMPI_Apps/JeMPI_LibAPI/src/main/java/org/jembi/jempi/libapi/Routes.java @@ -77,7 +77,7 @@ public static Route countRecords( new ApiModels.ApiNumberOfRecords(result.get().goldenRecords(), result.get().patientRecords()), JSON_MARSHALLER) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } public static Route getGidsPaged( @@ -93,7 +93,7 @@ public static Route getGidsPaged( ? complete(StatusCodes.OK, result.get(), JSON_MARSHALLER) - : complete(StatusCodes.IM_A_TEAPOT)))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))))); } public static Route getGoldenRecordAuditTrail( @@ -105,7 +105,7 @@ public static Route getGoldenRecordAuditTrail( ? complete(StatusCodes.OK, ApiModels.ApiAuditTrail.fromAuditTrail(result.get().auditTrail()), JSON_MARSHALLER) - : complete(StatusCodes.IM_A_TEAPOT))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)))); } public static Route getInteractionAuditTrail( @@ -117,7 +117,7 @@ public static Route getInteractionAuditTrail( ? complete(StatusCodes.OK, ApiModels.ApiAuditTrail.fromAuditTrail(result.get().auditTrail()), JSON_MARSHALLER) - : complete(StatusCodes.IM_A_TEAPOT))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)))); } public static Route patchIidNewGidLink( @@ -137,7 +137,7 @@ public static Route patchIidNewGidLink( linkInfo -> complete(StatusCodes.OK, linkInfo, JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))))); } public static Route patchIidGidLink( @@ -167,7 +167,7 @@ public static Route patchIidGidLink( StatusCodes.OK, linkInfo, JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)))))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))))))); } public static Route countGoldenRecords( @@ -182,7 +182,7 @@ public static Route countGoldenRecords( count -> complete(StatusCodes.OK, new ApiModels.ApiGoldenRecordCount(count), JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } public static Route countInteractions( @@ -197,7 +197,7 @@ public static Route countInteractions( count -> complete(StatusCodes.OK, new ApiModels.ApiInteractionCount(count), JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } public static Route getGidsAll( @@ -206,7 +206,7 @@ public static Route getGidsAll( return onComplete(Ask.getGidsAll(actorSystem, backEnd), result -> result.isSuccess() ? complete(StatusCodes.OK, result.get(), JSON_MARSHALLER) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } public static Route getNotifications( @@ -227,7 +227,7 @@ public static Route getNotifications( ? complete(StatusCodes.OK, result.get(), JSON_MARSHALLER) - : complete(StatusCodes.IM_A_TEAPOT)))))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))))))); } public static Route getExpandedGoldenRecordsUsingParameterList( @@ -246,7 +246,7 @@ public static Route getExpandedGoldenRecordsUsingParameterList( .map(ApiModels.ApiExpandedGoldenRecord::fromExpandedGoldenRecord) .toList(), JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); }); } @@ -267,7 +267,7 @@ public static Route getExpandedGoldenRecordsFromUsingCSV( .map(ApiModels.ApiExpandedGoldenRecord::fromExpandedGoldenRecord) .toList(), JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); }); } @@ -287,7 +287,7 @@ public static Route getExpandedInteractionsUsingCSV( .map(ApiModels.ApiExpandedInteraction::fromExpandedInteraction) .toList(), JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); }); } @@ -305,7 +305,7 @@ public static Route getExpandedGoldenRecord( ApiModels.ApiExpandedGoldenRecord.fromExpandedGoldenRecord( goldenRecord), Jackson.marshaller(OBJECT_MAPPER))) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } public static Route getInteraction( @@ -321,7 +321,7 @@ public static Route getInteraction( patientRecord -> complete(StatusCodes.OK, ApiModels.ApiInteraction.fromInteraction(patientRecord), JSON_MARSHALLER)) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } public static Route postUpdateNotification( @@ -333,7 +333,7 @@ public static Route postUpdateNotification( final var updateResponse = response.get(); return complete(StatusCodes.OK, updateResponse, JSON_MARSHALLER); } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -355,7 +355,7 @@ public static Route postUploadCsvFile( (info, file) -> onComplete(Ask.postUploadCsvFile(actorSystem, backEnd, info, file), response -> response.isSuccess() ? complete(StatusCodes.OK) - : complete(StatusCodes.IM_A_TEAPOT)))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))))); } public static Route postSimpleSearch( @@ -377,7 +377,7 @@ public static Route postSimpleSearch( final var eventSearchRsp = response.get(); return complete(StatusCodes.OK, eventSearchRsp, JSON_MARSHALLER); } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -396,7 +396,7 @@ public static Route postFilterGids( final var eventSearchRsp = response.get(); return complete(StatusCodes.OK, eventSearchRsp, JSON_MARSHALLER); } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -415,7 +415,7 @@ public static Route postFilterGidsWithInteractionCount( final var eventSearchRsp = response.get(); return complete(StatusCodes.OK, eventSearchRsp, JSON_MARSHALLER); } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -435,7 +435,7 @@ public static Route postCustomSearch( final var eventSearchRsp = response.get(); return complete(StatusCodes.OK, eventSearchRsp, JSON_MARSHALLER); } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -469,10 +469,10 @@ public static Route proxyPostCalculateScores( http, obj), response -> response.isSuccess() ? complete(response.get()) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } catch (JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } }); } @@ -505,10 +505,10 @@ public static Route proxyGetCandidatesWithScore( return onComplete(proxyGetCandidatesWithScore(linkerIP, linkerPort, http, iid), response -> response.isSuccess() ? complete(response.get()) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } catch (JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } }); } @@ -523,7 +523,7 @@ private static CompletionStage patchCrUpdateFieldsProxy( "http://%s:%d/JeMPI/%s", linkerIP, linkerPort, - GlobalConstants.SEGMENT_PROXY_CR_UPDATE_FIELDS)) + GlobalConstants.SEGMENT_PROXY_PATCH_CR_UPDATE_FIELDS)) .withMethod(HttpMethods.PATCH) .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); final var stage = http.singleRequest(request); @@ -540,13 +540,49 @@ private static CompletionStage postCrRegisterProxy( "http://%s:%d/JeMPI/%s", linkerIP, linkerPort, - GlobalConstants.SEGMENT_PROXY_CR_REGISTER)) + GlobalConstants.SEGMENT_PROXY_POST_CR_REGISTER)) .withMethod(HttpMethods.POST) .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); final var stage = http.singleRequest(request); return stage.thenApply(response -> response); } + private static CompletionStage postLinkInteractionProxy( + final String linkerIP, + final Integer linkerPort, + final Http http, + final ApiModels.LinkInteractionSyncBody body) throws JsonProcessingException { + final var request = HttpRequest + .create(String.format(Locale.ROOT, + "http://%s:%d/JeMPI/%s", + linkerIP, + linkerPort, + GlobalConstants.SEGMENT_PROXY_POST_LINK_INTERACTION)) + .withMethod(HttpMethods.POST) + .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); + final var stage = http.singleRequest(request); + return stage.thenApply(response -> response); + } + + private static CompletionStage postLinkInteractionToGidProxy( + final String linkerIP, + final Integer linkerPort, + final Http http, + final ApiModels.LinkInteractionToGidSyncBody body) throws JsonProcessingException { + final var request = HttpRequest + .create(String.format(Locale.ROOT, + "http://%s:%d/JeMPI/%s", + linkerIP, + linkerPort, + GlobalConstants.SEGMENT_PROXY_POST_LINK_INTERACTION_TO_GID)) + .withMethod(HttpMethods.POST) + .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); + final var stage = http.singleRequest(request); + return stage.thenApply(response -> response); + } + + + private static CompletionStage postCrCandidatesProxy( final String linkerIP, final Integer linkerPort, @@ -557,7 +593,7 @@ private static CompletionStage postCrCandidatesProxy( "http://%s:%d/JeMPI/%s", linkerIP, linkerPort, - GlobalConstants.SEGMENT_PROXY_CR_CANDIDATES)) + GlobalConstants.SEGMENT_PROXY_POST_CR_CANDIDATES)) .withMethod(HttpMethods.POST) .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); final var stage = http.singleRequest(request); @@ -577,7 +613,7 @@ private static CompletionStage postCrFindProxy( "http://%s:%d/JeMPI/%s", linkerIP, linkerPort, - GlobalConstants.SEGMENT_PROXY_CR_FIND)) + GlobalConstants.SEGMENT_PROXY_POST_CR_FIND)) .withMethod(HttpMethods.POST) .withEntity(ContentTypes.APPLICATION_JSON, OBJECT_MAPPER.writeValueAsBytes(body)); final var stage = http.singleRequest(request); @@ -598,10 +634,10 @@ public static Route patchCrUpdateFields( return onComplete(patchCrUpdateFieldsProxy(linkerIP, linkerPort, http, apiCrUpdateFields), response -> response.isSuccess() ? complete(response.get()) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } catch (JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } }); } @@ -618,7 +654,7 @@ public static Route postCrFind( return onComplete(postCrFindProxy(linkerIP, linkerPort, http, apiCrFind), response -> response.isSuccess() ? complete(response.get()) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } catch (JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); return complete(StatusCodes.IM_A_TEAPOT); @@ -637,10 +673,10 @@ public static Route postCrCandidates( return onComplete(postCrCandidatesProxy(linkerIP, linkerPort, http, apiCrCandidates), response -> response.isSuccess() ? complete(response.get()) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } catch (JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } }); } @@ -656,7 +692,25 @@ public static Route postCrRegister( return onComplete(postCrRegisterProxy(linkerIP, linkerPort, http, apiCrRegister), response -> response.isSuccess() ? complete(response.get()) - : complete(StatusCodes.IM_A_TEAPOT)); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); + } catch (JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + return complete(StatusCodes.NO_CONTENT); + } + }); + } + + public static Route postLinkInteraction( + final String linkerIP, + final Integer linkerPort, + final Http http) { + return entity(Jackson.unmarshaller(OBJECT_MAPPER, ApiModels.LinkInteractionSyncBody.class), + linkInteractionSyncBody -> { + try { + return onComplete(postLinkInteractionProxy(linkerIP, linkerPort, http, linkInteractionSyncBody), + response -> response.isSuccess() + ? complete(response.get()) + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); } catch (JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); return complete(StatusCodes.NO_CONTENT); @@ -664,4 +718,23 @@ public static Route postCrRegister( }); } + public static Route postLinkInteractionToGid( + final String linkerIP, + final Integer linkerPort, + final Http http) { + return entity(Jackson.unmarshaller(OBJECT_MAPPER, ApiModels.LinkInteractionToGidSyncBody.class), + apiLinkInteractionToGid -> { + try { + return onComplete(postLinkInteractionToGidProxy(linkerIP, linkerPort, http, apiLinkInteractionToGid), + response -> response.isSuccess() + ? complete(response.get()) + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))); + } catch (JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + return complete(StatusCodes.NO_CONTENT); + } + }); + } + + } diff --git a/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/dgraph/DgraphMutations.java b/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/dgraph/DgraphMutations.java index e4a6ddee4..a2fd9a06d 100644 --- a/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/dgraph/DgraphMutations.java +++ b/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/dgraph/DgraphMutations.java @@ -43,7 +43,7 @@ LinkInfo addNewDGraphInteraction(final Interaction interaction) { LOGGER.error("Failed to insert golden record"); return null; } - return new LinkInfo(grUID, result.interactionUID, 1.0F); + return new LinkInfo(grUID, result.interactionUID, result.sourceUID, 1.0F); } boolean updateGoldenRecordField( @@ -306,7 +306,7 @@ Either linkToNewGoldenRecord( interaction.demographicData(), interaction.interactionId(), interaction.sourceId().uid(), score, new CustomUniqueGoldenRecordData(interaction.uniqueInteractionData())); - return Either.right(new LinkInfo(newGoldenID, interactionId, score)); + return Either.right(new LinkInfo(newGoldenID, interactionId, interaction.sourceId().uid(), score)); } Either updateLink( @@ -330,7 +330,7 @@ Either updateLink( final var scoreList = new ArrayList(); scoreList.add(new DgraphPairWithScore(newGoldenId, interactionId, score)); addScoreFacets(scoreList); - return Either.right(new LinkInfo(newGoldenId, interactionId, score)); + return Either.right(new LinkInfo(newGoldenId, interactionId, null, score)); // FIX: need to return the source id } LinkInfo linkDGraphInteraction( @@ -348,7 +348,7 @@ LinkInfo linkDGraphInteraction( addSourceId(interactionScoreList.get(0).goldenUID(), result.sourceUID); final var grUID = interactionScoreList.get(0).goldenUID(); final var theScore = interactionScoreList.get(0).score(); - return new LinkInfo(grUID, result.interactionUID, theScore); + return new LinkInfo(grUID, result.interactionUID, result.sourceUID, theScore); } Option createSchema() { diff --git a/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/postgresql/LibPostgresql.java b/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/postgresql/LibPostgresql.java index 7ed235e64..d448b8c51 100644 --- a/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/postgresql/LibPostgresql.java +++ b/JeMPI_Apps/JeMPI_LibMPI/src/main/java/org/jembi/jempi/libmpi/postgresql/LibPostgresql.java @@ -274,7 +274,7 @@ public LinkInfo createInteractionAndLinkToExistingGoldenRecord( eid, Edge.EdgeName.GID2IID, new FacetScore(goldenIdScore.score())); - return new LinkInfo(goldenIdScore.goldenId(), eid.toString(), goldenIdScore.score()); + return new LinkInfo(goldenIdScore.goldenId(), eid.toString(), null, goldenIdScore.score()); } public LinkInfo createInteractionAndLinkToClonedGoldenRecord( @@ -287,7 +287,7 @@ public LinkInfo createInteractionAndLinkToClonedGoldenRecord( Edge.createEdge(gid, sid, Edge.EdgeName.GID2SID); Edge.createEdge(gid, iid, Edge.EdgeName.GID2IID, new FacetScore(score)); - return new LinkInfo(gid.toString(), iid.toString(), score); + return new LinkInfo(gid.toString(), iid.toString(), null, score); } public void startTransaction() { diff --git a/JeMPI_Apps/JeMPI_LibShared/pom.xml b/JeMPI_Apps/JeMPI_LibShared/pom.xml index 8f01f34d0..c158bde89 100644 --- a/JeMPI_Apps/JeMPI_LibShared/pom.xml +++ b/JeMPI_Apps/JeMPI_LibShared/pom.xml @@ -31,6 +31,26 @@ commons-text + + com.typesafe.akka + akka-stream_${scala.tools.version} + + + + com.typesafe.akka + akka-http_${scala.tools.version} + + + + com.typesafe.akka + akka-http-jackson_${scala.tools.version} + + + + ch.megard + akka-http-cors_${scala.tools.version} + + com.fasterxml.jackson.core jackson-annotations diff --git a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/ApiModels.java b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/ApiModels.java index 8f3920d4a..12fd5cf0e 100644 --- a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/ApiModels.java +++ b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/ApiModels.java @@ -1,20 +1,56 @@ package org.jembi.jempi.shared.models; +import akka.http.javadsl.model.HttpResponse; +import akka.http.javadsl.model.StatusCode; +import akka.http.javadsl.model.StatusCodes; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.lang3.ObjectUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; +import static org.jembi.jempi.shared.utils.AppUtils.OBJECT_MAPPER; + public abstract class ApiModels { + private static final Logger LOGGER = LogManager.getLogger(ApiModels.class); private static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + public static HttpResponse getHttpErrorResponse(final StatusCode statusCode) { + try { + var entity = OBJECT_MAPPER.writeValueAsBytes(new ApiError()); + return HttpResponse.create() + .withStatus(statusCode) + .withEntity(entity); + } catch (JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + return HttpResponse.create().withStatus(StatusCodes.INTERNAL_SERVER_ERROR); + } + } + public interface ApiPaginatedResultSet { } + public record ApiError( + + @JsonProperty("module") String module, + @JsonProperty("class") String klass, + @JsonProperty("line_number") Integer lineNumber) { + + public ApiError() { + this(Thread.currentThread().getStackTrace()[3].getModuleName(), + Thread.currentThread().getStackTrace()[3].getClassName(), + Thread.currentThread().getStackTrace()[3].getLineNumber()); + } + + + } + public record ApiGoldenRecordCount(Long count) { } @@ -69,6 +105,27 @@ public record ApiCrRegisterRequest( CustomDemographicData demographicData) { } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record LinkInteractionSyncBody( + String stan, + ExternalLinkRange externalLinkRange, + Float matchThreshold, + CustomSourceId sourceId, + CustomUniqueInteractionData uniqueInteractionData, + CustomDemographicData demographicData) { + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record LinkInteractionToGidSyncBody( + String stan, + CustomSourceId sourceId, + CustomUniqueInteractionData uniqueInteractionData, + CustomDemographicData demographicData, + String gid) { + } + + @JsonInclude(JsonInclude.Include.NON_NULL) public record ApiCrRegisterResponse(LinkInfo linkInfo) { } diff --git a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java index 6fc224db6..332333650 100644 --- a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java +++ b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/GlobalConstants.java @@ -47,10 +47,10 @@ public final class GlobalConstants { public static final String SEGMENT_POST_FILTER_GIDS = "filter-gids"; public static final String SEGMENT_POST_FILTER_GIDS_WITH_INTERACTION_COUNT = "filter-gids-interaction"; - public static final String SEGMENT_PROXY_CR_REGISTER = "cr-register"; - public static final String SEGMENT_PROXY_CR_FIND = "cr-find"; - public static final String SEGMENT_PROXY_CR_CANDIDATES = "cr-candidates"; - public static final String SEGMENT_PROXY_CR_UPDATE_FIELDS = "cr-update-fields"; + public static final String SEGMENT_PROXY_POST_CR_REGISTER = "cr-register"; + public static final String SEGMENT_PROXY_POST_CR_FIND = "cr-find"; + public static final String SEGMENT_PROXY_POST_CR_CANDIDATES = "cr-candidates"; + public static final String SEGMENT_PROXY_PATCH_CR_UPDATE_FIELDS = "cr-update-fields"; public static final String SEGMENT_PROXY_GET_CANDIDATES_WITH_SCORES = "candidate-golden-records"; diff --git a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInfo.java b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInfo.java index 9bdc91a04..2c20e6243 100644 --- a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInfo.java +++ b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInfo.java @@ -3,5 +3,6 @@ public record LinkInfo( String goldenUID, String interactionUID, + String sourceUID, float score) { } diff --git a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInteractionSyncBody.java b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInteractionSyncBody.java deleted file mode 100644 index ff7b24c0e..000000000 --- a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInteractionSyncBody.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.jembi.jempi.shared.models; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; - -@JsonInclude(JsonInclude.Include.NON_NULL) -public record LinkInteractionSyncBody( - @JsonProperty("stan") String stan, - @JsonProperty("externalLinkRange") ExternalLinkRange externalLinkRange, - @JsonProperty("matchThreshold") Float matchThreshold, - @JsonProperty("patientRecord") Interaction interaction) { -} diff --git a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInteractionToGidSyncBody.java b/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInteractionToGidSyncBody.java deleted file mode 100644 index 9ea4bcccf..000000000 --- a/JeMPI_Apps/JeMPI_LibShared/src/main/java/org/jembi/jempi/shared/models/LinkInteractionToGidSyncBody.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.jembi.jempi.shared.models; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; - -@JsonInclude(JsonInclude.Include.NON_NULL) -public record LinkInteractionToGidSyncBody( - @JsonProperty("stan") String stan, - @JsonProperty("patientRecord") Interaction interaction, - @JsonProperty("gid") String gid) { -} diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java index 3df6907a0..240e83290 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Ask.java @@ -8,8 +8,6 @@ import org.jembi.jempi.linker.backend.BackEnd; import org.jembi.jempi.shared.models.ApiModels; import org.jembi.jempi.shared.models.InteractionEnvelop; -import org.jembi.jempi.shared.models.LinkInteractionSyncBody; -import org.jembi.jempi.shared.models.LinkInteractionToGidSyncBody; import java.util.concurrent.CompletionStage; @@ -93,7 +91,7 @@ static CompletionStage patchCrUpdateField( static CompletionStage postLinkInteraction( final ActorSystem actorSystem, final ActorRef backEnd, - final LinkInteractionSyncBody body) { + final ApiModels.LinkInteractionSyncBody body) { CompletionStage stage = AskPattern.ask(backEnd, replyTo -> new BackEnd.SyncLinkInteractionRequest( body, @@ -129,7 +127,7 @@ static CompletionStage findCandidates( static CompletionStage postLinkPatientToGid( final ActorSystem actorSystem, final ActorRef backEnd, - final LinkInteractionToGidSyncBody body) { + final ApiModels.LinkInteractionToGidSyncBody body) { CompletionStage stage = AskPattern.ask(backEnd, replyTo -> new BackEnd.SyncLinkInteractionToGidRequest( body, diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/HttpServer.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/HttpServer.java index 1c1eda3d3..952cf8a75 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/HttpServer.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/HttpServer.java @@ -43,7 +43,7 @@ private Route createRoute( final ActorSystem actorSystem, final ActorRef backEnd) { return pathPrefix("JeMPI", - () -> concat(patch(() -> path(GlobalConstants.SEGMENT_PROXY_CR_UPDATE_FIELDS, + () -> concat(patch(() -> path(GlobalConstants.SEGMENT_PROXY_PATCH_CR_UPDATE_FIELDS, () -> Routes.proxyPatchCrUpdateField(actorSystem, backEnd))), post(() -> concat(path(GlobalConstants.SEGMENT_PROXY_POST_LINK_INTERACTION, () -> Routes.proxyPostLinkInteraction(actorSystem, backEnd)), @@ -51,11 +51,11 @@ private Route createRoute( () -> Routes.proxyPostLinkInteractionToGID(actorSystem, backEnd)), path(GlobalConstants.SEGMENT_PROXY_POST_CALCULATE_SCORES, () -> Routes.proxyPostCalculateScores(actorSystem, backEnd)), - path(GlobalConstants.SEGMENT_PROXY_CR_CANDIDATES, + path(GlobalConstants.SEGMENT_PROXY_POST_CR_CANDIDATES, () -> Routes.proxyGetCrCandidates(actorSystem, backEnd)), - path(GlobalConstants.SEGMENT_PROXY_CR_FIND, + path(GlobalConstants.SEGMENT_PROXY_POST_CR_FIND, () -> Routes.proxyGetCrFind(actorSystem, backEnd)), - path(GlobalConstants.SEGMENT_PROXY_CR_REGISTER, + path(GlobalConstants.SEGMENT_PROXY_POST_CR_REGISTER, () -> Routes.proxyPostCrRegister(actorSystem, backEnd)))), get(() -> concat(// path("mu", () -> Routes.routeMU(actorSystem, backEnd)), path(GlobalConstants.SEGMENT_PROXY_GET_CANDIDATES_WITH_SCORES, diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Routes.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Routes.java index b12247628..df67b4196 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Routes.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/Routes.java @@ -3,24 +3,34 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.http.javadsl.marshallers.jackson.Jackson; +import akka.http.javadsl.model.StatusCode; import akka.http.javadsl.model.StatusCodes; import akka.http.javadsl.server.Route; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jembi.jempi.libmpi.MpiGeneralError; import org.jembi.jempi.libmpi.MpiServiceError; import org.jembi.jempi.linker.backend.BackEnd; import org.jembi.jempi.shared.models.ApiModels; import org.jembi.jempi.shared.models.CustomMU; -import org.jembi.jempi.shared.models.LinkInteractionSyncBody; -import org.jembi.jempi.shared.models.LinkInteractionToGidSyncBody; import static akka.http.javadsl.server.Directives.*; import static org.jembi.jempi.shared.utils.AppUtils.OBJECT_MAPPER; final class Routes { + private static final Logger LOGGER = LogManager.getLogger(Routes.class); + private Routes() { } + static StatusCode logHttpError( + final StatusCode code, + final String log) { + LOGGER.debug("{}", log); + return code; + } + static Route mapError(final MpiGeneralError obj) { return switch (obj) { case MpiServiceError.InteractionIdDoesNotExistError e -> complete(StatusCodes.BAD_REQUEST, e, Jackson.marshaller()); @@ -49,35 +59,17 @@ static Route proxyGetCandidatesWithScore( candidateList -> complete(StatusCodes.OK, candidateList, Jackson.marshaller())) - : complete(StatusCodes.IM_A_TEAPOT)))); - } - - static Route proxyPostLinkInteraction( - final ActorSystem actorSystem, - final ActorRef backEnd) { - return entity(Jackson.unmarshaller(LinkInteractionSyncBody.class), - obj -> onComplete(Ask.postLinkInteraction(actorSystem, backEnd, obj), response -> { - if (response.isSuccess()) { - final var eventLinkPatientSyncRsp = response.get(); - return complete(StatusCodes.OK, - new ApiModels.ApiExtendedLinkInfo(eventLinkPatientSyncRsp.stan(), - eventLinkPatientSyncRsp.linkInfo(), - eventLinkPatientSyncRsp.externalLinkCandidateList()), - Jackson.marshaller()); - } else { - return complete(StatusCodes.IM_A_TEAPOT); - } - })); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT))))); } static Route proxyPostLinkInteractionToGID( final ActorSystem actorSystem, final ActorRef backEnd) { - return entity(Jackson.unmarshaller(LinkInteractionToGidSyncBody.class), + return entity(Jackson.unmarshaller(ApiModels.LinkInteractionToGidSyncBody.class), obj -> onComplete(Ask.postLinkPatientToGid(actorSystem, backEnd, obj), response -> response.isSuccess() ? complete(StatusCodes.OK, response.get(), Jackson.marshaller()) - : complete(StatusCodes.IM_A_TEAPOT))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)))); } static Route proxyPostCalculateScores( @@ -87,18 +79,9 @@ static Route proxyPostCalculateScores( obj -> onComplete(Ask.postCalculateScores(actorSystem, backEnd, obj), response -> response.isSuccess() ? complete(StatusCodes.OK, response.get(), Jackson.marshaller()) - : complete(StatusCodes.IM_A_TEAPOT))); + : complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)))); } -// static Route routeMU( -// final ActorSystem actorSystem, -// final ActorRef backEnd) { -// return onComplete(Ask.getMU(actorSystem, backEnd), -// response -> response.isSuccess() -// ? complete(StatusCodes.OK, response.get().mu(), Jackson.marshaller()) -// : complete(StatusCodes.IM_A_TEAPOT)); -// } - static Route proxyGetCrCandidates( final ActorSystem actorSystem, final ActorRef backEnd) { @@ -113,7 +96,7 @@ static Route proxyGetCrCandidates( new ApiModels.ApiCrCandidatesResponse(rsp.goldenRecords().get()), Jackson.marshaller(OBJECT_MAPPER)); } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -132,7 +115,7 @@ static Route proxyGetCrFind( new ApiModels.ApiCrCandidatesResponse(rsp.goldenRecords().get()), Jackson.marshaller(OBJECT_MAPPER)); } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -153,7 +136,25 @@ static Route proxyPostCrRegister( Jackson.marshaller(OBJECT_MAPPER)); } } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); + } + })); + } + + static Route proxyPostLinkInteraction( + final ActorSystem actorSystem, + final ActorRef backEnd) { + return entity(Jackson.unmarshaller(OBJECT_MAPPER, ApiModels.LinkInteractionSyncBody.class), + obj -> onComplete(Ask.postLinkInteraction(actorSystem, backEnd, obj), response -> { + if (response.isSuccess()) { + final var eventLinkPatientSyncRsp = response.get(); + return complete(StatusCodes.OK, + new ApiModels.ApiExtendedLinkInfo(eventLinkPatientSyncRsp.stan(), + eventLinkPatientSyncRsp.linkInfo(), + eventLinkPatientSyncRsp.externalLinkCandidateList()), + Jackson.marshaller()); + } else { + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } @@ -174,7 +175,7 @@ static Route proxyPatchCrUpdateField( Jackson.marshaller()); } } else { - return complete(StatusCodes.IM_A_TEAPOT); + return complete(ApiModels.getHttpErrorResponse(StatusCodes.IM_A_TEAPOT)); } })); } diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java index 90385bf93..b9b389ca0 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/BackEnd.java @@ -8,14 +8,12 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import io.vavr.control.Either; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.config.Configurator; import org.jembi.jempi.AppConfig; import org.jembi.jempi.libmpi.LibMPI; -import org.jembi.jempi.libmpi.LibMPIClientInterface; import org.jembi.jempi.libmpi.MpiGeneralError; import org.jembi.jempi.shared.kafka.MyKafkaProducer; import org.jembi.jempi.shared.models.*; @@ -92,7 +90,11 @@ private void openMPI(final boolean useDGraph) { AppConfig.KAFKA_BOOTSTRAP_SERVERS, "CLIENT_ID_LINKER-" + UUID.randomUUID()); } else { - libMPI = new LibMPI(String.format(Locale.ROOT, "jdbc:postgresql://%s:%d/%s", AppConfig.POSTGRESQL_IP, AppConfig.POSTGRESQL_PORT, AppConfig.POSTGRESQL_DATABASE), + libMPI = new LibMPI(String.format(Locale.ROOT, + "jdbc:postgresql://%s:%d/%s", + AppConfig.POSTGRESQL_IP, + AppConfig.POSTGRESQL_PORT, + AppConfig.POSTGRESQL_DATABASE), AppConfig.POSTGRESQL_USER, AppConfig.POSTGRESQL_PASSWORD, AppConfig.KAFKA_BOOTSTRAP_SERVERS, @@ -109,7 +111,7 @@ private void openMPI(final boolean useDGraph) { public Receive createReceive() { return newReceiveBuilder().onMessage(AsyncLinkInteractionRequest.class, this::asyncLinkInteractionHandler) .onMessage(SyncLinkInteractionRequest.class, this::syncLinkInteractionHandler) - .onMessage(SyncLinkInteractionToGidRequest.class, this::syncLinkInteractionToGidHandler) +// .onMessage(SyncLinkInteractionToGidRequest.class, this::syncLinkInteractionToGidHandler) .onMessage(CalculateScoresRequest.class, this::calculateScoresHandler) .onMessage(TeaTimeRequest.class, this::teaTimeHandler) .onMessage(WorkTimeRequest.class, this::workTimeHandler) @@ -138,15 +140,36 @@ private Behavior crFind(final CrFindRequest req) { return Behaviors.same(); } + private Behavior crUpdateField(final CrUpdateFieldRequest req) { + final var result = LinkerCR.crUpdateField(libMPI, req.crUpdateFields); + req.replyTo.tell(new CrUpdateFieldResponse(result)); + return Behaviors.same(); + } + private Behavior crRegister(final CrRegisterRequest req) { final var result = LinkerCR.crRegister(libMPI, req.crRegister); req.replyTo.tell(new CrRegisterResponse(result)); return Behaviors.same(); } - private Behavior crUpdateField(final CrUpdateFieldRequest req) { - final var result = LinkerCR.crUpdateField(libMPI, req.crUpdateFields); - req.replyTo.tell(new CrUpdateFieldResponse(result)); + private Behavior syncLinkInteractionHandler(final SyncLinkInteractionRequest request) { + final var listLinkInfo = + LinkerDWH.linkInteraction(libMPI, + new Interaction(null, + request.link.sourceId(), + request.link.uniqueInteractionData(), + request.link.demographicData()), + request.link.externalLinkRange(), + request.link.matchThreshold() == null + ? AppConfig.LINKER_MATCH_THRESHOLD + : request.link.matchThreshold()); + request.replyTo.tell(new SyncLinkInteractionResponse(request.link.stan(), + listLinkInfo.isLeft() + ? listLinkInfo.getLeft() + : null, + listLinkInfo.isRight() + ? listLinkInfo.get() + : null)); return Behaviors.same(); } @@ -174,22 +197,8 @@ private Behavior asyncLinkInteractionHandler(final AsyncLinkInteraction }); } - private Behavior syncLinkInteractionHandler(final SyncLinkInteractionRequest request) { - final var listLinkInfo = - LinkerDWH.linkInteraction(libMPI, - request.link.interaction(), - request.link.externalLinkRange(), - request.link.matchThreshold()); - request.replyTo.tell(new SyncLinkInteractionResponse(request.link.stan(), - listLinkInfo.isLeft() - ? listLinkInfo.getLeft() - : null, - listLinkInfo.isRight() - ? listLinkInfo.get() - : null)); - return Behaviors.same(); - } +/* private Behavior syncLinkInteractionToGidHandler(final SyncLinkInteractionToGidRequest request) { final LinkInfo linkInfo; final var interaction = request.link.interaction(); @@ -229,6 +238,7 @@ private Behavior syncLinkInteractionToGidHandler(final SyncLinkInteract request.replyTo.tell(new SyncLinkInteractionToGidResponse(request.link.stan(), linkInfo)); return Behaviors.same(); } +*/ private Behavior workTimeHandler(final WorkTimeRequest request) { LOGGER.info("WORK TIME"); @@ -343,7 +353,7 @@ public record CalculateScoresResponse( } public record SyncLinkInteractionRequest( - LinkInteractionSyncBody link, + ApiModels.LinkInteractionSyncBody link, ActorRef replyTo) implements Request { } @@ -354,7 +364,7 @@ public record SyncLinkInteractionResponse( } public record SyncLinkInteractionToGidRequest( - LinkInteractionToGidSyncBody link, + ApiModels.LinkInteractionToGidSyncBody link, ActorRef replyTo) implements Request { } diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java index afa60d2b1..994f1f5b0 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerDWH.java @@ -118,6 +118,7 @@ static Either> linkInteraction( final Interaction interaction, final ExternalLinkRange externalLinkRange, final float matchThreshold_) { + if (!CustomLinkerDeterministic.canApplyLinking(interaction.demographicData())) { libMPI.startTransaction(); if (CustomLinkerDeterministic.DETERMINISTIC_DO_MATCHING || CustomLinkerProbabilistic.PROBABILISTIC_DO_MATCHING) { diff --git a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerProbabilistic.java b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerProbabilistic.java index 7f9e8a3b7..1b42fea09 100644 --- a/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerProbabilistic.java +++ b/JeMPI_Apps/JeMPI_Linker/src/main/java/org/jembi/jempi/linker/backend/LinkerProbabilistic.java @@ -1,6 +1,5 @@ package org.jembi.jempi.linker.backend; -import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.similarity.JaccardSimilarity; import org.apache.commons.text.similarity.JaroWinklerSimilarity; @@ -8,7 +7,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jembi.jempi.shared.models.CustomMU; -import org.jembi.jempi.shared.utils.AppUtils; import java.util.List; import java.util.stream.IntStream; @@ -202,37 +200,21 @@ private static List computeWeights(final int n) { if (n % 2 == 0) { final var k = n / 2; final var z = 1.0F / k; - final var w = IntStream.range(0, n) - .mapToDouble(i -> abs(1.0 - (z * i))) - .boxed() - .map(Double::floatValue) - .toList(); - if (LOGGER.isDebugEnabled()) { - try { - LOGGER.debug("{}", AppUtils.OBJECT_MAPPER.writeValueAsString(w)); - } catch (JsonProcessingException e) { - LOGGER.error(e.getLocalizedMessage(), e); - } - } - return w; + return IntStream.range(0, n) + .mapToDouble(i -> abs(1.0 - (z * i))) + .boxed() + .map(Double::floatValue) + .toList(); } else { final var k = (n + 1) / 2; final var z = 1.0F / k; - final var w = IntStream.range(0, n) - .mapToDouble(i -> abs(1.0 - (z * (i < k - ? i - : i + 1)))) - .boxed() - .map(Double::floatValue) - .toList(); - if (LOGGER.isDebugEnabled()) { - try { - LOGGER.debug("{}", AppUtils.OBJECT_MAPPER.writeValueAsString(w)); - } catch (JsonProcessingException e) { - LOGGER.error(e.getLocalizedMessage(), e); - } - } - return w; + return IntStream.range(0, n) + .mapToDouble(i -> abs(1.0 - (z * (i < k + ? i + : i + 1)))) + .boxed() + .map(Double::floatValue) + .toList(); } }