Skip to content

Commit

Permalink
Merge branch 'release/1.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Jun 16, 2023
2 parents 4ce8dd2 + 6af1c7a commit 5d01a1c
Show file tree
Hide file tree
Showing 34 changed files with 148 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ updates:
directory: "/"
schedule:
interval: daily
open-pull-requests-limit: 10
open-pull-requests-limit: 15
default-labels:
- "Type: dependencies"
4 changes: 2 additions & 2 deletions .github/workflows/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ jobs:

# Build
- name: Build with Maven
run: ./mvnw clean verify -U -B -T4
run: ./mvnw clean verify -U -B -T4 -ntp

# itest
- name: Run itest
run: ./mvnw integration-test failsafe:verify -Pitest -U -B -T4
run: ./mvnw integration-test failsafe:verify -Pitest -U -B -T4 -ntp

- name: Upload coverage to Codecov
if: github.event_name == 'push' && github.actor != 'dependabot[bot]'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ jobs:

# Build
- name: Build with Maven
run: ./mvnw clean verify -U -B -T4
run: ./mvnw clean verify -U -B -T4 -ntp

# Publish release
- name: Deploy a new release version to Maven Central
run: ./mvnw clean deploy -B -DskipTests -DskipExamples -Prelease -Dgpg.keyname="${{ secrets.GPG_KEYNAME }}" -Dgpg.passphrase="${{ secrets.GPG_PASSPHRASE }}"
run: ./mvnw clean deploy -B -ntp -DskipTests -DskipExamples -Prelease -Dgpg.keyname="${{ secrets.GPG_KEYNAME }}" -Dgpg.passphrase="${{ secrets.GPG_PASSPHRASE }}"
env:
OSS_CENTRAL_USERNAME: "${{ secrets.SONATYPE_USERNAME }}"
OSS_CENTRAL_PASSWORD: "${{ secrets.SONATYPE_PASSWORD }}"
Expand Down
6 changes: 3 additions & 3 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.4/apache-maven-3.8.4-bin.zip
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.2/apache-maven-3.9.2-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar
2 changes: 1 addition & 1 deletion bom/bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-root</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
6 changes: 3 additions & 3 deletions bom/parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-root</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand All @@ -14,7 +14,7 @@
<name>POM / Parent</name>

