Skip to content

Commit

Permalink
Moving on with dataflow compiler and encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Oct 18, 2024
1 parent ebfe1e3 commit 621a85d
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.integratedmodelling.klab.api.data.mediation.ValueMediator;
import org.integratedmodelling.klab.api.geometry.Geometry;
import org.integratedmodelling.klab.api.knowledge.Artifact;
import org.integratedmodelling.klab.api.knowledge.ObservationStrategy;
import org.integratedmodelling.klab.api.lang.kim.KimClassification;
import org.integratedmodelling.klab.api.lang.kim.KimLookupTable;
import org.integratedmodelling.klab.api.lang.kim.KimObservable;
Expand All @@ -16,7 +17,7 @@

/**
* FIXME all this is obsolete. Should be a light wrapper without all those methods.
*
* <p>
* A contextualizable is the declaration of a resource that can be compiled into a processing step for a
* dataflow. In k.IM this can represent:
* <p>
Expand All @@ -36,7 +37,7 @@
* It is the runtime's task to turn any computable resource into a uniform k.DL
* service call. The call produces a IContextualizer that is inserted in a
* dataflow.
*
* <p>
* FIXME this should merely be a tag interface that tags standard KimAssets. The contextualization
* mode/trigger should be kept in the model independently.
*
Expand Down Expand Up @@ -108,7 +109,7 @@ public enum Trigger {
/**
* The data structure describing interactive parameters. It's a javabean with only strings for values, so
* that it can be easily serialized for communication.
*
* <p>
* FIXME we should just keep an annotation for this and use it if it's there. No reason for this to be
* here, either.
*
Expand Down Expand Up @@ -215,7 +216,7 @@ public void setLabel(String label) {
public String toString() {
return "InteractiveParameter [id=" + id + ", functionId=" + functionId + ", description=" + description
+ ", label=" + label + ", type=" + type + ", initialValue=" + initialValue + ", values" +
"=" + values
"=" + values
+ "]";
}

Expand Down Expand Up @@ -341,6 +342,14 @@ public void setSectionDescription(String sectionDescription) {
*/
String getResourceUrn();

/**
* Contextualization requires a trip back to the resolver to resolve a contextualization strategy
* in the current contextualization context.
*
* @return
*/
ObservationStrategy getObservationStrategy();

