Skip to content

Commit

Permalink
maxConnections config
Browse files Browse the repository at this point in the history
  • Loading branch information
ARR552 committed Nov 12, 2024
1 parent 59e340f commit 28740b0
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func runServer(ctx *cli.Context) error {
// Create stream server
s, err := datastreamer.NewServer(uint16(port), streamerVersion, streamerSystemID, StSequencer, file,
time.Duration(writeTimeout)*time.Millisecond, time.Duration(inactivityTimeout)*time.Second,
5*time.Second, nil) //nolint:mnd
5*time.Second, nil, 0) //nolint:mnd
if err != nil {
return err
}
Expand Down Expand Up @@ -750,6 +750,7 @@ func checkEntryBlockSanity(
return err
}
blockNum := l2Block.Number
log.Debug("L2BlockNum: ", blockNum)
//Check previous End Block
if sanityBlockEnd != blockNum {
log.Warnf(`(X) SANITY CHECK failed (%d): BlockStart but the previous one is not closed yet?
Expand Down
2 changes: 1 addition & 1 deletion datastreamer/datastreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestServer(t *testing.T) {
panic(err)
}
streamServer, err = datastreamer.NewServer(config.Port, 1, 137, streamType,
config.Filename, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log)
config.Filename, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log, 100)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion datastreamer/streamrelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64,

// Create server side
r.server, err = NewServer(port, version, systemID, streamType, fileName, writeTimeout,
inactivityTimeout, inactivityCheckInterval, cfg)
inactivityTimeout, inactivityCheckInterval, cfg, 0)
if err != nil {
log.Errorf("Error creating relay server side: %v", err)
return nil, err
Expand Down
13 changes: 7 additions & 6 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type CommandError uint32
const EntryTypeNotFound = math.MaxUint32

const (
maxConnections = 100 // Maximum number of connected clients
streamBuffer = 256 // Buffers for the stream channel
maxBookmarkLength = 16 // Maximum number of bytes for a bookmark
)
Expand Down Expand Up @@ -116,6 +115,7 @@ type StreamServer struct {
// Time interval to check for client connections that have reached
// the inactivity timeout and kill them
inactivityCheckInterval time.Duration
maxConnections uint32
started bool // Flag server started

version uint8
Expand Down Expand Up @@ -165,7 +165,7 @@ type ResultEntry struct {
// NewServer creates a new data stream server
func NewServer(port uint16, version uint8, systemID uint64, streamType StreamType, fileName string,
writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration,
cfg *log.Config) (*StreamServer, error) {
cfg *log.Config, maxConnections uint32) (*StreamServer, error) {
// Create the server data stream
s := StreamServer{
port: port,
Expand All @@ -174,6 +174,7 @@ func NewServer(port uint16, version uint8, systemID uint64, streamType StreamTyp
inactivityTimeout: inactivityTimeout,
inactivityCheckInterval: inactivityCheckInterval,
started: false,
maxConnections: maxConnections,

version: version,
systemID: systemID,
Expand Down Expand Up @@ -284,8 +285,8 @@ func (s *StreamServer) waitConnections() {
}

// Check max connections allowed
if s.getSafeClientsLen() >= maxConnections {
log.Warnf("Unable to accept client connection, maximum number of connections reached (%d)", maxConnections)
if s.maxConnections != 0 && s.getSafeClientsLen() >= s.maxConnections {
log.Warnf("Unable to accept client connection, maximum number of connections reached (%d)", s.maxConnections)
conn.Close()
time.Sleep(timeout)
continue
Expand Down Expand Up @@ -1165,10 +1166,10 @@ func (s *StreamServer) getSafeClient(clientID string) *client {
return s.clients[clientID]
}

func (s *StreamServer) getSafeClientsLen() int {
func (s *StreamServer) getSafeClientsLen() uint32 {
s.mutexClients.RLock()
defer s.mutexClients.RUnlock()
return len(s.clients)
return uint32(len(s.clients))
}

// BookmarkPrintDump prints all bookmarks
Expand Down

0 comments on commit 28740b0

Please sign in to comment.