From 59e340fb2456b0d1bc4501240b4e7a01ff5fde61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:28:49 +0200 Subject: [PATCH] review errors level (#150) --- datastreamer/streambookmark.go | 3 +- datastreamer/streamclient.go | 32 +++++++++++++--------- datastreamer/streamfile.go | 5 ++-- datastreamer/streamserver.go | 50 +++++++++++++++++----------------- 4 files changed, 47 insertions(+), 43 deletions(-) diff --git a/datastreamer/streambookmark.go b/datastreamer/streambookmark.go index f1e84a4..01b1817 100644 --- a/datastreamer/streambookmark.go +++ b/datastreamer/streambookmark.go @@ -57,7 +57,6 @@ func (b *StreamBookmark) GetBookmark(bookmark []byte) (uint64, error) { // Get the bookmark from DB entry, err := b.db.Get(bookmark, nil) if errors.Is(err, leveldb.ErrNotFound) { - // log.Infof("Bookmark not found [%v]: %v", bookmark, err) return 0, err } else if err != nil { log.Errorf("Error getting bookmark [%v]: %w", bookmark, err) @@ -100,7 +99,7 @@ func (b *StreamBookmark) PrintDump() error { iter.Release() // Log total - log.Infof("Number of bookmarks: [%d]", count) + log.Debugf("Number of bookmarks: [%d]", count) return err } diff --git a/datastreamer/streamclient.go b/datastreamer/streamclient.go index 0a0e51e..e85512a 100644 --- a/datastreamer/streamclient.go +++ b/datastreamer/streamclient.go @@ -87,7 +87,12 @@ func (c *StreamClient) Start() error { go c.readEntries() // Goroutine to consume streaming entries - go c.getStreaming() + go func() { + err := c.getStreaming() + if err != nil { + log.Errorf("%s Error while getting streaming: %v", c.ID, err) + } + }() // Flag stared c.started = true @@ -103,7 +108,7 @@ func (c *StreamClient) connectServer() bool { for !c.connected { c.conn, err = net.Dial("tcp", c.server) if err != nil { - log.Infof("Error connecting to server %s: %v", c.server, err) + log.Errorf("Error connecting to server %s: %v", c.server, err) time.Sleep(defaultTimeout) continue } else { @@ -177,7 +182,7 @@ func (c *StreamClient) ExecCommandGetBookmark(fromBookmark []byte) (FileEntry, e // execCommand executes a valid client TCP command with deferred command result possibility func (c *StreamClient) execCommand(cmd Command, deferredResult bool, fromEntry uint64, fromBookmark []byte) (HeaderEntry, FileEntry, error) { - log.Infof("%s Executing command %d[%s]...", c.ID, cmd, StrCommand[cmd]) + log.Debugf("%s Executing command %d[%s]...", c.ID, cmd, StrCommand[cmd]) header := HeaderEntry{} entry := FileEntry{} @@ -207,14 +212,14 @@ func (c *StreamClient) execCommand(cmd Command, deferredResult bool, // Send the command parameters switch cmd { case CmdStart: - log.Infof("%s ...from entry %d", c.ID, fromEntry) + log.Debugf("%s ...from entry %d", c.ID, fromEntry) // Send starting/from entry number err = writeFullUint64(fromEntry, c.conn) if err != nil { return header, entry, err } case CmdStartBookmark: - log.Infof("%s ...from bookmark [%v]", c.ID, fromBookmark) + log.Debugf("%s ...from bookmark [%v]", c.ID, fromBookmark) // Send starting/from bookmark length err = writeFullUint32(uint32(len(fromBookmark)), c.conn) if err != nil { @@ -226,14 +231,14 @@ func (c *StreamClient) execCommand(cmd Command, deferredResult bool, return header, entry, err } case CmdEntry: - log.Infof("%s ...get entry %d", c.ID, fromEntry) + log.Debugf("%s ...get entry %d", c.ID, fromEntry) // Send entry to retrieve err = writeFullUint64(fromEntry, c.conn) if err != nil { return header, entry, err } case CmdBookmark: - log.Infof("%s ...get bookmark [%v]", c.ID, fromBookmark) + log.Debugf("%s ...get bookmark [%v]", c.ID, fromBookmark) // Send bookmark length err = writeFullUint32(uint32(len(fromBookmark)), c.conn) if err != nil { @@ -529,14 +534,14 @@ func (c *StreamClient) readEntries() { func (c *StreamClient) getResult(cmd Command) ResultEntry { // Get result entry r := <-c.results - log.Infof("%s Result %d[%s] received for command %d[%s]", c.ID, r.errorNum, r.errorStr, cmd, StrCommand[cmd]) + log.Debugf("%s Result %d[%s] received for command %d[%s]", c.ID, r.errorNum, r.errorStr, cmd, StrCommand[cmd]) return r } // getHeader consumes a header entry func (c *StreamClient) getHeader() HeaderEntry { h := <-c.headers - log.Infof("%s Header received info: TotalEntries[%d], TotalLength[%d], Version[%d], SystemID[%d]", + log.Debugf("%s Header received info: TotalEntries[%d], TotalLength[%d], Version[%d], SystemID[%d]", c.ID, h.TotalEntries, h.TotalLength, h.Version, h.SystemID) return h } @@ -544,12 +549,12 @@ func (c *StreamClient) getHeader() HeaderEntry { // getEntry consumes a entry from commands response func (c *StreamClient) getEntry() FileEntry { e := <-c.entryRsp - log.Infof("%s Entry received info: Number[%d]", c.ID, e.Number) + log.Debugf("%s Entry received info: Number[%d]", c.ID, e.Number) return e } // getStreaming consumes streaming data entries -func (c *StreamClient) getStreaming() { +func (c *StreamClient) getStreaming() error { for { e := <-c.entries c.nextEntry = e.Number + 1 @@ -557,7 +562,8 @@ func (c *StreamClient) getStreaming() { // Process the data entry err := c.processEntry(&e, c, c.relayServer) if err != nil { - log.Fatalf("%s Processing entry %d: %s. HALTED!", c.ID, e.Number, err.Error()) + log.Errorf("%s Processing entry %d: %s. Exiting getStream function", c.ID, e.Number, err.Error()) + return err } } } @@ -597,6 +603,6 @@ func (c *StreamClient) IsStarted() bool { // PrintReceivedEntry prints received entry (default callback function) func PrintReceivedEntry(e *FileEntry, c *StreamClient, s *StreamServer) error { // Log data entry fields - log.Infof("Data entry(%s): %d | %d | %d | %d", c.ID, e.Number, e.Length, e.Type, len(e.Data)) + log.Debugf("Data entry(%s): %d | %d | %d | %d", c.ID, e.Number, e.Length, e.Type, len(e.Data)) return nil } diff --git a/datastreamer/streamfile.go b/datastreamer/streamfile.go index 705dffd..b27f3f4 100644 --- a/datastreamer/streamfile.go +++ b/datastreamer/streamfile.go @@ -215,7 +215,7 @@ func (f *StreamFile) initializeFile() error { for i := 1; i <= initPages; i++ { err = f.createPage(f.pageSize) if err != nil { - log.Error("Eror creating page") + log.Error("Error creating page") return err } } @@ -580,7 +580,6 @@ func (f *StreamFile) AddFileEntry(e FileEntry) error { f.header.TotalEntries++ f.mutexHeader.Unlock() - // printHeaderEntry(f.header) return nil } @@ -656,7 +655,7 @@ func DecodeBinaryToFileEntry(b []byte) (FileEntry, error) { func (f *StreamFile) iteratorFrom(entryNum uint64, readOnly bool) (*iteratorFile, error) { // Check starting entry number if entryNum >= f.writtenHead.TotalEntries { - log.Infof("Invalid starting entry number for iterator") + log.Error("Invalid starting entry number for iterator") return nil, ErrInvalidEntryNumber } diff --git a/datastreamer/streamserver.go b/datastreamer/streamserver.go index 63b0f60..6816fb8 100644 --- a/datastreamer/streamserver.go +++ b/datastreamer/streamserver.go @@ -263,7 +263,7 @@ func (s *StreamServer) checkClientInactivity() { s.mutexClients.Unlock() for clientID := range clientsToKill { - log.Infof("killing inactive client %s", clientID) + log.Warnf("killing inactive client %s", clientID) s.killClient(clientID) } } @@ -331,7 +331,7 @@ func (s *StreamServer) handleConnection(conn net.Conn) { // Check stream type if st != s.streamType { - log.Errorf("Mismatch stream type, killed: %s", clientID) + log.Errorf("Mismatch stream type: client %s killed", clientID) s.killClient(clientID) return } @@ -464,7 +464,7 @@ func (s *StreamServer) CommitAtomicOp() error { // No atomic operation in progress s.clearAtomicOp() - log.Infof("committed datastream atomic operation, startEntry: %d, time: %v", s.atomicOp.startEntry, time.Since(start)) + log.Debugf("committed datastream atomic operation, startEntry: %d, time: %v", s.atomicOp.startEntry, time.Since(start)) return nil } @@ -474,7 +474,7 @@ func (s *StreamServer) RollbackAtomicOp() error { start := time.Now().UnixNano() defer log.Debugf("RollbackAtomicOp process time: %vns", time.Now().UnixNano()-start) - log.Infof("rollback datastream atomic operation, startEntry: %d", s.atomicOp.startEntry) + log.Debugf("rollback datastream atomic operation, startEntry: %d", s.atomicOp.startEntry) if s.atomicOp.status != aoStarted { log.Errorf("Rollback not allowed, AtomicOp is not in the started state") return ErrRollbackNotAllowed @@ -734,7 +734,7 @@ func (s *StreamServer) broadcastAtomicOp() { } } - log.Infof("sent datastream entries, count: %d, clients: %d, time: %v, clients-ip: {%s}", + log.Debugf("sent datastream entries, count: %d, clients: %d, time: %v, clients-ip: {%s}", len(broadcastOp.entries), len(s.clients), time.Since(start), sClients) } } @@ -878,11 +878,11 @@ func (s *StreamServer) processCmdStart(client *client) error { client.fromEntry = fromEntry // Log - log.Infof("Client %s command Start from %d", client.clientID, fromEntry) + log.Debugf("Client %s command Start from %d", client.clientID, fromEntry) // Check received param if fromEntry > s.nextEntry && fromEntry > s.initEntry { - log.Infof("Start command invalid from entry %d for client %s", fromEntry, client.clientID) + log.Errorf("Start command invalid from entry %d for client %s", fromEntry, client.clientID) err = ErrStartCommandInvalidParamFromEntry _ = s.sendResultEntry(uint32(CmdErrBadFromEntry), StrCommandErrors[CmdErrBadFromEntry], client) return err @@ -912,7 +912,7 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error { // Check maximum length allowed if length > maxBookmarkLength { - log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", + log.Errorf("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", client.clientID, length, maxBookmarkLength) return ErrBookmarkMaxLength } @@ -924,12 +924,12 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error { } // Log - log.Infof("Client %s command StartBookmark [%v]", client.clientID, bookmark) + log.Debugf("Client %s command StartBookmark [%v]", client.clientID, bookmark) // Get bookmark entryNum, err := s.bookmark.GetBookmark(bookmark) if err != nil { - log.Infof("StartBookmark command invalid from bookmark %v for client %s: %v", bookmark, client.clientID, err) + log.Errorf("StartBookmark command invalid from bookmark %v for client %s: %v", bookmark, client.clientID, err) err = ErrStartBookmarkInvalidParamFromBookmark _ = s.sendResultEntry(uint32(CmdErrBadFromBookmark), StrCommandErrors[CmdErrBadFromBookmark], client) return err @@ -942,7 +942,7 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error { } // Stream entries data from the entry number marked by the bookmark - log.Infof("Client %s Bookmark [%v] is the entry number [%d]", client.clientID, bookmark, entryNum) + log.Debugf("Client %s Bookmark [%v] is the entry number [%d]", client.clientID, bookmark, entryNum) if entryNum < s.nextEntry { err = s.streamingFromEntry(client, entryNum) } @@ -953,7 +953,7 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error { // processCmdStop processes the TCP Stop command from the clients func (s *StreamServer) processCmdStop(client *client) error { // Log - log.Infof("Client %s command Stop", client.clientID) + log.Debugf("Client %s command Stop", client.clientID) // Send a command result entry OK err := s.sendResultEntry(0, "OK", client) @@ -963,7 +963,7 @@ func (s *StreamServer) processCmdStop(client *client) error { // processCmdHeader processes the TCP Header command from the clients func (s *StreamServer) processCmdHeader(client *client) error { // Log - log.Infof("Client %s command Header", client.clientID) + log.Debugf("Client %s command Header", client.clientID) // Send a command result entry OK err := s.sendResultEntry(0, "OK", client) @@ -982,7 +982,7 @@ func (s *StreamServer) processCmdHeader(client *client) error { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending header entry to %s: %v", client.clientID, err) + log.Errorf("Error sending header entry to %s: %v", client.clientID, err) return err } return nil @@ -997,7 +997,7 @@ func (s *StreamServer) processCmdEntry(client *client) error { } // Log - log.Infof("Client %s command Entry %d", client.clientID, entryNumber) + log.Debugf("Client %s command Entry %d", client.clientID, entryNumber) // Send a command result entry OK err = s.sendResultEntry(0, "OK", client) @@ -1008,7 +1008,7 @@ func (s *StreamServer) processCmdEntry(client *client) error { // Get the requested entry entry, err := s.GetEntry(entryNumber) if err != nil { - log.Infof("Error getting entry, not found? %d: %v", entryNumber, err) + log.Warnf("Entry not found %d: %v", entryNumber, err) entry = FileEntry{} entry.Length = FixedSizeFileEntry entry.Type = EntryTypeNotFound @@ -1023,7 +1023,7 @@ func (s *StreamServer) processCmdEntry(client *client) error { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending entry to %s: %v", client.clientID, err) + log.Errorf("Error sending entry to %s: %v", client.clientID, err) return err } @@ -1040,7 +1040,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error { // Check maximum length allowed if length > maxBookmarkLength { - log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", + log.Errorf("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", client.clientID, length, maxBookmarkLength) return ErrBookmarkMaxLength } @@ -1052,7 +1052,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error { } // Log - log.Infof("Client %s command Bookmark %v", client.clientID, bookmark) + log.Debugf("Client %s command Bookmark %v", client.clientID, bookmark) // Send a command result entry OK err = s.sendResultEntry(0, "OK", client) @@ -1063,7 +1063,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error { // Get the requested bookmark entry, err := s.GetFirstEventAfterBookmark(bookmark) if err != nil { - log.Infof("Error getting bookmark, not found? %v: %v", bookmark, err) + log.Warnf("Entry not found %v: %v", bookmark, err) entry = FileEntry{} entry.Length = FixedSizeFileEntry entry.Type = EntryTypeNotFound @@ -1078,7 +1078,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending entry to %s: %v", client.clientID, err) + log.Errorf("Error sending entry to %s: %v", client.clientID, err) return err } @@ -1088,7 +1088,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error { // streamingFromEntry sends to the client the stream data starting from the requested entry number func (s *StreamServer) streamingFromEntry(client *client, fromEntry uint64) error { // Log - log.Infof("SYNCING %s from entry %d...", client.clientID, fromEntry) + log.Debugf("SYNCING %s from entry %d...", client.clientID, fromEntry) // Start file stream iterator iterator, err := s.streamFile.iteratorFrom(fromEntry, true) @@ -1117,11 +1117,11 @@ func (s *StreamServer) streamingFromEntry(client *client, fromEntry uint64) erro err = ErrNilConnection } if err != nil { - log.Warnf("Error sending entry %d to %s: %v", iterator.Entry.Number, client.clientID, err) + log.Errorf("Error sending entry %d to %s: %v", iterator.Entry.Number, client.clientID, err) return err } } - log.Infof("Synced %s until %d!", client.clientID, iterator.Entry.Number) + log.Debugf("Synced %s until %d!", client.clientID, iterator.Entry.Number) // Close iterator s.streamFile.iteratorEnd(iterator) @@ -1153,7 +1153,7 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, client err = ErrNilConnection } if err != nil { - log.Warnf("Error sending result entry to %s: %v", client.clientID, err) + log.Errorf("Error sending result entry to %s: %v", client.clientID, err) return err } return nil