Skip to content

Commit

Permalink
Add support to byte array decimal fields (#97)
Browse files Browse the repository at this point in the history
Problem
=======
Address #91

Solution
========
When encountering such byte array represented "Decimal" fields, parse
them into raw buffers.

Change summary:
---------------
- Added code to parse "Decimal" type fields represented by byte arrays
(fixed length or non-fixed length) into raw buffer values for further
client side processing.
- Added two test cases verifying the added code.
- Loosen the precision check to allow values greater than 18 for byte
array represented "Decimal" fields.

Steps to Verify:
----------------
- Use the library to open a parquet file which contains a "Decimal"
field represented by a byte array whose precision is greater than 18.
- Before the change, library will throw an error saying precision cannot
be greater than 18.
- After the change, library will parse those fields to their raw buffer
values and return records normally.

---------

Co-authored-by: Wil Wade <[email protected]>
  • Loading branch information
YECHUNAN and wilwade authored Aug 14, 2023
1 parent ac5257d commit c07e7e8
Show file tree
Hide file tree
Showing 14 changed files with 785 additions and 643 deletions.
2 changes: 1 addition & 1 deletion esbuild-serve.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require('esbuild')
}, {
entryPoints: ['parquet.ts'],
outfile: 'main.js',
define: {"process.env.NODE_DEBUG": false, "process.env.NODE_ENV": "\"production\"", global: "window" },
define: {"process.env.NODE_DEBUG": "false", "process.env.NODE_ENV": "\"production\"", global: "window" },
platform: 'browser',
plugins: [compressionBrowserPlugin,wasmPlugin],
sourcemap: "external",
Expand Down
5 changes: 3 additions & 2 deletions esbuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const baseConfig = {
bundle: true,
entryPoints: ['parquet.ts'],
define: {
"process.env.NODE_DEBUG": false,
"process.env.NODE_DEBUG": "false",
"process.env.NODE_ENV": "\"production\"",
global: "window"
},
Expand All @@ -21,7 +21,7 @@ const testConfig = {
bundle: true,
entryPoints: ['test/browser/main.ts'],
define: {
"process.env.NODE_DEBUG": false,
"process.env.NODE_DEBUG": "false",
"process.env.NODE_ENV": "\"production\"",
global: "window"
},
Expand Down Expand Up @@ -61,6 +61,7 @@ Promise.all(targets.map(esbuild.build))
})
.catch(e => {
console.error("Finished with errors: ", e.toString());
process.exit(1);
});


Expand Down
2 changes: 1 addition & 1 deletion lib/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ function decodeStatisticsValue(value: any, column: ParquetField | Options) {
}

if (column.originalType) {
value = parquet_types.fromPrimitive(column.originalType, value);
value = parquet_types.fromPrimitive(column.originalType, value, column);
}
return value;
}
Expand Down
13 changes: 6 additions & 7 deletions lib/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
nameWithPath = `${path}.${nameWithPath}`
}

const typeDef = opts.type ? parquet_types.PARQUET_LOGICAL_TYPES[opts.type] : undefined;
const typeDef = opts.type ? parquet_types.getParquetTypeDataObject(opts.type, opts) : undefined;
if (!typeDef) {
fieldErrors.push(`Invalid parquet type: ${(opts.type || "missing type")}, for Column: ${nameWithPath}`);
continue;
Expand All @@ -172,7 +172,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
if (typeDef.originalType === 'DECIMAL') {
// Default scale to 0 per https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
if (typeof opts.scale === "undefined") opts.scale = 0;
fieldErrors = fieldErrors.concat(errorsForDecimalOpts(typeDef.originalType, opts, nameWithPath));
fieldErrors = fieldErrors.concat(errorsForDecimalOpts(typeDef.originalType, typeDef.primitiveType, opts, nameWithPath));
}

/* add to schema */
Expand Down Expand Up @@ -219,7 +219,7 @@ function isDefined<T>(val: T | undefined): val is T {
return val !== undefined;
}

function errorsForDecimalOpts(type: string, opts: FieldDefinition, columnName: string): string[] {
function errorsForDecimalOpts(type: string, primitiveType: string | undefined, opts: FieldDefinition, columnName: string): string[] {
const fieldErrors = []
if(opts.precision === undefined || opts.precision < 1) {
fieldErrors.push(
Expand All @@ -231,9 +231,9 @@ function errorsForDecimalOpts(type: string, opts: FieldDefinition, columnName: s
`invalid schema for type: ${type}, for Column: ${columnName}, precision must be an integer`
);
}
else if (opts.precision > 18) {
else if (primitiveType === "INT64" && opts.precision > 18) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, can not handle precision over 18`
`invalid schema for type: ${type} and primitive type: ${primitiveType} for Column: ${columnName}, can not handle precision over 18`
);
}
if (typeof opts.scale === "undefined" || opts.scale < 0) {
Expand All @@ -246,8 +246,7 @@ function errorsForDecimalOpts(type: string, opts: FieldDefinition, columnName: s
`invalid schema for type: ${type}, for Column: ${columnName}, scale must be an integer`
);
}
// Default precision to 18 if it is undefined as that is a different error
else if (opts.scale > (opts.precision || 18)) {
else if (opts.precision !== undefined && opts.scale > opts.precision) {
fieldErrors.push(
`invalid schema or precision for type: ${type}, for Column: ${columnName}, precision must be greater than or equal to scale`
);
Expand Down
5 changes: 3 additions & 2 deletions lib/shred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ function shredRecordInternal(fields: Record<string, ParquetField>, record: Recor
field.dLevelMax);
} else {
data[path].distinct_values!.add(values[i]);
data[path].values!.push(parquet_types.toPrimitive(fieldType as string, values[i]));
data[path].values!.push(parquet_types.toPrimitive(fieldType as string, values[i], field));
data[path].rlevels!.push(rlvl_i);
data[path].dlevels!.push(field.dLevelMax);
data[path].count! += 1;
Expand Down Expand Up @@ -205,7 +205,8 @@ export const materializeRecords = function(schema: ParquetSchema, buffer: Record
if (dLevel === field.dLevelMax) {
value = parquet_types.fromPrimitive(
field.originalType || field.primitiveType,
values.next().value);
values.next().value,
field);
}

records[rLevels[0] - 1] = records[rLevels[0] - 1] || {};
Expand Down
107 changes: 84 additions & 23 deletions lib/types.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,84 @@
'use strict';
// Thanks to https://github.com/kbajalc/parquets for some of the code.
import * as BSON from "bson"
import { PrimitiveType, OriginalType, ParquetType } from "./declare"

type ParquetTypeData = {
[Property in ParquetType]: {
primitiveType?: PrimitiveType,
toPrimitive: Function,
fromPrimitive?: Function,
originalType?: OriginalType,
typeLength?: number
}
}
import { PrimitiveType, OriginalType, ParquetType, FieldDefinition, ParquetField } from "./declare"
import { Options } from "./codec/types";

type ParquetTypeDataObject = {
primitiveType?: PrimitiveType,
toPrimitive: Function,
fromPrimitive?: Function,
originalType?: OriginalType,
typeLength?: number
};

interface INTERVAL {
months: number,
days: number,
milliseconds: number
}

export const PARQUET_LOGICAL_TYPES: ParquetTypeData = {
export function getParquetTypeDataObject(type: ParquetType, field?: ParquetField | Options | FieldDefinition): ParquetTypeDataObject {
if (type === 'DECIMAL') {
if (field?.typeLength !== undefined) {
return {
primitiveType: 'FIXED_LEN_BYTE_ARRAY',
originalType: 'DECIMAL',
typeLength: field.typeLength,
toPrimitive: toPrimitive_FIXED_LEN_BYTE_ARRAY_DECIMAL
};
} else if (field?.precision !== undefined && field.precision > 18) {
return {
primitiveType: 'BYTE_ARRAY',
originalType: 'DECIMAL',
typeLength: field.typeLength,
toPrimitive: toPrimitive_BYTE_ARRAY_DECIMAL
};
} else {
return {
primitiveType: 'INT64',
originalType: 'DECIMAL',
toPrimitive: toPrimitive_INT64
};
}
} else {
return PARQUET_LOGICAL_TYPE_DATA[type];
}
}

const PARQUET_LOGICAL_TYPES = new Set<string>([
'BOOLEAN',
'INT32',
'INT64',
'INT96',
'FLOAT',
'DOUBLE',
'BYTE_ARRAY',
'FIXED_LEN_BYTE_ARRAY',
'UTF8',
'ENUM',
'TIME_MILLIS',
'TIME_MICROS',
'DATE',
'TIMESTAMP_MILLIS',
'TIMESTAMP_MICROS',
'UINT_8',
'UINT_16',
'UINT_32',
'UINT_64',
'INT_8',
'INT_16',
'INT_32',
'INT_64',
'DECIMAL',
'JSON',
'BSON',
'INTERVAL',
'MAP',
'LIST'
] satisfies ParquetType[])

const PARQUET_LOGICAL_TYPE_DATA: { [logicalType: string]: ParquetTypeDataObject } = {
'BOOLEAN': {
primitiveType: 'BOOLEAN',
toPrimitive: toPrimitive_BOOLEAN,
Expand Down Expand Up @@ -133,11 +192,6 @@ export const PARQUET_LOGICAL_TYPES: ParquetTypeData = {
originalType: 'INT_64',
toPrimitive: toPrimitive_INT64
},
'DECIMAL': {
primitiveType: 'INT64',
originalType: 'DECIMAL',
toPrimitive: toPrimitive_INT64
},
'JSON': {
primitiveType: 'BYTE_ARRAY',
originalType: 'JSON',
Expand Down Expand Up @@ -173,31 +227,30 @@ export const PARQUET_LOGICAL_TYPES: ParquetTypeData = {
* @returns if type is a valid Parquet Type
*/
function isParquetType(type: string | undefined): type is ParquetType {
return type !== undefined && (type in PARQUET_LOGICAL_TYPES);
return type !== undefined && PARQUET_LOGICAL_TYPES.has(type);
}

/**
* Convert a value from it's native representation to the internal/underlying
* primitive type
*/
export function toPrimitive(type: string | undefined, value: unknown) {
export function toPrimitive(type: string | undefined, value: unknown, field?: ParquetField | Options) {
if (!isParquetType(type)) {
throw 'invalid type: ' + type || "undefined";
}

return PARQUET_LOGICAL_TYPES[type].toPrimitive(value);
return getParquetTypeDataObject(type, field).toPrimitive(value);
}

/**
* Convert a value from it's internal/underlying primitive representation to
* the native representation
*/
export function fromPrimitive(type: string | undefined, value: unknown) {
export function fromPrimitive(type: string | undefined, value: unknown, field?: ParquetField | Options) {
if (!isParquetType(type)) {
throw 'invalid type: ' + type || "undefined";
}

const typeFromPrimitive = PARQUET_LOGICAL_TYPES[type].fromPrimitive
const typeFromPrimitive = getParquetTypeDataObject(type, field).fromPrimitive
if (typeFromPrimitive !== undefined) {
return typeFromPrimitive(value)
} else {
Expand Down Expand Up @@ -350,6 +403,14 @@ function toPrimitive_INT96(value: number | bigint | string) {
}
}

function toPrimitive_FIXED_LEN_BYTE_ARRAY_DECIMAL(value: Array<number>) {
return Buffer.from(value);
}

function toPrimitive_BYTE_ARRAY_DECIMAL(value: Array<number>) {
return Buffer.from(value);
}

function toPrimitive_MAP(value: any) {
return value;
}
Expand Down
4 changes: 2 additions & 2 deletions lib/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as parquet_codec from './codec'
import * as parquet_compression from './compression'
import * as parquet_types from './types'
import * as bloomFilterWriter from "./bloomFilterIO/bloomFilterWriter"
import { WriterOptions, ParquetCodec, ParquetField, ColumnMetaDataExt, RowGroupExt, Page } from './declare'
import { WriterOptions, ParquetCodec, ParquetField, ColumnMetaDataExt, RowGroupExt, Page, FieldDefinition } from './declare'
import { Options } from './codec/types'
import { ParquetSchema } from './schema'
import Int64 from 'node-int64'
Expand Down Expand Up @@ -386,7 +386,7 @@ function encodeStatisticsValue(value: any, column: ParquetField | Options) {
return Buffer.alloc(0);
}
if (column.originalType) {
value = parquet_types.toPrimitive(column.originalType,value);
value = parquet_types.toPrimitive(column.originalType, value, column);
}
if (column.primitiveType !== 'BYTE_ARRAY') {
value = encodeValues(column.primitiveType!,'PLAIN',[value],column);
Expand Down
Loading

0 comments on commit c07e7e8

Please sign in to comment.