Skip to content

Commit

Permalink
Merge pull request #42 from lensesio-dev/fix/gcp-add-headers
Browse files Browse the repository at this point in the history
Adding topic, subscription and project to headers in DEFAULT mode
  • Loading branch information
andrewstevenson authored Jun 17, 2024
2 parents 0e467ed + e9dae1a commit 48f0b66
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,27 @@ public class MinimalAndMessageAttributesHeaderMapper implements HeaderMapper {

private final MinimalHeaderMapper minimalHeaderMapping = new MinimalHeaderMapper();

private static final String HEADER_PROJECT_ID = "GCPProjectId";
private static final String HEADER_TOPIC_ID = "PubSubTopicId";
private static final String HEADER_SUBSCRIPTION_ID = "PubSubSubscriptionId";

@Override
public Map<String, String> mapHeaders(final PubSubMessageData source) {
val miniMap = minimalHeaderMapping.mapHeaders(source);
val extraMap = mapExtra(source);
val headMap = source.getMessage().getAttributesMap();
return ImmutableMap.<String, String>builder()
.putAll(miniMap)
.putAll(extraMap)
.putAll(headMap)
.build();
}

private Map<String, String> mapExtra(PubSubMessageData source) {
return Map.of(
HEADER_PROJECT_ID, source.getSourcePartition().getProjectId(),
HEADER_TOPIC_ID, source.getSourcePartition().getTopicId(),
HEADER_SUBSCRIPTION_ID, source.getSourcePartition().getSubscriptionId()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

Expand All @@ -41,7 +42,7 @@ class MinimalAndMessageAttributesHeaderMapperTest {

private static final Map<String, String> HEADERS_MAP = Map.of("attr1", "value1", "attr2", "value2");

@Mock
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private PubSubMessageData pubSubMessageData;

@Mock
Expand All @@ -55,18 +56,27 @@ void setup() {
}

@Test
void testGetHeaders() {
void testMapHeaders() {

when(pubsubMessage.getPublishTime()).thenReturn(Timestamp.newBuilder().setSeconds(PUBLISH_TIME_INSTANT
.getEpochSecond()).build());
when(pubsubMessage.getAttributesMap()).thenReturn(HEADERS_MAP);
when(pubSubMessageData.getMessage()).thenReturn(pubsubMessage);
when(pubSubMessageData.getSourcePartition().getProjectId()).thenReturn("test-project");
when(pubSubMessageData.getSourcePartition().getTopicId()).thenReturn("test-topic");
when(pubSubMessageData.getSourcePartition().getSubscriptionId()).thenReturn("test-subscription");

Map<String, String> result = minimalAndMessageAttributesHeaderMapping.mapHeaders(pubSubMessageData);

assertEquals(
ImmutableMap.builder()
.put("PublishTimestamp", String.valueOf(PUBLISH_TIME_INSTANT.getEpochSecond()))
.putAll(HEADERS_MAP).build(),
.putAll(HEADERS_MAP)
.put("GCPProjectId", "test-project") // Include expected values from mapExtra
.put("PubSubTopicId", "test-topic")
.put("PubSubSubscriptionId", "test-subscription")
.build(),
result);
}

}

0 comments on commit 48f0b66

Please sign in to comment.