/**
* Resources such as expressions or URN-specified remote computations may have requirements that must be
* satisfied within the model where the computation appears. These will be made available in appropriate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,26 @@
*/
public interface Actuator extends Serializable, RuntimeAsset {

enum Type {
/**
* Resolve an existing observation identified by the same ID of this actuator
*/
RESOLVE,
/**
* Resolve an observation that should be created in the runtime if it is not
* present. This is relevant when the dataflow is executed from a stored serialized
* form.
*/
OBSERVE,
/**
* The actuator merely references an observation that is handled by another observation and has been
* contextualized when this actuator is encountered.
*/
REFERENCE
}

default RuntimeAsset.Type classify() {
return Type.ACTUATOR;
return RuntimeAsset.Type.ACTUATOR;
}

/**
Expand All @@ -83,6 +101,13 @@ default RuntimeAsset.Type classify() {
*/
long getId();

/**
* Used to discriminate observations as k.LAB-build vs. user-provided when compiling the actuator into a persistent serializable form.
*
* @return
*/
Actuator.Type getActuatorType();

/**
* All actuators have a name that corresponds 1-to-1 to the semantics it was created to resolve
* (observable reference name). The only case for duplication of the same name is when a direct
Expand Down Expand Up @@ -180,17 +205,6 @@ default RuntimeAsset.Type classify() {
*/
String getStrategyUrn();

/**
* If true, this actuator is a reference to another which has been defined before it in order of
* computation, and has produced its observation by the time this actuator is called into a
* contextualization. It only serves as a placeholder with a possibly different alias to define the local
* identifier of the original observation. Reference actuators are otherwise empty, with no children and
* no computation.
*
* @return
*/
boolean isReference();

/**
* The actuator's geometry is a merge of the native coverage of all models downstream of the actuator.
* Dealing with different coverages within a model is the responsibility of the runtime. A null or empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.integratedmodelling.klab.api.data.mediation.ValueMediator;
import org.integratedmodelling.klab.api.geometry.Geometry;
import org.integratedmodelling.klab.api.knowledge.Artifact;
import org.integratedmodelling.klab.api.knowledge.ObservationStrategy;
import org.integratedmodelling.klab.api.lang.Contextualizable;
import org.integratedmodelling.klab.api.lang.ExpressionCode;
import org.integratedmodelling.klab.api.lang.ServiceCall;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class ContextualizableImpl extends KimStatementImpl implements Contextual
private Parameters<String> parameters = Parameters.create();
private Collection<String> interactiveParameters = new ArrayList<>();
private Contextualizable condition;
private ObservationStrategy observationStrategy;
private Pair<ValueMediator, ValueMediator> conversion;
private boolean negated;
private boolean mediation;
Expand Down Expand Up @@ -263,6 +265,15 @@ public void setEmpty(boolean empty) {
this.empty = empty;
}

@Override
public ObservationStrategy getObservationStrategy() {
return observationStrategy;
}

public void setObservationStrategy(ObservationStrategy observationStrategy) {
this.observationStrategy = observationStrategy;
}

@Override
public void visit(Visitor visitor) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,15 @@ public class ActuatorImpl implements Actuator {
@Serial
private static final long serialVersionUID = 2500101522003062757L;
private long id;
// private long timestamp;
private String name;
// private String alias;
private Artifact.Type type;
private Observable observable;
private String strategyUrn;
private List<Actuator> children = new ArrayList<>();
private List<ServiceCall> computation = new ArrayList<>();
// private boolean input;
// private boolean output;
private boolean reference;
// private boolean deferred;
// private String observer;
private Geometry coverage = Geometry.EMPTY;
private Parameters<String> data = Parameters.create();
// private Queue<ObservationStrategy> deferrals = new ConcurrentLinkedQueue<>();
private Actuator.Type actuatorType;

@Override
public long getId() {
Expand All @@ -43,11 +36,6 @@ public String getName() {
return this.name;
}

// @Override
// public String getAlias() {
// return this.alias;
// }

@Override
public Artifact.Type getType() {
return this.type;
Expand All @@ -68,17 +56,6 @@ public List<ServiceCall> getComputation() {
return this.computation;
}

// @Override
// public boolean isInput() {
// return this.input;
// }
//
// @Override
// public boolean isOutput() {
// return this.output;
// }


@Override
public String getStrategyUrn() {
return strategyUrn;
Expand All @@ -88,11 +65,6 @@ public void setStrategyUrn(String strategyUrn) {
this.strategyUrn = strategyUrn;
}

@Override
public boolean isReference() {
return this.reference;
}

@Override
public Geometry getCoverage() {
return this.coverage;
Expand All @@ -107,18 +79,9 @@ public void setId(long id) {
this.id = id;
}

// public void setTimestamp(long timestamp) {
// this.timestamp = timestamp;
// }

public void setName(String name) {
this.name = name;
}
//
// public void setAlias(String alias) {
// this.alias = alias;
// }

public void setType(Artifact.Type type) {
this.type = type;
}
Expand All @@ -134,19 +97,6 @@ public void setChildren(List<Actuator> children) {
public void setComputation(List<ServiceCall> computation) {
this.computation = computation;
}

// public void setInput(boolean input) {
// this.input = input;
// }
//
// public void setOutput(boolean output) {
// this.output = output;
// }

public void setReference(boolean reference) {
this.reference = reference;
}

public void setCoverage(Geometry coverage) {
this.coverage = coverage;
}
Expand All @@ -155,28 +105,14 @@ public void setData(Parameters<String> data) {
this.data = data;
}

// @Override
// public String getObserver() {
// return observer;
// }
//
// @Override
// public Queue<ObservationStrategy> getDeferrals() {
// return this.deferrals;
// }

// public void setObserver(String observer) {
// this.observer = observer;
// }

// @Override
// public boolean isDeferred() {
// return deferred;
// }
//
// public void setDeferred(boolean deferred) {
// this.deferred = deferred;
// }
@Override
public Type getActuatorType() {
return actuatorType;
}

public void setActuatorType(Type actuatorType) {
this.actuatorType = actuatorType;
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,39 +233,39 @@ public Parallelism getParallelism(ContextScope scope) {
public boolean registerActuator(Actuator actuator, Dataflow dataflow, ContextScope scope,
DirectObservation contextObservation) {

var data = observationData.get(actuator.getId());
if (data == null && /* shouldn't happen */ !actuator.isReference()) {
data = new ObservationData();
data.actuator = actuator;
// data.observation = createObservation(actuator, contextObservation, scope);
data.scale = Scale.create(scope.getContextObservation().getGeometry());
data.contextObservation = contextObservation;

// var customScale = dataflow.getResources().get((actuator.getId() + "_dataflow"), Scale.class);
// if (customScale != null) {
// // FIXME why the heck is this an Object and I have to cast?
// data.scale = data.scale.merge((Scale) customScale, LogicalConnector.INTERSECTION);
// var data = observationData.get(actuator.getId());
// if (data == null && /* shouldn't happen */ !actuator.isReference()) {
// data = new ObservationData();
// data.actuator = actuator;
//// data.observation = createObservation(actuator, contextObservation, scope);
// data.scale = Scale.create(scope.getContextObservation().getGeometry());
// data.contextObservation = contextObservation;
//
//// var customScale = dataflow.getResources().get((actuator.getId() + "_dataflow"), Scale.class);
//// if (customScale != null) {
//// // FIXME why the heck is this an Object and I have to cast?
//// data.scale = data.scale.merge((Scale) customScale, LogicalConnector.INTERSECTION);
//// }
//
// for (Actuator child : actuator.getChildren()) {
//// if (child.isInput() && !child.getName().equals(child.getAlias())) {
//// data.localNames.put(child.getName(), child.getAlias());
//// }
// }

for (Actuator child : actuator.getChildren()) {
// if (child.isInput() && !child.getName().equals(child.getAlias())) {
// data.localNames.put(child.getName(), child.getAlias());
//
// Executor executor = null;
// for (var computation : data.actuator.getComputation()) {
// var step = createExecutor(actuator, data.observation, computation, scope, executor);
// if (executor != step) {
// data.executors.add(step);
// }
}

Executor executor = null;
for (var computation : data.actuator.getComputation()) {
var step = createExecutor(actuator, data.observation, computation, scope, executor);
if (executor != step) {
data.executors.add(step);
}
executor = step;
}

observationData.put(actuator.getId(), data);

return true;
}
// executor = step;
// }
//
// observationData.put(actuator.getId(), data);
//
// return true;
// }

return false;
}
Expand Down Expand Up @@ -490,7 +490,6 @@ private Storage createStorage(Observable observable, ContextScope scope) {
// return ret;
// }


/**
* Establish the order of execution and the possible parallelism. Each root actuator should be sorted by
* dependency and appended in order to the result list along with its order of execution. Successive roots
Expand Down Expand Up @@ -518,7 +517,7 @@ private List<Pair<Actuator, Integer>> sortComputation(Dataflow<Observation> data
Set<Actuator> group = new HashSet<>();
while (order.hasNext()) {
Actuator next = order.next();
if (!next.isReference()) {
if (next.getActuatorType() != Actuator.Type.REFERENCE) {
var data = observationData.get(next.getId());
if (!data.executors.isEmpty()) {
ret.add(Pair.of(next, (executionOrder = checkExecutionOrder(executionOrder, next,
Expand All @@ -527,7 +526,6 @@ private List<Pair<Actuator, Integer>> sortComputation(Dataflow<Observation> data
}
}
}

return ret;
}

Expand Down
Loading

0 comments on commit 621a85d

Please sign in to comment.