Skip to content

Commit

Permalink
feat(plc4go/bacnet): npdu wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 21, 2024
1 parent fcd5173 commit adc013a
Show file tree
Hide file tree
Showing 22 changed files with 1,119 additions and 181 deletions.
61 changes: 58 additions & 3 deletions plc4go/internal/bacnetip/CommunicationsModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package bacnetip

import (
"fmt"
"strconv"
"strings"

"github.com/apache/plc4x/plc4go/spi"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"strconv"
)

// maps of named clients and servers
Expand All @@ -42,16 +45,60 @@ func init() {
elementMap = make(map[int]*ApplicationServiceElement)
}

type IPCI interface {
fmt.Stringer
GetPDUUserData() spi.Message
GetPDUSource() *Address
SetPDUSource(source *Address)
GetPDUDestination() *Address
SetPDUDestination(*Address)
Update(pci Arg) error
}

type __PCI struct {
pduUserData spi.Message
pduUserData spi.Message // TODO: should that be PDUUserData rater than spi.Message and do we need another field... lets see...
pduSource *Address
pduDestination *Address
}

var _ IPCI = (*__PCI)(nil)

func new__PCI(pduUserData spi.Message, pduSource *Address, pduDestination *Address) *__PCI {
return &__PCI{pduUserData, pduSource, pduDestination}
}

func (p *__PCI) GetPDUUserData() spi.Message {
return p.pduUserData
}

func (p *__PCI) GetPDUSource() *Address {
return p.pduSource
}

func (p *__PCI) SetPDUSource(source *Address) {
p.pduSource = source
}

func (p *__PCI) GetPDUDestination() *Address {
return p.pduDestination
}

func (p *__PCI) SetPDUDestination(destination *Address) {
p.pduDestination = destination
}

func (p *__PCI) Update(pci Arg) error {
switch pci := pci.(type) {
case IPCI:
p.pduUserData = pci.GetPDUUserData()
p.pduSource = pci.GetPDUSource()
p.pduDestination = pci.GetPDUDestination()
return nil
default:
return errors.Errorf("invalid IPCI type %T", pci)
}
}

