Skip to content

Commit

Permalink
Merge pull request #301 from navikt/sf-pdl-kafka
Browse files Browse the repository at this point in the history
Sf pdl kafka
  • Loading branch information
nocturnalnematode authored Sep 18, 2024
2 parents cb6ebaa + 9896be1 commit 2f2df9e
Show file tree
Hide file tree
Showing 112 changed files with 4,215 additions and 75 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ install.cmd

# misc
.*.sw?
*.vim
TAGS
tags
1 change: 1 addition & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"plugins": ["prettier-plugin-apex"],
"trailingComma": "none",
"printWidth": 120,
"singleQuote": true,
Expand Down
87 changes: 58 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,44 +1,73 @@
# crm-platform-integration

This package contains the `KafkaMessage__c` sObject and related Apex logic in order to receive JSON payloads representing
changes from the Kafka CDC pipeline. A trigger on the `KafkaMessage__c` sObject will create enqueue asynchronous processing
request through the asynchronous processing framework that is part of the crm-platform-base package.
This package contains the `KafkaMessage__c` sObject and related Apex logic in
order to receive JSON payloads representing changes from the Kafka CDC pipeline.
A trigger on the `KafkaMessage__c` sObject will enqueue asynchronous processing
requests through the asynchronous processing framework that is part of the
**crm-platform-base** package.

## Custom Metadata Bindings

The framework depends on two custom metadata objects in order to dynamically instruct the application how to handle the message payload.
The framework depends on two custom metadata objects in order to dynamically
instruct the application how to handle the message payload.

### AsyncRequestHandlerBinding_mdt
### `AsyncRequestHandlerBinding__mdt`

Binding between the asynchronous processing request (`AsyncRequest__c`) type created by this package and the `KafkaMessageAsyncJob` class
in order to instruct the asynchronous processing framework to call the `KafkaMessageAsyncJob` class in order to handle
requests originating from this package.
Binding between the asynchronous processing request (`AsyncRequest__c`) type
created by this package and the `KafkaMessageAsyncJob` class in order to
instruct the asynchronous processing framework to call the
`KafkaMessageAsyncJob` class in order to handle requests originating from this
package.

### KafkaMessageHandlerBinding_mdt
### `KafkaMessageHandlerBinding__mdt`

Binding between the KafkaMessage**c.Topic**c field and an Apex handler class for a given Topic in order to instruct the
application on how to handle a message payload related to a specific Kafka topic.
Binding between the `KafkaMessage__c.Topic__c` field and an Apex handler class
for a given Topic in order to instruct the application on how to handle a
message payload related to a specific Kafka topic.

## Execution Flow

1. An external application inserts a record or batch or records into the KafkaMessage\_\_c sObject
2. A trigger on the KafkaMessage**c object insert one record into the AsyncRequest**c object for each batch of up to 200
KafkaMessage\_\_c records created in a single transaction, representing a a request for asynchronous processing of the new
messages.
3. When the asynchronous processing framework processes the request, the custom metadata binding `AsyncRequestHandlerBinding_mdt`
instructs the application to handle the request using the `KafkaMessageAsyncJob` Apex class. - If no `AsyncRequestHandlerBinding_mdt` record is found corresponding to the "Kafka Message" AsyncRequestType**c value,
the `AsyncRequest**c` record is updated with an error.
4. The `KafkaMessageAsyncJob` queries for the relevant KafkaMessage**c records by the Ids stored in the async processing
request and queries the `KafkaMessageHandlerBinding_mdt` custom metadata object for registered bindings between `KafkaMessage**c.Topic**c`
values and corresponding Apex classes to handle payloads corresponding to Topic**c values. - If no `KafkaMessageHandlerBinding_mdt` record is found corresponding to the `Topic__c` value, the relevant
`KafkaMessage__c` record is updated with an error. The message kan then be retried after the error has been addressed.
5. The Apex class registered by the `KafkaMessageHandlerBinding_mdt` binding executes the business logic corresponding to the
`Topic__c` value. - If an execption occurs, the relevant`KafkaMessage__c` record is updated with an error. The message kan then be retried
after the error has been addressed.

