diff --git a/code-generation/language-python/src/main/java/org/apache/plc4x/language/python/PythonLanguageTemplateHelper.java b/code-generation/language-python/src/main/java/org/apache/plc4x/language/python/PythonLanguageTemplateHelper.java
index e1cc591d7d8..5cabeb85f84 100644
--- a/code-generation/language-python/src/main/java/org/apache/plc4x/language/python/PythonLanguageTemplateHelper.java
+++ b/code-generation/language-python/src/main/java/org/apache/plc4x/language/python/PythonLanguageTemplateHelper.java
@@ -590,49 +590,49 @@ public String getWriteBufferWriteMethodCall(String logicalName, SimpleTypeRefere
}
switch (simpleTypeReference.getBaseType()) {
case BIT:
- return "writeBuffer.WriteBit(\"" + logicalName + "\", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_bit(" + fieldName + ", \"" + logicalName + "\"" + writerArgsString + ")";
case BYTE:
- return "writeBuffer.WriteByte(\"" + logicalName + "\", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_byte(" + fieldName + ", \"" + logicalName + "\"" + writerArgsString + ")";
case UINT:
IntegerTypeReference unsignedIntegerTypeReference = (IntegerTypeReference) simpleTypeReference;
if (unsignedIntegerTypeReference.getSizeInBits() <= 8) {
- return "writeBuffer.WriteUint8(\"" + logicalName + "\", " + unsignedIntegerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_byte(" + fieldName + ", " + unsignedIntegerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
if (unsignedIntegerTypeReference.getSizeInBits() <= 16) {
- return "writeBuffer.WriteUint16(\"" + logicalName + "\", " + unsignedIntegerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_unsigned_short(" + fieldName + ", " + unsignedIntegerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
if (unsignedIntegerTypeReference.getSizeInBits() <= 32) {
- return "writeBuffer.WriteUint32(\"" + logicalName + "\", " + unsignedIntegerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_unsigned_int(" + fieldName + ", " + unsignedIntegerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
if (unsignedIntegerTypeReference.getSizeInBits() <= 64) {
- return "writeBuffer.WriteUint64(\"" + logicalName + "\", " + unsignedIntegerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_unsigned_long(" + fieldName + ", " + unsignedIntegerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
- return "writeBuffer.WriteBigInt(\"" + logicalName + "\", " + unsignedIntegerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_unsigned_long(" + fieldName + ", " + unsignedIntegerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
case INT:
IntegerTypeReference integerTypeReference = (IntegerTypeReference) simpleTypeReference;
if (integerTypeReference.getSizeInBits() <= 8) {
- return "writeBuffer.WriteInt8(\"" + logicalName + "\", " + integerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_signed_byte(" + fieldName + ", " + integerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
if (integerTypeReference.getSizeInBits() <= 16) {
- return "writeBuffer.WriteInt16(\"" + logicalName + "\", " + integerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_short(" + fieldName + ", " + integerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
if (integerTypeReference.getSizeInBits() <= 32) {
- return "writeBuffer.WriteInt32(\"" + logicalName + "\", " + integerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_int(" + fieldName + ", " + integerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
if (integerTypeReference.getSizeInBits() <= 64) {
- return "writeBuffer.WriteInt64(\"" + logicalName + "\", " + integerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_long(" + fieldName + ", " + integerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
- return "writeBuffer.WriteBigInt(\"" + logicalName + "\", " + integerTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_long(" + fieldName + ", " + integerTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
case FLOAT:
case UFLOAT:
FloatTypeReference floatTypeReference = (FloatTypeReference) simpleTypeReference;
if (floatTypeReference.getSizeInBits() <= 32) {
- return "writeBuffer.WriteFloat32(\"" + logicalName + "\", " + floatTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_float(" + fieldName + ", " + floatTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
if (floatTypeReference.getSizeInBits() <= 64) {
- return "writeBuffer.WriteFloat64(\"" + logicalName + "\", " + floatTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_double(" + fieldName + ", " + floatTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
}
- return "writeBuffer.WriteBigFloat(\"" + logicalName + "\", " + floatTypeReference.getSizeInBits() + ", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_double(" + fieldName + ", " + floatTypeReference.getSizeInBits() + ", \"" + logicalName + "\"" + writerArgsString + ")";
case STRING: {
StringTypeReference stringTypeReference = (StringTypeReference) simpleTypeReference;
final Term encodingTerm = field.getEncoding().orElse(new DefaultStringLiteral("UTF-8"));
@@ -641,8 +641,8 @@ public String getWriteBufferWriteMethodCall(String logicalName, SimpleTypeRefere
.asStringLiteral()
.orElseThrow(() -> new FreemarkerException("Encoding must be a quoted string value")).getValue();
String length = Integer.toString(simpleTypeReference.getSizeInBits());
- return "writeBuffer.WriteString(\"" + logicalName + "\", uint32(" + length + "), \"" +
- encoding + "\", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_str(" + fieldName + ", uint32(" + length + "), \"" +
+ encoding + "\", \"" + logicalName + "\"" + writerArgsString + ")";
}
case VSTRING: {
VstringTypeReference vstringTypeReference = (VstringTypeReference) simpleTypeReference;
@@ -653,13 +653,13 @@ public String getWriteBufferWriteMethodCall(String logicalName, SimpleTypeRefere
.orElseThrow(() -> new FreemarkerException("Encoding must be a quoted string value")).getValue();
String lengthExpression = toExpression(field, null, vstringTypeReference.getLengthExpression(), null, Collections.singletonList(new DefaultArgument("stringLength", new DefaultIntegerTypeReference(SimpleTypeReference.SimpleBaseType.INT, 32))), true, false);
String length = Integer.toString(simpleTypeReference.getSizeInBits());
- return "writeBuffer.WriteString(\"" + logicalName + "\", uint32(" + lengthExpression + "), \"" +
- encoding + "\", " + fieldName + writerArgsString + ")";
+ return "write_buffer.write_str(" + fieldName + ", uint32(" + lengthExpression + "), \"" +
+ encoding + "\", \"" + logicalName + "\"" + writerArgsString + ")";
}
case DATE:
case TIME:
case DATETIME:
- return "writeBuffer.WriteUint32(\"" + logicalName + "\", uint32(" + fieldName + ")" + writerArgsString + ")";
+ return "write_buffer.write_unsigned_int(uint32(" + fieldName + "), \"" + logicalName + "\")" + writerArgsString + ")";
default:
throw new FreemarkerException("Unsupported base type " + simpleTypeReference.getBaseType());
}
diff --git a/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh b/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh
index cbd9274e49e..c6b6e4c14df 100644
--- a/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh
+++ b/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh
@@ -64,6 +64,7 @@ class ${type.name}:
<@emitImport import="from loguru import logging as log" />
<@emitImport import="from abc import staticmethod" />
+ <@emitImport import="from plc4py.spi.generation.ReadBuffer import ReadBuffer" />
@staticmethod
def static_parse(read_buffer: ReadBuffer<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${helper.camelCaseToSnakeCase(parserArgument.name)}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>):
<#assign defaultCaseOutput=false>
@@ -74,14 +75,12 @@ class ${type.name}:
<#list case.discriminatorValueTerms as discriminatorValueTerm>
<#assign discriminatorExpression=dataIoTypeDefinition.switchField.orElseThrow().discriminatorExpressions[discriminatorValueTerm?index].asLiteral().orElseThrow().asVariableLiteral().orElseThrow()>
<#assign discriminatorType=helper.getDiscriminatorTypes()[discriminatorExpression.name]>
- EvaluationHelper.equals(
- ${helper.camelCaseToSnakeCase(helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorExpression, parserArguments))},
+ ${helper.camelCaseToSnakeCase(helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorExpression, parserArguments))} ==
<#if discriminatorType.isEnumTypeReference()>
${helper.getLanguageTypeNameForTypeReference(discriminatorType)}.${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorValueTerm, parserArguments)}
<#else>
${helper.camelCaseToSnakeCase(helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorValueTerm, parserArguments))}
#if>
- )
<#sep> and #sep>
#list>
:@compress> # ${case.name}
@@ -102,10 +101,9 @@ class ${type.name}:
<#-- If this is a count array, we can directly initialize an array with the given size -->
<#if field.isCountArrayField()>
# Count array
- if ${helper.toParseExpression(arrayField, helper.intTypeReference, arrayField.loopExpression,parserArguments)} > Integer.MAX_VALUE:
- raise ParseException("Array count of " + (${helper.toParseExpression(arrayField, helper.intTypeReference, arrayField.loopExpression,parserArguments)}) + " exceeds the maximum allowed count of " + Integer.MAX_VALUE)
-
item_count: int = int(${helper.toParseExpression(arrayField, helper.intTypeReference, arrayField.loopExpression,parserArguments)})
+ <@emitImport import="from typing import List" />
+ <@emitImport import="from plc4py.api.value.PlcValue import PlcValue" />
${helper.camelCaseToSnakeCase(arrayField.name)}: List[PlcValue] = []
for cur_item in range(item_count):
${helper.camelCaseToSnakeCase(arrayField.name)}.append(${helper.getPlcValueTypeForTypeReference(elementTypeReference)}(${helper.getLanguageTypeNameForTypeReference(elementTypeReference, false)}(<#if elementTypeReference.isSimpleTypeReference()>${helper.getReadBufferReadMethodCall(elementTypeReference.asSimpleTypeReference().orElseThrow(), "", arrayField)})<#else>${elementTypeReference.asComplexTypeReference().orElseThrow().name}IO.static_parse(read_buffer<#if elementTypeReference.params.isPresent()>, <#list elementTypeReference.params.orElseThrow() as parserArgument>(${helper.getLanguageTypeNameForTypeReference(helper.getArgumentType(elementTypeReference, parserArgument?index), true)}) (${helper.toParseExpression(arrayField, elementTypeReference, parserArgument,parserArguments)})<#sep>, #sep>#list>#if>)#if>))
@@ -117,6 +115,8 @@ class ${type.name}:
# Length array
_${helper.camelCaseToSnakeCase(arrayField.name)}_length: long = ${helper.toParseExpression(arrayField, helper.intTypeReference, arrayField.loopExpression,parserArguments)}
${helper.camelCaseToSnakeCase(arrayField.name)}_end_pos: long = read_buffer.get_pos() + _${helper.camelCaseToSnakeCase(arrayField.name)}_length
+ <@emitImport import="from typing import List" />
+ <@emitImport import="from plc4py.api.value.PlcValue import PlcValue" />
value: List[PlcValue]= []
while read_buffer.get_pos() < ${helper.camelCaseToSnakeCase(arrayField.name)}_end_pos):
value.append(<@compress single_line=true>
@@ -158,6 +158,7 @@ class ${type.name}:
# Const Field (${helper.camelCaseToSnakeCase(constField.name)})
${helper.camelCaseToSnakeCase(constField.name)}: ${helper.getNonPrimitiveLanguageTypeNameForField(constField)} = ${helper.getReadBufferReadMethodCall(constField.type.asSimpleTypeReference().orElseThrow(), "", constField)}
if ${helper.camelCaseToSnakeCase(constField.name)} != ${dataIoTypeDefinition.name}.${constField.name?upper_case}):
+ <@emitImport import="from plc4py.api.exceptions.exceptions import ParseException" />
raise ParseException("Expected constant value " + ${dataIoTypeDefinition.name}.${constField.name?upper_case} + " but got " + ${helper.camelCaseToSnakeCase(constField.name)})
<#if constField.name == "value">
@@ -214,6 +215,7 @@ class ${type.name}:
<#if case.name == "Struct">
<#-- In this case we need to wrap each field in a PlcValue that matches it's natural type -->
+ <@emitImport import="from plc4py.api.value.PlcValue import PlcValue" />
_map: Dict[str, PlcValue] = {}
<#list case.fields as field>
<#if field.isArrayField()>
@@ -223,12 +225,14 @@ class ${type.name}:
<#assign field=field.asPropertyField().orElseThrow()>
<#switch helper.getLanguageTypeNameForTypeReference(field.type)>
<#case "Boolean">
+ <@emitImport import="from plc4py.spi.values.PlcBOOL import PlcBOOL" />
_map["${field.name}"] = PlcBOOL(${field.name})
<#break>
<#case "Byte">
_map["${field.name}"] = PlcSINT(${field.name})
<#break>
<#case "Short">
+ <@emitImport import="from plc4py.spi.values.PlcINT import PlcINT" />
_map["${field.name}"] = PlcINT(${field.name})
<#break>
<#case "Integer">
@@ -313,13 +317,10 @@ class ${type.name}:
<#if outputFlavor != "passive">
<@emitImport import="from abc import staticmethod" />
+ <@emitImport import="from plc4py.spi.generation.WriteBuffer import WriteBuffer" />
+ <@emitImport import="from plc4py.api.value.PlcValue import PlcValue" />
@staticmethod
- def static_serialize(writeBuffer: WriteBuffer, _value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${parserArgument.name}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>) -> None:
- static_serialize(writeBuffer, _value<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${parserArgument.name}<#sep>, #sep>#list>#if>, ByteOrder.BIG_ENDIAN)
-
- <@emitImport import="from abc import staticmethod" />
- @staticmethod
- def static_serialize(writeBuffer: WriteBuffer, _value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${parserArgument.name}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>, byteOrder: ByteOrder) -> None:
+ def static_serialize(write_buffer: WriteBuffer, _value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${helper.camelCaseToSnakeCase(parserArgument.name)}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>, byte_order: ByteOrder) -> None:
<#assign defaultCaseOutput=false>
<#assign dataIoTypeDefinition=type.asDataIoTypeDefinition().orElseThrow()>
<#list dataIoTypeDefinition.switchField.orElseThrow().cases as case>
@@ -328,13 +329,12 @@ class ${type.name}:
<#list case.discriminatorValueTerms as discriminatorValueTerm>
<#assign discriminatorExpression=dataIoTypeDefinition.switchField.orElseThrow().discriminatorExpressions[discriminatorValueTerm?index].asLiteral().orElseThrow().asVariableLiteral().orElseThrow()>
<#assign discriminatorType=helper.getDiscriminatorTypes()[discriminatorExpression.name]>
- EvaluationHelper.equals(
- ${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorExpression, parserArguments)},
+ ${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorExpression, parserArguments)} ==
<#if discriminatorType.isEnumTypeReference()>
${helper.getLanguageTypeNameForTypeReference(discriminatorType)}.${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorValueTerm, parserArguments)}
<#else>
${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorValueTerm, parserArguments)}
- #if>)
+ #if>
<#sep> and #sep>
#list>
: # ${case.name}@compress>
@@ -352,26 +352,27 @@ class ${type.name}:
<#if case.name == "Struct">
for val in values.getStruct().get("${arrayField.name}").getList():
<#if elementTypeReference.isByteBased()>
- value: ${helper.getLanguageTypeNameForField(arrayField)} = val.getRaw()
- writeBuffer.writeByteArray("", value)
+ value: ${helper.getLanguageTypeNameForField(arrayField)} = val.get_raw()
+ write_buffer.write_byte_array("", value)
<#else>
- value: ${helper.getLanguageTypeNameForField(arrayField)} = val.get${helper.getLanguageTypeNameForField(arrayField)?cap_first}()
+ value: ${helper.getLanguageTypeNameForField(arrayField)} = val.get_${helper.camelCaseToSnakeCase(helper.getLanguageTypeNameForField(arrayField)?cap_first)}()
${helper.getWriteBufferWriteMethodCall(elementTypeReference.asSimpleTypeReference().orElseThrow(), "value", arrayField)}
#if>
<#else>
for val in values.getList():
<#if elementTypeReference.isByteBased()>
- value: list[byte] = val.getRaw()
- writeBuffer.writeByteArray("", value)
+ <@emitImport import="from typing import List" />
+ value: list[byte] = val.get_raw()
+ write_buffer.write_byte_array("", value)
<#else>
- value: ${helper.getLanguageTypeNameForTypeReference(elementTypeReference)} = val.get${helper.getLanguageTypeNameForTypeReference(elementTypeReference)?cap_first}()
+ value: ${helper.getLanguageTypeNameForTypeReference(elementTypeReference)} = val.get_${helper.camelCaseToSnakeCase(helper.getLanguageTypeNameForTypeReference(elementTypeReference)?cap_first)}()
${helper.getWriteBufferWriteMethodCall(elementTypeReference.asSimpleTypeReference().orElseThrow(), "(" + arrayField.name + ")", arrayField)}
#if>
#if>
<#if case.name == "BOOL">
- while writeBuffer.getPos() < len(writeBuffer.getData()):
- writeBuffer.writeBit(false)
+ while write_buffer.getPos() < len(write_buffer.get_data()):
+ write_buffer.write_bit(false)
#if>
<#break>
<#case "const">
@@ -382,8 +383,8 @@ class ${type.name}:
<#case "enum">
<#assign enumField=field.asEnumField().orElseThrow()>
# Enum field (${enumField.name})
- ${enumField.name}: ${helper.getLanguageTypeNameForField(field)} = _value.get${enumField.name?cap_first}()
- ${helper.getWriteBufferWriteMethodCall(helper.getEnumBaseTypeReference(field.asTypedField().orElseThrow().type), "(" + enumField.name + ".getValue())", enumField)}
+ ${enumField.name}: ${helper.getLanguageTypeNameForField(field)} = _value.get_${helper.camelCaseToSnakeCase(enumField.name?cap_first)}()
+ ${helper.getWriteBufferWriteMethodCall(helper.getEnumBaseTypeReference(field.asTypedField().orElseThrow().type), "(" + enumField.name + ".value)", enumField)}
<#break>
<#case "manual">
<#assign manualField=field.asManualField().orElseThrow()>
@@ -399,12 +400,12 @@ class ${type.name}:
<#assign simpleField=field.asSimpleField().orElseThrow()>
# Simple Field (${simpleField.name})
<#if case.name == "Struct">
- ${simpleField.name}: ${helper.getLanguageTypeNameForField(simpleField)} = _value.getStruct().get("${simpleField.name}").get${helper.getLanguageTypeNameForField(simpleField)?cap_first}()
+ ${simpleField.name}: ${helper.getLanguageTypeNameForField(simpleField)} = _value.get_struct().get("${simpleField.name}").get${helper.getLanguageTypeNameForField(simpleField)?cap_first}()
<#else>
<#if simpleField.name == "value">
- ${simpleField.name}: ${helper.getLanguageTypeNameForField(simpleField)} = _value.get${helper.getLanguageTypeNameForField(simpleField)?cap_first}()
+ ${simpleField.name}: ${helper.getLanguageTypeNameForField(simpleField)} = _value.get_${helper.camelCaseToSnakeCase(helper.getLanguageTypeNameForField(simpleField)?cap_first)}()
<#elseif simpleField.name == "secondsSinceEpoch">
- ${simpleField.name}: ${helper.getLanguageTypeNameForField(simpleField)} = _value.get${helper.getLanguageTypeNameForField(simpleField)?cap_first}()
+ ${simpleField.name}: ${helper.getLanguageTypeNameForField(simpleField)} = _value.get_${helper.camelCaseToSnakeCase(helper.getLanguageTypeNameForField(simpleField)?cap_first)}()
<#else>
<#-- Just for now -->
${simpleField.name}: ${helper.getLanguageTypeNameForField(simpleField)} = ${helper.getNullValueForTypeReference(simpleField.type)}
@@ -413,7 +414,7 @@ class ${type.name}:
<#if simpleField.type.isSimpleTypeReference()>
${helper.getWriteBufferWriteMethodCall(simpleField.type.asSimpleTypeReference().orElseThrow(), "(" + simpleField.name + ")", simpleField)}
<#else>
- ${simpleField.type.asComplexTypeReference().orElseThrow().name}IO.staticSerialize(writeBuffer, ${simpleField.name})
+ ${simpleField.type.asComplexTypeReference().orElseThrow().name}IO.static_serialize(write_buffer, ${helper.camelCaseToSnakeCase(simpleField.name)})
#if>
<#break>
#switch>
@@ -423,15 +424,17 @@ class ${type.name}:
<@emitImport import="from abc import staticmethod" />
<@emitImport import="import math" />
+ <@emitImport import="from plc4py.api.value.PlcValue import PlcValue" />
@staticmethod
- def get_length_in_bytes(_value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${parserArgument.name}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>) -> int:
- return int(math.ceil(float(getLengthInBits(_value<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${parserArgument.name}<#sep>, #sep>#list>#if>)) / 8.0))
+ def get_length_in_bytes(_value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${helper.camelCaseToSnakeCase(parserArgument.name)}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>) -> int:
+ return int(math.ceil(float(get_length_in_bits(_value<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${helper.camelCaseToSnakeCase(parserArgument.name)}<#sep>, #sep>#list>#if>)) / 8.0))
<@emitImport import="from abc import staticmethod" />
+ <@emitImport import="from plc4py.api.value.PlcValue import PlcValue" />
@staticmethod
- def get_length_in_bits(_value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${parserArgument.name}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>) -> int:
- sizeInBits: int = 0
+ def get_length_in_bits(_value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${helper.camelCaseToSnakeCase(parserArgument.name)}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, #sep>#list>#if>) -> int:
+ size_in_bits: int = 0
<#assign defaultCaseOutput=false>
<#assign dataIoTypeDefinition=type.asDataIoTypeDefinition().orElseThrow()>
<#list dataIoTypeDefinition.switchField.orElseThrow().cases as case>
@@ -440,14 +443,12 @@ class ${type.name}:
<#list case.discriminatorValueTerms as discriminatorValueTerm>
<#assign discriminatorExpression=dataIoTypeDefinition.switchField.orElseThrow().discriminatorExpressions[discriminatorValueTerm?index].asLiteral().orElseThrow().asVariableLiteral().orElseThrow()>
<#assign discriminatorType=helper.getDiscriminatorTypes()[discriminatorExpression.name]>
- EvaluationHelper.equals(
- ${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorExpression, parserArguments)},
+ ${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorExpression, parserArguments)} ==
<#if discriminatorType.isEnumTypeReference()>
${helper.getLanguageTypeNameForTypeReference(discriminatorType)}.${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorValueTerm, parserArguments)}
<#else>
${helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorValueTerm, parserArguments)}
#if>
- )
<#sep> and #sep>
#list>
@compress>: # ${case.name}
@@ -465,39 +466,39 @@ class ${type.name}:
<#elseif elementTypeReference.isComplexTypeReference()>
# TODO: Finish this!
<#else>
- sizeInBits += values.getList().size() * ${elementTypeReference.asSimpleTypeReference().orElseThrow().sizeInBits}
+ size_in_bits += values.get_list().size() * ${elementTypeReference.asSimpleTypeReference().orElseThrow().sizeInBits}
#if>
<#break>
<#case "const">
<#assign constField=field.asConstField().orElseThrow()>
# Const Field (${constField.name})
<#-- const fields are only simple type -->
- sizeInBits += ${constField.type.asSimpleTypeReference().orElseThrow().sizeInBits}
+ size_in_bits += ${constField.type.asSimpleTypeReference().orElseThrow().sizeInBits}
<#break>
<#case "enum">
<#assign enumField=field.asEnumField().orElseThrow()>
# Enum field (${enumField.name})
- sizeInBits += ${helper.getEnumFieldSimpleTypeReference(enumField.type, enumField.fieldName).sizeInBits}
+ size_in_bits += ${helper.getEnumFieldSimpleTypeReference(enumField.type, enumField.fieldName).sizeInBits}
<#break>
<#case "manual">
<#assign manualField=field.asManualField().orElseThrow()>
# Manual Field (${manualField.name})
- sizeInBits += ${helper.toSerializationExpression(manualField, helper.intTypeReference, manualField.lengthExpression, type.parserArguments.orElse(null))}
+ size_in_bits += ${helper.toSerializationExpression(manualField, helper.intTypeReference, manualField.lengthExpression, type.parserArguments.orElse(null))}
<#break>
<#case "reserved">
<#assign reservedField=field.asReservedField().orElseThrow()>
# Reserved Field
- sizeInBits += ${reservedField.type.asSimpleTypeReference().orElseThrow().sizeInBits}
+ size_in_bits += ${reservedField.type.asSimpleTypeReference().orElseThrow().sizeInBits}
<#break>
<#case "simple">
<#assign simpleField=field.asSimpleField().orElseThrow()>
# Simple Field (${simpleField.name})
- sizeInBits += ${simpleField.type.asSimpleTypeReference().orElseThrow().sizeInBits}
+ size_in_bits += ${simpleField.type.asSimpleTypeReference().orElseThrow().sizeInBits}
<#break>
#switch>
#list>
<#sep>#sep>#list>
- return sizeInBits
+ return size_in_bits
@importSectionWithContentBelow>
diff --git a/code-generation/language-python/src/main/resources/templates/python/enum-template.python.ftlh b/code-generation/language-python/src/main/resources/templates/python/enum-template.python.ftlh
index e36d4b2fe01..a82bf2438af 100644
--- a/code-generation/language-python/src/main/resources/templates/python/enum-template.python.ftlh
+++ b/code-generation/language-python/src/main/resources/templates/python/enum-template.python.ftlh
@@ -61,6 +61,7 @@ ${import}
<@emitImport import="from aenum import AutoNumberEnum" />
class ${type.name}(AutoNumberEnum):
+ <#if type.constantNames?has_content>_init_ = "value, <#list type.constantNames as constantName>${helper.camelCaseToSnakeCase(constantName)}<#sep>, #sep>#list>"#if>
<#list type.enumValues as enumValue>
${enumValue.name}<#if !type.constantNames?has_content>:#if> <@compress single_line=true>
<#if type.type.isPresent()>
diff --git a/sandbox/plc4py/plc4py/drivers/modbus/ModbusConfiguration.py b/sandbox/plc4py/plc4py/drivers/modbus/ModbusConfiguration.py
index 4b3c551a42d..25fc144cbee 100644
--- a/sandbox/plc4py/plc4py/drivers/modbus/ModbusConfiguration.py
+++ b/sandbox/plc4py/plc4py/drivers/modbus/ModbusConfiguration.py
@@ -28,3 +28,6 @@ def __init__(self, url):
if self.port is None:
self.port = 502
+
+ if "unit_identifier" not in self.parameters:
+ self.unit_identifier = 1
diff --git a/sandbox/plc4py/plc4py/drivers/modbus/ModbusConnection.py b/sandbox/plc4py/plc4py/drivers/modbus/ModbusConnection.py
index 3a5f9a77d6f..0be6561eca0 100644
--- a/sandbox/plc4py/plc4py/drivers/modbus/ModbusConnection.py
+++ b/sandbox/plc4py/plc4py/drivers/modbus/ModbusConnection.py
@@ -46,7 +46,7 @@ class ModbusConnection(PlcConnection):
def __init__(self, config: ModbusConfiguration, transport: Plc4xBaseTransport):
super().__init__(config)
- self._device: ModbusDevice = ModbusDevice()
+ self._device: ModbusDevice = ModbusDevice(self._configuration)
self._transport: Plc4xBaseTransport = transport
@staticmethod
diff --git a/sandbox/plc4py/plc4py/drivers/modbus/ModbusDevice.py b/sandbox/plc4py/plc4py/drivers/modbus/ModbusDevice.py
index c48fe1c875a..9c42acafc40 100644
--- a/sandbox/plc4py/plc4py/drivers/modbus/ModbusDevice.py
+++ b/sandbox/plc4py/plc4py/drivers/modbus/ModbusDevice.py
@@ -22,6 +22,24 @@
from dataclasses import dataclass, field
from typing import Dict, List
+from plc4py.drivers.modbus.ModbusTag import (
+ ModbusTagHoldingRegister,
+ ModbusTagCoil,
+ ModbusTagDiscreteInput,
+ ModbusTagInputRegister,
+)
+
+from plc4py.api.exceptions.exceptions import PlcRuntimeException
+from plc4py.drivers.modbus.ModbusConfiguration import ModbusConfiguration
+from plc4py.protocols.modbus.readwrite.ModbusPDUReadCoilsRequest import (
+ ModbusPDUReadCoilsRequest,
+)
+from plc4py.protocols.modbus.readwrite.ModbusPDUReadDiscreteInputsRequest import (
+ ModbusPDUReadDiscreteInputsRequest,
+)
+from plc4py.protocols.modbus.readwrite.ModbusPDUReadInputRegistersRequest import (
+ ModbusPDUReadInputRegistersRequest,
+)
from plc4py.spi.generation.WriteBuffer import WriteBufferByteBased
from plc4py.api.messages.PlcRequest import PlcReadRequest
@@ -31,12 +49,17 @@
ModbusPDUReadHoldingRegistersRequest,
)
from plc4py.protocols.modbus.readwrite.ModbusTcpADU import ModbusTcpADU
-from plc4py.utils.GenericTypes import ByteOrder
+from plc4py.utils.GenericTypes import ByteOrder, AtomicInteger
@dataclass
class ModbusDevice:
- fields: Dict[str, PlcValue] = field(default_factory=lambda: {})
+ _configuration: ModbusConfiguration
+ tags: Dict[str, PlcValue] = field(default_factory=lambda: {})
+
+ _transaction_generator: AtomicInteger = field(
+ default_factory=lambda: AtomicInteger()
+ )
async def read(
self, request: PlcReadRequest, transport: Transport
@@ -44,12 +67,38 @@ async def read(
"""
Reads one field from the Mock Device
"""
- logging.debug(f"Reading field {str(field)} from Modbus Device")
+ if len(request.tags) > 1:
+ raise NotImplementedError(
+ "The Modbus driver only supports reading single tags at once"
+ )
+ if len(request.tags) == 0:
+ raise PlcRuntimeException("No tags have been specified to read")
+ tag = request.tags[request.tag_names[0]]
+ logging.debug(f"Reading tag {str(tag)} from Modbus Device")
+
+ # Create future to be returned when a value is returned
loop = asyncio.get_running_loop()
message_future = loop.create_future()
- pdu = ModbusPDUReadHoldingRegistersRequest(0, 2)
- adu = ModbusTcpADU(False, 1, 1, pdu)
+ if isinstance(tag, ModbusTagCoil):
+ pdu = ModbusPDUReadCoilsRequest(tag.address, tag.quantity)
+ elif isinstance(tag, ModbusTagDiscreteInput):
+ pdu = ModbusPDUReadDiscreteInputsRequest(tag.address, tag.quantity)
+ elif isinstance(tag, ModbusTagInputRegister):
+ pdu = ModbusPDUReadInputRegistersRequest(tag.address, tag.quantity)
+ elif isinstance(tag, ModbusTagHoldingRegister):
+ pdu = ModbusPDUReadHoldingRegistersRequest(tag.address, tag.quantity)
+ else:
+ raise NotImplementedError(
+ "Modbus tag type not implemented " + str(tag.__class__)
+ )
+
+ adu = ModbusTcpADU(
+ False,
+ self._transaction_generator.increment(),
+ self._configuration.unit_identifier,
+ pdu,
+ )
write_buffer = WriteBufferByteBased(adu.length_in_bytes(), ByteOrder.BIG_ENDIAN)
adu.serialize(write_buffer)
@@ -62,5 +111,6 @@ async def read(
)
await message_future
+
response = PlcReadResponse(PlcResponseCode.OK, [], {})
return response
diff --git a/sandbox/plc4py/plc4py/drivers/modbus/ModbusTag.py b/sandbox/plc4py/plc4py/drivers/modbus/ModbusTag.py
index 140576c557c..93d3e658b28 100644
--- a/sandbox/plc4py/plc4py/drivers/modbus/ModbusTag.py
+++ b/sandbox/plc4py/plc4py/drivers/modbus/ModbusTag.py
@@ -26,76 +26,78 @@
from plc4py.spi.messages.PlcRequest import TagBuilder
-@dataclass
class ModbusTag(PlcTag):
- address: int
- quantity: int
- data_type: ModbusDataType
-
- ADDRESS_PATTERN: str = (
- "(?P
\d+)(:(?P[a-zA-Z_]+))?([(?P\d+)])?"
+ _ADDRESS_PATTERN: str = (
+ "(?P\d+)(:(?P[a-zA-Z_]+))?(\[(?P\d+)\])?"
)
- FIXED_DIGIT_MODBUS_PATTERN: str = (
- "(?P\d{4,5})?(:(?P[a-zA-Z_]+))?([(?P\d+)])?"
+ _FIXED_DIGIT_MODBUS_PATTERN: str = (
+ "(?P\d{4,5})?(:(?P[a-zA-Z_]+))?(\[(?P\d+)\])?"
)
- PROTOCOL_ADDRESS_OFFSET: int = 1
- REGISTER_MAX_ADDRESS: int = 65535
+ _PROTOCOL_ADDRESS_OFFSET: int = 1
+ _REGISTER_MAX_ADDRESS: int = 65535
+
+ _ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(_FIXED_DIGIT_MODBUS_PATTERN)
+ _ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(_FIXED_DIGIT_MODBUS_PATTERN)
+ _DEFAULT_DATA_TYPE: ModbusDataType = ModbusDataType.INT
- ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(FIXED_DIGIT_MODBUS_PATTERN)
- ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(FIXED_DIGIT_MODBUS_PATTERN)
- DEFAULT_DATA_TYPE: ModbusDataType = ModbusDataType.INT
+ _QUANTITY_MAX: int = 120
- QUANTITY_MAX: int = 120
+ def __init__(self, address: int, quantity: int, data_type: ModbusDataType):
+ self.address: int = address
+ self.quantity: int = quantity
+ self.data_type: ModbusDataType = data_type
@classmethod
def matches(cls, address_string: str):
return (
- cls.ADDRESS_PATTERN.match(address_string) is not None
- or cls.ADDRESS_SHORTER_PATTERN.match(address_string) is not None
- or cls.ADDRESS_SHORT_PATTERN.match(address_string) is not None
+ cls._ADDRESS_PATTERN.match(address_string) is not None
+ or cls._ADDRESS_SHORTER_PATTERN.match(address_string) is not None
+ or cls._ADDRESS_SHORT_PATTERN.match(address_string) is not None
)
@classmethod
def _matcher(cls, address_string):
- match = cls.ADDRESS_PATTERN.match(address_string)
+ match = cls._ADDRESS_PATTERN.match(address_string)
if match is not None:
return match
- match = cls.ADDRESS_SHORTER_PATTERN.match(address_string)
+ match = cls._ADDRESS_SHORTER_PATTERN.match(address_string)
if match is not None:
return match
- match = cls.ADDRESS_SHORT_PATTERN.match(address_string)
+ match = cls._ADDRESS_SHORT_PATTERN.match(address_string)
if match is not None:
return match
@classmethod
def create(cls, address_string):
matcher = cls._matcher(address_string)
- address: int = int(matcher.group("address")) - ModbusTag.PROTOCOL_ADDRESS_OFFSET
- if address > cls.REGISTER_MAX_ADDRESS:
+ address: int = (
+ int(matcher.group("address")) - ModbusTag._PROTOCOL_ADDRESS_OFFSET
+ )
+ if address > cls._REGISTER_MAX_ADDRESS:
raise PlcFieldParseException(
"Address must be less than or equal to "
- + str(cls.REGISTER_MAX_ADDRESS)
+ + str(cls._REGISTER_MAX_ADDRESS)
+ ". Was "
- + str(address + cls.PROTOCOL_ADDRESS_OFFSET)
+ + str(address + cls._PROTOCOL_ADDRESS_OFFSET)
)
quantity: int = (
int(matcher.group("quantity"))
if "quantity" in matcher.groupdict()
- and matcher.group("datatype") is not None
+ and matcher.group("quantity") is not None
else 1
)
- if (address + quantity) > cls.REGISTER_MAX_ADDRESS:
+ if (address + quantity) > cls._REGISTER_MAX_ADDRESS:
raise PlcFieldParseException(
"Last requested address is out of range, should be between "
- + str(cls.PROTOCOL_ADDRESS_OFFSET)
+ + str(cls._PROTOCOL_ADDRESS_OFFSET)
+ " and "
- + str(cls.REGISTER_MAX_ADDRESS)
+ + str(cls._REGISTER_MAX_ADDRESS)
+ ". Was "
- + str(address + cls.PROTOCOL_ADDRESS_OFFSET + (quantity - 1))
+ + str(address + cls._PROTOCOL_ADDRESS_OFFSET + (quantity - 1))
)
- if quantity > cls.QUANTITY_MAX:
+ if quantity > cls._QUANTITY_MAX:
raise PlcFieldParseException(
"quantity may not be larger than 2000. Was " + str(quantity)
)
@@ -110,54 +112,56 @@ def create(cls, address_string):
class ModbusTagCoil(ModbusTag):
- ADDRESS_PREFIX: str = "0x"
- ADDRESS_PATTERN: Pattern[AnyStr] = re.compile("coil:" + ModbusTag.ADDRESS_PATTERN)
- ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
- "0" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_PREFIX: str = "0x"
+ _ADDRESS_PATTERN: Pattern[AnyStr] = re.compile("coil:" + ModbusTag._ADDRESS_PATTERN)
+ _ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
+ "0" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
- ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
- "0x" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
+ "0x" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
- DEFAULT_DATA_TYPE: ModbusDataType = ModbusDataType.BOOL
+ _DEFAULT_DATA_TYPE: ModbusDataType = ModbusDataType.BOOL
+ _QUANTITY_MAX: int = 2000
class ModbusTagDiscreteInput(ModbusTag):
- ADDRESS_PREFIX: str = "1x"
- ADDRESS_PATTERN: Pattern[AnyStr] = re.compile(
- "discrete-input:" + ModbusTag.ADDRESS_PATTERN
+ _ADDRESS_PREFIX: str = "1x"
+ _ADDRESS_PATTERN: Pattern[AnyStr] = re.compile(
+ "discrete-input:" + ModbusTag._ADDRESS_PATTERN
)
- ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
- "1" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
+ "1" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
- ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
- "1x" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
+ "1x" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
- DEFAULT_DATA_TYPE: ModbusDataType = ModbusDataType.BOOL
+ _DEFAULT_DATA_TYPE: ModbusDataType = ModbusDataType.BOOL
+ _QUANTITY_MAX: int = 2000
class ModbusTagInputRegister(ModbusTag):
- ADDRESS_PREFIX: str = "3x"
- ADDRESS_PATTERN: Pattern[AnyStr] = re.compile(
- "input-register:" + ModbusTag.ADDRESS_PATTERN
+ _ADDRESS_PREFIX: str = "3x"
+ _ADDRESS_PATTERN: Pattern[AnyStr] = re.compile(
+ "input-register:" + ModbusTag._ADDRESS_PATTERN
)
- ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
- "3" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
+ "3" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
- ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
- "3x" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
+ "3x" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
class ModbusTagHoldingRegister(ModbusTag):
- ADDRESS_PREFIX: str = "4x"
- ADDRESS_PATTERN: Pattern[AnyStr] = re.compile(
- "holding-register:" + ModbusTag.ADDRESS_PATTERN
+ _ADDRESS_PREFIX: str = "4x"
+ _ADDRESS_PATTERN: Pattern[AnyStr] = re.compile(
+ "holding-register:" + ModbusTag._ADDRESS_PATTERN
)
- ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
- "4" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_SHORTER_PATTERN: Pattern[AnyStr] = re.compile(
+ "4" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
- ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
- "4x" + ModbusTag.FIXED_DIGIT_MODBUS_PATTERN
+ _ADDRESS_SHORT_PATTERN: Pattern[AnyStr] = re.compile(
+ "4x" + ModbusTag._FIXED_DIGIT_MODBUS_PATTERN
)
diff --git a/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py b/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py
index ed1e6be6fc0..35d35ab4659 100644
--- a/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py
+++ b/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py
@@ -19,7 +19,11 @@
from abc import staticmethod
from loguru import logging as log
+from plc4py.api.value.PlcValue import PlcValue
from plc4py.protocols.modbus.readwrite.ModbusDataType import ModbusDataType
+from plc4py.spi.generation.ReadBuffer import ReadBuffer
+from plc4py.spi.generation.WriteBuffer import WriteBuffer
+from typing import List
import math
@@ -28,11 +32,7 @@ class DataItem:
def static_parse(
read_buffer: ReadBuffer, data_type: ModbusDataType, number_of_values: int
):
- if EvaluationHelper.equals(
- data_type, ModbusDataType.BOOL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BOOL
+ if data_type == ModbusDataType.BOOL and number_of_values == int(1): # BOOL
# Reserved Field (Compartmentalized so the "reserved" variable can't leak)
reserved: int = read_buffer.read_unsigned_int(15, logical_name="")
if reserved != int(0x0000):
@@ -48,28 +48,16 @@ def static_parse(
value: bool = read_buffer.read_bit("")
return PlcBOOL(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.BOOL): # List
+ if data_type == ModbusDataType.BOOL: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcBOOL(bool(read_buffer.read_bit(""))))
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.BYTE
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BYTE
+ if data_type == ModbusDataType.BYTE and number_of_values == int(1): # BYTE
# Reserved Field (Compartmentalized so the "reserved" variable can't leak)
reserved: int = read_buffer.read_unsigned_short(8, logical_name="")
if reserved != int(0x00):
@@ -85,43 +73,31 @@ def static_parse(
value: int = read_buffer.read_unsigned_short(8, logical_name="")
return PlcBYTE(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.BYTE): # List
+ if data_type == ModbusDataType.BYTE: # List
# Array field (value)
# Count array
- if number_of_values * int(8) > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values * int(8))
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values * int(8))
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcBOOL(bool(read_buffer.read_bit(""))))
return PlcList(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.WORD): # WORD
+ if data_type == ModbusDataType.WORD: # WORD
# Simple Field (value)
value: int = read_buffer.read_unsigned_int(16, logical_name="")
return PlcWORD(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.DWORD): # DWORD
+ if data_type == ModbusDataType.DWORD: # DWORD
# Simple Field (value)
value: int = read_buffer.read_unsigned_long(32, logical_name="")
return PlcDWORD(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.LWORD): # LWORD
+ if data_type == ModbusDataType.LWORD: # LWORD
# Simple Field (value)
value: int = read_buffer.read_unsigned_big_integer(64, logical_name="")
return PlcLWORD(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.SINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # SINT
+ if data_type == ModbusDataType.SINT and number_of_values == int(1): # SINT
# Reserved Field (Compartmentalized so the "reserved" variable can't leak)
reserved: int = read_buffer.read_unsigned_short(8, logical_name="")
if reserved != int(0x00):
@@ -137,17 +113,9 @@ def static_parse(
value: int = read_buffer.read_signed_byte(8, logical_name="")
return PlcSINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.SINT): # List
+ if data_type == ModbusDataType.SINT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -156,89 +124,49 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.INT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # INT
+ if data_type == ModbusDataType.INT and number_of_values == int(1): # INT
# Simple Field (value)
value: int = read_buffer.read_short(16, logical_name="")
return PlcINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.INT): # List
+ if data_type == ModbusDataType.INT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcINT(int(read_buffer.read_short(16, logical_name=""))))
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.DINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DINT
+ if data_type == ModbusDataType.DINT and number_of_values == int(1): # DINT
# Simple Field (value)
value: int = read_buffer.read_int(32, logical_name="")
return PlcDINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.DINT): # List
+ if data_type == ModbusDataType.DINT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcDINT(int(read_buffer.read_int(32, logical_name=""))))
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.LINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LINT
+ if data_type == ModbusDataType.LINT and number_of_values == int(1): # LINT
# Simple Field (value)
value: int = read_buffer.read_long(64, logical_name="")
return PlcLINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.LINT): # List
+ if data_type == ModbusDataType.LINT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcLINT(int(read_buffer.read_long(64, logical_name=""))))
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.USINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # USINT
+ if data_type == ModbusDataType.USINT and number_of_values == int(1): # USINT
# Reserved Field (Compartmentalized so the "reserved" variable can't leak)
reserved: int = read_buffer.read_unsigned_short(8, logical_name="")
if reserved != int(0x00):
@@ -254,17 +182,9 @@ def static_parse(
value: int = read_buffer.read_unsigned_short(8, logical_name="")
return PlcUSINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.USINT): # List
+ if data_type == ModbusDataType.USINT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -273,26 +193,14 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.UINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UINT
+ if data_type == ModbusDataType.UINT and number_of_values == int(1): # UINT
# Simple Field (value)
value: int = read_buffer.read_unsigned_int(16, logical_name="")
return PlcUINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.UINT): # List
+ if data_type == ModbusDataType.UINT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -301,26 +209,14 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.UDINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UDINT
+ if data_type == ModbusDataType.UDINT and number_of_values == int(1): # UDINT
# Simple Field (value)
value: int = read_buffer.read_unsigned_long(32, logical_name="")
return PlcUDINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.UDINT): # List
+ if data_type == ModbusDataType.UDINT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -329,26 +225,14 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.ULINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # ULINT
+ if data_type == ModbusDataType.ULINT and number_of_values == int(1): # ULINT
# Simple Field (value)
value: int = read_buffer.read_unsigned_big_integer(64, logical_name="")
return PlcULINT(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.ULINT): # List
+ if data_type == ModbusDataType.ULINT: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -359,26 +243,14 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.REAL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # REAL
+ if data_type == ModbusDataType.REAL and number_of_values == int(1): # REAL
# Simple Field (value)
value: float = read_buffer.read_float(32, logical_name="")
return PlcREAL(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.REAL): # List
+ if data_type == ModbusDataType.REAL: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -387,26 +259,14 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.LREAL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LREAL
+ if data_type == ModbusDataType.LREAL and number_of_values == int(1): # LREAL
# Simple Field (value)
value: float = read_buffer.read_double(64, logical_name="")
return PlcLREAL(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.LREAL): # List
+ if data_type == ModbusDataType.LREAL: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -415,26 +275,14 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.CHAR
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # CHAR
+ if data_type == ModbusDataType.CHAR and number_of_values == int(1): # CHAR
# Simple Field (value)
value: str = read_buffer.read_string(8, logical_name="", encoding="")
return PlcCHAR(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.CHAR): # List
+ if data_type == ModbusDataType.CHAR: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -445,26 +293,14 @@ def static_parse(
)
return PlcList(value)
- if EvaluationHelper.equals(
- data_type, ModbusDataType.WCHAR
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WCHAR
+ if data_type == ModbusDataType.WCHAR and number_of_values == int(1): # WCHAR
# Simple Field (value)
value: str = read_buffer.read_string(16, logical_name="", encoding="")
return PlcWCHAR(value)
- if EvaluationHelper.equals(data_type, ModbusDataType.WCHAR): # List
+ if data_type == ModbusDataType.WCHAR: # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -479,421 +315,300 @@ def static_parse(
@staticmethod
def static_serialize(
- writeBuffer: WriteBuffer,
- _value: PlcValue,
- dataType: ModbusDataType,
- numberOfValues: int,
- ) -> None:
- static_serialize(
- writeBuffer, _value, dataType, numberOfValues, ByteOrder.BIG_ENDIAN
- )
-
- @staticmethod
- def static_serialize(
- writeBuffer: WriteBuffer,
+ write_buffer: WriteBuffer,
_value: PlcValue,
- dataType: ModbusDataType,
- numberOfValues: int,
- byteOrder: ByteOrder,
+ data_type: ModbusDataType,
+ number_of_values: int,
+ byte_order: ByteOrder,
) -> None:
- if EvaluationHelper.equals(
- data_type, ModbusDataType.BOOL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BOOL
+ if data_type == ModbusDataType.BOOL and number_of_values == int(1): # BOOL
# Reserved Field
- writeBuffer.WriteUint16("int0x0000", 15, int(0x0000))
+ write_buffer.write_unsigned_short(int(0x0000), 15, "int0x0000")
# Simple Field (value)
- value: bool = _value.getBool()
- writeBuffer.WriteBit("value", (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.BOOL): # List
+ value: bool = _value.get_bool()
+ write_buffer.write_bit((value), "value")
+ if data_type == ModbusDataType.BOOL: # List
values: PlcList = _value
for val in values.getList():
- value: bool = val.getBool()
- writeBuffer.WriteBit("value", (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.BYTE
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BYTE
+ value: bool = val.get_bool()
+ write_buffer.write_bit((value), "value")
+
+ if data_type == ModbusDataType.BYTE and number_of_values == int(1): # BYTE
# Reserved Field
- writeBuffer.WriteUint8("int0x00", 8, int(0x00))
+ write_buffer.write_byte(int(0x00), 8, "int0x00")
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint8("value", 8, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.BYTE): # List
+ value: int = _value.get_int()
+ write_buffer.write_byte((value), 8, "value")
+ if data_type == ModbusDataType.BYTE: # List
values: PlcList = _value
for val in values.getList():
- value: bool = val.getBool()
- writeBuffer.WriteBit("value", (value))
-
- if EvaluationHelper.equals(data_type, ModbusDataType.WORD): # WORD
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint16("value", 16, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.DWORD): # DWORD
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint32("value", 32, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.LWORD): # LWORD
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint64("value", 64, (value))
- if EvaluationHelper.equals(
- data_type, ModbusDataType.SINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # SINT
+ value: bool = val.get_bool()
+ write_buffer.write_bit((value), "value")
+
+ if data_type == ModbusDataType.WORD: # WORD
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_short((value), 16, "value")
+ if data_type == ModbusDataType.DWORD: # DWORD
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_int((value), 32, "value")
+ if data_type == ModbusDataType.LWORD: # LWORD
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_long((value), 64, "value")
+ if data_type == ModbusDataType.SINT and number_of_values == int(1): # SINT
# Reserved Field
- writeBuffer.WriteUint8("int0x00", 8, int(0x00))
+ write_buffer.write_byte(int(0x00), 8, "int0x00")
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt8("value", 8, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.SINT): # List
+ value: int = _value.get_int()
+ write_buffer.write_signed_byte((value), 8, "value")
+ if data_type == ModbusDataType.SINT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt8("value", 8, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.INT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # INT
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt16("value", 16, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.INT): # List
+ value: int = val.get_int()
+ write_buffer.write_signed_byte((value), 8, "value")
+
+ if data_type == ModbusDataType.INT and number_of_values == int(1): # INT
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_short((value), 16, "value")
+ if data_type == ModbusDataType.INT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt16("value", 16, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.DINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DINT
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt32("value", 32, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.DINT): # List
+ value: int = val.get_int()
+ write_buffer.write_short((value), 16, "value")
+
+ if data_type == ModbusDataType.DINT and number_of_values == int(1): # DINT
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_int((value), 32, "value")
+ if data_type == ModbusDataType.DINT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt32("value", 32, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.LINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LINT
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt64("value", 64, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.LINT): # List
+ value: int = val.get_int()
+ write_buffer.write_int((value), 32, "value")
+
+ if data_type == ModbusDataType.LINT and number_of_values == int(1): # LINT
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_long((value), 64, "value")
+ if data_type == ModbusDataType.LINT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt64("value", 64, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.USINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # USINT
+ value: int = val.get_int()
+ write_buffer.write_long((value), 64, "value")
+
+ if data_type == ModbusDataType.USINT and number_of_values == int(1): # USINT
# Reserved Field
- writeBuffer.WriteUint8("int0x00", 8, int(0x00))
+ write_buffer.write_byte(int(0x00), 8, "int0x00")
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint8("value", 8, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.USINT): # List
+ value: int = _value.get_int()
+ write_buffer.write_byte((value), 8, "value")
+ if data_type == ModbusDataType.USINT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint8("value", 8, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.UINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UINT
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint16("value", 16, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.UINT): # List
+ value: int = val.get_int()
+ write_buffer.write_byte((value), 8, "value")
+
+ if data_type == ModbusDataType.UINT and number_of_values == int(1): # UINT
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_short((value), 16, "value")
+ if data_type == ModbusDataType.UINT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint16("value", 16, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.UDINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UDINT
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint32("value", 32, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.UDINT): # List
+ value: int = val.get_int()
+ write_buffer.write_unsigned_short((value), 16, "value")
+
+ if data_type == ModbusDataType.UDINT and number_of_values == int(1): # UDINT
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_int((value), 32, "value")
+ if data_type == ModbusDataType.UDINT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint32("value", 32, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.ULINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # ULINT
- # Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint64("value", 64, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.ULINT): # List
+ value: int = val.get_int()
+ write_buffer.write_unsigned_int((value), 32, "value")
+
+ if data_type == ModbusDataType.ULINT and number_of_values == int(1): # ULINT
+ # Simple Field (value)
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_long((value), 64, "value")
+ if data_type == ModbusDataType.ULINT: # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint64("value", 64, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.REAL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # REAL
- # Simple Field (value)
- value: float = _value.getFloat()
- writeBuffer.WriteFloat32("value", 32, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.REAL): # List
+ value: int = val.get_int()
+ write_buffer.write_unsigned_long((value), 64, "value")
+
+ if data_type == ModbusDataType.REAL and number_of_values == int(1): # REAL
+ # Simple Field (value)
+ value: float = _value.get_float()
+ write_buffer.write_float((value), 32, "value")
+ if data_type == ModbusDataType.REAL: # List
values: PlcList = _value
for val in values.getList():
- value: float = val.getFloat()
- writeBuffer.WriteFloat32("value", 32, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.LREAL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LREAL
- # Simple Field (value)
- value: float = _value.getFloat()
- writeBuffer.WriteFloat64("value", 64, (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.LREAL): # List
+ value: float = val.get_float()
+ write_buffer.write_float((value), 32, "value")
+
+ if data_type == ModbusDataType.LREAL and number_of_values == int(1): # LREAL
+ # Simple Field (value)
+ value: float = _value.get_float()
+ write_buffer.write_double((value), 64, "value")
+ if data_type == ModbusDataType.LREAL: # List
values: PlcList = _value
for val in values.getList():
- value: float = val.getFloat()
- writeBuffer.WriteFloat64("value", 64, (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.CHAR
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # CHAR
- # Simple Field (value)
- value: str = _value.getStr()
- writeBuffer.WriteString("value", uint32(8), "UTF-8", (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.CHAR): # List
+ value: float = val.get_float()
+ write_buffer.write_double((value), 64, "value")
+
+ if data_type == ModbusDataType.CHAR and number_of_values == int(1): # CHAR
+ # Simple Field (value)
+ value: str = _value.get_str()
+ write_buffer.write_str((value), uint32(8), "UTF-8", "value")
+ if data_type == ModbusDataType.CHAR: # List
values: PlcList = _value
for val in values.getList():
- value: str = val.getStr()
- writeBuffer.WriteString("value", uint32(8), "UTF-8", (value))
-
- if EvaluationHelper.equals(
- data_type, ModbusDataType.WCHAR
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WCHAR
- # Simple Field (value)
- value: str = _value.getStr()
- writeBuffer.WriteString("value", uint32(16), "UTF-16", (value))
- if EvaluationHelper.equals(data_type, ModbusDataType.WCHAR): # List
+ value: str = val.get_str()
+ write_buffer.write_str((value), uint32(8), "UTF-8", "value")
+
+ if data_type == ModbusDataType.WCHAR and number_of_values == int(1): # WCHAR
+ # Simple Field (value)
+ value: str = _value.get_str()
+ write_buffer.write_str((value), uint32(16), "UTF-16", "value")
+ if data_type == ModbusDataType.WCHAR: # List
values: PlcList = _value
for val in values.getList():
- value: str = val.getStr()
- writeBuffer.WriteString("value", uint32(16), "UTF-16", (value))
+ value: str = val.get_str()
+ write_buffer.write_str((value), uint32(16), "UTF-16", "value")
@staticmethod
def get_length_in_bytes(
- _value: PlcValue, dataType: ModbusDataType, numberOfValues: int
+ _value: PlcValue, data_type: ModbusDataType, number_of_values: int
) -> int:
return int(
- math.ceil(float(getLengthInBits(_value, dataType, numberOfValues)) / 8.0)
+ math.ceil(
+ float(get_length_in_bits(_value, data_type, number_of_values)) / 8.0
+ )
)
@staticmethod
def get_length_in_bits(
- _value: PlcValue, dataType: ModbusDataType, numberOfValues: int
+ _value: PlcValue, data_type: ModbusDataType, number_of_values: int
) -> int:
- sizeInBits: int = 0
- if EvaluationHelper.equals(
- data_type, ModbusDataType.BOOL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BOOL
+ size_in_bits: int = 0
+ if data_type == ModbusDataType.BOOL and number_of_values == int(1): # BOOL
# Reserved Field
- sizeInBits += 15
+ size_in_bits += 15
# Simple Field (value)
- sizeInBits += 1
- if EvaluationHelper.equals(data_type, ModbusDataType.BOOL): # List
+ size_in_bits += 1
+ if data_type == ModbusDataType.BOOL: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 1
- if EvaluationHelper.equals(
- data_type, ModbusDataType.BYTE
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BYTE
+ size_in_bits += values.get_list().size() * 1
+ if data_type == ModbusDataType.BYTE and number_of_values == int(1): # BYTE
# Reserved Field
- sizeInBits += 8
+ size_in_bits += 8
# Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, ModbusDataType.BYTE): # List
+ size_in_bits += 8
+ if data_type == ModbusDataType.BYTE: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 1
- if EvaluationHelper.equals(data_type, ModbusDataType.WORD): # WORD
+ size_in_bits += values.get_list().size() * 1
+ if data_type == ModbusDataType.WORD: # WORD
# Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, ModbusDataType.DWORD): # DWORD
+ size_in_bits += 16
+ if data_type == ModbusDataType.DWORD: # DWORD
# Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, ModbusDataType.LWORD): # LWORD
+ size_in_bits += 32
+ if data_type == ModbusDataType.LWORD: # LWORD
# Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(
- data_type, ModbusDataType.SINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # SINT
+ size_in_bits += 64
+ if data_type == ModbusDataType.SINT and number_of_values == int(1): # SINT
# Reserved Field
- sizeInBits += 8
+ size_in_bits += 8
# Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, ModbusDataType.SINT): # List
+ size_in_bits += 8
+ if data_type == ModbusDataType.SINT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 8
- if EvaluationHelper.equals(
- data_type, ModbusDataType.INT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # INT
- # Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, ModbusDataType.INT): # List
+ size_in_bits += values.get_list().size() * 8
+ if data_type == ModbusDataType.INT and number_of_values == int(1): # INT
+ # Simple Field (value)
+ size_in_bits += 16
+ if data_type == ModbusDataType.INT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 16
- if EvaluationHelper.equals(
- data_type, ModbusDataType.DINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DINT
- # Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, ModbusDataType.DINT): # List
+ size_in_bits += values.get_list().size() * 16
+ if data_type == ModbusDataType.DINT and number_of_values == int(1): # DINT
+ # Simple Field (value)
+ size_in_bits += 32
+ if data_type == ModbusDataType.DINT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 32
- if EvaluationHelper.equals(
- data_type, ModbusDataType.LINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LINT
- # Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(data_type, ModbusDataType.LINT): # List
+ size_in_bits += values.get_list().size() * 32
+ if data_type == ModbusDataType.LINT and number_of_values == int(1): # LINT
+ # Simple Field (value)
+ size_in_bits += 64
+ if data_type == ModbusDataType.LINT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 64
- if EvaluationHelper.equals(
- data_type, ModbusDataType.USINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # USINT
+ size_in_bits += values.get_list().size() * 64
+ if data_type == ModbusDataType.USINT and number_of_values == int(1): # USINT
# Reserved Field
- sizeInBits += 8
+ size_in_bits += 8
# Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, ModbusDataType.USINT): # List
+ size_in_bits += 8
+ if data_type == ModbusDataType.USINT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 8
- if EvaluationHelper.equals(
- data_type, ModbusDataType.UINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UINT
- # Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, ModbusDataType.UINT): # List
+ size_in_bits += values.get_list().size() * 8
+ if data_type == ModbusDataType.UINT and number_of_values == int(1): # UINT
+ # Simple Field (value)
+ size_in_bits += 16
+ if data_type == ModbusDataType.UINT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 16
- if EvaluationHelper.equals(
- data_type, ModbusDataType.UDINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UDINT
- # Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, ModbusDataType.UDINT): # List
+ size_in_bits += values.get_list().size() * 16
+ if data_type == ModbusDataType.UDINT and number_of_values == int(1): # UDINT
+ # Simple Field (value)
+ size_in_bits += 32
+ if data_type == ModbusDataType.UDINT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 32
- if EvaluationHelper.equals(
- data_type, ModbusDataType.ULINT
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # ULINT
- # Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(data_type, ModbusDataType.ULINT): # List
+ size_in_bits += values.get_list().size() * 32
+ if data_type == ModbusDataType.ULINT and number_of_values == int(1): # ULINT
+ # Simple Field (value)
+ size_in_bits += 64
+ if data_type == ModbusDataType.ULINT: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 64
- if EvaluationHelper.equals(
- data_type, ModbusDataType.REAL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # REAL
- # Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, ModbusDataType.REAL): # List
+ size_in_bits += values.get_list().size() * 64
+ if data_type == ModbusDataType.REAL and number_of_values == int(1): # REAL
+ # Simple Field (value)
+ size_in_bits += 32
+ if data_type == ModbusDataType.REAL: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 32
- if EvaluationHelper.equals(
- data_type, ModbusDataType.LREAL
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LREAL
- # Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(data_type, ModbusDataType.LREAL): # List
+ size_in_bits += values.get_list().size() * 32
+ if data_type == ModbusDataType.LREAL and number_of_values == int(1): # LREAL
+ # Simple Field (value)
+ size_in_bits += 64
+ if data_type == ModbusDataType.LREAL: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 64
- if EvaluationHelper.equals(
- data_type, ModbusDataType.CHAR
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # CHAR
- # Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, ModbusDataType.CHAR): # List
+ size_in_bits += values.get_list().size() * 64
+ if data_type == ModbusDataType.CHAR and number_of_values == int(1): # CHAR
+ # Simple Field (value)
+ size_in_bits += 8
+ if data_type == ModbusDataType.CHAR: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 8
- if EvaluationHelper.equals(
- data_type, ModbusDataType.WCHAR
- ) and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WCHAR
- # Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, ModbusDataType.WCHAR): # List
+ size_in_bits += values.get_list().size() * 8
+ if data_type == ModbusDataType.WCHAR and number_of_values == int(1): # WCHAR
+ # Simple Field (value)
+ size_in_bits += 16
+ if data_type == ModbusDataType.WCHAR: # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 16
- return sizeInBits
+ size_in_bits += values.get_list().size() * 16
+ return size_in_bits
diff --git a/sandbox/plc4py/plc4py/protocols/modbus/readwrite/ModbusDataType.py b/sandbox/plc4py/plc4py/protocols/modbus/readwrite/ModbusDataType.py
index 19e7f9f406e..0ab173983c6 100644
--- a/sandbox/plc4py/plc4py/protocols/modbus/readwrite/ModbusDataType.py
+++ b/sandbox/plc4py/plc4py/protocols/modbus/readwrite/ModbusDataType.py
@@ -22,6 +22,7 @@
class ModbusDataType(AutoNumberEnum):
+ _init_ = "value, data_type_size"
BOOL = (1, int(2))
BYTE = (2, int(2))
WORD = (3, int(2))
diff --git a/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py b/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py
index 809018f1160..f5f32855e49 100644
--- a/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py
+++ b/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py
@@ -19,54 +19,38 @@
from abc import staticmethod
from loguru import logging as log
+from plc4py.api.value.PlcValue import PlcValue
+from plc4py.spi.generation.ReadBuffer import ReadBuffer
+from plc4py.spi.generation.WriteBuffer import WriteBuffer
+from typing import List
import math
class DataItem:
@staticmethod
def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int):
- if EvaluationHelper.equals(data_type, "_bool") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BOOL
+ if data_type == "_bool" and number_of_values == int(1): # BOOL
# Simple Field (value)
value: bool = read_buffer.read_bit("")
return PlcBOOL(value)
- if EvaluationHelper.equals(data_type, "_bool"): # List
+ if data_type == "_bool": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcBOOL(bool(read_buffer.read_bit(""))))
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_byte") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BYTE
+ if data_type == "_byte" and number_of_values == int(1): # BYTE
# Simple Field (value)
value: int = read_buffer.read_unsigned_short(8, logical_name="")
return PlcBYTE(value)
- if EvaluationHelper.equals(data_type, "_byte"): # List
+ if data_type == "_byte": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -75,24 +59,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_word") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WORD
+ if data_type == "_word" and number_of_values == int(1): # WORD
# Simple Field (value)
value: int = read_buffer.read_unsigned_int(16, logical_name="")
return PlcWORD(value)
- if EvaluationHelper.equals(data_type, "_word"): # List
+ if data_type == "_word": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -101,24 +75,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_dword") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DWORD
+ if data_type == "_dword" and number_of_values == int(1): # DWORD
# Simple Field (value)
value: int = read_buffer.read_unsigned_long(32, logical_name="")
return PlcDWORD(value)
- if EvaluationHelper.equals(data_type, "_dword"): # List
+ if data_type == "_dword": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -127,24 +91,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_lword") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LWORD
+ if data_type == "_lword" and number_of_values == int(1): # LWORD
# Simple Field (value)
value: int = read_buffer.read_unsigned_big_integer(64, logical_name="")
return PlcLWORD(value)
- if EvaluationHelper.equals(data_type, "_lword"): # List
+ if data_type == "_lword": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -155,24 +109,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_sint") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # SINT
+ if data_type == "_sint" and number_of_values == int(1): # SINT
# Simple Field (value)
value: int = read_buffer.read_signed_byte(8, logical_name="")
return PlcSINT(value)
- if EvaluationHelper.equals(data_type, "_sint"): # List
+ if data_type == "_sint": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -181,96 +125,56 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_int") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # INT
+ if data_type == "_int" and number_of_values == int(1): # INT
# Simple Field (value)
value: int = read_buffer.read_short(16, logical_name="")
return PlcINT(value)
- if EvaluationHelper.equals(data_type, "_int"): # List
+ if data_type == "_int": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcINT(int(read_buffer.read_short(16, logical_name=""))))
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_dint") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DINT
+ if data_type == "_dint" and number_of_values == int(1): # DINT
# Simple Field (value)
value: int = read_buffer.read_int(32, logical_name="")
return PlcDINT(value)
- if EvaluationHelper.equals(data_type, "_dint"): # List
+ if data_type == "_dint": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcDINT(int(read_buffer.read_int(32, logical_name=""))))
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_lint") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LINT
+ if data_type == "_lint" and number_of_values == int(1): # LINT
# Simple Field (value)
value: int = read_buffer.read_long(64, logical_name="")
return PlcLINT(value)
- if EvaluationHelper.equals(data_type, "_lint"): # List
+ if data_type == "_lint": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
value.append(PlcLINT(int(read_buffer.read_long(64, logical_name=""))))
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_usint") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # USINT
+ if data_type == "_usint" and number_of_values == int(1): # USINT
# Simple Field (value)
value: int = read_buffer.read_unsigned_short(8, logical_name="")
return PlcUSINT(value)
- if EvaluationHelper.equals(data_type, "_usint"): # List
+ if data_type == "_usint": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -279,24 +183,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_uint") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UINT
+ if data_type == "_uint" and number_of_values == int(1): # UINT
# Simple Field (value)
value: int = read_buffer.read_unsigned_int(16, logical_name="")
return PlcUINT(value)
- if EvaluationHelper.equals(data_type, "_uint"): # List
+ if data_type == "_uint": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -305,24 +199,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_udint") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UDINT
+ if data_type == "_udint" and number_of_values == int(1): # UDINT
# Simple Field (value)
value: int = read_buffer.read_unsigned_long(32, logical_name="")
return PlcUDINT(value)
- if EvaluationHelper.equals(data_type, "_udint"): # List
+ if data_type == "_udint": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -331,24 +215,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_ulint") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # ULINT
+ if data_type == "_ulint" and number_of_values == int(1): # ULINT
# Simple Field (value)
value: int = read_buffer.read_unsigned_big_integer(64, logical_name="")
return PlcULINT(value)
- if EvaluationHelper.equals(data_type, "_ulint"): # List
+ if data_type == "_ulint": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -359,24 +233,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_real") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # REAL
+ if data_type == "_real" and number_of_values == int(1): # REAL
# Simple Field (value)
value: float = read_buffer.read_float(32, logical_name="")
return PlcREAL(value)
- if EvaluationHelper.equals(data_type, "_real"): # List
+ if data_type == "_real": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -385,24 +249,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_lreal") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LREAL
+ if data_type == "_lreal" and number_of_values == int(1): # LREAL
# Simple Field (value)
value: float = read_buffer.read_double(64, logical_name="")
return PlcLREAL(value)
- if EvaluationHelper.equals(data_type, "_lreal"): # List
+ if data_type == "_lreal": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -411,24 +265,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_char") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # CHAR
+ if data_type == "_char" and number_of_values == int(1): # CHAR
# Simple Field (value)
value: str = read_buffer.read_string(8, logical_name="", encoding="")
return PlcCHAR(value)
- if EvaluationHelper.equals(data_type, "_char"): # List
+ if data_type == "_char": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -439,24 +283,14 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_wchar") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WCHAR
+ if data_type == "_wchar" and number_of_values == int(1): # WCHAR
# Simple Field (value)
value: str = read_buffer.read_string(16, logical_name="", encoding="")
return PlcWCHAR(value)
- if EvaluationHelper.equals(data_type, "_wchar"): # List
+ if data_type == "_wchar": # List
# Array field (value)
# Count array
- if number_of_values > Integer.MAX_VALUE:
- raise ParseException(
- "Array count of "
- + (number_of_values)
- + " exceeds the maximum allowed count of "
- + Integer.MAX_VALUE
- )
-
item_count: int = int(number_of_values)
value: List[PlcValue] = []
for cur_item in range(item_count):
@@ -467,12 +301,12 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
)
return PlcList(value)
- if EvaluationHelper.equals(data_type, "_string"): # STRING
+ if data_type == "_string": # STRING
# Simple Field (value)
value: str = read_buffer.read_string(255, logical_name="", encoding="")
return PlcSTRING(value)
- if EvaluationHelper.equals(data_type, "_wstring"): # STRING
+ if data_type == "_wstring": # STRING
# Simple Field (value)
value: str = read_buffer.read_string(255, logical_name="", encoding="")
@@ -481,401 +315,329 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int)
@staticmethod
def static_serialize(
- writeBuffer: WriteBuffer, _value: PlcValue, dataType: str, numberOfValues: int
- ) -> None:
- static_serialize(
- writeBuffer, _value, dataType, numberOfValues, ByteOrder.BIG_ENDIAN
- )
-
- @staticmethod
- def static_serialize(
- writeBuffer: WriteBuffer,
+ write_buffer: WriteBuffer,
_value: PlcValue,
- dataType: str,
- numberOfValues: int,
- byteOrder: ByteOrder,
+ data_type: str,
+ number_of_values: int,
+ byte_order: ByteOrder,
) -> None:
- if EvaluationHelper.equals(data_type, "BOOL") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BOOL
+ if data_type == "BOOL" and number_of_values == int(1): # BOOL
# Simple Field (value)
- value: bool = _value.getBool()
- writeBuffer.WriteBit("value", (value))
- if EvaluationHelper.equals(data_type, "BOOL"): # List
+ value: bool = _value.get_bool()
+ write_buffer.write_bit((value), "value")
+ if data_type == "BOOL": # List
values: PlcList = _value
for val in values.getList():
- value: bool = val.getBool()
- writeBuffer.WriteBit("value", (value))
+ value: bool = val.get_bool()
+ write_buffer.write_bit((value), "value")
- if EvaluationHelper.equals(data_type, "BYTE") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BYTE
+ if data_type == "BYTE" and number_of_values == int(1): # BYTE
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint8("value", 8, (value))
- if EvaluationHelper.equals(data_type, "BYTE"): # List
+ value: int = _value.get_int()
+ write_buffer.write_byte((value), 8, "value")
+ if data_type == "BYTE": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint8("value", 8, (value))
+ value: int = val.get_int()
+ write_buffer.write_byte((value), 8, "value")
- if EvaluationHelper.equals(data_type, "WORD") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WORD
+ if data_type == "WORD" and number_of_values == int(1): # WORD
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint16("value", 16, (value))
- if EvaluationHelper.equals(data_type, "WORD"): # List
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_short((value), 16, "value")
+ if data_type == "WORD": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint16("value", 16, (value))
+ value: int = val.get_int()
+ write_buffer.write_unsigned_short((value), 16, "value")
- if EvaluationHelper.equals(data_type, "DWORD") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DWORD
+ if data_type == "DWORD" and number_of_values == int(1): # DWORD
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint32("value", 32, (value))
- if EvaluationHelper.equals(data_type, "DWORD"): # List
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_int((value), 32, "value")
+ if data_type == "DWORD": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint32("value", 32, (value))
+ value: int = val.get_int()
+ write_buffer.write_unsigned_int((value), 32, "value")
- if EvaluationHelper.equals(data_type, "LWORD") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LWORD
+ if data_type == "LWORD" and number_of_values == int(1): # LWORD
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint64("value", 64, (value))
- if EvaluationHelper.equals(data_type, "LWORD"): # List
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_long((value), 64, "value")
+ if data_type == "LWORD": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint64("value", 64, (value))
+ value: int = val.get_int()
+ write_buffer.write_unsigned_long((value), 64, "value")
- if EvaluationHelper.equals(data_type, "SINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # SINT
+ if data_type == "SINT" and number_of_values == int(1): # SINT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt8("value", 8, (value))
- if EvaluationHelper.equals(data_type, "SINT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_signed_byte((value), 8, "value")
+ if data_type == "SINT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt8("value", 8, (value))
+ value: int = val.get_int()
+ write_buffer.write_signed_byte((value), 8, "value")
- if EvaluationHelper.equals(data_type, "INT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # INT
+ if data_type == "INT" and number_of_values == int(1): # INT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt16("value", 16, (value))
- if EvaluationHelper.equals(data_type, "INT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_short((value), 16, "value")
+ if data_type == "INT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt16("value", 16, (value))
+ value: int = val.get_int()
+ write_buffer.write_short((value), 16, "value")
- if EvaluationHelper.equals(data_type, "DINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DINT
+ if data_type == "DINT" and number_of_values == int(1): # DINT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt32("value", 32, (value))
- if EvaluationHelper.equals(data_type, "DINT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_int((value), 32, "value")
+ if data_type == "DINT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt32("value", 32, (value))
+ value: int = val.get_int()
+ write_buffer.write_int((value), 32, "value")
- if EvaluationHelper.equals(data_type, "LINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LINT
+ if data_type == "LINT" and number_of_values == int(1): # LINT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteInt64("value", 64, (value))
- if EvaluationHelper.equals(data_type, "LINT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_long((value), 64, "value")
+ if data_type == "LINT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteInt64("value", 64, (value))
+ value: int = val.get_int()
+ write_buffer.write_long((value), 64, "value")
- if EvaluationHelper.equals(data_type, "USINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # USINT
+ if data_type == "USINT" and number_of_values == int(1): # USINT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint8("value", 8, (value))
- if EvaluationHelper.equals(data_type, "USINT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_byte((value), 8, "value")
+ if data_type == "USINT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint8("value", 8, (value))
+ value: int = val.get_int()
+ write_buffer.write_byte((value), 8, "value")
- if EvaluationHelper.equals(data_type, "UINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UINT
+ if data_type == "UINT" and number_of_values == int(1): # UINT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint16("value", 16, (value))
- if EvaluationHelper.equals(data_type, "UINT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_short((value), 16, "value")
+ if data_type == "UINT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint16("value", 16, (value))
+ value: int = val.get_int()
+ write_buffer.write_unsigned_short((value), 16, "value")
- if EvaluationHelper.equals(data_type, "UDINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UDINT
+ if data_type == "UDINT" and number_of_values == int(1): # UDINT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint32("value", 32, (value))
- if EvaluationHelper.equals(data_type, "UDINT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_int((value), 32, "value")
+ if data_type == "UDINT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint32("value", 32, (value))
+ value: int = val.get_int()
+ write_buffer.write_unsigned_int((value), 32, "value")
- if EvaluationHelper.equals(data_type, "ULINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # ULINT
+ if data_type == "ULINT" and number_of_values == int(1): # ULINT
# Simple Field (value)
- value: int = _value.getInt()
- writeBuffer.WriteUint64("value", 64, (value))
- if EvaluationHelper.equals(data_type, "ULINT"): # List
+ value: int = _value.get_int()
+ write_buffer.write_unsigned_long((value), 64, "value")
+ if data_type == "ULINT": # List
values: PlcList = _value
for val in values.getList():
- value: int = val.getInt()
- writeBuffer.WriteUint64("value", 64, (value))
+ value: int = val.get_int()
+ write_buffer.write_unsigned_long((value), 64, "value")
- if EvaluationHelper.equals(data_type, "REAL") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # REAL
+ if data_type == "REAL" and number_of_values == int(1): # REAL
# Simple Field (value)
- value: float = _value.getFloat()
- writeBuffer.WriteFloat32("value", 32, (value))
- if EvaluationHelper.equals(data_type, "REAL"): # List
+ value: float = _value.get_float()
+ write_buffer.write_float((value), 32, "value")
+ if data_type == "REAL": # List
values: PlcList = _value
for val in values.getList():
- value: float = val.getFloat()
- writeBuffer.WriteFloat32("value", 32, (value))
+ value: float = val.get_float()
+ write_buffer.write_float((value), 32, "value")
- if EvaluationHelper.equals(data_type, "LREAL") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LREAL
+ if data_type == "LREAL" and number_of_values == int(1): # LREAL
# Simple Field (value)
- value: float = _value.getFloat()
- writeBuffer.WriteFloat64("value", 64, (value))
- if EvaluationHelper.equals(data_type, "LREAL"): # List
+ value: float = _value.get_float()
+ write_buffer.write_double((value), 64, "value")
+ if data_type == "LREAL": # List
values: PlcList = _value
for val in values.getList():
- value: float = val.getFloat()
- writeBuffer.WriteFloat64("value", 64, (value))
+ value: float = val.get_float()
+ write_buffer.write_double((value), 64, "value")
- if EvaluationHelper.equals(data_type, "CHAR") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # CHAR
+ if data_type == "CHAR" and number_of_values == int(1): # CHAR
# Simple Field (value)
- value: str = _value.getStr()
- writeBuffer.WriteString("value", uint32(8), "UTF-8", (value))
- if EvaluationHelper.equals(data_type, "CHAR"): # List
+ value: str = _value.get_str()
+ write_buffer.write_str((value), uint32(8), "UTF-8", "value")
+ if data_type == "CHAR": # List
values: PlcList = _value
for val in values.getList():
- value: str = val.getStr()
- writeBuffer.WriteString("value", uint32(8), "UTF-8", (value))
+ value: str = val.get_str()
+ write_buffer.write_str((value), uint32(8), "UTF-8", "value")
- if EvaluationHelper.equals(data_type, "WCHAR") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WCHAR
+ if data_type == "WCHAR" and number_of_values == int(1): # WCHAR
# Simple Field (value)
- value: str = _value.getStr()
- writeBuffer.WriteString("value", uint32(16), "UTF-16", (value))
- if EvaluationHelper.equals(data_type, "WCHAR"): # List
+ value: str = _value.get_str()
+ write_buffer.write_str((value), uint32(16), "UTF-16", "value")
+ if data_type == "WCHAR": # List
values: PlcList = _value
for val in values.getList():
- value: str = val.getStr()
- writeBuffer.WriteString("value", uint32(16), "UTF-16", (value))
+ value: str = val.get_str()
+ write_buffer.write_str((value), uint32(16), "UTF-16", "value")
- if EvaluationHelper.equals(data_type, "STRING"): # STRING
+ if data_type == "STRING": # STRING
# Simple Field (value)
- value: str = _value.getStr()
- writeBuffer.WriteString("value", uint32(255), "UTF-8", (value))
- if EvaluationHelper.equals(data_type, "WSTRING"): # STRING
+ value: str = _value.get_str()
+ write_buffer.write_str((value), uint32(255), "UTF-8", "value")
+ if data_type == "WSTRING": # STRING
# Simple Field (value)
- value: str = _value.getStr()
- writeBuffer.WriteString("value", uint32(255), "UTF-16", (value))
+ value: str = _value.get_str()
+ write_buffer.write_str((value), uint32(255), "UTF-16", "value")
@staticmethod
def get_length_in_bytes(
- _value: PlcValue, dataType: str, numberOfValues: int
+ _value: PlcValue, data_type: str, number_of_values: int
) -> int:
return int(
- math.ceil(float(getLengthInBits(_value, dataType, numberOfValues)) / 8.0)
+ math.ceil(
+ float(get_length_in_bits(_value, data_type, number_of_values)) / 8.0
+ )
)
@staticmethod
- def get_length_in_bits(_value: PlcValue, dataType: str, numberOfValues: int) -> int:
- sizeInBits: int = 0
- if EvaluationHelper.equals(data_type, "BOOL") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BOOL
+ def get_length_in_bits(
+ _value: PlcValue, data_type: str, number_of_values: int
+ ) -> int:
+ size_in_bits: int = 0
+ if data_type == "BOOL" and number_of_values == int(1): # BOOL
# Simple Field (value)
- sizeInBits += 1
- if EvaluationHelper.equals(data_type, "BOOL"): # List
- values: PlcList = _value
- sizeInBits += values.getList().size() * 1
- if EvaluationHelper.equals(data_type, "BYTE") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # BYTE
+ size_in_bits += 1
+ if data_type == "BOOL": # List
+ values: PlcList = _value
+ size_in_bits += values.get_list().size() * 1
+ if data_type == "BYTE" and number_of_values == int(1): # BYTE
# Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, "BYTE"): # List
- values: PlcList = _value
- sizeInBits += values.getList().size() * 8
- if EvaluationHelper.equals(data_type, "WORD") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WORD
+ size_in_bits += 8
+ if data_type == "BYTE": # List
+ values: PlcList = _value
+ size_in_bits += values.get_list().size() * 8
+ if data_type == "WORD" and number_of_values == int(1): # WORD
# Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, "WORD"): # List
- values: PlcList = _value
- sizeInBits += values.getList().size() * 16
- if EvaluationHelper.equals(data_type, "DWORD") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DWORD
+ size_in_bits += 16
+ if data_type == "WORD": # List
+ values: PlcList = _value
+ size_in_bits += values.get_list().size() * 16
+ if data_type == "DWORD" and number_of_values == int(1): # DWORD
# Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, "DWORD"): # List
- values: PlcList = _value
- sizeInBits += values.getList().size() * 32
- if EvaluationHelper.equals(data_type, "LWORD") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LWORD
+ size_in_bits += 32
+ if data_type == "DWORD": # List
+ values: PlcList = _value
+ size_in_bits += values.get_list().size() * 32
+ if data_type == "LWORD" and number_of_values == int(1): # LWORD
# Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(data_type, "LWORD"): # List
+ size_in_bits += 64
+ if data_type == "LWORD": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 64
- if EvaluationHelper.equals(data_type, "SINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # SINT
+ size_in_bits += values.get_list().size() * 64
+ if data_type == "SINT" and number_of_values == int(1): # SINT
# Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, "SINT"): # List
+ size_in_bits += 8
+ if data_type == "SINT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 8
- if EvaluationHelper.equals(data_type, "INT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # INT
+ size_in_bits += values.get_list().size() * 8
+ if data_type == "INT" and number_of_values == int(1): # INT
# Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, "INT"): # List
+ size_in_bits += 16
+ if data_type == "INT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 16
- if EvaluationHelper.equals(data_type, "DINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # DINT
+ size_in_bits += values.get_list().size() * 16
+ if data_type == "DINT" and number_of_values == int(1): # DINT
# Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, "DINT"): # List
+ size_in_bits += 32
+ if data_type == "DINT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 32
- if EvaluationHelper.equals(data_type, "LINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LINT
+ size_in_bits += values.get_list().size() * 32
+ if data_type == "LINT" and number_of_values == int(1): # LINT
# Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(data_type, "LINT"): # List
+ size_in_bits += 64
+ if data_type == "LINT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 64
- if EvaluationHelper.equals(data_type, "USINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # USINT
+ size_in_bits += values.get_list().size() * 64
+ if data_type == "USINT" and number_of_values == int(1): # USINT
# Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, "USINT"): # List
+ size_in_bits += 8
+ if data_type == "USINT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 8
- if EvaluationHelper.equals(data_type, "UINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UINT
+ size_in_bits += values.get_list().size() * 8
+ if data_type == "UINT" and number_of_values == int(1): # UINT
# Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, "UINT"): # List
+ size_in_bits += 16
+ if data_type == "UINT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 16
- if EvaluationHelper.equals(data_type, "UDINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # UDINT
+ size_in_bits += values.get_list().size() * 16
+ if data_type == "UDINT" and number_of_values == int(1): # UDINT
# Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, "UDINT"): # List
+ size_in_bits += 32
+ if data_type == "UDINT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 32
- if EvaluationHelper.equals(data_type, "ULINT") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # ULINT
+ size_in_bits += values.get_list().size() * 32
+ if data_type == "ULINT" and number_of_values == int(1): # ULINT
# Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(data_type, "ULINT"): # List
+ size_in_bits += 64
+ if data_type == "ULINT": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 64
- if EvaluationHelper.equals(data_type, "REAL") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # REAL
+ size_in_bits += values.get_list().size() * 64
+ if data_type == "REAL" and number_of_values == int(1): # REAL
# Simple Field (value)
- sizeInBits += 32
- if EvaluationHelper.equals(data_type, "REAL"): # List
+ size_in_bits += 32
+ if data_type == "REAL": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 32
- if EvaluationHelper.equals(data_type, "LREAL") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # LREAL
+ size_in_bits += values.get_list().size() * 32
+ if data_type == "LREAL" and number_of_values == int(1): # LREAL
# Simple Field (value)
- sizeInBits += 64
- if EvaluationHelper.equals(data_type, "LREAL"): # List
+ size_in_bits += 64
+ if data_type == "LREAL": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 64
- if EvaluationHelper.equals(data_type, "CHAR") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # CHAR
+ size_in_bits += values.get_list().size() * 64
+ if data_type == "CHAR" and number_of_values == int(1): # CHAR
# Simple Field (value)
- sizeInBits += 8
- if EvaluationHelper.equals(data_type, "CHAR"): # List
+ size_in_bits += 8
+ if data_type == "CHAR": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 8
- if EvaluationHelper.equals(data_type, "WCHAR") and EvaluationHelper.equals(
- number_of_values, int(1)
- ): # WCHAR
+ size_in_bits += values.get_list().size() * 8
+ if data_type == "WCHAR" and number_of_values == int(1): # WCHAR
# Simple Field (value)
- sizeInBits += 16
- if EvaluationHelper.equals(data_type, "WCHAR"): # List
+ size_in_bits += 16
+ if data_type == "WCHAR": # List
values: PlcList = _value
- sizeInBits += values.getList().size() * 16
- if EvaluationHelper.equals(data_type, "STRING"): # STRING
+ size_in_bits += values.get_list().size() * 16
+ if data_type == "STRING": # STRING
# Simple Field (value)
- sizeInBits += 255
- if EvaluationHelper.equals(data_type, "WSTRING"): # STRING
+ size_in_bits += 255
+ if data_type == "WSTRING": # STRING
# Simple Field (value)
- sizeInBits += 255
- return sizeInBits
+ size_in_bits += 255
+ return size_in_bits
diff --git a/sandbox/plc4py/plc4py/protocols/simulated/readwrite/SimulatedDataTypeSizes.py b/sandbox/plc4py/plc4py/protocols/simulated/readwrite/SimulatedDataTypeSizes.py
index a9811d8a673..9c391b4351e 100644
--- a/sandbox/plc4py/plc4py/protocols/simulated/readwrite/SimulatedDataTypeSizes.py
+++ b/sandbox/plc4py/plc4py/protocols/simulated/readwrite/SimulatedDataTypeSizes.py
@@ -22,6 +22,7 @@
class SimulatedDataTypeSizes(AutoNumberEnum):
+ _init_ = "value, data_type_size"
BOOL = (1, int(1))
BYTE = (2, int(1))
WORD = (3, int(2))
diff --git a/sandbox/plc4py/plc4py/utils/GenericTypes.py b/sandbox/plc4py/plc4py/utils/GenericTypes.py
index c1fb8ed5e12..e4f1aa19c20 100644
--- a/sandbox/plc4py/plc4py/utils/GenericTypes.py
+++ b/sandbox/plc4py/plc4py/utils/GenericTypes.py
@@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
+import threading
from dataclasses import dataclass
from enum import Enum, auto
from typing import Generator
@@ -60,3 +61,23 @@ def get_short_name(order):
@dataclass
class ByteOrderAware:
byte_order: ByteOrder
+
+
+class AtomicInteger:
+ def __init__(self, seed=0):
+ self._value = seed
+ self._lock = threading.Lock()
+
+ def increment(self, num=1):
+ with self._lock:
+ self._value += num
+ return self._value
+
+ def decrement(self, num=1):
+ with self._lock:
+ self._value -= num
+ return self._value
+
+ @property
+ def value(self):
+ return self._value
diff --git a/sandbox/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py b/sandbox/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py
index c848f875000..2d686d7d4dd 100644
--- a/sandbox/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py
+++ b/sandbox/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py
@@ -28,11 +28,11 @@ async def manual_test_plc_driver_modbus_connect():
assert not connection.is_connected()
-async def manual_test_plc_driver_modbus_read():
+async def test_plc_driver_modbus_read():
driver_manager = PlcDriverManager()
async with driver_manager.connection("modbus://127.0.0.1:5555") as connection:
with connection.read_request_builder() as builder:
- builder.add_item("Random Tag", "holding-register:1")
+ builder.add_item("Random Tag", "holding-register:1[10]")
request = builder.build()
future = connection.execute(request)
await future