Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft of metadata API with timestamp tracking capabilities. #1725

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.messages;

import org.apache.plc4x.java.api.metadata.Metadata.Key;
import org.apache.plc4x.java.api.metadata.time.TimeSource;

/**
* High level definition of common metadata keys which can occur across multiple drivers.
*/
public interface PlcMetadataKeys {

Key<Long> TIMESTAMP = Key.of("timestamp", Long.class);
Key<TimeSource> TIMESTAMP_SOURCE = Key.of("timestamp_source", TimeSource.class);

Key<Long> RECEIVE_TIMESTAMP = Key.of("receive_timestamp", Long.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.plc4x.java.api.messages;

import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.model.PlcTag;
import org.apache.plc4x.java.api.types.PlcResponseCode;

Expand All @@ -38,4 +39,9 @@ public interface PlcTagResponse extends PlcResponse {

PlcResponseCode getResponseCode(String name);

/**
* Returns tag level metadata information.
*/
Metadata getTagMetadata(String name);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.metadata;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Metadata {

public static class Key<T> {

private final String key;
private final Class<T> type;

protected Key(String key, Class<T> type) {
this.key = key;
this.type = type;
}

public String getKey() {
return key;
}

public boolean validate(Object value) {
return type.isInstance(value);
}

public static <T> Key<T> of(String key, Class<T> type) {
return new Key<>(key, type);
}

}

public final static Metadata EMPTY = new Metadata(Collections.emptyMap());

private final Metadata parent;
private final Map<Key<?>, Object> values;

Metadata(Map<Key<?>, Object> values) {
this(values, EMPTY);
}

public Metadata(Map<Key<?>, Object> values, Metadata parent) {
this.parent = parent;
this.values = values;
}

public Set<Key<?>> keys() {
Set<Key<?>> keys = new LinkedHashSet<>(values.keySet());
keys.addAll(parent.keys());
return Collections.unmodifiableSet(keys);
}

public Object getValue(Key<?> key) {
Object value = values.get(key);
if (value == null) {
return parent.getValue(key);
}
return value;
}

public static class Builder {
private final Logger logger = LoggerFactory.getLogger(Builder.class);

private final Map<Key<?>, Object> values = new HashMap<>();
private final Metadata parent;

public Builder() {
this(Metadata.EMPTY);
}

public Builder(Metadata parent) {
this.parent = parent;
}

public <T> Builder put(Key<T> key, T value) {
if (!key.validate(value)) {
logger.debug("Ignore metadata value {}, it does not match constraints imposed by key {}", value, key);
return this;
}

values.put(key, value);
return this;
}

public Metadata build() {
return new Metadata(values, parent);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.api.metadata.time;

public enum TimeSource {

// Time information is assumed by PLC4X itself
ASSUMPTION,
// Time comes from software layer, kernel driver and similar
SOFTWARE,
// Time can is confronted through hardware i.e. microcontroller
HARDWARE,
// Other source of time which fall into separate truthiness category
OTHER

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.plc4x.java.api.exceptions.PlcInvalidTagException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.metadata.time.TimeSource;
import org.apache.plc4x.java.api.model.*;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
Expand Down Expand Up @@ -1476,20 +1478,29 @@ protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket ms
if (msg.getUserdata() instanceof AdsDeviceNotificationRequest) {
AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata();
List<AdsStampHeader> stamps = notificationData.getAdsStampHeaders();
long receiveTs = System.currentTimeMillis();
for (AdsStampHeader stamp : stamps) {
// convert Windows FILETIME format to unix epoch
long unixEpochTimestamp = stamp.getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L;
// result metadata
Metadata eventMetadata = new Metadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, receiveTs)
.put(PlcMetadataKeys.TIMESTAMP, unixEpochTimestamp)
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE)
.build();
List<AdsNotificationSample> samples = stamp.getAdsNotificationSamples();
for (AdsNotificationSample sample : samples) {
long handle = sample.getNotificationHandle();
for (DefaultPlcConsumerRegistration registration : consumers.keySet()) {
for (PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()) {
if (subscriptionHandle instanceof AdsSubscriptionHandle) {
AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle;
if (adsHandle.getNotificationHandle() == handle)
consumers.get(registration).accept(
new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp),
convertSampleToPlc4XResult(adsHandle, sample.getData())));
if (adsHandle.getNotificationHandle() == handle) {
Map<String, Metadata> metadata = new HashMap<>();
Instant timestamp = Instant.ofEpochMilli(unixEpochTimestamp);
DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(timestamp, convertSampleToPlc4XResult(adsHandle, sample.getData(), metadata, eventMetadata));
consumers.get(registration).accept(event);
}
}
}
}
Expand All @@ -1498,12 +1509,14 @@ protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket ms
}
}

private Map<String, ResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws
private Map<String, ResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data,
Map<String, Metadata> tagMetadata, Metadata metadata) throws
ParseException {
Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, ByteOrder.LITTLE_ENDIAN);
values.put(subscriptionHandle.getTagName(), new ResponseItem<>(PlcResponseCode.OK,
DataItem.staticParse(readBuffer, getPlcValueTypeForAdsDataType(subscriptionHandle.getAdsDataType()), data.length)));
tagMetadata.put(subscriptionHandle.getTagName(), new Metadata.Builder(metadata).build());
return values;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.opcua;

import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.metadata.Metadata.Key;
import org.apache.plc4x.java.opcua.tag.OpcuaQualityStatus;

/**
* OPC UA level metadata keys.
*/
public interface OpcMetadataKeys {

Key<OpcuaQualityStatus> QUALITY = Metadata.Key.of("opcua_quality", OpcuaQualityStatus.class);

Key<Long> SERVER_TIMESTAMP = Metadata.Key.of("opcua_server_timestamp", Long.class);
Key<Long> SOURCE_TIMESTAMP = Metadata.Key.of("opcua_source_timestamp", Long.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.Metadata;
import org.apache.plc4x.java.api.metadata.time.TimeSource;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcValueType;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.opcua.OpcMetadataKeys;
import org.apache.plc4x.java.opcua.config.OpcuaConfiguration;
import org.apache.plc4x.java.opcua.context.Conversation;
import org.apache.plc4x.java.opcua.context.OpcuaDriverContext;
import org.apache.plc4x.java.opcua.context.SecureChannel;
import org.apache.plc4x.java.opcua.readwrite.*;
import org.apache.plc4x.java.opcua.tag.OpcuaQualityStatus;
import org.apache.plc4x.java.opcua.tag.OpcuaTag;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
Expand Down Expand Up @@ -196,6 +200,10 @@ private SecureChannel createSecureChannel(ConversationContext<OpcuaAPU> context,
public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
LOGGER.trace("Reading Value");

Metadata responseMetadata = new Metadata.Builder()
.put(PlcMetadataKeys.RECEIVE_TIMESTAMP, System.currentTimeMillis())
.build();

DefaultPlcReadRequest request = (DefaultPlcReadRequest) readRequest;
RequestHeader requestHeader = conversation.createRequestHeader();

Expand Down Expand Up @@ -225,7 +233,10 @@ public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
transaction.submit(() -> {
conversation.submit(opcuaReadRequest, ReadResponse.class).whenComplete((response, error) -> bridge(transaction, future, response, error));
});
return future.thenApply(response -> new DefaultPlcReadResponse(request, readResponse(request.getTagNames(), response.getResults())));
return future.thenApply(response -> {
Map<String, Metadata> metadata = new LinkedHashMap<>();
return new DefaultPlcReadResponse(request, readResponse(request.getTagNames(), response.getResults(), metadata, responseMetadata), metadata);
});
}

static NodeId generateNodeId(OpcuaTag tag) {
Expand All @@ -250,14 +261,15 @@ static NodeId generateNodeId(OpcuaTag tag) {
return nodeId;
}

public Map<String, ResponseItem<PlcValue>> readResponse(LinkedHashSet<String> tagNames, List<DataValue> results) {
public Map<String, ResponseItem<PlcValue>> readResponse(LinkedHashSet<String> tagNames, List<DataValue> results, Map<String, Metadata> metadata, Metadata responseMetadata) {
PlcResponseCode responseCode = null; // initialize variable
Map<String, ResponseItem<PlcValue>> response = new HashMap<>();
int count = 0;
for (String tagName : tagNames) {
PlcValue value = null;
if (results.get(count).getValueSpecified()) {
Variant variant = results.get(count).getValue();
DataValue dataValue = results.get(count);
if (dataValue.getValueSpecified()) {
Variant variant = dataValue.getValue();
LOGGER.trace("Response of type {}", variant.getClass().toString());
if (variant instanceof VariantBoolean) {
byte[] array = ((VariantBoolean) variant).getValue();
Expand Down Expand Up @@ -410,12 +422,22 @@ public Map<String, ResponseItem<PlcValue>> readResponse(LinkedHashSet<String> ta
responseCode = PlcResponseCode.OK;
}
} else {
StatusCode statusCode = results.get(count).getStatusCode();
StatusCode statusCode = dataValue.getStatusCode();
responseCode = mapOpcStatusCode(statusCode.getStatusCode(), PlcResponseCode.UNSUPPORTED);
LOGGER.error("Error while reading value from OPC UA server error code: {}", results.get(count).getStatusCode().toString());
LOGGER.error("Error while reading value from OPC UA server error code:- " + dataValue.getStatusCode().toString());
}
count++;

Metadata tagMetadata = new Metadata.Builder(responseMetadata)
.put(OpcMetadataKeys.QUALITY, new OpcuaQualityStatus(dataValue.getStatusCode()))
.put(OpcMetadataKeys.SERVER_TIMESTAMP, dataValue.getServerTimestamp())
.put(OpcMetadataKeys.SOURCE_TIMESTAMP, dataValue.getSourceTimestamp())
.put(PlcMetadataKeys.TIMESTAMP, dataValue.getServerTimestamp())
.put(PlcMetadataKeys.TIMESTAMP_SOURCE, TimeSource.SOFTWARE)
.build();
response.put(tagName, new ResponseItem<>(responseCode, value));
metadata.put(tagName, tagMetadata);

count++;
}
return response;
}
Expand Down
Loading
Loading