Skip to content

Commit

Permalink
Vector instead of List for parquet types and values (#205)
Browse files Browse the repository at this point in the history
In modern snowplow loaders we have already switched to using Vector when
manipulating data structures. We use vector because we commonly need to
join together fields from different shcmeas.

But even the new loader code still has lots of `list.toVector`. If we
change to Vector in schema-ddl then we can eliminate a lot of those
unnecessary conversions.
  • Loading branch information
istreeter committed May 8, 2024
1 parent c8f5729 commit 4c0505d
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.implicits._
import cats.data.{NonEmptyList, ValidatedNel, Validated}
import cats.data.{NonEmptyVector, ValidatedNel, Validated}
import cats.Semigroup

import java.time.{Instant, LocalDate}
Expand All @@ -33,8 +33,8 @@ trait Caster[A] {
def decimalValue(unscaled: BigInt, details: Type.Decimal): A
def dateValue(v: LocalDate): A
def timestampValue(v: Instant): A
def structValue(vs: NonEmptyList[Caster.NamedValue[A]]): A
def arrayValue(vs: List[A]): A
def structValue(vs: NonEmptyVector[Caster.NamedValue[A]]): A
def arrayValue(vs: Vector[A]): A
}

object Caster {
Expand Down Expand Up @@ -120,7 +120,6 @@ object Caster {
private def castArray[A](caster: Caster[A], array: Type.Array, value: Json): Result[A] =
value.asArray match {
case Some(values) => values
.toList
.map {
case Json.Null =>
if (array.nullability.nullable) caster.nullValue.validNel
Expand Down Expand Up @@ -170,7 +169,7 @@ object Caster {
/** Part of `castStruct`, mapping sub-fields of a JSON object into `FieldValue`s */
private def castStructField[A](caster: Caster[A], field: Field, jsonObject: Map[String, Json]): ValidatedNel[CastError, NamedValue[A]] = {
val ca = field.accessors
.toList
.iterator
.map { name =>
jsonObject.get(name) match {
case Some(Json.Null) => CastAccumulate[A](None, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.data.NonEmptyList
import cats.data.NonEmptyVector
import cats.implicits._

import com.snowplowanalytics.iglu.schemaddl.StringUtils
Expand Down Expand Up @@ -54,7 +54,7 @@ object Field {
field.copy(name = normalizeName(field), fieldType = fieldType)
}

private def collapseDuplicateFields(normFields: NonEmptyList[Field]): NonEmptyList[Field] = {
private def collapseDuplicateFields(normFields: NonEmptyVector[Field]): NonEmptyVector[Field] = {
val endMap = normFields
.groupBy(_.name)
.map { case (key, fs) =>
Expand Down Expand Up @@ -137,10 +137,10 @@ object Field {
val nullability = isFieldNullable(constructedType.nullability, isSubfieldRequired)
Field(key, constructedType.value, nullability)
}
.toList
.toVector
.sortBy(_.name)

NonEmptyList.fromList(subfields) match {
NonEmptyVector.fromVector(subfields) match {
case Some(nel) =>
Some(Type.Struct(nel))
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.data.NonEmptyList
import cats.data.NonEmptyVector

import java.time.{Instant, LocalDate}

Expand All @@ -33,8 +33,8 @@ object FieldValue {
case class DecimalValue(value: BigDecimal, precision: Type.DecimalPrecision) extends FieldValue
case class TimestampValue(value: java.sql.Timestamp) extends FieldValue
case class DateValue(value: java.sql.Date) extends FieldValue
case class StructValue(values: List[NamedValue]) extends FieldValue
case class ArrayValue(values: List[FieldValue]) extends FieldValue
case class StructValue(values: Vector[NamedValue]) extends FieldValue
case class ArrayValue(values: Vector[FieldValue]) extends FieldValue
/* Part of [[StructValue]] */
case class NamedValue(name: String, value: FieldValue)

Expand All @@ -50,13 +50,13 @@ object FieldValue {
DecimalValue(BigDecimal(unscaled, details.scale), details.precision)
def dateValue(v: LocalDate): FieldValue = DateValue(java.sql.Date.valueOf(v))
def timestampValue(v: Instant): FieldValue = TimestampValue(java.sql.Timestamp.from(v))
def structValue(vs: NonEmptyList[Caster.NamedValue[FieldValue]]): FieldValue =
def structValue(vs: NonEmptyVector[Caster.NamedValue[FieldValue]]): FieldValue =
StructValue {
vs.toList.map {
vs.toVector.map {
case Caster.NamedValue(n, v) => NamedValue(n, v)
}
}
def arrayValue(vs: List[FieldValue]): FieldValue = ArrayValue(vs)
def arrayValue(vs: Vector[FieldValue]): FieldValue = ArrayValue(vs)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.Show
import cats.data.NonEmptyList
import cats.data.NonEmptyVector
import cats.syntax.all._
import com.snowplowanalytics.iglu.schemaddl.parquet.Type.{Array, Struct}

Expand Down Expand Up @@ -93,9 +93,9 @@ object Migrations {
case _ => Nil // discard the modifications as they would have been detected in forward migration
})

val tgtFields = reverseMigration.toList.traverse(_.result).toList.flatten
val tgtFields = reverseMigration.toVector.traverse(_.result).toVector.flatten
val tgtFieldNames = tgtFields.map(_.name)
val allSrcFields = forwardMigration.toList.traverse(_.result).toList.flatten
val allSrcFields = forwardMigration.toVector.traverse(_.result).toVector.flatten
val allSrcFieldMap = allSrcFields.map(f => f.name -> f).toMap
// swap fields in src and target as they would be rearranged in nested structs or arrays
val reorderedTgtFields = tgtFields.map { t =>
Expand All @@ -105,13 +105,13 @@ object Migrations {
case _ => t
}
}
val srcFields: List[Field] = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
val srcFields: Vector[Field] = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
// drop not null constrains from removed fields.
_.copy(nullability = Type.Nullability.Nullable)
)

// failed migration would produce no fields in source
NonEmptyList.fromList(reorderedTgtFields ::: srcFields).map { nonEmpty =>
NonEmptyVector.fromVector(reorderedTgtFields ++ srcFields).map { nonEmpty =>
Type.Struct(nonEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.Eq
import cats.data.NonEmptyList
import cats.data.NonEmptyVector

sealed trait Type extends Product with Serializable

Expand All @@ -27,7 +27,7 @@ object Type {
case class Decimal(precision: DecimalPrecision, scale: Int) extends Type
case object Date extends Type
case object Timestamp extends Type
case class Struct(fields: NonEmptyList[Field]) extends Type
case class Struct(fields: NonEmptyVector[Field]) extends Type
case class Array(element: Type, nullability: Nullability) extends Type

/* Fallback type for when json schema does not map to a parquet primitive type (e.g. unions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import io.circe.literal._
import cats.data.NonEmptyList
import cats.data.{NonEmptyVector, NonEmptyList}
import org.specs2.matcher.ValidatedMatchers._
import org.specs2.matcher.MatchResult

Expand Down Expand Up @@ -87,11 +87,11 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""

def e5 = {
val inputJson = json"""{"foo": 42, "bar": true}"""
val inputField = Type.Struct(NonEmptyList.of(
val inputField = Type.Struct(NonEmptyVector.of(
Field("foo", Type.Integer, Nullable),
Field("bar", Type.Boolean, Required)))

val expected = StructValue(List(
val expected = StructValue(Vector(
NamedValue("foo", IntValue(42)),
NamedValue("bar", BooleanValue(true))
))
Expand All @@ -100,11 +100,11 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""

def e6 = {
val inputJson = json"""{"bar": true}"""
val inputField = Type.Struct(NonEmptyList.of(
val inputField = Type.Struct(NonEmptyVector.of(
Field("foo", Type.Integer, Nullable),
Field("bar", Type.Boolean, Required)))

val expected = StructValue(List(
val expected = StructValue(Vector(
NamedValue("foo", NullValue),
NamedValue("bar", BooleanValue(true))
))
Expand All @@ -115,15 +115,15 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
val inputJson = json"""["x", "y", "z"]"""
val inputField = Type.Array(Type.String, Type.Nullability.Required)

val expected = ArrayValue(List(StringValue("x"), StringValue("y"), StringValue("z")))
val expected = ArrayValue(Vector(StringValue("x"), StringValue("y"), StringValue("z")))
testCast(inputField, inputJson, expected)
}

def e8 = {
val inputJson = json"""["x", "y", null]"""
val inputField = Type.Array(Type.String, Type.Nullability.Nullable)

val expected = ArrayValue(List(StringValue("x"), StringValue("y"), NullValue))
val expected = ArrayValue(Vector(StringValue("x"), StringValue("y"), NullValue))
testCast(inputField, inputJson, expected)
}

Expand All @@ -135,13 +135,13 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
"undefined": 42
}"""

val inputField = Type.Struct(NonEmptyList.of(
val inputField = Type.Struct(NonEmptyVector.of(
Field("someBool", Type.Boolean, Required),
Field("repeatedInt", Type.Array(Type.Integer, Required), Required)))

val expected = StructValue(List(
val expected = StructValue(Vector(
NamedValue("some_bool", BooleanValue(true)),
NamedValue("repeated_int", ArrayValue(List(IntValue(1), IntValue(5), IntValue(2))))
NamedValue("repeated_int", ArrayValue(Vector(IntValue(1), IntValue(5), IntValue(2))))
))
testCast(inputField, inputJson, expected)
}
Expand All @@ -158,25 +158,25 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
}
}"""

val inputField = Type.Struct(NonEmptyList.of(
val inputField = Type.Struct(NonEmptyVector.of(
Field("someBool", Type.Boolean, Required),
Field("nested", Type.Struct(NonEmptyList.of(
Field("nested", Type.Struct(NonEmptyVector.of(
Field("str", Type.String, Required),
Field("int", Type.Integer, Nullable),
Field("deep", Type.Struct(NonEmptyList.of(Field("str", Type.String, Nullable))), Required),
Field("arr", Type.Array(Type.Struct(NonEmptyList.of(Field("a", Type.String, Required))), Required), Required)
Field("deep", Type.Struct(NonEmptyVector.of(Field("str", Type.String, Nullable))), Required),
Field("arr", Type.Array(Type.Struct(NonEmptyVector.of(Field("a", Type.String, Required))), Required), Required)
)), Nullable)
))

val expected = StructValue(List(
val expected = StructValue(Vector(
NamedValue("some_bool", BooleanValue(true)),
NamedValue("nested", StructValue(List(
NamedValue("nested", StructValue(Vector(
NamedValue("str", StringValue("foo bar")),
NamedValue("int", IntValue(3)),
NamedValue("deep", StructValue(List(NamedValue("str", StringValue("foo"))))),
NamedValue("arr", ArrayValue(List(
StructValue(List(NamedValue("a", StringValue("b")))),
StructValue(List(NamedValue("a", StringValue("d"))))
NamedValue("deep", StructValue(Vector(NamedValue("str", StringValue("foo"))))),
NamedValue("arr", ArrayValue(Vector(
StructValue(Vector(NamedValue("a", StringValue("b")))),
StructValue(Vector(NamedValue("a", StringValue("d"))))
)))
)))
))
Expand All @@ -190,9 +190,9 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
"optional": null
}"""

val inputField = Type.Struct(NonEmptyList.of(Field("optional", Type.String, Nullable)))
val inputField = Type.Struct(NonEmptyVector.of(Field("optional", Type.String, Nullable)))

val expected = StructValue(List(
val expected = StructValue(Vector(
NamedValue("optional", NullValue)
))
testCast(inputField, inputJson, expected)
Expand All @@ -204,9 +204,9 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
"unionType": ["this", "is", "fallback", "strategy"]
}"""

val inputField = Type.Struct(NonEmptyList.of(Field("unionType", Type.Json, Nullable)))
val inputField = Type.Struct(NonEmptyVector.of(Field("unionType", Type.Json, Nullable)))

val expected = StructValue(List(
val expected = StructValue(Vector(
NamedValue("union_type", JsonValue(json"""["this","is","fallback","strategy"]"""))
))
testCast(inputField, inputJson, expected)
Expand All @@ -215,7 +215,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
def e13 = {
def testInvalidCast(fieldType: Type, value: Json) =
Caster.cast(caster, Field("top", fieldType, Required), value) must beInvalid
List(
Vector(
testInvalidCast(Type.String, json"""42"""),
testInvalidCast(Type.String, json"""true"""),
testInvalidCast(Type.Boolean, json""""hello""""),
Expand All @@ -237,7 +237,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
(bd.underlying.unscaledValue.longValue must_== expectedLong) and
(bd.scale must_== expectedScale)
}
List(
Vector(
testDecimal(Type.Decimal(Digits9, 2), json"87.98", 8798, 2),
testDecimal(Type.Decimal(Digits9, 2), json"-87.98", -8798, 2),
testDecimal(Type.Decimal(Digits9, 2), json"87.98000", 8798, 2),
Expand All @@ -257,7 +257,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
def e15 = {
def testInvalidCast(decimal: Type.Decimal, value: Json) =
Caster.cast(caster, Field("top", decimal, Required), value) must beInvalid
List(
Vector(
testInvalidCast(Type.Decimal(Digits9, 2), json"""12.1234"""),
testInvalidCast(Type.Decimal(Digits9, 2), json"""-12.1234"""),
testInvalidCast(Type.Decimal(Digits9, 2), json"""123456789.12"""),
Expand All @@ -269,15 +269,15 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
}

def e16 = {
val inputField = Type.Struct(NonEmptyList.of(
val inputField = Type.Struct(NonEmptyVector.of(
Field("xyz", Type.Integer, Nullable, Set("xyz", "XYZ"))))

List(
json"""{"xyz": 42, "XYZ": "invalid"}""" -> StructValue(List(NamedValue("xyz", IntValue(42)))),
json"""{"XYZ": 42, "xyz": "invalid"}""" -> StructValue(List(NamedValue("xyz", IntValue(42)))),
json"""{"xyz": null, "XYZ": "invalid"}""" -> StructValue(List(NamedValue("xyz", NullValue))),
json"""{"XYZ": null, "xyz": "invalid"}""" -> StructValue(List(NamedValue("xyz", NullValue))),
json"""{"XYZ": "invalid"}""" -> StructValue(List(NamedValue("xyz", NullValue))),
Vector(
json"""{"xyz": 42, "XYZ": "invalid"}""" -> StructValue(Vector(NamedValue("xyz", IntValue(42)))),
json"""{"XYZ": 42, "xyz": "invalid"}""" -> StructValue(Vector(NamedValue("xyz", IntValue(42)))),
json"""{"xyz": null, "XYZ": "invalid"}""" -> StructValue(Vector(NamedValue("xyz", NullValue))),
json"""{"XYZ": null, "xyz": "invalid"}""" -> StructValue(Vector(NamedValue("xyz", NullValue))),
json"""{"XYZ": "invalid"}""" -> StructValue(Vector(NamedValue("xyz", NullValue))),
)
.map { case (json, expected) =>
testCast(inputField, json, expected)
Expand All @@ -288,7 +288,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""

def e17 = {
val inputJson = json"""[{"id": null}]"""
val fieldType = Type.Array(Type.Struct(NonEmptyList.of(Field("id", Type.String, Required, Set("id")))), Required)
val fieldType = Type.Array(Type.Struct(NonEmptyVector.of(Field("id", Type.String, Required, Set("id")))), Required)

val expected = NonEmptyList.one(WrongType(Json.Null, Type.String))
Caster.cast(caster, Field("top", fieldType, Nullable), inputJson) must beInvalid(expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe.Json
import cats.data.NonEmptyList
import cats.data.NonEmptyVector

import java.time.{LocalDate, Instant}

Expand All @@ -31,8 +31,8 @@ object ExampleFieldValue {
case class DecimalValue(value: BigDecimal, precision: Type.DecimalPrecision) extends ExampleFieldValue
case class TimestampValue(value: java.sql.Timestamp) extends ExampleFieldValue
case class DateValue(value: java.sql.Date) extends ExampleFieldValue
case class StructValue(values: List[Caster.NamedValue[ExampleFieldValue]]) extends ExampleFieldValue
case class ArrayValue(values: List[ExampleFieldValue]) extends ExampleFieldValue
case class StructValue(values: Vector[Caster.NamedValue[ExampleFieldValue]]) extends ExampleFieldValue
case class ArrayValue(values: Vector[ExampleFieldValue]) extends ExampleFieldValue

val caster: Caster[ExampleFieldValue] = new Caster[ExampleFieldValue] {
def nullValue: ExampleFieldValue = NullValue
Expand All @@ -46,7 +46,7 @@ object ExampleFieldValue {
DecimalValue(BigDecimal(unscaled, details.scale), details.precision)
def dateValue(v: LocalDate): ExampleFieldValue = DateValue(java.sql.Date.valueOf(v))
def timestampValue(v: Instant): ExampleFieldValue = TimestampValue(java.sql.Timestamp.from(v))
def structValue(vs: NonEmptyList[Caster.NamedValue[ExampleFieldValue]]): ExampleFieldValue = StructValue(vs.toList)
def arrayValue(vs: List[ExampleFieldValue]): ExampleFieldValue = ArrayValue(vs)
def structValue(vs: NonEmptyVector[Caster.NamedValue[ExampleFieldValue]]): ExampleFieldValue = StructValue(vs.toVector)
def arrayValue(vs: Vector[ExampleFieldValue]): ExampleFieldValue = ArrayValue(vs)
}
}
Loading

0 comments on commit 4c0505d

Please sign in to comment.