Skip to content

Commit

Permalink
fix(plc4go/bacnet): several bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 13, 2024
1 parent 77dfb25 commit 19f5db7
Show file tree
Hide file tree
Showing 10 changed files with 661 additions and 100 deletions.
2 changes: 1 addition & 1 deletion plc4go/internal/bacnetip/ApplicationLayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *SSM) RestartTimer(millis uint) {
}

delta := time.Millisecond * time.Duration(millis)
s.InstallTask(nil, &delta)
s.InstallTask(InstallTaskOptions{Delta: &delta})
}

// setState This function is called when the derived class wants to change state
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/bacnetip/ApplicationModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func NewApplication(localLog zerolog.Logger, localDevice *LocalDeviceObject, dev
if !a._startupDisabled {
for _, fn := range a.CapabilityFunctions("startup") {
localLog.Debug().Interface("fn", fn).Msg("startup fn")
Deferred(fn)
Deferred(fn, NoArgs, NoKWArgs)
}
}
return a, nil
Expand Down
18 changes: 12 additions & 6 deletions plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@ func (b *BIPSimple) String() string {
func (b *BIPSimple) Indication(args Args, kwargs KWArgs) error {
b.log.Debug().Stringer("Args", args).Stringer("KWArgs", kwargs).Msg("Indication")
pdu := args.Get0PDU()
if pdu == nil {
return errors.New("no pdu")
}
if pdu.GetPDUDestination() == nil {
return errors.New("no pdu destination")
}

// check for local stations
switch pdu.GetPDUDestination().AddrType {
Expand Down Expand Up @@ -557,7 +563,7 @@ func NewBIPForeign(localLog zerolog.Logger, addr *Address, ttl *int, sapID *int,
b.bbmdTimeToLive = nil

// used in tracking active registration timeouts
b.registrationTimeoutTask = OneShotFunction(b._registration_expired)
b.registrationTimeoutTask = OneShotFunction(b._registration_expired, NoArgs, NoKWArgs)

// registration provided
if addr != nil {
Expand Down Expand Up @@ -745,7 +751,7 @@ func (b *BIPForeign) register(addr Address, ttl int) error {
// install this task to do registration renewal according to the TTL
// and stop tracking any active registration timeouts
var taskTime time.Time
b.InstallTask(&taskTime, nil)
b.InstallTask(InstallTaskOptions{When: &taskTime})
b._stop_track_registration()
return nil
}
Expand Down Expand Up @@ -776,7 +782,7 @@ func (b *BIPForeign) unregister() {
b._stop_track_registration()
}

// processTask is called when the registration request should be sent to the BBMD.
// ProcessTask is called when the registration request should be sent to the BBMD.
func (b *BIPForeign) ProcessTask() error {
pdu := NewPDU(readWriteModel.NewBVLCRegisterForeignDevice(uint16(*b.bbmdTimeToLive)), WithPDUDestination(b.bbmdAddress))

Expand All @@ -787,7 +793,7 @@ func (b *BIPForeign) ProcessTask() error {

// schedule the next registration renewal
var delta = time.Duration(*b.bbmdTimeToLive) * time.Second
b.InstallTask(nil, &delta)
b.InstallTask(InstallTaskOptions{Delta: &delta})
return nil
}

Expand All @@ -800,15 +806,15 @@ func (b *BIPForeign) ProcessTask() error {
// definitely not registered anymore.
func (b *BIPForeign) _start_track_registration() {
var delta = time.Duration(*b.bbmdTimeToLive)*time.Second + (30 * time.Second)
b.registrationTimeoutTask.InstallTask(nil, &delta)
b.registrationTimeoutTask.InstallTask(InstallTaskOptions{Delta: &delta})
}

func (b *BIPForeign) _stop_track_registration() {
b.registrationTimeoutTask.SuspendTask()
}

// _registration_expired is called when detecting that foreign device registration has definitely expired.
func (b *BIPForeign) _registration_expired() error {
func (b *BIPForeign) _registration_expired(_ Args, _ KWArgs) error {
b.registrationStatus = -1 // Unregistered
b._stop_track_registration()
return nil
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/bacnetip/Capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewCapability() *Capability {
type Collector struct {
}

func (c *Collector) CapabilityFunctions(fn string) []func() error {
func (c *Collector) CapabilityFunctions(fn string) []func(args Args, kwargs KWArgs) error {
// TODO: implement
return nil
}
89 changes: 75 additions & 14 deletions plc4go/internal/bacnetip/Core.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,25 @@ import (
"os/signal"
"syscall"
"time"

"github.com/rs/zerolog"
)

var running bool
var spin = 10 * time.Millisecond
var sleepTime = 0 * time.Nanosecond
var DeferredFunctions []func() error

type deferredFunctionTuple struct {
fn func(args Args, kwargs KWArgs) error
args Args
kwargs KWArgs
}

var DeferredFunctions []deferredFunctionTuple

var ErrorCallback func(err error)

func init() {
func run() {
running = true
go func() {
c := make(chan os.Signal, 1)
Expand All @@ -46,14 +55,17 @@ func init() {
go func() {
for running {
// get the next task
task, delta := _taskManager.GetNextTask()
var delta time.Duration
task, taskDelta := _taskManager.GetNextTask()
if task != nil {
_taskManager.ProcessTask(task)
}

// if delta is None, there are no Tasks, default to spinning
if delta == 0 {
if taskDelta == nil {
delta = spin
} else {
delta = *taskDelta
}

// there may be threads around, sleep for a bit
Expand All @@ -74,27 +86,76 @@ func init() {
time.Sleep(delta)

// check for deferred functions
for len(DeferredFunctions) > 0 {
fnlist := DeferredFunctions
// empty list
DeferredFunctions = nil
for _, fnTuple := range fnlist {
fn := fnTuple.fn
args := fnTuple.args
kwargs := fnTuple.kwargs
if err := fn(args, kwargs); err != nil {
if ErrorCallback != nil {
ErrorCallback(err)
}
}
}
}
}
}()
}

// RunOnce makes a pass through the scheduled tasks and deferred functions just
//
// like the run() function but without the asyncore call (so there is no
// socket IO activity) and the timers.
func RunOnce(localLog zerolog.Logger) {
localLog.Trace().Msg("run_once")
taskManager := NewTaskManager(localLog)

for {
// get the next task
var task TaskRequirements
task, delta := taskManager.GetNextTask()
var displayDelta time.Duration
if delta != nil {
displayDelta = *delta
}
localLog.Debug().Stringer("task", task).Dur("delta", displayDelta).Msg("task")

// if there is a task to process, do it
if task != nil {
taskManager.ProcessTask(task)
}

// check for deferred functions

for len(DeferredFunctions) > 0 {
// get a reference to the list
fnlist := DeferredFunctions
// empty list
DeferredFunctions = nil
for _, fn := range fnlist {
if err := fn(); err != nil {

// call the functions
for _, fnTuple := range fnlist {
fn := fnTuple.fn
args := fnTuple.args
kwargs := fnTuple.kwargs
if err := fn(args, kwargs); err != nil {
if ErrorCallback != nil {
ErrorCallback(err)
}
}
}
}
}()
}

func RunOnce() {
// TODO: implement me
if delta == nil || *delta != 0 {
break
}
}
}

func Deferred(fn func() error) {
func Deferred(fn func(args Args, kwargs KWArgs) error, args Args, kwargs KWArgs) {
// append it to the list
DeferredFunctions = append(DeferredFunctions, fn)
DeferredFunctions = append(DeferredFunctions, deferredFunctionTuple{fn, args, kwargs})

// trigger the task manager event
// TODO: there is no trigger
Expand Down
16 changes: 8 additions & 8 deletions plc4go/internal/bacnetip/IOCBModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,15 +747,15 @@ func (i *IOQController) CompleteIO(iocb _IOCB, msg PDU) error {
i.state = IOQControllerStates_CTRL_WAITING
stateLog.Debug().Timestamp().Str("name", i.name).Msg("waiting")

task := FunctionTask(i._waitTrigger)
task.InstallTask(nil, &i.waitTime)
task := FunctionTask(i._waitTrigger, NoArgs, NoKWArgs)
task.InstallTask(InstallTaskOptions{Delta: &i.waitTime})
} else {
// change our state
i.state = IOQControllerStates_CTRL_IDLE
stateLog.Debug().Timestamp().Str("name", i.name).Msg("idle")

// look for more to do
Deferred(i._trigger)
Deferred(i._trigger, NoArgs, NoKWArgs)
}

return nil
Expand Down Expand Up @@ -784,12 +784,12 @@ func (i *IOQController) AbortIO(iocb _IOCB, err error) error {
stateLog.Debug().Timestamp().Str("name", i.name).Msg("idle")

// look for more to do
Deferred(i._trigger)
Deferred(i._trigger, NoArgs, NoKWArgs)
return nil
}

// _trigger Called to launch the next request in the queue
func (i *IOQController) _trigger() error {
func (i *IOQController) _trigger(_ Args, _ KWArgs) error {
i.log.Debug().Msg("_trigger")

// if we are busy, do nothing
Expand Down Expand Up @@ -821,13 +821,13 @@ func (i *IOQController) _trigger() error {

// if we're idle, call again
if i.state == IOQControllerStates_CTRL_IDLE {
Deferred(i._trigger)
Deferred(i._trigger, NoArgs, NoKWArgs)
}
return nil
}

// _waitTrigger is called to launch the next request in the queue
func (i *IOQController) _waitTrigger() error {
func (i *IOQController) _waitTrigger(_ Args, _ KWArgs) error {
i.log.Debug().Msg("_waitTrigger")

// make sure we are waiting
Expand All @@ -841,7 +841,7 @@ func (i *IOQController) _waitTrigger() error {
stateLog.Debug().Timestamp().Str("name", i.name).Msg("idle")

// look for more to do
return i._trigger()
return i._trigger(NoArgs, NoKWArgs)
}

//go:generate go run ../../tools/plc4xgenerator/gen.go -type=SieveQueue
Expand Down
4 changes: 2 additions & 2 deletions plc4go/internal/bacnetip/NetworkService.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func NewNetworkServiceElement(localLog zerolog.Logger, eid *int, startupDisabled

// if starting up is enabled defer our startup function
if !startupDisabled {
Deferred(n.Startup)
Deferred(n.Startup, NoArgs, NoKWArgs)
}
return n, nil
}
Expand All @@ -925,7 +925,7 @@ func (n *NetworkServiceElement) String() string {
return fmt.Sprintf("NetworkServiceElement(TBD...)") // TODO: fill some info here
}

func (n *NetworkServiceElement) Startup() error {
func (n *NetworkServiceElement) Startup(_ Args, _ KWArgs) error {
n.log.Debug().Msg("Startup")

// reference the service access point
Expand Down
Loading

0 comments on commit 19f5db7

Please sign in to comment.