func (p *__PCI) deepCopy() *__PCI {
pduUserData := p.pduUserData // those are immutable so no copy needed
pduSource := p.pduSource
Expand All @@ -68,7 +115,15 @@ func (p *__PCI) deepCopy() *__PCI {
}

func (p *__PCI) String() string {
return fmt.Sprintf("__PCI{pduUserData:\n%s\n, pduSource: %s, pduDestination: %s}", p.pduUserData, p.pduSource, p.pduDestination)
pduUserDataString := ""
if p.pduUserData != nil {
pduUserDataString = p.pduUserData.String()
if strings.Contains(pduUserDataString, "\n") {
pduUserDataString = "\n" + pduUserDataString + "\n"
}
pduUserDataString = "pduUserData: " + pduUserDataString + " ,"
}
return fmt.Sprintf("__PCI{%spduSource: %s, pduDestination: %s}", pduUserDataString, p.pduSource, p.pduDestination)
}

// _Client is an interface used for documentation
Expand Down
3 changes: 2 additions & 1 deletion plc4go/internal/bacnetip/IOCBModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ package bacnetip
import (
"container/heap"
"fmt"
"github.com/rs/zerolog"
"sync"
"time"

"github.com/apache/plc4x/plc4go/spi/utils"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand Down
1 change: 1 addition & 0 deletions plc4go/internal/bacnetip/LocalDevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package bacnetip

import (
"fmt"

readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
)

Expand Down
9 changes: 6 additions & 3 deletions plc4go/internal/bacnetip/NetworkService.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ package bacnetip
import (
"bytes"
"fmt"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"math"
"slices"
"time"

readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"

"github.com/pkg/errors"
"github.com/rs/zerolog"
)

type RouterStatus uint8
Expand Down Expand Up @@ -520,6 +522,7 @@ func (n *NetworkServiceAccessPoint) Indication(args Args, kwargs KWArgs) error {
panic("not implemented yet")
}

// TODO: should us the one in NPDU
func buildNPDU(hopCount uint8, source *Address, destination *Address, expectingReply bool, networkPriority readWriteModel.NPDUNetworkPriority, nlm readWriteModel.NLM, apdu readWriteModel.APDU) (readWriteModel.NPDU, error) {
switch {
case nlm != nil && apdu != nil:
Expand Down
121 changes: 83 additions & 38 deletions plc4go/internal/bacnetip/PDU.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,12 +853,20 @@ func NewGlobalBroadcast(route *Address) *Address {
return g
}

type PCI interface {
IPCI
GetExpectingReply() bool
GetNetworkPriority() readWriteModel.NPDUNetworkPriority
}

type _PCI struct {
*__PCI
expectingReply bool
networkPriority readWriteModel.NPDUNetworkPriority
}

var _ PCI = (*_PCI)(nil)

func newPCI(msg spi.Message, pduSource *Address, pduDestination *Address, expectingReply bool, networkPriority readWriteModel.NPDUNetworkPriority) *_PCI {
return &_PCI{
new__PCI(msg, pduSource, pduDestination),
Expand All @@ -867,6 +875,28 @@ func newPCI(msg spi.Message, pduSource *Address, pduDestination *Address, expect
}
}

func (p *_PCI) Update(pci Arg) error {
if err := p.__PCI.Update(pci); err != nil {
return errors.Wrap(err, "error updating __PCI")
}
switch pci := pci.(type) {
case PCI:
p.expectingReply = pci.GetExpectingReply()
p.networkPriority = pci.GetNetworkPriority()
return nil
default:
return errors.Errorf("invalid PCI type %T", pci)
}
}

func (p *_PCI) GetExpectingReply() bool {
return p.expectingReply
}

func (p *_PCI) GetNetworkPriority() readWriteModel.NPDUNetworkPriority {
return p.networkPriority
}

func (p *_PCI) deepCopy() *_PCI {
__pci := p.__PCI.deepCopy()
expectingReply := p.expectingReply
Expand All @@ -879,6 +909,7 @@ func (p *_PCI) String() string {
}

type PDUData interface {
SetPduData([]byte)
GetPduData() []byte
Get() (byte, error)
GetShort() (int16, error)
Expand Down Expand Up @@ -914,6 +945,10 @@ func NewPDUData(args Args) PDUData {
return newPDUData(args[0].(_PDUDataRequirements))
}

func (d *_PDUData) SetPduData(data []byte) {
d.cachedData = data
}

func (d *_PDUData) GetPduData() []byte {
d.checkCache()
return d.cachedData
Expand Down Expand Up @@ -1001,26 +1036,57 @@ func (d *_PDUData) deepCopy() *_PDUData {
return &copyPDUData
}

type APCI interface {
PCI
}

type _APCI struct {
*_PCI
}

func newAPCI(msg spi.Message, pduSource *Address, pduDestination *Address, expectingReply bool, networkPriority readWriteModel.NPDUNetworkPriority) *_APCI {
return &_APCI{
_PCI: newPCI(msg, pduSource, pduDestination, expectingReply, networkPriority),
}
}

func (a *_APCI) Update(apci Arg) error {
if err := a._PCI.Update(apci); err != nil {
return errors.Wrap(err, "error updating _PCI")
}
switch pci := apci.(type) {
case APCI:
// TODO: update coordinates....
return nil
default:
return errors.Errorf("invalid APCI type %T", pci)
}
}

func (a *_APCI) deepCopy() *_APCI {
_pci := a._PCI.deepCopy()
return &_APCI{_pci}
}

func (a *_APCI) String() string {
return fmt.Sprintf("APCI{%s}", a._PCI)
}

type PDU interface {
fmt.Stringer
GetMessage() spi.Message
GetPDUSource() *Address
SetPDUSource(source *Address)
GetPDUDestination() *Address
SetPDUDestination(*Address)
GetExpectingReply() bool
GetNetworkPriority() readWriteModel.NPDUNetworkPriority
APCI
PDUData
GetMessage() spi.Message // TODO: check if we still need that... ()
DeepCopy() PDU
}

type _PDU struct {
*_PCI
*_APCI
*_PDUData
}

func NewPDU(msg spi.Message, pduOptions ...PDUOption) PDU {
p := &_PDU{
_PCI: newPCI(msg, nil, nil, false, readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE),
_APCI: newAPCI(msg, nil, nil, false, readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE),
}
for _, option := range pduOptions {
option(p)
Expand All @@ -1032,7 +1098,7 @@ func NewPDU(msg spi.Message, pduOptions ...PDUOption) PDU {
func NewPDUFromPDU(pdu PDU, pduOptions ...PDUOption) PDU {
msg := pdu.(*_PDU).pduUserData
p := &_PDU{
_PCI: newPCI(msg, pdu.GetPDUSource(), pdu.GetPDUDestination(), pdu.GetExpectingReply(), pdu.GetNetworkPriority()),
_APCI: newAPCI(msg, pdu.GetPDUSource(), pdu.GetPDUDestination(), pdu.GetExpectingReply(), pdu.GetNetworkPriority()),
}
for _, option := range pduOptions {
option(p)
Expand All @@ -1043,7 +1109,7 @@ func NewPDUFromPDU(pdu PDU, pduOptions ...PDUOption) PDU {

func NewPDUFromPDUWithNewMessage(pdu PDU, msg spi.Message, pduOptions ...PDUOption) PDU {
p := &_PDU{
_PCI: newPCI(msg, pdu.GetPDUSource(), pdu.GetPDUDestination(), pdu.GetExpectingReply(), pdu.GetNetworkPriority()),
_APCI: newAPCI(msg, pdu.GetPDUSource(), pdu.GetPDUDestination(), pdu.GetExpectingReply(), pdu.GetNetworkPriority()),
}
for _, option := range pduOptions {
option(p)
Expand All @@ -1054,7 +1120,7 @@ func NewPDUFromPDUWithNewMessage(pdu PDU, msg spi.Message, pduOptions ...PDUOpti

func NewPDUWithAllOptions(msg spi.Message, pduSource *Address, pduDestination *Address, expectingReply bool, networkPriority readWriteModel.NPDUNetworkPriority) *_PDU {
p := &_PDU{
_PCI: newPCI(msg, pduSource, pduDestination, expectingReply, networkPriority),
_APCI: newAPCI(msg, pduSource, pduDestination, expectingReply, networkPriority),
}
p._PDUData = newPDUData(p)
return p
Expand Down Expand Up @@ -1087,6 +1153,9 @@ func WithPDUNetworkPriority(networkPriority readWriteModel.NPDUNetworkPriority)
}

func (p *_PDU) getPDUData() []byte {
if p.GetMessage() == nil {
return nil
}
writeBufferByteBased := utils.NewWriteBufferByteBased()
if err := p.GetMessage().SerializeWithWriteBuffer(context.Background(), writeBufferByteBased); err != nil {
panic(err) // TODO: graceful handle
Expand All @@ -1098,32 +1167,8 @@ func (p *_PDU) GetMessage() spi.Message {
return p.pduUserData
}

func (p *_PDU) GetPDUSource() *Address {
return p.pduSource
}

func (p *_PDU) SetPDUSource(source *Address) {
p.pduSource = source
}

func (p *_PDU) GetPDUDestination() *Address {
return p.pduDestination
}

func (p *_PDU) SetPDUDestination(destination *Address) {
p.pduDestination = destination
}

func (p *_PDU) GetExpectingReply() bool {
return p.expectingReply
}

func (p *_PDU) GetNetworkPriority() readWriteModel.NPDUNetworkPriority {
return p.networkPriority
}

func (p *_PDU) deepCopy() *_PDU {
return &_PDU{_PCI: p._PCI.deepCopy(), _PDUData: p._PDUData.deepCopy()}
return &_PDU{_APCI: p._APCI.deepCopy(), _PDUData: p._PDUData.deepCopy()}
}

func (p *_PDU) DeepCopy() PDU {
Expand Down
11 changes: 6 additions & 5 deletions plc4go/internal/bacnetip/UDPCommunicationsModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ package bacnetip
import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/rs/zerolog"
"net"
"time"

"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi/options"

"github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)

//go:generate go run ../../tools/plc4xgenerator/gen.go -type=UDPActor
Expand Down Expand Up @@ -228,7 +229,7 @@ func (d *UDPDirector) AddActor(actor *UDPActor) {

// tell the ASE there is a new client
if d.serviceElement != nil {
if err := d.SapRequest(NoArgs, NewKWArgs(kwAddActor, actor)); err != nil {
if err := d.SapRequest(NoArgs, NewKWArgs(KWAddActor, actor)); err != nil {
d.log.Error().Err(err).Msg("Error in add actor")
}
}
Expand All @@ -242,7 +243,7 @@ func (d *UDPDirector) DelActor(actor *UDPActor) {

// tell the ASE the client has gone away
if d.serviceElement != nil {
if err := d.SapRequest(NoArgs, NewKWArgs(kwDelActor, actor)); err != nil {
if err := d.SapRequest(NoArgs, NewKWArgs(KWDelActor, actor)); err != nil {
d.log.Error().Err(err).Msg("Error in del actor")
}
}
Expand All @@ -255,7 +256,7 @@ func (d *UDPDirector) GetActor(address Address) *UDPActor {
func (d *UDPDirector) ActorError(actor *UDPActor, err error) {
// tell the ASE the actor had an error
if d.serviceElement != nil {
if err := d.SapRequest(NoArgs, NewKWArgs(kwActorError, actor, kwError, err)); err != nil {
if err := d.SapRequest(NoArgs, NewKWArgs(KWActorError, actor, KWError, err)); err != nil {
d.log.Error().Err(err).Msg("Error in actor error")
}
}
Expand Down
Loading

0 comments on commit adc013a

Please sign in to comment.