<properties>
<spring-boot.version>2.7.5</spring-boot.version>
<spring-boot.version>2.7.12</spring-boot.version>
<camunda-7.version>7.18.0</camunda-7.version>
<camunda-bpm-data.version>1.2.8</camunda-bpm-data.version>
<shedlock.version>4.38.0</shedlock.version>
Expand Down Expand Up @@ -80,7 +80,7 @@
<dependency>
<groupId>org.mockito.kotlin</groupId>
<artifactId>mockito-kotlin</artifactId>
<version>4.0.0</version>
<version>5.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion example/axon/flight-axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/axon/hotel-axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/axon/reservation-axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/itest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<!-- It is ok to use the extension parent, there is only Kotlin and some managed deps defined there -->
<relativePath>../bom/parent/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion example/spring-cloud/flight-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/spring-cloud/hotel-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/spring-cloud/reservation-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion extension/axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../bom/parent/pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion extension/cockpit-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../bom/parent/pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion extension/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath>../../bom/parent/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class BatchCorrelationProcessor(
.filterNot { it.correlationMessages.isEmpty() }
.forEach { batch ->
try {
logger.debug { "Correlating batch ${batch.correlationHint} containing ${batch.correlationMessages.size} messages." }
logger.debug { "Correlating batch ${batch.groupingKey} containing ${batch.correlationMessages.size} messages." }
val result: CorrelationBatchResult = correlationService.correlateBatch(batch)
logger.debug { "Processing result for batch ${batch.correlationHint}: $result" }
logger.debug { "Processing result for batch ${batch.groupingKey}: $result" }
when (result) {
is CorrelationBatchResult.Success -> {
persistenceService.success(successfulCorrelations = result.successfulCorrelations)
Expand All @@ -49,7 +49,7 @@ class BatchCorrelationProcessor(
)
)
correlationMetrics.incrementError()
logger.trace(e) { "Error processing for batch ${batch.correlationHint}" }
logger.trace(e) { "Error processing for batch ${batch.groupingKey}" }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package io.holunda.camunda.bpm.correlate.correlation
import io.holunda.camunda.bpm.correlate.event.CorrelationHint

/**
* Correlation batch is a collection of messages with the same correlation hint.
* @param correlationHint target of the correlation.
* Correlation batch is a collection of messages with the same grouping key.
* @param groupingKey key for this batch.
* @param correlationMessages messages that should be correlated.
*/
data class CorrelationBatch(
val correlationHint: CorrelationHint,
val groupingKey: Any,
val correlationMessages: List<CorrelationMessage>
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.holunda.camunda.bpm.correlate.correlation.impl

import io.holunda.camunda.bpm.correlate.correlation.CorrelationMessage

/**
* Correlation message comparator taking message timestamp as a reference.
*/
class MessageTimestampCorrelationMessageComparator : Comparator<CorrelationMessage> {
override fun compare(left: CorrelationMessage, right: CorrelationMessage): Int =
left.messageMetaData.messageTimestamp.compareTo(right.messageMetaData.messageTimestamp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ data class MessageMetaData(
* Message id.
*/
val messageId: String,
/**
* Timestamp of the message.
*/
val messageTimestamp: Instant,
/**
* Payload type info.
*/
Expand All @@ -32,6 +36,7 @@ data class MessageMetaData(
*/
constructor(snippet: MessageMetaDataSnippet) : this(
messageId = requireNotNull(snippet.messageId) { "Message id must not be null" },
messageTimestamp = requireNotNull(snippet.messageTimestamp) { "Message timestamp must not be null" },
payloadTypeInfo = requireNotNull(snippet.payloadTypeInfo) { "Payload type info must not be null" },
payloadEncoding = requireNotNull(snippet.payloadEncoding) { "Payload encoding must be set" },
timeToLive = snippet.timeToLive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,44 +37,14 @@ data class MessageMetaDataSnippet(
/**
* Reducer for snippets.
*/
fun reduce(acc: MessageMetaDataSnippet, other: MessageMetaDataSnippet): MessageMetaDataSnippet =
acc.let {
if (other.messageId != null) {
it.copy(messageId = other.messageId)
} else {
it
}
}.let {
if (other.timeToLive != null) {
it.copy(timeToLive = other.timeToLive)
} else {
it
}
}.let {
if (other.expiration != null) {
it.copy(expiration = other.expiration)
} else {
it
}
}.let {
if (other.payloadEncoding != null) {
it.copy(payloadEncoding = other.payloadEncoding)
} else {
it
}
}.let {
if (other.payloadTypeInfo != TypeInfo.UNKNOWN && it.payloadTypeInfo.overwritePossible) {
it.copy(payloadTypeInfo = other.payloadTypeInfo)
} else {
it
}
}.let {
if (other.messageTimestamp != null) {
it.copy(messageTimestamp = other.messageTimestamp)
} else {
it
}
}
fun reduce(acc: MessageMetaDataSnippet, other: MessageMetaDataSnippet): MessageMetaDataSnippet = MessageMetaDataSnippet(
messageId = other.messageId ?: acc.messageId,
timeToLive = other.timeToLive ?: acc.timeToLive,
expiration = other.expiration ?: acc.expiration,
payloadEncoding = other.payloadEncoding ?: acc.payloadEncoding,
payloadTypeInfo = if(other.payloadTypeInfo != TypeInfo.UNKNOWN && acc.payloadTypeInfo.overwritePossible) other.payloadTypeInfo else acc.payloadTypeInfo,
messageTimestamp = other.messageTimestamp ?: acc.messageTimestamp
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ data class CorrelationHint(
/**
* Flag if only process start events should be considered.
*/
val processStart: Boolean = false
val processStart: Boolean = false,
/**
* Any value used for grouping messages into batches. Messages with equal groupingKeys will end up in the same batch.
* Defaults to correlation variables.
*/
val groupingKey: Any = correlationVariables
) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class DefaultMessagePersistenceService(
)
val messageMetaData = MessageMetaData(
messageId = entity.id,
messageTimestamp = entity.inserted,
payloadTypeInfo = typeInfo,
timeToLive = entity.timeToLiveDuration,
payloadEncoding = entity.payloadEncoding,
Expand Down Expand Up @@ -85,10 +86,10 @@ class DefaultMessagePersistenceService(
// build batches
val batches: List<CorrelationBatch> = messagesWithRetries
.keys
.groupBy(singleMessageCorrelationStrategy.correlationSelector())
.groupBy { singleMessageCorrelationStrategy.correlationSelector().invoke(it).groupingKey }
.map {
CorrelationBatch(
correlationHint = it.key,
groupingKey = it.key,
correlationMessages = it.value.sortedWith(singleMessageCorrelationStrategy.correlationMessageSorter())
)
}
Expand Down Expand Up @@ -163,7 +164,7 @@ class DefaultMessagePersistenceService(
messageRepository.insert(
MessageEntity(
id = metaData.messageId,
inserted = clock.instant(),
inserted = metaData.messageTimestamp,
payloadEncoding = metaData.payloadEncoding,
payloadTypeNamespace = metaData.payloadTypeInfo.namespace,
payloadTypeName = metaData.payloadTypeInfo.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ fun emptyMessage() = ObjectMessage(mapOf(), "")

fun emptyMessageMetadata() = MessageMetaData(
messageId = messageId(),
messageTimestamp = Instant.now(),
payloadTypeInfo = TypeInfo.UNKNOWN,
payloadEncoding = "",
timeToLive = null,
Expand All @@ -30,6 +31,7 @@ fun rejectingFilter() = RejectingMessageFilter()
fun runningInstanceHint(processInstanceId: String) = CorrelationHint(
processDefinitionId = "DEF-ID",
processInstanceId = processInstanceId,
groupingKey = processInstanceId
)

fun correlationMessage() = CorrelationMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ internal class BatchCorrelationProcessorTest {

private val batches = arrayOf(
CorrelationBatch(
correlationHint = runningInstanceHint("instance1"),
groupingKey = runningInstanceHint("instance1").groupingKey,
correlationMessages = listOf(correlationMessage(), correlationMessage(), correlationMessage())
),
CorrelationBatch(
correlationHint = runningInstanceHint("instance2"),
groupingKey = runningInstanceHint("instance2").groupingKey,
correlationMessages = listOf(correlationMessage(), correlationMessage())
)
)
Expand Down Expand Up @@ -98,4 +98,4 @@ internal class BatchCorrelationProcessorTest {

}

}
}
Loading

0 comments on commit 5d01a1c

Please sign in to comment.