Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide simple implementation of one-level lineage optimized for parent jobs #2657

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
16 changes: 16 additions & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ public Response getLineage(
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage/direct")
public Response getSimpleLineage(
@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) {
if (!parentJobNodeId.isJobType()) {
throw new IllegalArgumentException("Only job expected, got " + parentJobNodeId.getValue());
}
throwIfNotExists(parentJobNodeId);
return Response.ok(lineageService.parentDirectLineage(parentJobNodeId.asJobId())).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static String stringOrNull(final ResultSet results, final String column)
public static String stringOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("no column found for " + column);
}
return results.getString(column);
}
Expand Down
57 changes: 54 additions & 3 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,42 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
import marquez.db.mappers.DatasetDataMapper;
import marquez.db.mappers.JobDataMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.mappers.SimpleLineageEdgeMapper;
import marquez.service.models.DatasetData;
import marquez.service.models.JobData;
import marquez.service.models.Run;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

@RegisterRowMapper(DatasetDataMapper.class)
@RegisterRowMapper(JobDataMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(SimpleLineageEdgeMapper.class)
public interface LineageDao {

public record SimpleLineage(Collection<SimpleLineageEdge> edges) {
}

public record SimpleLineageEdge(
JobId job1,
String direction,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using existing IOType enum? It took me some time to understand what direction is.
Would it make sense to replace job1,job2 with job, upstreamJob?

DatasetId dataset,
String direction2,
JobId job2,
JobId job2parent
) {

}
/**
* Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the
* input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have
Expand Down Expand Up @@ -79,6 +98,38 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

/**
* 1 level of lineage for all the children jobs of the given parent
*
* @param parentJobNamespace the namespace of the parent
* @param parentJobName the name of the parent
* @return edges form job to dataset to job
*/
@SqlQuery(
"""
SELECT
jobs.namespace_name AS job_namespace, jobs.simple_name AS job_name,
jvim.io_type AS io1,
d.namespace_name AS ds_namespace, d."name" AS ds_name,
jvim2.io_type AS io2,
jv2.namespace_name AS job2_namespace, jv2.job_name AS job2_name,
pj.namespace_name AS job2_parent_namespace, pj.simple_name AS job2_parent_name
FROM jobs
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
INNER JOIN job_versions jv ON jv.uuid = jobs.current_version_uuid
LEFT JOIN job_versions_io_mapping jvim ON jvim.job_version_uuid = jobs.current_version_uuid
LEFT JOIN datasets d ON d.uuid = jvim.dataset_uuid
LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid and jvim2.io_type <> jvim.io_type
LEFT JOIN job_versions jv2 ON jv2.uuid = jvim2.job_version_uuid
LEFT JOIN jobs j2 ON jv2.job_uuid = j2.uuid
LEFT JOIN jobs pj ON j2.parent_job_uuid = pj.uuid
WHERE jobs.parent_job_uuid IN (
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
SELECT uuid AS parent_job_uuid
FROM jobs
WHERE namespace_name=:parentJobNamespace and simple_name=:parentJobName
);
""")
Collection<SimpleLineageEdge> getDirectLineageFromParent(String parentJobNamespace, String parentJobName);

@SqlQuery(
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
Expand Down
36 changes: 36 additions & 0 deletions api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package marquez.db.mappers;

import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

import lombok.NonNull;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.db.LineageDao.SimpleLineageEdge;

public final class SimpleLineageEdgeMapper implements RowMapper<SimpleLineageEdge> {
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
@Override
public SimpleLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
JobId job1 = JobId.of(NamespaceName.of(stringOrThrow(results, "job_namespace")), JobName.of(stringOrThrow(results, "job_name")));
String io1 = stringOrNull(results, "io1");
String ds_namespace = stringOrNull(results, "ds_namespace");
DatasetId ds = ds_namespace == null ? null : new DatasetId(NamespaceName.of(ds_namespace), DatasetName.of(stringOrNull(results, "ds_name")));
String io2 = stringOrNull(results, "io2");
String job2_namespace = stringOrNull(results, "job2_namespace");
JobId job2 = job2_namespace == null ? null : JobId.of(NamespaceName.of(job2_namespace), JobName.of(stringOrThrow(results, "job2_name")));
String job2parent_namespace = stringOrNull(results, "job2_parent_namespace");
JobId job2parent = job2parent_namespace == null ? null : JobId.of(NamespaceName.of(job2parent_namespace), JobName.of(stringOrThrow(results, "job2_parent_name")));
return new SimpleLineageEdge(job1, io1, ds, io2, job2, job2parent);
}
}

76 changes: 72 additions & 4 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package marquez.service;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import static java.util.stream.Collectors.filtering;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -21,6 +23,12 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetId;
Expand All @@ -41,13 +49,73 @@

@Slf4j
public class LineageService extends DelegatingLineageDao {

public record JobWithParent(JobId job, JobId parent) {
}

public record DatasetLineage(DatasetId dataset, Collection<JobWithParent> consumers, Collection<JobWithParent> producers) {
}

public record ChildLineage(JobId job, Collection<DatasetLineage> inputs, Collection<DatasetLineage> outputs) {
}

public record ParentLineage(JobId parent, Collection<ChildLineage> children) {
}

private final JobDao jobDao;

public LineageService(LineageDao delegate, JobDao jobDao) {
super(delegate);
this.jobDao = jobDao;
}

/**
* This method is specialized for returning one level of lineage from a parent job.
* It finds all the children of the provided parent node
* It then finds the input and output datasets those children write to.
* It finally returns the other jobs consuming or producing those datasets (and their parent).
* @param parentJobId the parent job
* @return 1 level of lineage for all the children jobs of the given parent
*/
public ParentLineage parentDirectLineage(JobId parentJobId) {
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
log.debug("Attempting to get lineage for parent job '{}'", parentJobId);

Collection<SimpleLineageEdge> directLineageFromParent =
getDirectLineageFromParent(parentJobId.getNamespace().getValue(), parentJobId.getName().getValue());


Map<JobId, Map<String, Map<DatasetId, Map<String, List<JobWithParent>>>>> grouped =
directLineageFromParent.stream().collect(
groupingBy(SimpleLineageEdge::job1,
filtering(e -> e.direction() != null,
groupingBy(SimpleLineageEdge::direction,
filtering(e -> e.dataset() != null,
groupingBy(SimpleLineageEdge::dataset,
filtering(e -> e.direction2() != null,
groupingBy(SimpleLineageEdge::direction2,
mapping(e -> new JobWithParent(e.job2(), e.job2parent()),
toList())))))))));

List<ChildLineage> children = grouped.entrySet().stream().map(
e -> new ChildLineage(
e.getKey(),
toDatasetLineages(e.getValue().get("INPUT")),
toDatasetLineages(e.getValue().get("OUTPUT"))
)
).collect(toList());
return new ParentLineage(parentJobId, children);
}

private Collection<DatasetLineage> toDatasetLineages(Map<DatasetId, Map<String, List<JobWithParent>>> datasets) {
return datasets == null ? null : datasets.entrySet().stream().map(
e -> new DatasetLineage(
e.getKey(),
e.getValue().get("INPUT"),
e.getValue().get("OUTPUT")
)
).collect(toList());
}

// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public JavaType typeFromId(DatabindContext context, String id) throws IOExceptio
.filter(s -> s.getName().equals(type))
.findAny()
.map(EventSchemaURL::getSubType)
.map(p -> (Class)p)
.orElse(LINEAGE_EVENT.subType);

return context.constructSpecializedType(superType, subType);
Expand Down
Loading
Loading