Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column data for a dataset doesn't seem to be captured right #2968

Open
Doehla opened this issue Nov 4, 2024 · 1 comment
Open

Column data for a dataset doesn't seem to be captured right #2968

Doehla opened this issue Nov 4, 2024 · 1 comment

Comments

@Doehla
Copy link

Doehla commented Nov 4, 2024

Observed something in the UI and upon looking at the endpoint response think this highlights something more.

Steps to reproduce:
  • Ran a script that created a DatasetEvent (from OpenLineage which reported the Option[T] data types. This event was manually emitted to the Marquez endpoint
  • Ran a spark job with OpenLineage's OpenLineageSparkListener to do some small data operation which appears to have reported the non-Option[T] data types. These events were automatically emitted to the Marquez endpoint.
Observations:

Both the Option[T] and non-Option[T] data types appear to be associated with the same schema definition and columns -- they were just additive -- not sure if this was intentional or not:
image

Only 1 reported schema exists:
image

In looking at the json payload that captures the table data we retrieve this:

{
  "id": {
    "namespace": "file",
    "name": "/opt/spark/data/sensor_data.json"
  },
  "type": "DB_TABLE",
  "name": "/opt/spark/data/sensor_data.json",
  "physicalName": "/opt/spark/data/sensor_data.json",
  "createdAt": "2024-11-01T19:38:51.440991Z",
  "updatedAt": "2024-11-01T19:39:43.043Z",
  "namespace": "file",
  "sourceName": "/opt/spark/data/sensor_data.json",
  "fields": [
    {
      "name": "id",
      "type": "Option[long]",
      "tags": [],
      "description": null
    },
    {
      "name": "location",
      "type": "Option[struct]",
      "tags": [],
      "description": null
    },
    {
      "name": "readings",
      "type": "Option[struct]",
      "tags": [],
      "description": null
    },
    {
      "name": "sensor_type",
      "type": "Option[string]",
      "tags": [],
      "description": null
    },
    {
      "name": "status",
      "type": "Option[string]",
      "tags": [],
      "description": null
    },
    {
      "name": "timestamp",
      "type": "Option[string]",
      "tags": [],
      "description": null
    },
    {
      "name": "id",
      "type": "long",
      "tags": [],
      "description": null
    },
    {
      "name": "location",
      "type": "struct",
      "tags": [],
      "description": null
    },
    {
      "name": "readings",
      "type": "struct",
      "tags": [],
      "description": null
    },
    {
      "name": "sensor_type",
      "type": "string",
      "tags": [],
      "description": null
    },
    {
      "name": "status",
      "type": "string",
      "tags": [],
      "description": null
    },
    {
      "name": "timestamp",
      "type": "string",
      "tags": [],
      "description": null
    }
  ],
  "tags": [],
  "lastModifiedAt": null,
  "lastLifecycleState": "CREATE",
  "description": null,
  "currentVersion": "6ef09594-6305-4a39-88f5-e6f48fff5f2c",
  "columnLineage": null,
  "facets": {
    "schema": {
      "fields": [
        {
          "name": "id",
          "type": "long"
        },
        {
          "name": "location",
          "type": "struct",
          "fields": [
            {
              "name": "altitude",
              "type": "double"
            },
            {
              "name": "latitude",
              "type": "double"
            },
            {
              "name": "longitude",
              "type": "double"
            }
          ]
        },
        {
          "name": "readings",
          "type": "struct",
          "fields": [
            {
              "name": "humidity",
              "type": "double"
            },
            {
              "name": "other_data",
              "type": "struct",
              "fields": [
                {
                  "name": "current",
                  "type": "double"
                },
                {
                  "name": "signal_strength",
                  "type": "long"
                },
                {
                  "name": "voltage",
                  "type": "double"
                }
              ]
            },
            {
              "name": "pressure",
              "type": "double"
            },
            {
              "name": "temperature",
              "type": "double"
            }
          ]
        },
        {
          "name": "sensor_type",
          "type": "string"
        },
        {
          "name": "status",
          "type": "string"
        },
        {
          "name": "timestamp",
          "type": "string"
        }
      ],
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
      "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet"
    },
    "dataSource": {
      "uri": "file",
      "name": "file",
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
      "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet"
    },
    "lifecycleStateChange": {
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/client/python",
      "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet",
      "lifecycleStateChange": "CREATE"
    },
    "file": {
      "hash": "150cd27894500bc7b2ec5dae2a664512",
      "Creation Date": "2024-10-23T22:23:48.447890",
      "Modified Date": "2024-09-30T20:48:42.520602",
      "File Size (Mb)": 0.0795145034790039,
      "Last Access Date": "2024-10-25T17:14:03.251640"
    },
    "ownership": {
      "owners": [
        {
          "name": "XXX"
        }
      ],
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/client/python",
      "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/OwnershipDatasetFacet.json#/$defs/OwnershipDatasetFacet"
    }
  },
  "deleted": false
}

Here, the schema facet was updated from the first supplied value to the new value (while keeping the other facets that were not sent automatically from spark intact -- which is great so we can keep those). The overwriting of the schema I can understand as that was provided from the spark job too. Would be great if the nullable flag was captured in the data types from the event emitted from OpenLineage -- the consideration for where the Option[T] types came from for the first call. However, the fields data has been added together, even with the same column names being used. Interestingly though the graph visual doesn't have these repeated.

What I would expect:

As the schema is being overwritten would think the fields would be updated too rather than added together. With the data types changing I could see this being captured in some versioned change. Perhaps though there might be some desire to have some additive data capture present within the code to merge schemas together in case multiple events are omitted that both contain partial schema definitions?

Think that the correct path forward with where things stand presently though is that we need to ignore the nullable flag on the data and not make Option[T] data types to represent this. Would be nice to have the possibility of missing data though captured in the schema.

Copy link

boring-cyborg bot commented Nov 4, 2024

Thanks for opening your first issue in the Marquez project! Please be sure to follow the issue template!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant