Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messeges are getting merged #566

Open
IDUN-BogdanPi opened this issue Apr 16, 2024 · 1 comment
Open

Messeges are getting merged #566

IDUN-BogdanPi opened this issue Apr 16, 2024 · 1 comment

Comments

@IDUN-BogdanPi
Copy link

Hi,

I am using Kinesis Producer for Java:

			<dependency>
				<groupId>com.amazonaws</groupId>
				<artifactId>amazon-kinesis-producer</artifactId>
				<version>0.15.8</version>
			</dependency>

This is my send-message code snippet:

            String json = objectMapper.writeValueAsString(msg);
            log.info("Sending notification to user: {}, message :{}", msg.getPartitioningKey(), json);
            byte[] messageBytes = objectMapper.writeValueAsString(msg).getBytes(StandardCharsets.UTF_8);
            ByteBuffer data = ByteBuffer.wrap(messageBytes);
            ListenableFuture<UserRecordResult> future = kinesisProducer.addUserRecord(STREAM_NAME, msg.getPartitioningKey(), data);

            Futures.addCallback(future, new FutureCallback<>() {

The log message produced is:

Sending notification to user: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3, message :{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713264772502","status":"NOT_STARTED","timestamp":1713264772502,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}

But this is what it ends up as in Kinesis. The problem seems to appear when sending messages with different partitioning keys. They end up "merged". As you can see in the example below, this should've been 2 distinct json messages.

In the log messages, the partitioning key is correctly: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3
While on Kinesis it seems to be: a

image
����
$03eadfed-b2ab-45e8-8b8c-54baa6fd27f3
$026e854e-4bac-41ae-97b3-3b39c66cef89�������{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713261592840","status":"PROCESSING","timestamp":1713264766170,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}��������{"idunId":"026e854e-4bac-41ae-97b3-3b39c66cef89","deviceId":"49-44-55-4E-00-02","action":"liveStreamInsights","recordingId":"1713264702302","partitioningKey":"026e854e-4bac-41ae-97b3-3b39c66cef89","raw_eeg":[-- removed for brevity ---]}DZ	�@�_@_+q�``}3
image

Any idea what might be the problem?

Thanks for you help!

@lbourdages
Copy link

How do you create the Producer? There's a setting about aggregation that is set to true by default.

You can disable it like so:

KinesisProducerConfiguration config = new KinesisProducerConfiguration().setAggregationEnabled(false);

return new KinesisProducer(config);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants