Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle map and array types binary record data #3434

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

umeshdangat
Copy link

No description provided.

@leonardBang
Copy link
Contributor

Thanks for your contribution @umeshdangat , @loserwang1024 Would you like to help review this PR?

@umeshdangat
Copy link
Author

I was writing a custom flink cdc source connector. Input data has complex nested types. That is how I ran into the issue of BinaryRecordData not supporting complex types such as ARRAY and MAP, including AbstractBinaryWriter not supporting those. Most of the code here is copied from flink source (including relevant tests).

I was also trying to get my data eventually into paimon, using paimon sink. Thus I have made relevant changes to PaimonWriterHelper as well to handle complex types.

Lastly, I came across a bug due to circular dependency between DataFieldSerializer, DataTypeSerializer, and RowTypeSerializer classes that causes NPE for certain RowTypes like nested row type. I have baked that change in this PR but also have a separate one here with more explanation

Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @umeshdangat , thanks for your hard job. The PR is so big and critical that I just review part of BinaryArrayData. Here is some comments.

// such as is string, map, etc.
return 8;
case TIMESTAMP_WITH_TIME_ZONE:
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why support TIMESTAMP_WITH_LOCAL_TIME_ZONE but not TIMESTAMP_WITH_TIME_ZONE?

/** Offset for Arrays. */
private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that an array only contain one type, why should we get all kind of ${type}_array_offset? I think one offset is enough.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we maintain different offsets for each primitive type is because our implementation provides methods to convert the binary data into specific primitive type arrays, such as toShortArray() and toIntArray(). Each primitive type array has a different base offset in memory, which is why we initialize the respective offsets:

  • SHORT_ARRAY_OFFSET for short[]
  • INT_ARRAY_OFFSET for int[]
  • And similarly for other types like boolean, long, float, and double.
    These offsets are used to accurately copy the binary data into the appropriate primitive type arrays.


public BinaryArrayData() {}

private void assertIndexIsValid(int ordinal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about rename ordinal to index? Same name style with BinaryRecordData would be easier to read.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

final int size = BinarySegmentUtils.getInt(segments, offset);
assert size >= 0 : "size (" + size + ") should >= 0";

this.size = size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set sizeInBytes, offset and segments by super.pointTo

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


/**
* A binary implementation of {@link ArrayData} which is backed by {@link MemorySegment}s.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more description link what BinaryRecordData does. For example, he first 4 bytes stores size(or length) of array....

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class);

public static int calculateHeaderInBytes(int numFields) {
return 4 + ((numFields + 31) / 32) * 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what it means? Could you please give me more details?

  1. what's the meaning of each data of this formula?
  2. why not ((4 +numFields + 31) / 32) * 4;
  3. you mean each header is 4 bytes? But in calculateFixLengthPartSize, BIGINT or DOUBLE will return 8.

Copy link
Author

@umeshdangat umeshdangat Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per my understanding:
The calculateHeaderInBytes method is used to determine the size of the header for a binary array in bytes. The header consists of the size of the array AND a bitmap indicating which elements are null.

  • Fixed Size Path: The 4 at the beginning represents the number of bytes used to store the size of the array. This is a fixed overhead for any binary array, regardless of the number of elements.
  • Null Bitmap: The second part of the calculation is ((numFields + 31) / 32) * 4. This part determines the number of bytes required to store the null bitmap.

Why ((numFields + 31) / 32) * 4?:

numFields: This is the number of elements in the array.
"+ 31 and / 32": This is a mathematical trick to ensure that we have enough bits to represent all the elements in the array. Each element in the array needs 1 bit in the null bitmap.
" +31" ensures that we round up to the next multiple of .32 (to ensure word boundary) so we always allocate enough bits
"/32" converts the number of elements to number of 32 bit words needed.
Example:
numFields = 64

  1. fixed part will have 4 bytes with integer value 64
  2. (64+31)/32 = 95/32 = 2.96875 = 3 (rounded up)
    then convert words to bytes: 3*4 = 12 bytes for bitmap

so for example 64 entries we have
4 bytes fixed size plus
12 bytes for bitmap (that is 12*8 = 96 bits so space for upto 96 entries)

Overall the formula at top of this file

[size(int)] + [null bits(4-byte word boundaries)] + [values or offset&length] + [variable length part].

represents how data is stored.
Example 1: array of 64 integers
[4 bytes (size)] + [12 bytes (null bits)] + [256 bytes (values)] = 272 bytes total

Example 2: array of 64 variable-length elements:
[4 bytes (size)] + [12 bytes (null bits)] + [64 * 8 bytes (offset & length)] + [variable length data]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added docString to this method.

checkNoNull();
short[] values = new short[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, SHORT_ARRAY_OFFSET, size * 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not get SHORT_ARRAY_OFFSET when used rather than when initialization?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you suggesting removal of static variable SHORT_ARRAY_OFFSET and computing it directly instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wonder why design like it.

@umeshdangat
Copy link
Author

@loserwang1024 thanks for looking into the PR.

I think that BinaryRecordData class is for the most part a port from flink's source BinaryRowData.

Following that code I tried to keep my code consistent with flink's BinaryArrayData and BinaryMapData.

@loserwang1024
Copy link
Contributor

@umeshdangat , thanks for you explain. More comment detail in java docs is always a good thing.

Copy link

github-actions bot commented Oct 7, 2024

This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs.

@github-actions github-actions bot added the Stale label Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants