Skip to content

Commit

Permalink
apacheGH-41594: [Go] Support reading date64 type & properly validat…
Browse files Browse the repository at this point in the history
…e list-like types (apache#41595)

This PR includes 2 fixes:
1. support reading `date64` columns (as write is supported)
2. properly validate list-like data types (list of unsupported is unsupported)

### Rationale for this change

See apache#41594

### What changes are included in this PR?

1. Added `date64` reading & conversion funcs similar to `date32`
2. Refactored date type validation

### Are these changes tested?

a55cd53

### Are there any user-facing changes?

No.

* GitHub Issue: apache#41594

Authored-by: candiduslynx <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
candiduslynx authored and vibhatha committed May 25, 2024
1 parent 559fcf0 commit ee4fb91
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 121 deletions.
40 changes: 25 additions & 15 deletions go/arrow/csv/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,21 +239,31 @@ func WithStringsReplacer(replacer *strings.Replacer) Option {

func validate(schema *arrow.Schema) {
for i, f := range schema.Fields() {
switch ft := f.Type.(type) {
case *arrow.BooleanType:
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type:
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type:
case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type:
case *arrow.StringType, *arrow.LargeStringType:
case *arrow.TimestampType:
case *arrow.Date32Type, *arrow.Date64Type:
case *arrow.Decimal128Type, *arrow.Decimal256Type:
case *arrow.ListType, *arrow.LargeListType, *arrow.FixedSizeListType:
case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.FixedSizeBinaryType:
case arrow.ExtensionType:
case *arrow.NullType:
default:
panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, ft))
if !typeSupported(f.Type) {
panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, f.Type))
}
}
}

func typeSupported(dt arrow.DataType) bool {
switch dt := dt.(type) {
case *arrow.BooleanType:
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type:
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type:
case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type:
case *arrow.StringType, *arrow.LargeStringType:
case *arrow.TimestampType:
case *arrow.Date32Type, *arrow.Date64Type:
case *arrow.Decimal128Type, *arrow.Decimal256Type:
case *arrow.MapType:
return false
case arrow.ListLikeType:
return typeSupported(dt.Elem())
case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.FixedSizeBinaryType:
case arrow.ExtensionType:
case *arrow.NullType:
default:
return false
}
return true
}
74 changes: 29 additions & 45 deletions go/arrow/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ func (r *Reader) initFieldConverter(bldr array.Builder) func(string) {
return func(str string) {
r.parseDate32(bldr, str)
}
case *arrow.Date64Type:
return func(str string) {
r.parseDate64(bldr, str)
}
case *arrow.Time32Type:
return func(str string) {
r.parseTime32(bldr, str, dt.Unit)
Expand All @@ -486,17 +490,13 @@ func (r *Reader) initFieldConverter(bldr array.Builder) func(string) {
return func(str string) {
r.parseDecimal256(bldr, str, dt.Precision, dt.Scale)
}
case *arrow.ListType:
return func(s string) {
r.parseList(bldr, s)
}
case *arrow.LargeListType:
case *arrow.FixedSizeListType:
return func(s string) {
r.parseLargeList(bldr, s)
r.parseFixedSizeList(bldr.(*array.FixedSizeListBuilder), s, int(dt.Len()))
}
case *arrow.FixedSizeListType:
case arrow.ListLikeType:
return func(s string) {
r.parseFixedSizeList(bldr, s, int(dt.Len()))
r.parseListLike(bldr.(array.ListLikeBuilder), s)
}
case *arrow.BinaryType:
return func(s string) {
Expand Down Expand Up @@ -740,81 +740,67 @@ func (r *Reader) parseDate32(field array.Builder, str string) {
field.(*array.Date32Builder).Append(arrow.Date32FromTime(tm))
}

func (r *Reader) parseTime32(field array.Builder, str string, unit arrow.TimeUnit) {
func (r *Reader) parseDate64(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}

val, err := arrow.Time32FromString(str, unit)
tm, err := time.Parse("2006-01-02", str)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Time32Builder).Append(val)
field.(*array.Date64Builder).Append(arrow.Date64FromTime(tm))
}

func (r *Reader) parseDecimal128(field array.Builder, str string, prec, scale int32) {
func (r *Reader) parseTime32(field array.Builder, str string, unit arrow.TimeUnit) {
if r.isNull(str) {
field.AppendNull()
return
}

val, err := decimal128.FromString(str, prec, scale)
val, err := arrow.Time32FromString(str, unit)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Decimal128Builder).Append(val)
field.(*array.Time32Builder).Append(val)
}

func (r *Reader) parseDecimal256(field array.Builder, str string, prec, scale int32) {
func (r *Reader) parseDecimal128(field array.Builder, str string, prec, scale int32) {
if r.isNull(str) {
field.AppendNull()
return
}

val, err := decimal256.FromString(str, prec, scale)
val, err := decimal128.FromString(str, prec, scale)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Decimal256Builder).Append(val)
field.(*array.Decimal128Builder).Append(val)
}

func (r *Reader) parseList(field array.Builder, str string) {
func (r *Reader) parseDecimal256(field array.Builder, str string, prec, scale int32) {
if r.isNull(str) {
field.AppendNull()
return
}
if !(strings.HasPrefix(str, "{") && strings.HasSuffix(str, "}")) {
r.err = errors.New("invalid list format. should start with '{' and end with '}'")
return
}
str = strings.Trim(str, "{}")
listBldr := field.(*array.ListBuilder)
listBldr.Append(true)
if len(str) == 0 {
// we don't want to create the csv reader if we already know the
// string is empty
return
}
valueBldr := listBldr.ValueBuilder()
reader := csv.NewReader(strings.NewReader(str))
items, err := reader.Read()
if err != nil {

val, err := decimal256.FromString(str, prec, scale)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
for _, str := range items {
r.initFieldConverter(valueBldr)(str)
}
field.(*array.Decimal256Builder).Append(val)
}

func (r *Reader) parseLargeList(field array.Builder, str string) {
func (r *Reader) parseListLike(field array.ListLikeBuilder, str string) {
if r.isNull(str) {
field.AppendNull()
return
Expand All @@ -824,14 +810,13 @@ func (r *Reader) parseLargeList(field array.Builder, str string) {
return
}
str = strings.Trim(str, "{}")
largeListBldr := field.(*array.LargeListBuilder)
largeListBldr.Append(true)
field.Append(true)
if len(str) == 0 {
// we don't want to create the csv reader if we already know the
// string is empty
return
}
valueBldr := largeListBldr.ValueBuilder()
valueBldr := field.ValueBuilder()
reader := csv.NewReader(strings.NewReader(str))
items, err := reader.Read()
if err != nil {
Expand All @@ -843,7 +828,7 @@ func (r *Reader) parseLargeList(field array.Builder, str string) {
}
}

func (r *Reader) parseFixedSizeList(field array.Builder, str string, n int) {
func (r *Reader) parseFixedSizeList(field *array.FixedSizeListBuilder, str string, n int) {
if r.isNull(str) {
field.AppendNull()
return
Expand All @@ -853,14 +838,13 @@ func (r *Reader) parseFixedSizeList(field array.Builder, str string, n int) {
return
}
str = strings.Trim(str, "{}")
fixedSizeListBldr := field.(*array.FixedSizeListBuilder)
fixedSizeListBldr.Append(true)
field.Append(true)
if len(str) == 0 {
// we don't want to create the csv reader if we already know the
// string is empty
return
}
valueBldr := fixedSizeListBldr.ValueBuilder()
valueBldr := field.ValueBuilder()
reader := csv.NewReader(strings.NewReader(str))
items, err := reader.Read()
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions go/arrow/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func testCSVReader(t *testing.T, filepath string, withHeader bool, stringsCanBeN
{Name: "large_binary", Type: arrow.BinaryTypes.LargeBinary},
{Name: "fixed_size_binary", Type: &arrow.FixedSizeBinaryType{ByteWidth: 3}},
{Name: "uuid", Type: types.NewUUIDType()},
{Name: "date32", Type: arrow.PrimitiveTypes.Date32},
{Name: "date64", Type: arrow.PrimitiveTypes.Date64},
},
nil,
)
Expand Down Expand Up @@ -420,6 +422,8 @@ rec[0]["binary"]: ["\x00\x01\x02"]
rec[0]["large_binary"]: ["\x00\x01\x02"]
rec[0]["fixed_size_binary"]: ["\x00\x01\x02"]
rec[0]["uuid"]: ["00000000-0000-0000-0000-000000000001"]
rec[0]["date32"]: [19121]
rec[0]["date64"]: [1652054400000]
rec[1]["bool"]: [false]
rec[1]["i8"]: [-2]
rec[1]["i16"]: [-2]
Expand All @@ -442,6 +446,8 @@ rec[1]["binary"]: [(null)]
rec[1]["large_binary"]: [(null)]
rec[1]["fixed_size_binary"]: [(null)]
rec[1]["uuid"]: ["00000000-0000-0000-0000-000000000002"]
rec[1]["date32"]: [19121]
rec[1]["date64"]: [1652054400000]
rec[2]["bool"]: [(null)]
rec[2]["i8"]: [(null)]
rec[2]["i16"]: [(null)]
Expand All @@ -464,6 +470,8 @@ rec[2]["binary"]: [(null)]
rec[2]["large_binary"]: [(null)]
rec[2]["fixed_size_binary"]: [(null)]
rec[2]["uuid"]: [(null)]
rec[2]["date32"]: [(null)]
rec[2]["date64"]: [(null)]
`, str1Value, str1Value, str2Value, str2Value)
got, want := out.String(), want
require.Equal(t, want, got)
Expand Down
8 changes: 4 additions & 4 deletions go/arrow/csv/testdata/header.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
bool;i8;i16;i32;i64;u8;u16;u32;u64;f16;f32;f64;str;large_str;ts;list(i64);large_list(i64);fixed_size_list(i64);binary;large_binary;fixed_size_binary;uuid
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;str-1;2022-05-09T00:01:01;{1,2,3};{1,2,3};{1,2,3};AAEC;AAEC;AAEC;00000000-0000-0000-0000-000000000001
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;;2022-05-09T23:59:59;{};{};{4,5,6};;;;00000000-0000-0000-0000-000000000002
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null
bool;i8;i16;i32;i64;u8;u16;u32;u64;f16;f32;f64;str;large_str;ts;list(i64);large_list(i64);fixed_size_list(i64);binary;large_binary;fixed_size_binary;uuid;date32;date64
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;str-1;2022-05-09T00:01:01;{1,2,3};{1,2,3};{1,2,3};AAEC;AAEC;AAEC;00000000-0000-0000-0000-000000000001;2022-05-09;2022-05-09
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;;2022-05-09T23:59:59;{};{};{4,5,6};;;;00000000-0000-0000-0000-000000000002;2022-05-09;2022-05-09
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null
8 changes: 4 additions & 4 deletions go/arrow/csv/testdata/types.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
## supported types: bool;int8;int16;int32;int64;uint8;uint16;uint32;uint64;float16;float32;float64;string;large_string;timestamp;list(i64);large_list(i64);fixed_size_list(i64);binary;large_binary;fixed_size_binary;uuid
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;str-1;2022-05-09T00:01:01;{1,2,3};{1,2,3};{1,2,3};AAEC;AAEC;AAEC;00000000-0000-0000-0000-000000000001
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;;2022-05-09T23:59:59;{};{};{4,5,6};;;;00000000-0000-0000-0000-000000000002
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null
## supported types: bool;int8;int16;int32;int64;uint8;uint16;uint32;uint64;float16;float32;float64;string;large_string;timestamp;list(i64);large_list(i64);fixed_size_list(i64);binary;large_binary;fixed_size_binary;uuid;date32;date64
true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;1.1;str-1;str-1;2022-05-09T00:01:01;{1,2,3};{1,2,3};{1,2,3};AAEC;AAEC;AAEC;00000000-0000-0000-0000-000000000001;2022-05-09;2022-05-09
false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;2.2;;;2022-05-09T23:59:59;{};{};{4,5,6};;;;00000000-0000-0000-0000-000000000002;2022-05-09;2022-05-09
null;NULL;null;N/A;;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null;null
69 changes: 16 additions & 53 deletions go/arrow/csv/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/apache/arrow/go/v17/arrow/array"
)

func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array, stringsReplacer func(string)string) []string {
func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array, stringsReplacer func(string) string) []string {
res := make([]string, col.Len())
switch typ.(type) {
case *arrow.BooleanType:
Expand Down Expand Up @@ -215,62 +215,25 @@ func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array, st
res[i] = w.nullValue
}
}
case *arrow.ListType:
arr := col.(*array.List)
listVals, offsets := arr.ListValues(), arr.Offsets()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
list := array.NewSlice(listVals, int64(offsets[i]), int64(offsets[i+1]))
var b bytes.Buffer
b.Write([]byte{'{'})
writer := csv.NewWriter(&b)
writer.Write(w.transformColToStringArr(list.DataType(), list, stringsReplacer))
writer.Flush()
b.Truncate(b.Len() - 1)
b.Write([]byte{'}'})
res[i] = b.String()
list.Release()
} else {
res[i] = w.nullValue
}
}
case *arrow.LargeListType:
arr := col.(*array.LargeList)
listVals, offsets := arr.ListValues(), arr.Offsets()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
list := array.NewSlice(listVals, int64(offsets[i]), int64(offsets[i+1]))
var b bytes.Buffer
b.Write([]byte{'{'})
writer := csv.NewWriter(&b)
writer.Write(w.transformColToStringArr(list.DataType(), list, stringsReplacer))
writer.Flush()
b.Truncate(b.Len() - 1)
b.Write([]byte{'}'})
res[i] = b.String()
list.Release()
} else {
res[i] = w.nullValue
}
}
case *arrow.FixedSizeListType:
arr := col.(*array.FixedSizeList)
case arrow.ListLikeType:
arr := col.(array.ListLike)
listVals := arr.ListValues()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
list := array.NewSlice(listVals, int64((arr.Len()-1)*i), int64((arr.Len()-1)*(i+1)))
var b bytes.Buffer
b.Write([]byte{'{'})
writer := csv.NewWriter(&b)
writer.Write(w.transformColToStringArr(list.DataType(), list, stringsReplacer))
writer.Flush()
b.Truncate(b.Len() - 1)
b.Write([]byte{'}'})
res[i] = b.String()
list.Release()
} else {
if arr.IsNull(i) {
res[i] = w.nullValue
continue
}
start, end := arr.ValueOffsets(i)
list := array.NewSlice(listVals, start, end)
var b bytes.Buffer
b.Write([]byte{'{'})
writer := csv.NewWriter(&b)
writer.Write(w.transformColToStringArr(list.DataType(), list, stringsReplacer))
writer.Flush()
b.Truncate(b.Len() - 1)
b.Write([]byte{'}'})
res[i] = b.String()
list.Release()
}
case *arrow.BinaryType:
arr := col.(*array.Binary)
Expand Down

0 comments on commit ee4fb91

Please sign in to comment.