Skip to content

Commit

Permalink
Fix/s7async (#1451)
Browse files Browse the repository at this point in the history
* Change type transfer in cyclic subscription from byte[] to PlcValue. Accepts cyclic subscription to bits.

* Add short pattern to tag subscription.

* Corrects short tag handling in CYC subscriptions. In observation./karaf

* Modified the cyclical subscription system. TODO time base management fpr CYC.

* Fixed time base handling for cyclical subscriptions. Subscription routine for changes is added experimentally.

---------

Co-authored-by: Cesar Garcia <[email protected]>
  • Loading branch information
glcj and ceos01 authored Mar 15, 2024
1 parent 13fcae7 commit 5a1f768
Show file tree
Hide file tree
Showing 30 changed files with 761 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ public enum KnxManufacturer {
M_ABB___RESERVED((int) 670, (int) 43954, (String) "ABB - reserved"),
M_BUSCH_JAEGER_ELEKTRO___RESERVED(
(int) 671, (int) 43959, (String) "Busch-Jaeger Elektro - reserved");

private static final Map<Integer, KnxManufacturer> map;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.model.PlcTag;
Expand All @@ -38,6 +39,37 @@
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.plc4x.java.api.model.PlcSubscriptionTag;
import org.apache.plc4x.java.api.types.PlcValueType;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.BOOL;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.CHAR;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.DATE;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.DATE_AND_TIME;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.DINT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.DT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.DWORD;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.INT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.LDT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.LINT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.LREAL;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.LTIME;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.LTOD;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.LWORD;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.REAL;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.S5TIME;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.SINT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.TIME_OF_DAY;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.TOD;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.UDINT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.ULINT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.USINT;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.WCHAR;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.WORD;
import static org.apache.plc4x.java.s7.readwrite.TransportSize.WSTRING;
import org.apache.plc4x.java.s7.readwrite.tag.S7SubscriptionTag;
import org.apache.plc4x.java.s7.readwrite.tag.S7Tag;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag;
import org.apache.plc4x.java.spi.values.PlcValueHandler;

public class S7CyclicEvent implements S7Event {

Expand Down Expand Up @@ -68,34 +100,16 @@ public S7CyclicEvent(PlcSubscriptionRequest request, short jobid, S7PayloadUserD
map.put(Fields.TIMESTAMP.name(), this.timeStamp);
map.put(Fields.JOBID.name(), jobid);
map.put(Fields.ITEMSCOUNT.name(), event.getItemsCount());
int[] n = new int[1];
int[] n = new int[1];

request.getTagNames().forEach(tagname -> {
int i = n[0];
map.put(Fields.RETURNCODE_.name() + i, event.getItems().get(i).getReturnCode().getValue());
map.put(Fields.TRANSPORTSIZE_.name() + i, event.getItems().get(i).getTransportSize().getValue());
byte[] buffer = new byte[event.getItems().get(i).getData().size()];
j = 0;
event.getItems().get(i).getData().forEach(s -> {
buffer[j] = s.byteValue();
j++;
});
map.put(tagname, buffer);
n[0]++;
map.put(tagname, DataToPlcValue(tagname, request, event.getItems().get(i).getData()));
n[0]++;
});


// for (int i=0; i<event.getItemsCount(); i++){
// //map.put(Fields.RETURNCODE_.name()+i, event.getItems()[i].getReturnCode().getValue());
// map.put(Fields.RETURNCODE_.name()+i, event.getItems().get(i).getReturnCode().getValue());
// map.put(Fields.TRANSPORTSIZE_.name()+i, event.getItems().get(i).getTransportSize().getValue());
// byte[] buffer = new byte[event.getItems().get(i).getData().size()];
// j = 0;
// event.getItems().get(i).getData().forEach(s->{
// buffer[j] = s.byteValue();
// j ++;
// });
// map.put(Fields.DATA_.name()+i, buffer);
// }
}

public S7CyclicEvent(PlcSubscriptionRequest request, short jobid, S7PayloadUserDataItemCyclicServicesChangeDrivenPush event) {
Expand All @@ -107,30 +121,15 @@ public S7CyclicEvent(PlcSubscriptionRequest request, short jobid, S7PayloadUserD
map.put(Fields.JOBID.name(), jobid);
map.put(Fields.ITEMSCOUNT.name(), event.getItemsCount());
int[] n = new int[1];

request.getTagNames().forEach(tagname -> {
int i = n[0];
map.put(Fields.RETURNCODE_.name() + i, event.getItems().get(i).getReturnCode().getValue());
map.put(Fields.TRANSPORTSIZE_.name() + i, event.getItems().get(i).getTransportSize().getValue());
byte[] buffer = new byte[event.getItems().get(i).getData().size()];
j = 0;
event.getItems().get(i).getData().forEach(s -> {
buffer[j] = s.byteValue();
j++;
});
map.put(tagname, buffer);
n[0]++;
map.put(tagname, DataToPlcValue(tagname, request, event.getItems().get(i).getData()));
n[0]++;
});
// for (int i=0; i<event.getItemsCount(); i++){
// map.put(Fields.RETURNCODE_.name()+i, event.getItems().get(i).getReturnCode().getValue());
// map.put(Fields.TRANSPORTSIZE_.name()+i, event.getItems().get(i).getTransportSize().getValue());
// byte[] buffer = new byte[event.getItems().get(i).getData().size()];
// j = 0;
// event.getItems().get(i).getData().forEach(s->{
// buffer[j] = s.byteValue();
// j ++;
// });
// map.put(Fields.DATA_.name()+i, buffer);
// }

}

public S7CyclicEvent(PlcSubscriptionRequest request, short jobid, S7PayloadUserDataItemCyclicServicesSubscribeResponse event) {
Expand All @@ -142,30 +141,14 @@ public S7CyclicEvent(PlcSubscriptionRequest request, short jobid, S7PayloadUserD
map.put(Fields.JOBID.name(), jobid);
map.put(Fields.ITEMSCOUNT.name(), event.getItemsCount());
int[] n = new int[1];

request.getTagNames().forEach(tagname -> {
int i = n[0];
map.put(Fields.RETURNCODE_.name() + i, event.getItems().get(i).getReturnCode().getValue());
map.put(Fields.TRANSPORTSIZE_.name() + i, event.getItems().get(i).getTransportSize().getValue());
byte[] buffer = new byte[event.getItems().get(i).getData().size()];
j = 0;
event.getItems().get(i).getData().forEach(s -> {
buffer[j] = s.byteValue();
j++;
});
map.put(tagname, buffer);
n[0]++;
});
// for (int i=0; i<event.getItemsCount(); i++){
// map.put(Fields.RETURNCODE_.name()+i, event.getItems().get(i).getReturnCode().getValue());
// map.put(Fields.TRANSPORTSIZE_.name()+i, event.getItems().get(i).getTransportSize().getValue());
// byte[] buffer = new byte[event.getItems().get(i).getData().size()];
// j = 0;
// event.getItems().get(i).getData().forEach(s->{
// buffer[j] = s.byteValue();
// j ++;
// });
// map.put(Fields.DATA_.name()+i, buffer);
// }
map.put(tagname, DataToPlcValue(tagname, request, event.getItems().get(i).getData()));
n[0]++;
});
}

public S7CyclicEvent(PlcSubscriptionRequest request, short jobid, S7PayloadUserDataItemCyclicServicesChangeDrivenSubscribeResponse event) {
Expand All @@ -177,30 +160,14 @@ public S7CyclicEvent(PlcSubscriptionRequest request, short jobid, S7PayloadUserD
map.put(Fields.JOBID.name(), jobid);
map.put(Fields.ITEMSCOUNT.name(), event.getItemsCount());
int[] n = new int[1];

request.getTagNames().forEach(tagname -> {
int i = n[0];
map.put(Fields.RETURNCODE_.name() + i, event.getItems().get(i).getReturnCode().getValue());
map.put(Fields.TRANSPORTSIZE_.name() + i, event.getItems().get(i).getTransportSize().getValue());
byte[] buffer = new byte[event.getItems().get(i).getData().size()];
j = 0;
event.getItems().get(i).getData().forEach(s -> {
buffer[j] = s.byteValue();
j++;
});
map.put(tagname, buffer);
n[0]++;
});
// for (int i=0; i<event.getItemsCount(); i++){
// map.put(Fields.RETURNCODE_.name()+i, event.getItems().get(i).getReturnCode().getValue());
// map.put(Fields.TRANSPORTSIZE_.name()+i, event.getItems().get(i).getTransportSize().getValue());
// byte[] buffer = new byte[event.getItems().get(i).getData().size()];
// j = 0;
// event.getItems().get(i).getData().forEach(s->{
// buffer[j] = s.byteValue();
// j ++;
// });
// map.put(Fields.DATA_.name()+i, buffer);
// }
map.put(tagname, DataToPlcValue(tagname, request, event.getItems().get(i).getData()));
n[0]++;
});
}

@Override
Expand All @@ -225,7 +192,12 @@ public PlcValue getAsPlcValue() {

@Override
public PlcValue getPlcValue(String name) {
throw new UnsupportedOperationException("Not supported yet.");
if (request.getTagNames().contains(name)) {
PlcValue plcvalue = (PlcValue) map.get(name);
plcvalue.getRaw();
return plcvalue;
}
return null;
}

@Override
Expand Down Expand Up @@ -775,4 +747,150 @@ public PlcResponseCode getResponseCode(String name) {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final S7CyclicEvent other = (S7CyclicEvent) obj;

for (String tag:request.getTagNames()) {
final PlcValue othervalue = other.getPlcValue(tag);
if (othervalue == null) return false;
final PlcValue localvalue = (PlcValue) getPlcValue(tag);
if (Arrays.equals(localvalue.getRaw(), othervalue.getRaw()) == false){
return false;
}
};

return true;
}


private static PlcValue DataToPlcValue(String tagname, PlcSubscriptionRequest request, List<Short> data){

int[] i = new int[1];

final byte[] buffer = new byte[data.size()];
i[0] = 0;

data.forEach( b -> {
buffer[i[0]] = b.byteValue();
i[0]++;
});

ByteBuf bb = wrappedBuffer(buffer);


final DefaultPlcSubscriptionTag dpst = (DefaultPlcSubscriptionTag) request.getTag(tagname);
final S7SubscriptionTag subTag = (S7SubscriptionTag) dpst.getTag();
final S7Tag[] s7Tags = subTag.getS7Tags();

PlcValue plcValue = null;

switch(s7Tags[0].getDataType()){
case BOOL:;

Boolean[] bools = new Boolean[s7Tags[0].getNumberOfElements()];
for (int iter = 0; iter < s7Tags[0].getNumberOfElements(); iter++ )
bools[iter] = bb.readBoolean();
plcValue = PlcValueHandler.of(bools);
break;
case BYTE:;
Byte[] bytes = new Byte[bb.capacity()];
for (Byte b:bytes)
b = Byte.valueOf(bb.readByte());
plcValue = PlcValueHandler.of(bytes);
break;
case WORD:;
break;
case DWORD:;
break;
case LWORD:;
break;
case INT:;
Short[] shorts = new Short[s7Tags[0].getNumberOfElements()];
for (int iter = 0; iter < s7Tags[0].getNumberOfElements(); iter ++)
shorts[iter] = bb.readShort();
plcValue = PlcValueHandler.of(shorts);
break;
case UINT:;
break;
case SINT:;
break;
case USINT:;
break;
case DINT:;
Integer[] integers = new Integer[bb.capacity() / Integer.SIZE];
for (Integer di:integers) di = Integer.valueOf(bb.readInt());
plcValue = PlcValueHandler.of(integers);
break;
case UDINT:;
break;
case LINT:;
Long[] longs = new Long[bb.capacity() / Long.SIZE];
for (Long l:longs) l = bb.readLong();
plcValue = PlcValueHandler.of(longs);
break;
case ULINT:;
break;
case REAL:;
Float[] floats = new Float[bb.capacity() / Float.SIZE];
for (Float f:floats) f = bb.readFloat();
plcValue = PlcValueHandler.of(floats);
break;
case LREAL:;
Double[] doubles = new Double[bb.capacity() / Double.SIZE];
for (Double d:doubles) d = bb.readDouble();
plcValue = PlcValueHandler.of(doubles);
break;
case CHAR:;
break;
case WCHAR:;
break;
case STRING:;
break;
case WSTRING:;
break;
case S5TIME:;
break;
case TIME:;
break;
case LTIME:;
break;
case DATE:;
break;
case TIME_OF_DAY:;
break;
case TOD:;
break;
case LTIME_OF_DAY:;
break;
case LTOD:;
break;
case DATE_AND_TIME:;
break;
case DT:;
break;
case DATE_AND_LTIME:;
break;
case LDT:;
break;
case DTL:;
break;
}

return plcValue;

};




}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.s7.readwrite.utils.S7PlcSubscriptionRequest;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionRequest;

/**
* This object generates the main connection and includes the management
Expand Down Expand Up @@ -430,4 +434,12 @@ public CompletableFuture<? extends PlcPingResponse> ping() {
return null;
}

@Override
public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
if (!isSubscribeSupported()) {
throw new PlcUnsupportedOperationException("The connection does not support subscription");
}
return new S7PlcSubscriptionRequest.Builder(this, getPlcTagHandler());
}

}
Loading

0 comments on commit 5a1f768

Please sign in to comment.