1. An external application inserts a record or batch or records into the
`KafkaMessage__c` sObject
2. A trigger on the `KafkaMessage__c` object insert one record into the
`AsyncRequest__c` object for each batch of up to 200 `KafkaMessage__c`
records created in a single transaction, representing a request for
asynchronous processing of the new messages.
3. When the asynchronous processing framework processes the request, the custom
metadata binding `AsyncRequestHandlerBinding__mdt` instructs the application
to handle the request using the `KafkaMessageAsyncJob` Apex class. - If no
`AsyncRequestHandlerBinding__mdt` record is found corresponding to the "Kafka
Message" `AsyncRequestType__c` value, the `AsyncRequest__c` record is updated
with an error.
4. The `KafkaMessageAsyncJob` queries for the relevant `KafkaMessage__c` records
by the Ids stored in the async processing request and queries the
`KafkaMessageHandlerBinding__mdt` custom metadata object for registered
bindings between `KafkaMessage__c.Topic__c` values and corresponding Apex
classes to handle payloads corresponding to `Topic__c` values. - If no
`KafkaMessageHandlerBinding__mdt` record is found corresponding to the
`Topic__c` value, the relevant `KafkaMessage__c` record is updated with an
error. The message can then be retried after the error has been addressed.
i. If `KafkaMessageHandlerBinding__mdt.SandboxOverrideTopic__c` exists, it is
its value which will correspond with `KafkaMessage__c.Topic__c` in scratch
orgs and sandboxes. `KafkaMessageHandlerBinding__mdt.Topic__c` will in this
case remain unused.
5. The Apex class registered by the `KafkaMessageHandlerBinding__mdt` binding
executes the business logic corresponding to the `Topic__c` value. If an
exception occurs, the relevant `KafkaMessage__c` record is updated with an
error. The message can then be retried after the error has been addressed.

## Synchronous kafka message handling

