Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide simple implementation of one-level lineage optimized for parent jobs #2657

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
15 changes: 15 additions & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,21 @@ public Response getLineage(
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage/direct")
public Response getDirectLineage(@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mind updating openapi.yaml and changeling

if (!parentJobNodeId.isJobType()) {
throw new IllegalArgumentException("Only job expected, got " + parentJobNodeId.getValue());
}
throwIfNotExists(parentJobNodeId);
return Response.ok(lineageService.parentDirectLineage(parentJobNodeId.asJobId())).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static String stringOrNull(final ResultSet results, final String column)
public static String stringOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("no column found for " + column);
}
return results.getString(column);
}
Expand Down
42 changes: 42 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
import marquez.db.mappers.DatasetDataMapper;
import marquez.db.mappers.DirectLineageEdgeMapper;
import marquez.db.mappers.JobDataMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
Expand All @@ -25,8 +28,19 @@
@RegisterRowMapper(JobDataMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(DirectLineageEdgeMapper.class)
public interface LineageDao {

public record DirectLineage(Collection<DirectLineageEdge> edges) {}

public record DirectLineageEdge(
JobId job1,
String direction,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using existing IOType enum? It took me some time to understand what direction is.
Would it make sense to replace job1,job2 with job, upstreamJob?

DatasetId dataset,
String direction2,
JobId job2,
JobId job2parent) {}

/**
* Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the
* input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have
Expand Down Expand Up @@ -79,6 +93,34 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

/**
* 1 level of lineage for all the children jobs of the given parent
*
* @param parentJobNamespace the namespace of the parent
* @param parentJobName the name of the parent
* @return edges form job to dataset to job
*/
@SqlQuery(
"""
SELECT
jobs.namespace_name AS job_namespace, jobs."name" AS job_name,
jvim.io_type AS io1,
d.namespace_name AS ds_namespace, d."name" AS ds_name,
jvim2.io_type AS io2,
jv2.namespace_name AS job2_namespace, jv2.job_name AS job2_name,
jv2.namespace_name AS job2_parent_namespace, j2.parent_job_name AS job2_parent_name
FROM jobs_view jobs
INNER JOIN job_versions jv ON jv.uuid = jobs.current_version_uuid
LEFT JOIN job_versions_io_mapping jvim ON jvim.job_version_uuid = jobs.current_version_uuid
LEFT JOIN datasets d ON d.uuid = jvim.dataset_uuid
LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid AND jvim2.io_type <> jvim.io_type
LEFT JOIN job_versions jv2 ON jv2.uuid = jvim2.job_version_uuid
LEFT JOIN jobs_view j2 ON jv2.job_uuid = j2.uuid
WHERE jobs.namespace_name = :parentJobNamespace AND jobs.parent_job_name = :parentJobName ;
""")
Collection<DirectLineageEdge> getDirectLineageFromParent(
String parentJobNamespace, String parentJobName);

@SqlQuery(
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
Expand Down
54 changes: 54 additions & 0 deletions api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/
package marquez.db.mappers;

import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.db.LineageDao.DirectLineageEdge;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

/** Maps the result set of direct lineage to a DirectLineageEdge */
public final class DirectLineageEdgeMapper implements RowMapper<DirectLineageEdge> {
@Override
public DirectLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
JobId job1 =
JobId.of(
NamespaceName.of(stringOrThrow(results, "job_namespace")),
JobName.of(stringOrThrow(results, "job_name")));
String io1 = stringOrNull(results, "io1");
String ds_namespace = stringOrNull(results, "ds_namespace");
DatasetId ds =
ds_namespace == null
? null
: new DatasetId(
NamespaceName.of(ds_namespace), DatasetName.of(stringOrNull(results, "ds_name")));
String io2 = stringOrNull(results, "io2");
String job2_namespace = stringOrNull(results, "job2_namespace");
JobId job2 =
job2_namespace == null
? null
: JobId.of(
NamespaceName.of(job2_namespace), JobName.of(stringOrThrow(results, "job2_name")));
String job2parent_namespace = stringOrNull(results, "job2_parent_namespace");
JobId job2parent =
job2parent_namespace == null
? null
: JobId.of(
NamespaceName.of(job2parent_namespace),
JobName.of(stringOrThrow(results, "job2_parent_name")));
return new DirectLineageEdge(job1, io1, ds, io2, job2, job2parent);
}
}
80 changes: 80 additions & 0 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

package marquez.service;

import static java.util.stream.Collectors.filtering;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -41,13 +47,87 @@

@Slf4j
public class LineageService extends DelegatingLineageDao {

public record JobWithParent(JobId job, JobId parent) {}

public record DatasetLineage(
DatasetId dataset,
Collection<JobWithParent> consumers,
Collection<JobWithParent> producers) {}

public record ChildLineage(
JobId job, Collection<DatasetLineage> inputs, Collection<DatasetLineage> outputs) {}

public record ParentLineage(JobId parent, Collection<ChildLineage> children) {}

private final JobDao jobDao;

public LineageService(LineageDao delegate, JobDao jobDao) {
super(delegate);
this.jobDao = jobDao;
}

/**
* This method is specialized for returning one level of lineage from a parent job. It finds all
* the children of the provided parent node It then finds the input and output datasets those
* children write to. It finally returns the other jobs consuming or producing those datasets (and
* their parent).
*
* @param parentJobId the parent job
* @return 1 level of lineage for all the children jobs of the given parent
*/
public ParentLineage parentDirectLineage(JobId parentJobId) {
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
log.debug("Attempting to get lineage for parent job '{}'", parentJobId);

Collection<DirectLineageEdge> directLineageFromParent =
getDirectLineageFromParent(
parentJobId.getNamespace().getValue(), parentJobId.getName().getValue());

Map<JobId, Map<String, Map<DatasetId, Map<String, List<JobWithParent>>>>> grouped =
directLineageFromParent.stream()
.collect(
groupingBy(
DirectLineageEdge::job1,
filtering(
e -> e.direction() != null,
groupingBy(
DirectLineageEdge::direction,
filtering(
e -> e.dataset() != null,
groupingBy(
DirectLineageEdge::dataset,
filtering(
e -> e.direction2() != null,
groupingBy(
DirectLineageEdge::direction2,
mapping(
e -> new JobWithParent(e.job2(), e.job2parent()),
toList())))))))));

List<ChildLineage> children =
grouped.entrySet().stream()
.map(
e ->
new ChildLineage(
e.getKey(),
toDatasetLineages(e.getValue().get("INPUT")),
toDatasetLineages(e.getValue().get("OUTPUT"))))
.collect(toList());
return new ParentLineage(parentJobId, children);
}

private Collection<DatasetLineage> toDatasetLineages(
Map<DatasetId, Map<String, List<JobWithParent>>> datasets) {
return datasets == null
? null
: datasets.entrySet().stream()
.map(
e ->
new DatasetLineage(
e.getKey(), e.getValue().get("INPUT"), e.getValue().get("OUTPUT")))
.collect(toList());
}

// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public JavaType typeFromId(DatabindContext context, String id) throws IOExceptio
.filter(s -> s.getName().equals(type))
.findAny()
.map(EventSchemaURL::getSubType)
.map(p -> (Class) p)
.orElse(LINEAGE_EVENT.subType);

return context.constructSpecializedType(superType, subType);
Expand Down
18 changes: 18 additions & 0 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package marquez;

import static java.util.Arrays.asList;
import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static org.assertj.core.api.Assertions.as;
Expand Down Expand Up @@ -47,6 +48,7 @@
import lombok.extern.slf4j.Slf4j;
import marquez.api.JdbiUtils;
import marquez.client.MarquezClient;
import marquez.client.MarquezClient.ParentLineage;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetVersion;
import marquez.client.models.Job;
Expand Down Expand Up @@ -316,6 +318,14 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet()
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);

ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName));
assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME);
assertThat(directLineage.parent().getName()).isEqualTo(dagName);
assertThat(directLineage.children()).size().isEqualTo(2);

assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList())
.isEqualTo(asList("the_dag.task1", "the_dag.task2"));
}

@Test
Expand Down Expand Up @@ -390,6 +400,14 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);

ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName));
assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME);
assertThat(directLineage.parent().getName()).isEqualTo(dagName);
assertThat(directLineage.children()).size().isEqualTo(2);

assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList())
.isEqualTo(asList("the_dag.task1", "the_dag.task2"));
}

@Test
Expand Down
18 changes: 18 additions & 0 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import static marquez.db.LineageTestUtils.newDatasetFacet;
import static marquez.db.LineageTestUtils.writeDownstreamLineage;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import com.google.common.base.Functions;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
Expand All @@ -29,6 +32,7 @@
import java.util.stream.Stream;
import marquez.api.JdbiUtils;
import marquez.common.models.JobType;
import marquez.db.LineageDao.DirectLineageEdge;
import marquez.db.LineageTestUtils.DatasetConsumerJob;
import marquez.db.LineageTestUtils.JobLineage;
import marquez.db.models.JobRow;
Expand Down Expand Up @@ -165,6 +169,12 @@ public void testGetLineage() {
.containsAll(
expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
}

Collection<DirectLineageEdge> FromParent =
lineageDao.getDirectLineageFromParent(
disjointJob.getJob().getNamespaceName(), disjointJob.getJob().getName());
assertNotNull(FromParent);
assertTrue(FromParent.toString(), FromParent.size() == 0);
}

@Test
Expand Down Expand Up @@ -311,6 +321,14 @@ public void testGetLineageWithJobThatHasNoDownstreamConsumers() {
assertThat(lineage).hasSize(1).contains(writeJob.getJob().getUuid());
}

@Test
public void testGetFromParent() {
FacetTestUtils.createLineageWithFacets(openLineageDao);
Collection<DirectLineageEdge> FromParent =
lineageDao.getDirectLineageFromParent("namespace", "name");
assertTrue(FromParent.toString(), FromParent.size() == 2);
}

@Test
public void testGetLineageWithJobThatHasNoDatasets() {

Expand Down
Loading
Loading