Skip to content

Commit

Permalink
Add JSON-ification to several foundational config-state representatio…
Browse files Browse the repository at this point in the history
…ns, plus encapsulated convience method `JobState.getJobIdFromProps`
  • Loading branch information
phet committed Oct 26, 2023
1 parent 0769c0c commit c09873c
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringWriter;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonWriter;

import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.Watermark;
Expand Down Expand Up @@ -365,6 +367,57 @@ public int hashCode() {
return result;
}

/** @return Stringified form, in pretty-printed JSON */
public String toJsonString() {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
this.toJson(jsonWriter);
} catch (IOException ioe) {
// Ignored
}
return stringWriter.toString();
}

public void toJson(JsonWriter jsonWriter) throws IOException {
jsonWriter.beginObject();

jsonWriter.name("id").value(this.getId());
jsonWriter.name("properties");
jsonWriter.beginObject();
for (String key : this.getPropertyNames()) {
jsonWriter.name(key).value(this.getProp(key));
}
jsonWriter.endObject();

jsonWriter.name("extract");
jsonWriter.beginObject();
jsonWriter.name("extractId").value(this.getExtract().getId());
jsonWriter.name("extractProperties");
jsonWriter.beginObject();
for (String key : this.getExtract().getPropertyNames()) {
jsonWriter.name(key).value(this.getExtract().getProp(key));
}
jsonWriter.endObject();

State prevTableState = this.getExtract().getPreviousTableState();
if (prevTableState != null) {
jsonWriter.name("extractPrevTableState");
jsonWriter.beginObject();
jsonWriter.name("prevStateId").value(prevTableState.getId());
jsonWriter.name("prevStateProperties");
jsonWriter.beginObject();
for (String key : prevTableState.getPropertyNames()) {
jsonWriter.name(key).value(prevTableState.getProp(key));
}
jsonWriter.endObject();
jsonWriter.endObject();
}
jsonWriter.endObject();

jsonWriter.endObject();
}

public String getOutputFilePath() {
// Search for the properties in the workunit.
// This search for the property first in State and then in the Extract of this workunit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.gobblin.data.management.copy;

import com.google.common.cache.Cache;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
import java.util.Map;

Expand All @@ -34,8 +34,10 @@

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.stream.JsonWriter;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -132,6 +134,51 @@ public CopyableFile(FileStatus origin, Path destination, OwnerAndPermission dest
this.datasetOutputPath = datasetOutputPath;
}

/** @return Stringified form, including metadata, in pretty-printed JSON */
public String toJsonString() {
return toJsonString(true);
}

public String toJsonString(boolean includeMetadata) {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
this.toJson(jsonWriter, includeMetadata);
} catch (IOException ioe) {
// Ignored
}
return stringWriter.toString();
}

public void toJson(JsonWriter jsonWriter, boolean includeMetadata) throws IOException {
jsonWriter.beginObject();

jsonWriter
.name("file set").value(this.getFileSet())
.name("origin").value(this.getOrigin().toString())
.name("destination").value(this.getDestination().toString())
.name("destinationOwnerAndPermission").value(this.getDestinationOwnerAndPermission().toString())
// TODO:
// this.ancestorsOwnerAndPermission
// this.checksum
// this.preserve
// this.dataFileVersionStrategy
// this.originTimestamp
// this.upstreamTimestamp
.name("datasetOutputPath").value(this.getDatasetOutputPath().toString());

if (includeMetadata && this.getAdditionalMetadata() != null) {
jsonWriter.name("metadata");
jsonWriter.beginObject();
for (Map.Entry<String, String> entry : this.getAdditionalMetadata().entrySet()) {
jsonWriter.name(entry.getKey()).value(entry.getValue());
}
jsonWriter.endObject();
}

jsonWriter.endObject();
}

/**
* Set file system based source and destination dataset for this {@link CopyableFile}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;


Expand Down Expand Up @@ -140,10 +139,9 @@ public JobContext(Properties jobProps, Logger logger, SharedResourcesBroker<Gobb
"A job must have a job name specified by job.name");

this.jobName = JobState.getJobNameFromProps(jobProps);
this.jobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY) ? jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)
: JobLauncherUtils.newJobId(this.jobName);
this.jobId = JobState.getJobIdFromProps(jobProps);
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId); // in case not yet directly defined as such
this.jobSequence = Long.toString(Id.Job.parse(this.jobId).getSequence());
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId);

this.jobBroker = instanceBroker.newSubscopedBuilder(new JobScopeInstance(this.jobName, this.jobId))
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ImmutableProperties;
import org.apache.gobblin.util.JobLauncherUtils;


/**
Expand Down Expand Up @@ -185,6 +186,11 @@ public static String getJobNameFromProps(Properties props) {
return props.getProperty(ConfigurationKeys.JOB_NAME_KEY);
}

public static String getJobIdFromProps(Properties props) {
return props.containsKey(ConfigurationKeys.JOB_ID_KEY) ? props.getProperty(ConfigurationKeys.JOB_ID_KEY)
: JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props));
}

public static String getJobGroupFromState(State state) {
return state.getProp(ConfigurationKeys.JOB_GROUP_KEY);
}
Expand Down Expand Up @@ -589,10 +595,15 @@ public int hashCode() {

@Override
public String toString() {
return toJsonString(false);
}

/** @return Stringified form, in pretty-printed JSON */
public String toJsonString(boolean includeProperties) {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
this.toJson(jsonWriter, false);
this.toJson(jsonWriter, includeProperties);
} catch (IOException ioe) {
// Ignored
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -298,6 +299,22 @@ public int hashCode() {
return result;
}

/** @return Stringified form, including all properties, in pretty-printed JSON */
public String toJsonString() {
return toJsonString(true);
}

public String toJsonString(boolean includeProperties) {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
this.toJson(jsonWriter, includeProperties);
} catch (IOException ioe) {
// Ignored
}
return stringWriter.toString();
}

/**
* Convert this {@link TaskState} to a json document.
*
Expand Down

0 comments on commit c09873c

Please sign in to comment.