To process incoming kafka messages in a synchronous context the following pattern should be followed:
1. Definition of a separate platform event with the exact data model as i.e. defined [here](https://github.com/navikt/crm-platform-oppgave/tree/master/force-app/main/default/objects/Kafka_Oppgave_Event__e).
To process incoming kafka messages in a synchronous context the following
pattern should be followed:
1. Definition of a separate platform event with the exact data model as i.e.
defined
[here](https://github.com/navikt/crm-platform-oppgave/tree/master/force-app/main/default/objects/Kafka_Oppgave_Event__e).
2. Create a trigger and separate trigger handler to process the incoming events.
3. The processing itself should be implemented using the IKafkaMessageConsumer interface such that error handling can be performed easily storing failed events as KafkaMessage__c records. An example of this can be veiwed [here](https://github.com/navikt/crm-platform-oppgave/blob/master/force-app/main/default/classes/kafka/CRM_KafkaOppgaveEventHandler.cls) where *doEventTransform* performs the transformation from the custom event to the KafkaMessage__c model and the failed events are stored as KafkaMessage__c records in an error status.
3. The processing itself should be implemented using the `IKafkaMessageConsumer`
interface such that error handling can be performed easily storing failed
events as `KafkaMessage__c` records. An example of this can be viewed
[here](https://github.com/navikt/crm-platform-oppgave/blob/master/force-app/main/default/classes/kafka/CRM_KafkaOppgaveEventHandler.cls)
where *doEventTransform* performs the transformation from the custom event to
the `KafkaMessage__c` model and the failed events are stored as
`KafkaMessage__c` records in an error status.
2 changes: 1 addition & 1 deletion dummy-data/ManageTestData.apex
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
//Start executing AsyncRequests => Trigger handle the KafkaMessages
Database.executeBatch(new AsyncRequestBatchable(),50);
Database.executeBatch(new AsyncRequestBatchable(),50);
34 changes: 32 additions & 2 deletions dummy-data/kafkaMessages/KafkaMessages.json

Large diffs are not rendered by default.

43 changes: 35 additions & 8 deletions force-app/main/default/classes/KafkaMessageHandler.cls
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
public inherited sharing class KafkaMessageHandler extends MyTriggers {

private final static Boolean IS_SANDBOX = [SELECT IsSandbox FROM Organization][0].IsSandbox;

/**
* Finds and executes and filter class bound to the topic through KafkaMessageFilterBinding__mdt
*/
Expand Down Expand Up @@ -88,6 +91,35 @@ public inherited sharing class KafkaMessageHandler extends MyTriggers {
}
}

/**
* Fetches a map of handler bindings based on their topic, to be used to set
* ApexJobType__c and Priority__c.
* @param topicSet topics gathered from inserted kafka messages, which may
* correspond to either Topic__c or SandboxOverrideTopic__c in
* KafkaMessageHandlerBinding__mdt.
* @return Mapping from topic to binding.
*/
private static Map<String, KafkaMessageHandlerBinding__mdt> getHandlerBindingsByTopic(Set<String> topicSet) {
Map<String, KafkaMessageHandlerBinding__mdt> handlerBindingByTopic = new Map<String, KafkaMessageHandlerBinding__mdt>();
for (KafkaMessageHandlerBinding__mdt binding : [
SELECT Id, Topic__c, Priority__c, ApexJobType__c
FROM KafkaMessageHandlerBinding__mdt
WHERE Topic__c = :topicSet
]) {
handlerBindingByTopic.put(binding.Topic__c, binding);
}
if (IS_SANDBOX) {
for (KafkaMessageHandlerBinding__mdt binding : [
SELECT Id, SandboxOverrideTopic__c, Priority__c, ApexJobType__c
FROM KafkaMessageHandlerBinding__mdt
WHERE SandboxOverrideTopic__c = :topicSet
]) {
handlerBindingByTopic.put(binding.SandboxOverrideTopic__c, binding);
}
}
return handlerBindingByTopic;
}

/**
* Instantiates AsyncRequest__c records, including setting values for the CRM_Priority__c field based on
* priority associated with the related Kafka Message Handler Binding record for a given messages CRM_Topic__c value
Expand All @@ -99,14 +131,9 @@ public inherited sharing class KafkaMessageHandler extends MyTriggers {
if (!KafkaIntegrationCustomSetting.processingEnabled || recordIdsToProcessByTopic.size() == 0)
return asyncRequests;

Map<String, KafkaMessageHandlerBinding__mdt> handlerBindingByTopic = new Map<String, KafkaMessageHandlerBinding__mdt>();
for (KafkaMessageHandlerBinding__mdt binding : [
SELECT Id, Topic__c, Priority__c, ApexJobType__c
FROM KafkaMessageHandlerBinding__mdt
WHERE Topic__c = :recordIdsToProcessByTopic.keySet()
]) {
handlerBindingByTopic.put(binding.Topic__c, binding);
}
Map<String, KafkaMessageHandlerBinding__mdt> handlerBindingByTopic = getHandlerBindingsByTopic(
recordIdsToProcessByTopic.keySet()
);

for (String topic : recordIdsToProcessByTopic.keySet()) {
if (!handlerBindingByTopic.containsKey(topic)) {
Expand Down
34 changes: 34 additions & 0 deletions force-app/main/default/classes/KafkaMessageHandlerTest.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Tests methods specific to KafkaMessageHandler. See KafkaMessageServiceTest
* for similar tests.
*/
@IsTest
public with sharing class KafkaMessageHandlerTest {
/**
* Tests that the topic given in SandboxOverrideTopic__c is picked up by the
* handler. KafkaMessageHandlerBinding__mdt can not be modified in tests, so
* we assume the values are set correctly; CRM_Priority__c is set to 20 by
* default, but to 19 where the SandboxOverrideTopic__c matches the topic
* given here.
*/
@IsTest
private static void handleSandboxOverrideTopicPriority() {
// These topics will only be picked up in sandboxes, so only run there.
if ([SELECT IsSandbox FROM Organization][0].IsSandbox) {
KafkaMessage__c msg = new KafkaMessage__c(
CRM_Key__c = '1792160394037',
CRM_Topic__c = 'pdl.pdl-persondokument-tagged-v1',
CRM_Value__c = null
);
insert msg;
Test.startTest();
new KafkaMessageService(new List<KafkaMessage__c>{ msg }).handleMessages();
Test.stopTest();
System.assertEquals(
KafkaMessageService.STATUS_PROCESSED,
[SELECT Id, CRM_Status__c FROM KafkaMessage__c WHERE Id = :msg.Id LIMIT 1].CRM_Status__c
);
System.assertEquals(19, [SELECT CRM_Priority__c FROM AsyncRequest__c LIMIT 1].CRM_Priority__c);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<ApexClass xmlns="http://soap.sforce.com/2006/04/metadata">
<apiVersion>59.0</apiVersion>
<status>Active</status>
</ApexClass>
40 changes: 33 additions & 7 deletions force-app/main/default/classes/KafkaMessageService.cls
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,63 @@ public with sharing class KafkaMessageService {

private Map<String, IKafkaMessageConsumer> handlerBindings = new Map<String, IKafkaMessageConsumer>();
private List<KafkaMessage__c> messages;
private final static Boolean IS_SANDBOX = [SELECT IsSandbox FROM Organization][0].IsSandbox;

// Before Insert constructor - takes a list of Kafka Messages that don't yet have a record id
public KafkaMessageService(List<KafkaMessage__c> messages) {
this.messages = messages;
}

public void handleMessages() {
/**
* Maps each topic to its appropriate consumer.
* In prod, map every Topic__c.
* In dev, when the topic differs from that in prod, e.g. foo.bar-q2 instead
* of foo.bar, this topic is specified in SandboxOverrideTopic__c, which
* then takes precedence.
*/
private void setupHandlerBindings() {
for (KafkaMessageHandlerBinding__mdt binding : [
SELECT Id, Topic__c, ApexClass__c
SELECT Id, Topic__c, SandboxOverrideTopic__c, ApexClass__c
FROM KafkaMessageHandlerBinding__mdt
]) {
handlerBindings.put(
binding.Topic__c,
(IKafkaMessageConsumer) Type.forName(binding.ApexClass__c).newInstance()
);
if (IS_SANDBOX && String.isNotBlank(binding.SandboxOverrideTopic__c)) {
handlerBindings.put(
binding.SandboxOverrideTopic__c,
(IKafkaMessageConsumer) Type.forName(binding.ApexClass__c).newInstance()
);
} else {
handlerBindings.put(
binding.Topic__c,
(IKafkaMessageConsumer) Type.forName(binding.ApexClass__c).newInstance()
);
}
}
}

/**
* Sorts messages into their respective topics. Resets the status and error
* fields, which honestly should probably already be pending and '',
* respectively.
*/
private Map<String, List<KafkaMessage__c>> getMessagesByTopic() {
Map<String, List<KafkaMessage__c>> messagesByTopic = new Map<String, List<KafkaMessage__c>>();
for (KafkaMessage__c msg : messages) {
// Reset processing status fields
msg.CRM_Status__c = KafkaMessageService.STATUS_PENDING;
msg.CRM_ErrorMessage__c = '';

// Build map of messages by topic
if (messagesByTopic.containsKey(msg.CRM_Topic__c)) {
messagesByTopic.get(msg.CRM_Topic__c).add(msg);
} else {
messagesByTopic.put(msg.CRM_Topic__c, new List<KafkaMessage__c>{ msg });
}
}
return messagesByTopic;
}

public void handleMessages() {
setupHandlerBindings();
Map<String, List<KafkaMessage__c>> messagesByTopic = getMessagesByTopic();
List<KafkaMessage__c> messagesToUpdate = new List<KafkaMessage__c>();

for (String topic : messagesByTopic.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8" ?>
<?xml version="1.0" encoding="UTF-8"?>
<Layout xmlns="http://soap.sforce.com/2006/04/metadata">
<layoutSections>
<customLabel>false</customLabel>
Expand Down Expand Up @@ -26,6 +26,10 @@
<behavior>Required</behavior>
<field>Topic__c</field>
</layoutItems>
<layoutItems>
<behavior>Edit</behavior>
<field>SandboxOverrideTopic__c</field>
</layoutItems>
<layoutItems>
<behavior>Edit</behavior>
<field>Priority__c</field>
Expand Down Expand Up @@ -70,9 +74,9 @@
<customLabel>false</customLabel>
<detailHeading>false</detailHeading>
<editHeading>false</editHeading>
<layoutColumns />
<layoutColumns />
<layoutColumns />
<layoutColumns/>
<layoutColumns/>
<layoutColumns/>
<style>CustomLinks</style>
</layoutSections>
<showEmailCheckbox>false</showEmailCheckbox>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8" ?>
<CustomField xmlns="http://soap.sforce.com/2006/04/metadata">
<fullName>SandboxOverrideTopic__c</fullName>
<description>Overrides Topic__c when in a sandbox</description>
<externalId>false</externalId>
<fieldManageability>DeveloperControlled</fieldManageability>
<inlineHelpText>When it is not desirable that all events on the topic given in Topic__c is consumed in a sandbox, this field specifies an alternative topic.</inlineHelpText>
<label>Sandbox Override Topic</label>
<length>255</length>
<required>false</required>
<type>Text</type>
<unique>false</unique>
</CustomField>
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<columns>MasterLabel</columns>
<columns>DeveloperName</columns>
<columns>Topic__c</columns>
<columns>SandboxOverrideTopic__c</columns>
<columns>Priority__c</columns>
<columns>ApexClass__c</columns>
<columns>Description__c</columns>
Expand Down
1 change: 1 addition & 0 deletions force-app/pdl-apexTypes/classes/PDL_Adressebeskyttelse.cls
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
public with sharing class PDL_Adressebeskyttelse {

@TestVisible
public PDL_AdressebeskyttelseGradering gradering{ get;}
public PDL_Folkeregistermetadata folkeregistermetadata{ get;}
public PDL_Metadata metadata{ get;}
Expand Down
1 change: 1 addition & 0 deletions force-app/pdl-apexTypes/classes/PDL_Bostedsadresse.cls
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public with sharing class PDL_Bostedsadresse {
public Datetime gyldigFraOgMed{ get;}
public Datetime gyldigTilOgMed{ get;}
public String coAdressenavn{ get;}
@TestVisible
public PDL_Vegadresse vegadresse{ get;}
public PDL_Matrikkeladresse matrikkeladresse{ get;}
public PDL_UtenlandskAdresse utenlandskAdresse{ get;}
Expand Down
2 changes: 2 additions & 0 deletions force-app/pdl-apexTypes/classes/PDL_Doedsfall.cls
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
public with sharing class PDL_Doedsfall {

@TestVisible
public Date doedsdato{ get;}
@TestVisible
public PDL_Metadata metadata{ get;}
public PDL_Folkeregistermetadata folkeregistermetadata{ get;}

Expand Down
1 change: 1 addition & 0 deletions force-app/pdl-apexTypes/classes/PDL_Foedsel.cls
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
public with sharing class PDL_Foedsel {

public Integer foedselsaar{ get;}
@TestVisible
public Date foedselsdato{ get;}
public String foedeland{ get;}
public String foedested{ get;}
Expand Down
5 changes: 5 additions & 0 deletions force-app/pdl-apexTypes/classes/PDL_Fullmakt.cls
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
public with sharing class PDL_Fullmakt {

@TestVisible
public String motpartsPersonident{ get;}
@TestVisible
public PDL_FullmaktsRolle motpartsRolle{ get;}
@TestVisible
public String[] omraader{ get;}
@TestVisible
public Date gyldigFraOgMed{ get;}
@TestVisible
public Date gyldigTilOgMed{ get;}
public PDL_Metadata metadata{ get;}

Expand Down
Loading

0 comments on commit 2f2df9e

Please sign in to comment.