Skip to content

Commit

Permalink
fixing integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Julien Le Dem <[email protected]>
  • Loading branch information
julienledem committed Nov 2, 2023
1 parent 95d43e4 commit 53ff595
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 74 deletions.
58 changes: 14 additions & 44 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

package marquez;

import static java.util.Arrays.asList;
import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -27,9 +27,7 @@
import io.openlineage.client.OpenLineage.RunFacetsBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -50,19 +48,17 @@
import lombok.extern.slf4j.Slf4j;
import marquez.api.JdbiUtils;
import marquez.client.MarquezClient;
import marquez.client.MarquezClient.ParentLineage;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetVersion;
import marquez.client.models.Job;
import marquez.client.models.JobId;
import marquez.client.models.LineageEvent;
import marquez.client.models.Run;
import marquez.common.Utils;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.db.LineageTestUtils;
import marquez.service.models.DatasetEvent;
import marquez.service.models.JobEvent;
import marquez.service.models.NodeId;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jdbi.v3.core.Jdbi;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -323,26 +319,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet()
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);

marquez.common.models.JobId jobId =
new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName));
String nodeId = NodeId.of(jobId).getValue();
HttpRequest request =
HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId))
.header("Content-Type", "application/json")
.GET()
.build();

HttpResponse<String> resp;
try {
resp = http2.send(request, BodyHandlers.ofString());
ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName));
assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME);
assertThat(directLineage.parent().getName()).isEqualTo(dagName);
assertThat(directLineage.children()).size().isEqualTo(2);

assertEquals(200, resp.statusCode(), resp.body());
assertTrue(resp.body().contains("task1"), resp.body());
assertTrue(resp.body().contains("task2"), resp.body());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList())
.isEqualTo(asList("the_dag.task1", "the_dag.task2"));
}

@Test
Expand Down Expand Up @@ -418,26 +401,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);

marquez.common.models.JobId jobId =
new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName));
String nodeId = NodeId.of(jobId).getValue();
HttpRequest request =
HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId))
.header("Content-Type", "application/json")
.GET()
.build();

HttpResponse<String> resp;
try {
resp = http2.send(request, BodyHandlers.ofString());
ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName));
assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME);
assertThat(directLineage.parent().getName()).isEqualTo(dagName);
assertThat(directLineage.children()).size().isEqualTo(2);

assertEquals(200, resp.statusCode(), resp.body());
assertTrue(resp.body().contains("task1"), resp.body());
assertTrue(resp.body().contains("task2"), resp.body());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList())
.isEqualTo(asList("the_dag.task1", "the_dag.task2"));
}

@Test
Expand Down
1 change: 0 additions & 1 deletion api/src/test/java/marquez/db/LineageTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.validation.Valid;

import lombok.Builder;
import lombok.Value;
import marquez.common.Utils;
Expand Down
63 changes: 40 additions & 23 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package marquez.service;

import static marquez.db.LineageTestUtils.*;
import static marquez.db.LineageTestUtils.NAMESPACE;
import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static marquez.db.LineageTestUtils.newDatasetFacet;
import static marquez.db.LineageTestUtils.writeDownstreamLineage;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -17,7 +19,6 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import marquez.api.JdbiUtils;
import marquez.common.models.DatasetName;
import marquez.common.models.InputDatasetVersion;
Expand Down Expand Up @@ -53,7 +54,6 @@
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ObjectAssert;
import org.jdbi.v3.core.Jdbi;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -446,8 +446,18 @@ public void testLineageForOrphanedDataset() {
public void testParentLineage() {
String parentJobName1 = "parentJob1";
String parentJobName2 = "parentJob2";
ParentRunFacet parentRunFacet1 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build());
ParentRunFacet parentRunFacet2 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build());
ParentRunFacet parentRunFacet1 =
new ParentRunFacet(
PRODUCER_URL,
SCHEMA_URL,
new RunLink(UUID.randomUUID().toString()),
JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build());
ParentRunFacet parentRunFacet2 =
new ParentRunFacet(
PRODUCER_URL,
SCHEMA_URL,
new RunLink(UUID.randomUUID().toString()),
JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build());
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao,
Expand All @@ -470,27 +480,34 @@ public void testParentLineage() {
parentRunFacet2);

ParentLineage parentLineage =
lineageService.parentDirectLineage(JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1)));
lineageService.parentDirectLineage(
JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1)));
assertEquals(NAMESPACE, parentLineage.parent().getNamespace().getValue());
assertEquals(parentJobName1, parentLineage.parent().getName().getValue());
assertEquals(1, parentLineage.children().size());
parentLineage.children().forEach(
c -> {
assertEquals("parentJob1.writeJob", c.job().getName().getValue());
assertNull(c.inputs());
c.outputs().forEach(
i -> {
assertEquals(dataset.getName(), i.dataset().getName().getValue());
i.consumers().forEach( co -> {
assertThat(co.job().getName().getValue()).matches("parentJob2.readJob.*<-commonDataset");
assertThat(co.parent().getName().getValue()).isEqualTo("parentJob2");
// we don't go further than one level and don't see downstreamJob and finalConsumer
});
assertNull(i.producers());
}
);
}
);
parentLineage
.children()
.forEach(
c -> {
assertEquals("parentJob1.writeJob", c.job().getName().getValue());
assertNull(c.inputs());
c.outputs()
.forEach(
i -> {
assertEquals(dataset.getName(), i.dataset().getName().getValue());
i.consumers()
.forEach(
co -> {
assertThat(co.job().getName().getValue())
.matches("parentJob2.readJob.*<-commonDataset");
assertThat(co.parent().getName().getValue())
.isEqualTo("parentJob2");
// we don't go further than one level and don't see downstreamJob
// and finalConsumer
});
assertNull(i.producers());
});
});
}

private boolean jobNameEquals(Node node, String writeJob) {
Expand Down
24 changes: 24 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URL;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
Expand All @@ -35,9 +36,11 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetId;
import marquez.client.models.DatasetMeta;
import marquez.client.models.DatasetVersion;
import marquez.client.models.Job;
import marquez.client.models.JobId;
import marquez.client.models.JobMeta;
import marquez.client.models.JobVersion;
import marquez.client.models.LineageEvent;
Expand Down Expand Up @@ -125,6 +128,11 @@ public Lineage getLineage(NodeId nodeId, int depth) {
return Lineage.fromJson(bodyAsJson);
}

public ParentLineage getDirectLineage(JobId parentJobId) {
final String bodyAsJson = http.get(url.toDirectLineageUrl(parentJobId));
return ParentLineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineage(NodeId nodeId) {
return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}
Expand Down Expand Up @@ -703,4 +711,20 @@ String toJson() {
return Utils.toJson(this);
}
}

public record JobWithParent(JobId job, JobId parent) {}

public record DatasetLineage(
DatasetId dataset,
Collection<JobWithParent> consumers,
Collection<JobWithParent> producers) {}

public record ChildLineage(
JobId job, Collection<DatasetLineage> inputs, Collection<DatasetLineage> outputs) {}

public record ParentLineage(JobId parent, Collection<ChildLineage> children) {
static ParentLineage fromJson(final String json) {
return Utils.fromJson(json, new TypeReference<ParentLineage>() {});
}
}
}
4 changes: 4 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezPathV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ static String lineagePath() {
return path("/lineage/");
}

static String directLineagePath() {
return path("/lineage/direct");
}

static String columnLineagePath() {
return path("/column-lineage/");
}
Expand Down
25 changes: 19 additions & 6 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static marquez.client.MarquezPathV1.datasetPath;
import static marquez.client.MarquezPathV1.datasetTagPath;
import static marquez.client.MarquezPathV1.datasetVersionPath;
import static marquez.client.MarquezPathV1.directLineagePath;
import static marquez.client.MarquezPathV1.fieldTagPath;
import static marquez.client.MarquezPathV1.jobPath;
import static marquez.client.MarquezPathV1.jobVersionPath;
Expand All @@ -31,8 +32,6 @@
import static marquez.client.MarquezPathV1.searchPath;
import static marquez.client.MarquezPathV1.sourcePath;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -41,13 +40,20 @@
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.Nullable;

import org.apache.http.client.utils.URIBuilder;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;

import lombok.NonNull;
import marquez.client.models.JobId;
import marquez.client.models.NodeId;
import marquez.client.models.RunState;
import marquez.client.models.SearchFilter;
import marquez.client.models.SearchSort;
import org.apache.http.client.utils.URIBuilder;

class MarquezUrl {

Expand Down Expand Up @@ -197,7 +203,7 @@ URL toCreateTagsUrl(String name) {

URL toSearchUrl(
@NonNull String query, @Nullable SearchFilter filter, @Nullable SearchSort sort, int limit) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
queryParams.put("q", query);
if (filter != null) {
queryParams.put("filter", filter);
Expand All @@ -210,17 +216,24 @@ URL toSearchUrl(
}

URL toLineageUrl(NodeId nodeId, int depth) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
queryParams.put("nodeId", nodeId.getValue());
queryParams.put("depth", String.valueOf(depth));
return from(lineagePath(), queryParams.build());
}

URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
queryParams.put("nodeId", nodeId.getValue());
queryParams.put("depth", String.valueOf(depth));
queryParams.put("withDownstream", String.valueOf(withDownstream));
return from(columnLineagePath(), queryParams.build());
}

public URL toDirectLineageUrl(@NonNull JobId parentJobId) {
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
queryParams.put("parentJobNodeId", NodeId.of(parentJobId).getValue());
return from(directLineagePath(), queryParams.build());
}

}

0 comments on commit 53ff595

Please sign in to comment.