Skip to content

Commit

Permalink
Merge branch 'feat/182-consume-offering' of github.com:sovity/edc-ext…
Browse files Browse the repository at this point in the history
…ensions into feat/182-consume-offering

feat: merge latest upstream changes
  • Loading branch information
M-Fitzke committed Jul 21, 2023
2 parents d16b97d + b36056e commit 235ab2f
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 86 deletions.
2 changes: 0 additions & 2 deletions extensions/wrapper/wrapper/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter-params:5.9.3")
testImplementation("org.awaitility:awaitility:4.2.0")

//TODO move to own module?
testImplementation("${edcGroup}:iam-mock:${edcVersion}")
testImplementation("${edcGroup}:dsp:${edcVersion}")
testImplementation("${edcGroup}:management-api:${edcVersion}")
Expand All @@ -60,7 +59,6 @@ val openapiFileDir = "${project.buildDir}/swagger"
val openapiFileFilename = "edc-api-wrapper.yaml"
val openapiFile = "$openapiFileDir/$openapiFileFilename"

//TODO move to own module?
tasks.register("printClasspath") {
doLast {
println(sourceSets["main"].runtimeClasspath.asPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static WrapperExtensionContext buildContext(
transformerRegistry.register(new DataRequestToDataRequestDtoTransformer());
transformerRegistry.register(new TransferProcessToTransferProcessOutputDtoTransformer());
var consumptionService = new ConsumptionService(negotiationService, transferProcessService,
contractNegotiationStore, transferProcessStore, transformerRegistry);
contractNegotiationStore, transferProcessStore, transformerRegistry, policyMappingService);

negotiationObservable.registerListener(new ContractNegotiationConsumptionListener(consumptionService));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@
package de.sovity.edc.ext.wrapper.api.usecase.model;

import de.sovity.edc.ext.wrapper.api.common.model.PolicyDto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.util.Map;

/**
* Input for a consumption process.
*
* @author Ronja Quensel
*/
@AllArgsConstructor
@NoArgsConstructor
@Builder(toBuilder = true)
@Getter
public class ConsumptionInputDto {
String connectorId;
String connectorAddress;
String offerId;
String assetId;
Policy policy; //TODO is still using EDC model: policy as input for negotiation requires target attribute (= asset id)
DataAddress dataDestination;
/** ID of the provider. */
private String connectorId;

/** Address of the provider. */
private String connectorAddress;

/** ID of the offer which is the basis for the consumption request. */
private String offerId;

/** ID of the asset that is requested. */
private String assetId;

/** Policy used as the basis for the contract negotiation. */
private PolicyDto policy;

/**
* Destination where the data should be transferred. For reference, see
* {@link org.eclipse.edc.spi.types.domain.DataAddress}
*/
private Map<String, String> dataDestination;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,26 @@
import java.util.List;

/**
* DTO Class for ConsumerOutput
* DTO holding information about a consumption process.
*
* @author Steffen Biehs
* @author Steffen Biehs, Ronja Quensel
*/
@NoArgsConstructor
@AllArgsConstructor
@Getter
public class ConsumptionOutputDto {
/** ID of the process. */
private String id;

/** The input that was used to start this process. */
private ConsumptionInputDto input;

/** Collection of errors that may have occurred during this process. */
private List<String> errors;

/** Information about the contract negotiation associated with this process. */
private ContractNegotiationOutputDto contractNegotiation;

/** Information about the transfer process associated with this process. */
private TransferProcessOutputDto transferProcess;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.exception.InvalidRequestException;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;

/**
* Service for managing consumption processes (= contract negotiation and subsequent data transfer).
*
* @author Ronja Quensel
*/
@RequiredArgsConstructor
@Slf4j
public class ConsumptionService {
Expand All @@ -39,7 +51,16 @@ public class ConsumptionService {
private final ContractNegotiationStore contractNegotiationStore;
private final TransferProcessStore transferProcessStore;
private final TypeTransformerRegistry transformerRegistry;

private final PolicyMappingService policyMappingService;

/**
* Starts a consumption process for the asset specified in the input. Validates the input and
* then triggers a contract negotiation with the specified provider. After the negotiation
* is finalized, the corresponding transfer will be started after a call-back.
*
* @param consumptionInputDto the input for the process.
* @return the process id.
*/
public String startConsumptionProcess(ConsumptionInputDto consumptionInputDto) {
//TODO generate ID
var id = "id";
Expand All @@ -49,11 +70,13 @@ public String startConsumptionProcess(ConsumptionInputDto consumptionInputDto) {

validateInput(consumptionInputDto);

//TODO transformer
var policy = policyMappingService.policyDtoToPolicy(consumptionInputDto.getPolicy())
.withTarget(consumptionInputDto.getAssetId());

var contractOffer = ContractOffer.Builder.newInstance()
.id(consumptionInputDto.getOfferId())
.assetId(consumptionInputDto.getAssetId())
.policy(consumptionInputDto.getPolicy())
.policy(policy)
.providerId("urn:connector:" + consumptionInputDto.getConnectorId())
.build();

Expand All @@ -76,18 +99,25 @@ public String startConsumptionProcess(ConsumptionInputDto consumptionInputDto) {
return id;
}

/**
* Method used for callback after the contract negotiation has been finalized. Will be called
* by a corresponding listener. Starts the transfer as defined in the original process input.
*
* @param contractNegotiation the finalized contract negotiation.
*/
public void negotiationConfirmed(ContractNegotiation contractNegotiation) {
var process = findByNegotiation(contractNegotiation);

if (process != null) {
var agreementId = contractNegotiation.getContractAgreement().getId();

var destination = createDataAddress(process.getInput().getDataDestination());
var dataRequest = DataRequest.Builder.newInstance()
.id(randomUUID().toString())
.connectorId(process.getInput().getConnectorId())
.connectorAddress(process.getInput().getConnectorAddress())
.protocol("dataspace-protocol-http")
.dataDestination(process.getInput().getDataDestination())
.dataDestination(destination)
.assetId(process.getInput().getAssetId())
.contractId(agreementId)
.build();
Expand All @@ -105,6 +135,15 @@ public void negotiationConfirmed(ContractNegotiation contractNegotiation) {
}
}

/**
* Returns information about a consumption process. Retrieves the corresponding contract
* negotiation and transfer process, transforms them to an output format and returns them
* together with other persisted information about the consumption process like the original
* input.
*
* @param id the process id.
* @return information about the process.
*/
public ConsumptionOutputDto getConsumptionProcess(String id) {
var process = consumptionProcesses.get(id);
if (process == null) {
Expand All @@ -114,15 +153,15 @@ public ConsumptionOutputDto getConsumptionProcess(String id) {
var negotiationDto = Optional.ofNullable(process.getContractNegotiationId())
.map(contractNegotiationStore::findById)
.map(cn -> transformerRegistry.transform(cn, ContractNegotiationOutputDto.class))
.map(this::logIfFailedResult)
.map(this::throwIfFailedResult)
.filter(Result::succeeded)
.map(Result::getContent)
.orElse(null);

var transferProcessDto = Optional.ofNullable(process.getTransferProcessId())
.map(transferProcessStore::findById)
.map(tp -> transformerRegistry.transform(tp, TransferProcessOutputDto.class))
.map(this::logIfFailedResult)
.map(this::throwIfFailedResult)
.filter(Result::succeeded)
.map(Result::getContent)
.orElse(null);
Expand Down Expand Up @@ -150,8 +189,12 @@ private void validateInput(ConsumptionInputDto input) {
if (input.getPolicy() == null)
throw new InvalidRequestException(format(message, "policy"));

if (input.getDataDestination() == null)
var destination = input.getDataDestination();
if (destination == null)
throw new InvalidRequestException(format(message, "dataDestination"));

if (!destination.containsKey("type") && !destination.containsKey(EDC_NAMESPACE + "type"))
throw new InvalidRequestException("dataDestination must have type property.");
}

private ConsumptionDto findByNegotiation(ContractNegotiation contractNegotiation) {
Expand All @@ -163,10 +206,34 @@ private ConsumptionDto findByNegotiation(ContractNegotiation contractNegotiation
.orElse(null);
}

private <T> Result<T> logIfFailedResult(Result<T> result) {
private DataAddress createDataAddress(Map<String, String> properties) {
var nameSpacedProperties = properties.entrySet().stream()
.map(entry -> {
if (isValidUri(entry.getKey())) {
return new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue());
}
var key = EDC_NAMESPACE + entry.getKey();
return new AbstractMap.SimpleEntry<>(key, entry.getValue());
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return DataAddress.Builder.newInstance()
.properties(nameSpacedProperties)
.build();
}

private boolean isValidUri(String string) {
try {
new URI(string);
return true;
} catch (URISyntaxException e) {
return false;
}
}

private <T> Result<T> throwIfFailedResult(Result<T> result) {
if (result.failed()) {
log.error(format("Failed to transform contract negotiation: %s",
result.getFailureDetail()));
var message = "Failed to transform contract negotiation or transfer process: %s";
throw new EdcException(format(message, result.getFailureDetail()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
import de.sovity.edc.ext.wrapper.api.usecase.model.PolicyDefinitionRequestDto;
import jakarta.json.JsonArray;
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension;
import org.eclipse.edc.policy.model.Action;
import org.eclipse.edc.policy.model.Permission;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand All @@ -22,7 +18,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static de.sovity.edc.ext.wrapper.api.usecase.services.PolicyMappingService.ACTION_TYPE;
import static io.restassured.RestAssured.given;
import static io.restassured.http.ContentType.JSON;
import static java.util.UUID.randomUUID;
Expand Down Expand Up @@ -113,7 +108,7 @@ void consumeOffering() {
.extract()
.path("id");

// wait until transfer has been terminated (due to unsupported data address type)
// wait until transfer has been terminated (will be due to unsupported data address type)
await().atMost(45, TimeUnit.SECONDS).untilAsserted(() -> given()
.baseUri(consumerManagementUrl.toString())
.contentType(JSON)
Expand Down Expand Up @@ -171,18 +166,10 @@ private ConsumptionInputDto consumptionInputDto() {
.connectorAddress(providerProtocolUrl.toString())
.offerId(contractDefinitionId + ":" + assetId + ":" + randomUUID())
.assetId(assetId)
.policy(Policy.Builder.newInstance()
.permission(Permission.Builder.newInstance()
.action(Action.Builder.newInstance()
.type(ACTION_TYPE)
.build())
.target(assetId)
.build())
.target(assetId)
.build())
.dataDestination(DataAddress.Builder.newInstance()
.type("test")
.policy(PolicyDto.builder()
.permission(PermissionDto.builder().build())
.build())
.dataDestination(Map.of("type", "test"))
.build();
}
}
Loading

0 comments on commit 235ab2f

Please sign in to comment.