Skip to content

Commit

Permalink
Omit parquet field for a schema with no nested fields
Browse files Browse the repository at this point in the history
Snowplow users commonly use schemas with no nested fields. Previously,
we were creating a string column and loading the string field `{}`. But
there is no benefit to loading this redundant data.

By omitting a column for these schemas, it means we support schema
evolution if the user ever adds a nested field to the empty schema.

For empty schemas with `additionalProperties: true` we retain the old
behaviour of loading the original JSON as a string.
  • Loading branch information
istreeter committed Apr 19, 2024
1 parent 256a8f7 commit 662512a
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,11 @@ private[schemaddl] object Mutate {

val additionalProperties = schema.`type`.map(_.asUnion.value) match {
case Some(set) if set.contains(CommonProperties.Type.Object) =>
Some(ObjectProperty.AdditionalProperties.AdditionalPropertiesAllowed(false))
case _ => None
if (hasDefinedProperties(schema))
Some(ObjectProperty.AdditionalProperties.AdditionalPropertiesAllowed(false))
else
schema.additionalProperties
case _ => schema.additionalProperties
}

schema.copy(
Expand All @@ -283,4 +286,12 @@ private[schemaddl] object Mutate {
)
}

private def hasDefinedProperties(schema: Schema): Boolean =
schema.properties match {
case Some(p) if p.value.nonEmpty =>
true
case _ =>
(schema.oneOf.iterator.flatMap(_.value) ++ schema.anyOf.iterator.flatMap(_.value)).exists(hasDefinedProperties)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.implicits._
import cats.data.{ValidatedNel, Validated}
import cats.data.{NonEmptyList, ValidatedNel, Validated}
import cats.Semigroup

import java.time.{Instant, LocalDate}
Expand All @@ -33,7 +33,7 @@ trait Caster[A] {
def decimalValue(unscaled: BigInt, details: Type.Decimal): A
def dateValue(v: LocalDate): A
def timestampValue(v: Instant): A
def structValue(vs: List[Caster.NamedValue[A]]): A
def structValue(vs: NonEmptyList[Caster.NamedValue[A]]): A
def arrayValue(vs: List[A]): A
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
*/
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.data.NonEmptyList
import cats.implicits._

import com.snowplowanalytics.iglu.schemaddl.StringUtils
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.{ArrayProperty, CommonProperties}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.{ArrayProperty, CommonProperties, ObjectProperty}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.mutate.Mutate

case class Field(name: String,
Expand All @@ -28,19 +31,17 @@ object Field {
fieldType: Type,
nullability: Type.Nullability): Field = Field(name, fieldType, nullability, Set(name))

def build(name: String, topSchema: Schema, enforceValuePresence: Boolean): Field = {
val constructedType = buildType(Mutate.forStorage(topSchema))
val nullability = isFieldNullable(constructedType.nullability, enforceValuePresence)

Field(name, constructedType.value, nullability)
}

def buildRepeated(name: String, itemSchema: Schema, enforceItemPresence: Boolean, nullability: Type.Nullability): Field = {
val constructedType = buildType(Mutate.forStorage(itemSchema))
val itemNullability = isFieldNullable(constructedType.nullability, enforceItemPresence)
def build(name: String, topSchema: Schema, enforceValuePresence: Boolean): Option[Field] =
buildType(Mutate.forStorage(topSchema)).map { constructedType =>
val nullability = isFieldNullable(constructedType.nullability, enforceValuePresence)
Field(name, constructedType.value, nullability)
}

Field(name, Type.Array(constructedType.value, itemNullability), nullability)
}
def buildRepeated(name: String, itemSchema: Schema, enforceItemPresence: Boolean, nullability: Type.Nullability): Option[Field] =
buildType(Mutate.forStorage(itemSchema)).map { constructedType =>
val itemNullability = isFieldNullable(constructedType.nullability, enforceItemPresence)
Field(name, Type.Array(constructedType.value, itemNullability), nullability)
}

def normalize(field: Field): Field = {
val fieldType = field.fieldType match {
Expand All @@ -53,20 +54,18 @@ object Field {
field.copy(name = normalizeName(field), fieldType = fieldType)
}

private def collapseDuplicateFields(normFields: List[Field]): List[Field] = {
private def collapseDuplicateFields(normFields: NonEmptyList[Field]): NonEmptyList[Field] = {
val endMap = normFields
.groupBy(_.name)
.map { case (key, fs) =>
// Use `min` to deterministically pick the same accessor each time when choosing the type
val lowest = fs.minBy(f => f.accessors.min)
(key, lowest.copy(accessors = fs.flatMap(_.accessors).toSet))
val lowest = fs.minimumBy(f => f.accessors.min)
(key, lowest.copy(accessors = fs.iterator.flatMap(_.accessors).toSet))
}
normFields
.map(_.name)
.distinct
.foldLeft(List.empty[Field])(
(acc, name) => acc :+ endMap(name)
)
.map(endMap(_))
}

private[parquet] def normalizeName(field: Field): String =
Expand Down Expand Up @@ -100,55 +99,69 @@ object Field {
}
}

private def buildType(topSchema: Schema): NullableType = {
private def buildType(topSchema: Schema): Option[NullableType] = {
topSchema.`type` match {
case Some(types) if types.possiblyWithNull(CommonProperties.Type.Object) =>
NullableType(
value = buildObjectType(topSchema),
nullability = JsonNullability.extractFrom(types)
)
buildObjectType(topSchema).map { objectType =>
NullableType(
value = objectType,
nullability = JsonNullability.extractFrom(types)
)
}

case Some(types) if types.possiblyWithNull(CommonProperties.Type.Array) =>
NullableType(
value = buildArrayType(topSchema),
nullability = JsonNullability.extractFrom(types)
)
buildArrayType(topSchema).map { arrayType =>
NullableType(
value = arrayType,
nullability = JsonNullability.extractFrom(types)
)
}

case _ =>
provideSuggestions(topSchema) match {
case Some(matchingSuggestion) => matchingSuggestion
case None => jsonType(topSchema)
case Some(matchingSuggestion) => Some(matchingSuggestion)
case None => Some(jsonType(topSchema))
}
}
}

private def buildObjectType(topSchema: Schema): Type = {
val subfields = topSchema.properties.map(_.value).getOrElse(Map.empty)
if (subfields.nonEmpty) {
val requiredKeys = topSchema.required.toList.flatMap(_.value)
val structFields = subfields
.toList
.map { case (key, schema) =>
val isSubfieldRequired = requiredKeys.contains(key)
val constructedType = buildType(schema)
val nullability = isFieldNullable(constructedType.nullability, isSubfieldRequired)
Field(key, constructedType.value, nullability)
private def buildObjectType(topSchema: Schema): Option[Type] = {
val requiredKeys = topSchema.required.toList.flatMap(_.value)
val subfields = topSchema.properties
.iterator
.flatMap(_.value.toList)
.flatMap { case (key, schema) =>
buildType(schema).map(key -> _)
}.map { case (key, constructedType) =>
val isSubfieldRequired = requiredKeys.contains(key)
val nullability = isFieldNullable(constructedType.nullability, isSubfieldRequired)
Field(key, constructedType.value, nullability)
}
.toList
.sortBy(_.name)

NonEmptyList.fromList(subfields) match {
case Some(nel) =>
Some(Type.Struct(nel))
case None =>
topSchema.additionalProperties match {
case Some(ObjectProperty.AdditionalProperties.AdditionalPropertiesAllowed(false)) =>
None
case _ =>
Some(Type.Json)
}
.sortBy(_.name)
Type.Struct(structFields)
} else {
Type.Json
}
}

private def buildArrayType(topSchema: Schema): Type.Array = {
private def buildArrayType(topSchema: Schema): Option[Type.Array] = {
topSchema.items match {
case Some(ArrayProperty.Items.ListItems(schema)) =>
val typeOfArrayItem = buildType(schema)
val nullability = isFieldNullable(typeOfArrayItem.nullability, true)
Type.Array(typeOfArrayItem.value, nullability)
buildType(schema).map { typeOfArrayItem =>
val nullability = isFieldNullable(typeOfArrayItem.nullability, true)
Type.Array(typeOfArrayItem.value, nullability)
}
case _ =>
Type.Array(Type.Json, Type.Nullability.Nullable)
Some(Type.Array(Type.Json, Type.Nullability.Nullable))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import io.circe._
import cats.data.NonEmptyList

import java.time.{Instant, LocalDate}

Expand Down Expand Up @@ -49,9 +50,9 @@ object FieldValue {
DecimalValue(BigDecimal(unscaled, details.scale), details.precision)
def dateValue(v: LocalDate): FieldValue = DateValue(java.sql.Date.valueOf(v))
def timestampValue(v: Instant): FieldValue = TimestampValue(java.sql.Timestamp.from(v))
def structValue(vs: List[Caster.NamedValue[FieldValue]]): FieldValue =
def structValue(vs: NonEmptyList[Caster.NamedValue[FieldValue]]): FieldValue =
StructValue {
vs.map {
vs.toList.map {
case Caster.NamedValue(n, v) => NamedValue(n, v)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.Show
import cats.data.NonEmptyList
import cats.syntax.all._
import com.snowplowanalytics.iglu.schemaddl.parquet.Type.{Array, Struct}

Expand Down Expand Up @@ -85,16 +86,16 @@ object Migrations {
// Comparing struct target fields to the source. This will detect additions.
val reverseMigration = targetFields.map(tgtField => MigrationFieldPair(tgtField.name :: path, tgtField, sourceStruct.focus(tgtField.name)).migrations)

migrations ++= forwardMigration.flatMap(_.migrations)
migrations ++= forwardMigration.iterator.flatMap(_.migrations)

migrations ++= reverseMigration.flatMap(_.migrations.flatMap {
migrations ++= reverseMigration.iterator.flatMap(_.migrations.flatMap {
case KeyRemoval(path, value) => List(KeyAddition(path, value))
case _ => Nil // discard the modifications as they would have been detected in forward migration
})

val tgtFields = reverseMigration.traverse(_.result).toList.flatten
val tgtFields = reverseMigration.toList.traverse(_.result).toList.flatten
val tgtFieldNames = tgtFields.map(_.name)
val allSrcFields = forwardMigration.traverse(_.result).toList.flatten
val allSrcFields = forwardMigration.toList.traverse(_.result).toList.flatten
val allSrcFieldMap = allSrcFields.map(f => f.name -> f).toMap
// swap fields in src and target as they would be rearranged in nested structs or arrays
val reorderedTgtFields = tgtFields.map { t =>
Expand All @@ -104,13 +105,15 @@ object Migrations {
case _ => t
}
}
val srcFields = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
val srcFields: List[Field] = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
// drop not null constrains from removed fields.
_.copy(nullability = Type.Nullability.Nullable)
)

// failed migration would produce no fields in source
if (allSrcFields.isEmpty) None else Type.Struct(reorderedTgtFields ++ srcFields).some
NonEmptyList.fromList(reorderedTgtFields ::: srcFields).map { nonEmpty =>
Type.Struct(nonEmpty)
}

case _ => addIncompatibleType()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.iglu.schemaddl.parquet

import cats.Eq
import cats.data.NonEmptyList

sealed trait Type extends Product with Serializable

Expand All @@ -26,7 +27,7 @@ object Type {
case class Decimal(precision: DecimalPrecision, scale: Int) extends Type
case object Date extends Type
case object Timestamp extends Type
case class Struct(fields: List[Field]) extends Type
case class Struct(fields: NonEmptyList[Field]) extends Type
case class Array(element: Type, nullability: Nullability) extends Type

/* Fallback type for when json schema does not map to a parquet primitive type (e.g. unions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,7 @@ class MutateSpec extends org.specs2.Specification {
val expected = SpecHelpers.parseSchema(
"""
|{
|"type": "object",
|"additionalProperties": false
|"type": "object"
|}
""".stripMargin)
Mutate.forStorage(input) must_== expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""

def e5 = {
val inputJson = json"""{"foo": 42, "bar": true}"""
val inputField = Type.Struct(List(
val inputField = Type.Struct(NonEmptyList.of(
Field("foo", Type.Integer, Nullable),
Field("bar", Type.Boolean, Required)))

Expand All @@ -100,7 +100,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""

def e6 = {
val inputJson = json"""{"bar": true}"""
val inputField = Type.Struct(List(
val inputField = Type.Struct(NonEmptyList.of(
Field("foo", Type.Integer, Nullable),
Field("bar", Type.Boolean, Required)))

Expand Down Expand Up @@ -135,7 +135,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
"undefined": 42
}"""

val inputField = Type.Struct(List(
val inputField = Type.Struct(NonEmptyList.of(
Field("someBool", Type.Boolean, Required),
Field("repeatedInt", Type.Array(Type.Integer, Required), Required)))

Expand All @@ -158,13 +158,13 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
}
}"""

val inputField = Type.Struct(List(
val inputField = Type.Struct(NonEmptyList.of(
Field("someBool", Type.Boolean, Required),
Field("nested", Type.Struct(List(
Field("nested", Type.Struct(NonEmptyList.of(
Field("str", Type.String, Required),
Field("int", Type.Integer, Nullable),
Field("deep", Type.Struct(List(Field("str", Type.String, Nullable))), Required),
Field("arr", Type.Array(Type.Struct(List(Field("a", Type.String, Required))), Required), Required)
Field("deep", Type.Struct(NonEmptyList.of(Field("str", Type.String, Nullable))), Required),
Field("arr", Type.Array(Type.Struct(NonEmptyList.of(Field("a", Type.String, Required))), Required), Required)
)), Nullable)
))

Expand All @@ -190,7 +190,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
"optional": null
}"""

val inputField = Type.Struct(List(Field("optional", Type.String, Nullable)))
val inputField = Type.Struct(NonEmptyList.of(Field("optional", Type.String, Nullable)))

val expected = StructValue(List(
NamedValue("optional", NullValue)
Expand All @@ -204,7 +204,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
"unionType": ["this", "is", "fallback", "strategy"]
}"""

val inputField = Type.Struct(List(Field("unionType", Type.Json, Nullable)))
val inputField = Type.Struct(NonEmptyList.of(Field("unionType", Type.Json, Nullable)))

val expected = StructValue(List(
NamedValue("union_type", JsonValue(json"""["this","is","fallback","strategy"]"""))
Expand Down Expand Up @@ -269,7 +269,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""
}

def e16 = {
val inputField = Type.Struct(List(
val inputField = Type.Struct(NonEmptyList.of(
Field("xyz", Type.Integer, Nullable, Set("xyz", "XYZ"))))

List(
Expand All @@ -288,7 +288,7 @@ class CasterSpec extends org.specs2.Specification { def is = s2"""

def e17 = {
val inputJson = json"""[{"id": null}]"""
val fieldType = Type.Array(Type.Struct(List(Field("id", Type.String, Required, Set("id")))), Required)
val fieldType = Type.Array(Type.Struct(NonEmptyList.of(Field("id", Type.String, Required, Set("id")))), Required)

val expected = NonEmptyList.one(WrongType(Json.Null, Type.String))
Caster.cast(caster, Field("top", fieldType, Nullable), inputJson) must beInvalid(expected)
Expand Down
Loading

0 comments on commit 662512a

Please sign in to comment.