Skip to content

Commit

Permalink
test(plc4go): refactor cache test
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Dec 4, 2023
1 parent 3e18152 commit 2254ea1
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 79 deletions.
98 changes: 49 additions & 49 deletions plc4go/pkg/api/cache/PlcConnectionCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ type plcConnectionCache struct {
_options []options.WithOption // Used to pass them downstream
}

func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
func (c *plcConnectionCache) onConnectionEvent(event connectionEvent) {
connectionContainerInstance := event.getConnectionContainer()
if errorEvent, ok := event.(connectionErrorEvent); ok {
if t.tracer != nil {
t.tracer.AddTrace("destroy-connection", errorEvent.getError().Error())
if c.tracer != nil {
c.tracer.AddTrace("destroy-connection", errorEvent.getError().Error())
}
t.log.Debug().Str("connectionString", connectionContainerInstance.connectionString)
c.log.Debug().Str("connectionString", connectionContainerInstance.connectionString)
}
}

Expand All @@ -129,74 +129,74 @@ func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
///////////////////////////////////////
///////////////////////////////////////

func (t *plcConnectionCache) EnableTracer() {
t.tracer = tracer.NewTracer(
func (c *plcConnectionCache) EnableTracer() {
c.tracer = tracer.NewTracer(
"cache",
append(t._options, options.WithCustomLogger(t.log))...,
append(c._options, options.WithCustomLogger(c.log))...,
)
}

func (t *plcConnectionCache) GetTracer() tracer.Tracer {
return t.tracer
func (c *plcConnectionCache) GetTracer() tracer.Tracer {
return c.tracer
}

func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4go.PlcConnectionConnectResult {
func (c *plcConnectionCache) GetConnection(connectionString string) <-chan plc4go.PlcConnectionConnectResult {
ch := make(chan plc4go.PlcConnectionConnectResult)

go func() {
t.cacheLock.Lock()
c.cacheLock.Lock()

// If a connection for this connection string didn't exist yet, create a new container
// If a connection for this connection string didn'c exist yet, create a new container
// and make that container connect.
if _, ok := t.connections[connectionString]; !ok {
if t.tracer != nil {
t.tracer.AddTrace("get-connection", "create new cached connection")
if _, ok := c.connections[connectionString]; !ok {
if c.tracer != nil {
c.tracer.AddTrace("get-connection", "create new cached connection")
}
t.log.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
c.log.Debug().Str("connectionString", connectionString).Msg("Create new cached connection")
// Create a new connection container.
cc := newConnectionContainer(t.log, t.driverManager, connectionString)
cc := newConnectionContainer(c.log, c.driverManager, connectionString)
// Register for connection events (Like connection closed or error).
cc.addListener(t)
cc.addListener(c)
// Store the new connection container in the cache of connections.
t.connections[connectionString] = cc
c.connections[connectionString] = cc
// Initialize the connection itself.
go func(cc2 *connectionContainer) {
cc2.connect()
}(cc)
}

// Get the ConnectionContainer for this connection string.
connection := t.connections[connectionString]
connection := c.connections[connectionString]

// Release the lock again.
t.cacheLock.Unlock()
c.cacheLock.Unlock()

// Try to get a lease on this connection.
var txId string
if t.tracer != nil {
txId = t.tracer.AddTransactionalStartTrace("get-connection", "lease")
if c.tracer != nil {
txId = c.tracer.AddTransactionalStartTrace("get-connection", "lease")
}
leaseChan := connection.lease()
maximumWaitTimeout := time.NewTimer(t.maxWaitTime)
maximumWaitTimeout := time.NewTimer(c.maxWaitTime)
defer utils.CleanupTimer(maximumWaitTimeout)
select {
// Wait till we get a lease.
case connectionResponse := <-leaseChan:
t.log.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
c.log.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
responseTimeout := time.NewTimer(10 * time.Millisecond)
defer utils.CleanupTimer(responseTimeout)
select {
case ch <- connectionResponse:
if t.tracer != nil {
t.tracer.AddTransactionalTrace(txId, "get-connection", "success")
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "get-connection", "success")
}
case <-responseTimeout.C:
// Log a message, that the client has given up
if t.tracer != nil {
t.tracer.AddTransactionalTrace(txId, "get-connection", "client given up")
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "get-connection", "client given up")
}
close(ch)
t.log.Debug().Str("connectionString", connectionString).Msg("Client not available returning connection to cache.")
c.log.Debug().Str("connectionString", connectionString).Msg("Client not available returning connection to cache.")
// Return the connection to give another connection the chance to use it.
if connectionResponse.GetConnection() != nil {
connectionResponse.GetConnection().Close()
Expand All @@ -210,58 +210,58 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
<-leaseChan
_ = connection.returnConnection(StateIdle)
}()
if t.tracer != nil {
t.tracer.AddTransactionalTrace(txId, "get-connection", "timeout")
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "get-connection", "timeout")
}
t.log.Debug().Str("connectionString", connectionString).Msg("Timeout while waiting for connection.")
c.log.Debug().Str("connectionString", connectionString).Msg("Timeout while waiting for connection.")
ch <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.New("timeout while waiting for connection"))
}
}()

return ch
}

func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
t.log.Debug().Msg("Closing connection cache started.")
func (c *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
c.log.Debug().Msg("Closing connection cache started.")
ch := make(chan PlcConnectionCacheCloseResult)

go func() {
t.cacheLock.Lock()
defer t.cacheLock.Unlock()
c.cacheLock.Lock()
defer c.cacheLock.Unlock()

if len(t.connections) == 0 {
if len(c.connections) == 0 {
responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
defer utils.CleanupTimer(responseDeliveryTimeout)
select {
case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
case ch <- newDefaultPlcConnectionCacheCloseResult(c, nil):
case <-responseDeliveryTimeout.C:
}
t.log.Debug().Msg("Closing connection cache finished.")
c.log.Debug().Msg("Closing connection cache finished.")
return
}

for _, cc := range t.connections {
for _, cc := range c.connections {
// Mark the connection as being closed to not try to re-establish it.
cc.closed = true
// Try to get a lease as this way we kow we're not closing the connection
// while some go func is still using it.
go func(container *connectionContainer) {
leaseResults := container.lease()
closeTimeout := time.NewTimer(t.maxWaitTime)
closeTimeout := time.NewTimer(c.maxWaitTime)
defer utils.CleanupTimer(closeTimeout)
select {
// We're just getting the lease as this way we can be sure nobody else is using it.
// We also really don't care if it worked, or not ... it's just an attempt of being
// We also really don'c care if it worked, or not ... it's just an attempt of being
// nice.
case _ = <-leaseResults:
t.log.Debug().Str("connectionString", container.connectionString).Msg("Gracefully closing connection ...")
c.log.Debug().Str("connectionString", container.connectionString).Msg("Gracefully closing connection ...")
// Give back the connection.
if container.connection != nil {
container.connection.Close()
}
// If we're timing out brutally kill the connection.
case <-closeTimeout.C:
t.log.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
c.log.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
// Forcefully close this connection.
if container.connection != nil {
container.connection.Close()
Expand All @@ -271,17 +271,17 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
defer utils.CleanupTimer(responseDeliveryTimeout)
select {
case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
case ch <- newDefaultPlcConnectionCacheCloseResult(c, nil):
case <-responseDeliveryTimeout.C:
}
t.log.Debug().Msg("Closing connection cache finished.")
c.log.Debug().Msg("Closing connection cache finished.")
}(cc)
}
}()

return ch
}

func (t *plcConnectionCache) String() string {
return fmt.Sprintf("plcConnectionCache{driverManager: %s, maxLeaseTime: %s, maxWaitTime: %s, connections: %s, tracer: %s}", t.driverManager, t.maxLeaseTime, t.maxWaitTime, t.connections, t.tracer)
func (c *plcConnectionCache) String() string {
return fmt.Sprintf("plcConnectionCache{driverManager: %s, maxLeaseTime: %s, maxWaitTime: %s, connections: %s, tracer: %s}", c.driverManager, c.maxLeaseTime, c.maxWaitTime, c.connections, c.tracer)
}
Loading

0 comments on commit 2254ea1

Please sign in to comment.