Skip to content

Commit

Permalink
fix(plc4go/modbus): Delete elements in the loop, and the index is dec… (
Browse files Browse the repository at this point in the history
#1028)

* fix(plc4go/modbus): Delete elements in the loop, and the index is decremented by 1 at the same time

* test(plc4go/spi): add test for DefaultCodec failure not passing message to all handlers.

---------

Co-authored-by: Sebastian Rühl <[email protected]>
  • Loading branch information
hongjinlin and sruehl authored Jul 12, 2023
1 parent 21aeafa commit a359a2f
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 5 deletions.
13 changes: 8 additions & 5 deletions plc4go/spi/default/DefaultCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
defer m.expectationsChangeMutex.Unlock()
messageHandled := false
m.log.Trace().Msgf("Current number of expectations: %d", len(m.expectations))
for index, expectation := range m.expectations {
for i := 0; i < len(m.expectations); i++ {
expectation := m.expectations[i]
m.log.Trace().Msgf("Checking expectation %s", expectation)
// Check if the current message matches the expectations
// If it does, let it handle the message.
Expand All @@ -267,12 +268,14 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
}
continue
}
m.log.Trace().Msg("message handled")
messageHandled = true
// If this is the last element of the list remove it differently than if it's before that
if (index + 1) == len(m.expectations) {
m.expectations = m.expectations[:index]
} else if (index + 1) < len(m.expectations) {
m.expectations = append(m.expectations[:index], m.expectations[index+1:]...)
if (i + 1) == len(m.expectations) {
m.expectations = m.expectations[:i]
} else if (i + 1) < len(m.expectations) {
m.expectations = append(m.expectations[:i], m.expectations[i+1:]...)
i--
}
} else {
m.log.Trace().Stringer("expectation", expectation).Msg("doesn't accept message")
Expand Down
128 changes: 128 additions & 0 deletions plc4go/spi/default/DefaultCodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/rs/zerolog/log"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -366,6 +367,7 @@ func Test_defaultCodec_Connect(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
tt.wantErr(t, m.Connect(), fmt.Sprintf("Connect()"))
})
Expand Down Expand Up @@ -432,6 +434,7 @@ func Test_defaultCodec_ConnectWithContext(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
tt.wantErr(t, m.ConnectWithContext(tt.args.ctx), fmt.Sprintf("ConnectWithContext(%v)", tt.args.ctx))
})
Expand Down Expand Up @@ -482,6 +485,7 @@ func Test_defaultCodec_Disconnect(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
if tt.manipulator != nil {
tt.manipulator(t, c)
Expand Down Expand Up @@ -533,6 +537,7 @@ func Test_defaultCodec_Expect(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
tt.wantErr(t, m.Expect(tt.args.ctx, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError, tt.args.ttl), fmt.Sprintf("Expect(%v, func(), func(), func(), %v)", tt.args.ctx, tt.args.ttl))
})
Expand Down Expand Up @@ -565,6 +570,7 @@ func Test_defaultCodec_GetDefaultIncomingMessageChannel(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
assert.Equalf(t, tt.want, m.GetDefaultIncomingMessageChannel(), "GetDefaultIncomingMessageChannel()")
})
Expand Down Expand Up @@ -597,6 +603,7 @@ func Test_defaultCodec_GetTransportInstance(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
assert.Equalf(t, tt.want, m.GetTransportInstance(), "GetTransportInstance()")
})
Expand All @@ -619,6 +626,7 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
name string
fields fields
args args
setup func(t *testing.T, fields *fields, args *args)
want bool
}{
{
Expand Down Expand Up @@ -679,19 +687,134 @@ func Test_defaultCodec_HandleMessages(t *testing.T) {
return nil
},
},
&defaultExpectation{ // not accept
AcceptsMessage: func(_ spi.Message) bool {
return false
},
HandleMessage: func(_ spi.Message) error {
return nil
},
},
&defaultExpectation{ // accepts
AcceptsMessage: func(_ spi.Message) bool {
return true
},
HandleMessage: func(_ spi.Message) error {
return nil
},
},
},
},
want: true,
},
{
name: "handle some (ensure everyone get's it)",
setup: func(t *testing.T, fields *fields, args *args) {
accept1 := atomic.Bool{}
accept2 := atomic.Bool{}
accept3 := atomic.Bool{}
accept4 := atomic.Bool{}
accept5 := atomic.Bool{}
accept6 := atomic.Bool{}
t.Cleanup(func() {
assert.True(t, accept1.Load(), "accept1 not called")
assert.True(t, accept2.Load(), "accept2 not called")
assert.True(t, accept3.Load(), "accept3 not called")
assert.True(t, accept4.Load(), "accept4 not called")
assert.True(t, accept5.Load(), "accept5 not called")
assert.True(t, accept6.Load(), "accept6 not called")
})
fields.expectations = []spi.Expectation{
&defaultExpectation{ // doesn't accept
AcceptsMessage: func(_ spi.Message) bool {
return false
},
},
&defaultExpectation{ // accepts but fails // accept1
AcceptsMessage: func(_ spi.Message) bool {
return true
},
HandleMessage: func(_ spi.Message) error {
accept1.Store(true)
return errors.New("oh noes")
},
HandleError: func(err error) error {
return nil
},
},
&defaultExpectation{ // accepts but fails and fails to handle the error // accept2
AcceptsMessage: func(_ spi.Message) bool {
return true
},
HandleMessage: func(_ spi.Message) error {
accept2.Store(true)
return errors.New("oh noes")
},
HandleError: func(err error) error {
return errors.New("I failed completely")
},
},
&defaultExpectation{ // accepts // accept3
AcceptsMessage: func(_ spi.Message) bool {
return true
},
HandleMessage: func(_ spi.Message) error {
accept3.Store(true)
return nil
},
},
&defaultExpectation{ // accepts // accept4
AcceptsMessage: func(_ spi.Message) bool {
return true
},
HandleMessage: func(_ spi.Message) error {
accept4.Store(true)
return nil
},
},
&defaultExpectation{ // not accept // accept5
AcceptsMessage: func(_ spi.Message) bool {
return true
},
HandleMessage: func(_ spi.Message) error {
accept5.Store(true)
return nil
},
},
&defaultExpectation{ // not accept
AcceptsMessage: func(_ spi.Message) bool {
return false
},
HandleMessage: func(_ spi.Message) error {
return nil
},
},
&defaultExpectation{ // accepts // accept6
AcceptsMessage: func(_ spi.Message) bool {
return true
},
HandleMessage: func(_ spi.Message) error {
accept6.Store(true)
return nil
},
},
}
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.fields, &tt.args)
}
m := &defaultCodec{
DefaultCodecRequirements: tt.fields.DefaultCodecRequirements,
transportInstance: tt.fields.transportInstance,
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
assert.Equalf(t, tt.want, m.HandleMessages(tt.args.message), "HandleMessages(%v)", tt.args.message)
})
Expand Down Expand Up @@ -724,6 +847,7 @@ func Test_defaultCodec_IsRunning(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
assert.Equalf(t, tt.want, m.IsRunning(), "IsRunning()")
})
Expand Down Expand Up @@ -799,6 +923,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
tt.wantErr(t, m.SendRequest(tt.args.ctx, tt.args.message, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError, tt.args.ttl), fmt.Sprintf("SendRequest(%v, %v, func(), func(), func(), %v)", tt.args.ctx, tt.args.message, tt.args.ttl))
})
Expand Down Expand Up @@ -872,6 +997,7 @@ func Test_defaultCodec_TimeoutExpectations(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
m.TimeoutExpectations(tt.args.now)
})
Expand Down Expand Up @@ -1222,6 +1348,7 @@ func Test_defaultCodec_Work(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
if tt.manipulator != nil {
tt.manipulator(t, m)
Expand Down Expand Up @@ -1294,6 +1421,7 @@ func Test_defaultCodec_String(t *testing.T) {
defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
expectations: tt.fields.expectations,
customMessageHandling: tt.fields.customMessageHandling,
log: testutils.ProduceTestingLogger(t),
}
assert.Equalf(t, tt.want, m.String(), "String()")
})
Expand Down

0 comments on commit a359a2f

Please sign in to comment.