Skip to content

Commit

Permalink
Move main observation submission to a POST endpoint (no mutation)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ferdinando Villa committed Sep 15, 2024
1 parent 39a7931 commit d136daf
Show file tree
Hide file tree
Showing 18 changed files with 155 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ interface REASONER {
String DECLARE_OBSERVABLE = REASONER_BASE + "/declare/observable";

/**
* @protocol POST for a map containing the KimConcept definition as "OBSERVABLE" and possibly
* pattern variables
* @protocol POST for a map containing the KimConcept definition as "OBSERVABLE" and possibly pattern
* variables
*/
String DECLARE_CONCEPT = REASONER_BASE + "/declare/concept";

Expand Down Expand Up @@ -339,6 +339,12 @@ interface ADMIN extends PluginAPI {
*/
String DIGITAL_TWIN_GRAPH = "/dt";

/**
* PUT endpoint to ingest and start resolving an observation. Returns the observation ID that can be
* used to follow the resolution task. Payload is a
* {@link org.integratedmodelling.klab.api.services.resolver.objects.ResolutionRequest} instance.
*/
String OBSERVE = "/observe";
}

public interface RESOURCES {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public interface Queries {
interface GraphQL {

record Query(String queryPattern, String resultTarget, String[] variables) {}

Query OBSERVE = new Query( """
mutation Observe {
observe(observation: $observation)
}
""", "observe", new String[] {"observation"});
//
// Query OBSERVE = new Query( """
// mutation Observe {
// observe(observation: $observation)
// }
// """, "observe", new String[] {"observation"});

}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.integratedmodelling.klab.api.lang;

import java.io.Serializable;

/**
* Just a number with units.
*
* @author ferdinando.villa
*
*/
public interface Quantity {
public interface Quantity extends Serializable {

/**
* May be an integer or a double.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*
* @author Ferd
*/
public interface ContextScope extends SessionScope, AutoCloseable {
public interface ContextScope extends SessionScope {

@Override
default Type getType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ default String getServiceName() {
return "klab.runtime.service";
}

/**
* Submit an observation to the runtime in the passed scope. The result is the observation ID, which
* can be used to follow the resolution task progress through AMPQ messaging (if configured) or polling.
*
* @param observation
* @param scope
* @return
*/
long submit(Observation observation, ContextScope scope);


/**
* The main function of the runtime.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import org.integratedmodelling.klab.api.collections.Parameters;
import org.integratedmodelling.klab.api.geometry.Geometry;
import org.integratedmodelling.klab.api.knowledge.Concept;
import org.integratedmodelling.klab.api.knowledge.Model;
import org.integratedmodelling.klab.api.scope.ContextScope;
import org.integratedmodelling.klab.api.services.resolver.objects.ResolutionConstraintImpl;

import java.io.Serializable;
import java.util.List;

/**
Expand Down Expand Up @@ -39,7 +39,7 @@
*
* @author Ferd
*/
public interface ResolutionConstraint {
public interface ResolutionConstraint extends Serializable {

/**
* Defines type, behavior and intended payload class for the constraint. Each class MUST be serializable
Expand Down Expand Up @@ -88,7 +88,7 @@ private Type(Class<?> dataClass, boolean incremental) {
*
* @return
*/
boolean isEmpty();
boolean empty();

Type getType();

Expand All @@ -99,7 +99,7 @@ private Type(Class<?> dataClass, boolean incremental) {
* @param <T>
* @return
*/
<T> List<T> get(Class<T> dataClass);
<T> List<T> payload(Class<T> dataClass);

/**
* Used internally to merge a constraint with a previous one and return a new constraint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.integratedmodelling.klab.api.collections.Parameters;
import org.integratedmodelling.klab.api.exceptions.KlabIllegalArgumentException;
import org.integratedmodelling.klab.api.services.resolver.ResolutionConstraint;
import org.integratedmodelling.klab.api.services.resources.adapters.Parameter;
import org.integratedmodelling.klab.api.utils.Utils;

import java.util.ArrayList;
Expand Down Expand Up @@ -35,7 +34,7 @@ public int size() {
}

@Override
public boolean isEmpty() {
public boolean empty() {
return data.isEmpty() || data.stream().anyMatch(Objects::isNull);
}

Expand All @@ -44,8 +43,20 @@ public Type getType() {
return type;
}

public List<Object> getData() {
return data;
}

public void setData(List<Object> data) {
this.data = data;
}

public void setType(Type type) {
this.type = type;
}

@Override
public <T> List<T> get(Class<T> dataClass) {
public <T> List<T> payload(Class<T> dataClass) {
return new Utils.Casts<Object, T>().cast(data);
}

Expand All @@ -56,16 +67,16 @@ public ResolutionConstraint merge(ResolutionConstraint constraint) {
if (type == Type.Parameters) {
for (int i = 0; i < constraint.size(); i++) {
if (data.size() > i) {
var existing = this.get(Parameters.class).get(i);
var existing = this.payload(Parameters.class).get(i);
var merged = Parameters.create(existing);
merged.putAll(constraint.get(Parameters.class).get(i));
merged.putAll(constraint.payload(Parameters.class).get(i));
ret.data.add(merged);
} else {
ret.data.add(constraint.get(Parameters.class).get(i));
ret.data.add(constraint.payload(Parameters.class).get(i));
}
}
} else {
ret.data.addAll(constraint.get(Object.class));
ret.data.addAll(constraint.payload(Object.class));
}
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public P get() {
}
}

protected <P, T> ContextScope.Task<P, T> newMessageTrackingTask(Set<Message.MessageType> matchTypes,
Class<P> contextClass,
T value) {
return newMessageTrackingTask(matchTypes, value, null);
}

/**
* Return a future that exposes the tracking ID and produces the payload when the event message matches.
*
Expand Down Expand Up @@ -211,7 +217,8 @@ protected Channel getOrCreateChannel(Message.Queue queue) {
/*
Looks like just one channel is enough - so one connection factory, one
connection, one channel. Maybe the whole API could be simpler. Maybe channels are synchronizing? In
all cases we now can have a queue name w/o a channel so we would need to keep a hash of channels and dispose
all cases we now can have a queue name w/o a channel so we would need to keep a hash of channels
and dispose
properly.
*/
if (this.channel_ == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.integratedmodelling.klab.api.lang.kactors.KActorsBehavior;
import org.integratedmodelling.klab.api.lang.kdl.KdlDataflow;
import org.integratedmodelling.klab.api.lang.kim.*;
import org.integratedmodelling.klab.api.services.resolver.ResolutionConstraint;
import org.integratedmodelling.klab.api.services.runtime.Actuator;
import org.integratedmodelling.klab.api.services.runtime.Dataflow;
import org.integratedmodelling.klab.api.services.runtime.Message;
Expand Down Expand Up @@ -256,7 +257,7 @@ public static void configureObjectMapperForKlabTypes(ObjectMapper mapper) {
Model.class, ServiceCall.class, Observation.class,
NumericRange.class, Annotation.class, Metadata.class,
Geometry.Dimension.class, Parameters.class, Actuator.class,
Notification.LexicalContext.class,
Notification.LexicalContext.class, ResolutionConstraint.class,
KimObservationStrategy.Operation.class,
KimObservationStrategy.Filter.class, ObservationStrategy.class,
ObservationStrategy.Operation.class}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ public void setCurrency(String currency) {
this.currency = currency;
}

public String toString() {
return value + (unit == null ? "" : ("." + unit)) + (currency == null ? "" : ("." + currency));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public void setValue(Number value) {
this.value = value;
}

public String toString() {
return value + (unit == null ? "" : ("." + unit)) + (currency == null ? "" : ("." + currency));
}

// @Override
// public ValueType getValueType() {
// // TODO classify which number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.integratedmodelling.klab.api.scope.ServiceSideScope;
import org.integratedmodelling.klab.api.scope.SessionScope;
import org.integratedmodelling.klab.api.services.*;
import org.integratedmodelling.klab.api.services.resolver.objects.ResolutionRequest;
import org.integratedmodelling.klab.api.services.runtime.Channel;
import org.integratedmodelling.klab.api.services.runtime.Dataflow;
import org.integratedmodelling.klab.api.services.runtime.Message;
Expand All @@ -28,10 +29,6 @@ public class RuntimeClient extends ServiceClient implements RuntimeService {

private GraphQLClient graphClient;

// public RuntimeClient() {
// super(Type.RUNTIME);
// }

public RuntimeClient(URL url, Identity identity, List<ServiceReference> services, BiConsumer<Channel,
Message>... listeners) {
super(Type.RUNTIME, url, identity, services, listeners);
Expand Down Expand Up @@ -152,14 +149,22 @@ public String registerContext(ContextScope scope) {
return ret;
}

@Override
public long submit(Observation observation, ContextScope scope) {
ResolutionRequest resolutionRequest = new ResolutionRequest();
resolutionRequest.setObservation(observation);
resolutionRequest.setResolutionConstraints(scope.getResolutionConstraints());
return client.withScope(scope).post(ServicesAPI.RUNTIME.OBSERVE, resolutionRequest, Long.class);
}

@Override
public Provenance runDataflow(Dataflow<Observation> dataflow, ContextScope contextScope) {
return null;
}

@Override
public Capabilities capabilities(Scope scope) {
return client.get(ServicesAPI.CAPABILITIES, RuntimeCapabilitiesImpl.class);
return client.withScope(scope).get(ServicesAPI.CAPABILITIES, RuntimeCapabilitiesImpl.class);
}

public GraphQLClient graphClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import org.integratedmodelling.common.services.client.runtime.RuntimeClient;
import org.integratedmodelling.common.utils.Utils;
import org.integratedmodelling.klab.api.digitaltwin.GraphModel;
import org.integratedmodelling.klab.api.knowledge.Concept;
import org.integratedmodelling.klab.api.knowledge.Observable;
import org.integratedmodelling.klab.api.knowledge.observation.DirectObservation;
import org.integratedmodelling.klab.api.knowledge.observation.Observation;
import org.integratedmodelling.klab.api.knowledge.observation.scale.Scale;
import org.integratedmodelling.klab.api.provenance.Provenance;
import org.integratedmodelling.klab.api.scope.ContextScope;
import org.integratedmodelling.klab.api.services.KlabService;
Expand Down Expand Up @@ -35,6 +33,8 @@ public ClientContextScope(ClientUserScope parent, String contextName, RuntimeSer

private ClientContextScope(ClientContextScope parent) {
super(parent, parent.name, parent.runtimeService);
// this will have been reset by super to the user's id
setId(parent.getId());
}

@Override
Expand Down Expand Up @@ -84,20 +84,10 @@ public ContextScope connect(URL remoteContext) {

@Override
public Task<Observation, Long> observe(Observation observation) {

var runtime = getService(RuntimeService.class);
if (runtime instanceof RuntimeClient runtimeClient) {
long taskId =
runtimeClient.graphClient().query(GraphModel.Queries.GraphQL.OBSERVE.queryPattern(),
GraphModel.Queries.GraphQL.OBSERVE.resultTarget(), Long.class,
this, "observation",
GraphModel.adapt(observation, this));
return newMessageTrackingTask(EnumSet.of(Message.MessageType.ResolutionAborted,
Message.MessageType.ResolutionSuccessful), taskId, this::getObservation); // event
// watcher using either messaging or queues
}

return null;
long taskId = runtime.submit(observation, this);
return newMessageTrackingTask(EnumSet.of(Message.MessageType.ResolutionAborted,
Message.MessageType.ResolutionSuccessful), Observation.class, taskId); // event
}

@Override
Expand Down Expand Up @@ -264,7 +254,7 @@ public <T extends KlabService> Collection<T> getServices(Class<T> serviceClass)
ret.resolutionConstraints.clear();
} else {
for (var constraint : resolutionConstraints) {
if (constraint == null || constraint.isEmpty()) {
if (constraint == null || constraint.empty()) {
continue;
}
if (constraint.getType().incremental && ret.resolutionConstraints.containsKey(constraint.getType())) {
Expand All @@ -290,7 +280,7 @@ public <T> T getConstraint(ResolutionConstraint.Type type, T defaultValue) {
if (constraint == null || constraint.size() == 0) {
return defaultValue;
}
return (T) constraint.get(defaultValue.getClass()).getFirst();
return (T) constraint.payload(defaultValue.getClass()).getFirst();
}

@Override
Expand All @@ -299,7 +289,7 @@ public <T> T getConstraint(ResolutionConstraint.Type type, Class<T> resultClass)
if (constraint == null || constraint.size() == 0) {
return null;
}
return (T) constraint.get(resultClass).getFirst();
return (T) constraint.payload(resultClass).getFirst();
}

@Override
Expand All @@ -308,6 +298,6 @@ public <T> List<T> getConstraints(ResolutionConstraint.Type type, Class<T> resul
if (constraint == null || constraint.size() == 0) {
return List.of();
}
return constraint.get(resultClass);
return constraint.payload(resultClass);
}
}
Loading

0 comments on commit d136daf

Please sign in to comment.