Skip to content

Commit

Permalink
review errors level (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM authored Oct 17, 2024
1 parent 766c3e3 commit 59e340f
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 43 deletions.
3 changes: 1 addition & 2 deletions datastreamer/streambookmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func (b *StreamBookmark) GetBookmark(bookmark []byte) (uint64, error) {
// Get the bookmark from DB
entry, err := b.db.Get(bookmark, nil)
if errors.Is(err, leveldb.ErrNotFound) {
// log.Infof("Bookmark not found [%v]: %v", bookmark, err)
return 0, err
} else if err != nil {
log.Errorf("Error getting bookmark [%v]: %w", bookmark, err)
Expand Down Expand Up @@ -100,7 +99,7 @@ func (b *StreamBookmark) PrintDump() error {
iter.Release()

// Log total
log.Infof("Number of bookmarks: [%d]", count)
log.Debugf("Number of bookmarks: [%d]", count)

return err
}
32 changes: 19 additions & 13 deletions datastreamer/streamclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ func (c *StreamClient) Start() error {
go c.readEntries()

// Goroutine to consume streaming entries
go c.getStreaming()
go func() {
err := c.getStreaming()
if err != nil {
log.Errorf("%s Error while getting streaming: %v", c.ID, err)
}
}()

// Flag stared
c.started = true
Expand All @@ -103,7 +108,7 @@ func (c *StreamClient) connectServer() bool {
for !c.connected {
c.conn, err = net.Dial("tcp", c.server)
if err != nil {
log.Infof("Error connecting to server %s: %v", c.server, err)
log.Errorf("Error connecting to server %s: %v", c.server, err)
time.Sleep(defaultTimeout)
continue
} else {
Expand Down Expand Up @@ -177,7 +182,7 @@ func (c *StreamClient) ExecCommandGetBookmark(fromBookmark []byte) (FileEntry, e
// execCommand executes a valid client TCP command with deferred command result possibility
func (c *StreamClient) execCommand(cmd Command, deferredResult bool,
fromEntry uint64, fromBookmark []byte) (HeaderEntry, FileEntry, error) {
log.Infof("%s Executing command %d[%s]...", c.ID, cmd, StrCommand[cmd])
log.Debugf("%s Executing command %d[%s]...", c.ID, cmd, StrCommand[cmd])
header := HeaderEntry{}
entry := FileEntry{}

Expand Down Expand Up @@ -207,14 +212,14 @@ func (c *StreamClient) execCommand(cmd Command, deferredResult bool,
// Send the command parameters
switch cmd {
case CmdStart:
log.Infof("%s ...from entry %d", c.ID, fromEntry)
log.Debugf("%s ...from entry %d", c.ID, fromEntry)
// Send starting/from entry number
err = writeFullUint64(fromEntry, c.conn)
if err != nil {
return header, entry, err
}
case CmdStartBookmark:
log.Infof("%s ...from bookmark [%v]", c.ID, fromBookmark)
log.Debugf("%s ...from bookmark [%v]", c.ID, fromBookmark)
// Send starting/from bookmark length
err = writeFullUint32(uint32(len(fromBookmark)), c.conn)
if err != nil {
Expand All @@ -226,14 +231,14 @@ func (c *StreamClient) execCommand(cmd Command, deferredResult bool,
return header, entry, err
}
case CmdEntry:
log.Infof("%s ...get entry %d", c.ID, fromEntry)
log.Debugf("%s ...get entry %d", c.ID, fromEntry)
// Send entry to retrieve
err = writeFullUint64(fromEntry, c.conn)
if err != nil {
return header, entry, err
}
case CmdBookmark:
log.Infof("%s ...get bookmark [%v]", c.ID, fromBookmark)
log.Debugf("%s ...get bookmark [%v]", c.ID, fromBookmark)
// Send bookmark length
err = writeFullUint32(uint32(len(fromBookmark)), c.conn)
if err != nil {
Expand Down Expand Up @@ -529,35 +534,36 @@ func (c *StreamClient) readEntries() {
func (c *StreamClient) getResult(cmd Command) ResultEntry {
// Get result entry
r := <-c.results
log.Infof("%s Result %d[%s] received for command %d[%s]", c.ID, r.errorNum, r.errorStr, cmd, StrCommand[cmd])
log.Debugf("%s Result %d[%s] received for command %d[%s]", c.ID, r.errorNum, r.errorStr, cmd, StrCommand[cmd])
return r
}

// getHeader consumes a header entry
func (c *StreamClient) getHeader() HeaderEntry {
h := <-c.headers
log.Infof("%s Header received info: TotalEntries[%d], TotalLength[%d], Version[%d], SystemID[%d]",
log.Debugf("%s Header received info: TotalEntries[%d], TotalLength[%d], Version[%d], SystemID[%d]",
c.ID, h.TotalEntries, h.TotalLength, h.Version, h.SystemID)
return h
}

// getEntry consumes a entry from commands response
func (c *StreamClient) getEntry() FileEntry {
e := <-c.entryRsp
log.Infof("%s Entry received info: Number[%d]", c.ID, e.Number)
log.Debugf("%s Entry received info: Number[%d]", c.ID, e.Number)
return e
}

// getStreaming consumes streaming data entries
func (c *StreamClient) getStreaming() {
func (c *StreamClient) getStreaming() error {
for {
e := <-c.entries
c.nextEntry = e.Number + 1

// Process the data entry
err := c.processEntry(&e, c, c.relayServer)
if err != nil {
log.Fatalf("%s Processing entry %d: %s. HALTED!", c.ID, e.Number, err.Error())
log.Errorf("%s Processing entry %d: %s. Exiting getStream function", c.ID, e.Number, err.Error())
return err
}
}
}
Expand Down Expand Up @@ -597,6 +603,6 @@ func (c *StreamClient) IsStarted() bool {
// PrintReceivedEntry prints received entry (default callback function)
func PrintReceivedEntry(e *FileEntry, c *StreamClient, s *StreamServer) error {
// Log data entry fields
log.Infof("Data entry(%s): %d | %d | %d | %d", c.ID, e.Number, e.Length, e.Type, len(e.Data))
log.Debugf("Data entry(%s): %d | %d | %d | %d", c.ID, e.Number, e.Length, e.Type, len(e.Data))
return nil
}
5 changes: 2 additions & 3 deletions datastreamer/streamfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (f *StreamFile) initializeFile() error {
for i := 1; i <= initPages; i++ {
err = f.createPage(f.pageSize)
if err != nil {
log.Error("Eror creating page")
log.Error("Error creating page")
return err
}
}
Expand Down Expand Up @@ -580,7 +580,6 @@ func (f *StreamFile) AddFileEntry(e FileEntry) error {
f.header.TotalEntries++
f.mutexHeader.Unlock()

// printHeaderEntry(f.header)
return nil
}

Expand Down Expand Up @@ -656,7 +655,7 @@ func DecodeBinaryToFileEntry(b []byte) (FileEntry, error) {
func (f *StreamFile) iteratorFrom(entryNum uint64, readOnly bool) (*iteratorFile, error) {
// Check starting entry number
if entryNum >= f.writtenHead.TotalEntries {
log.Infof("Invalid starting entry number for iterator")
log.Error("Invalid starting entry number for iterator")
return nil, ErrInvalidEntryNumber
}

Expand Down
50 changes: 25 additions & 25 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (s *StreamServer) checkClientInactivity() {
s.mutexClients.Unlock()

for clientID := range clientsToKill {
log.Infof("killing inactive client %s", clientID)
log.Warnf("killing inactive client %s", clientID)
s.killClient(clientID)
}
}
Expand Down Expand Up @@ -331,7 +331,7 @@ func (s *StreamServer) handleConnection(conn net.Conn) {

// Check stream type
if st != s.streamType {
log.Errorf("Mismatch stream type, killed: %s", clientID)
log.Errorf("Mismatch stream type: client %s killed", clientID)
s.killClient(clientID)
return
}
Expand Down Expand Up @@ -464,7 +464,7 @@ func (s *StreamServer) CommitAtomicOp() error {
// No atomic operation in progress
s.clearAtomicOp()

log.Infof("committed datastream atomic operation, startEntry: %d, time: %v", s.atomicOp.startEntry, time.Since(start))
log.Debugf("committed datastream atomic operation, startEntry: %d, time: %v", s.atomicOp.startEntry, time.Since(start))

return nil
}
Expand All @@ -474,7 +474,7 @@ func (s *StreamServer) RollbackAtomicOp() error {
start := time.Now().UnixNano()
defer log.Debugf("RollbackAtomicOp process time: %vns", time.Now().UnixNano()-start)

log.Infof("rollback datastream atomic operation, startEntry: %d", s.atomicOp.startEntry)
log.Debugf("rollback datastream atomic operation, startEntry: %d", s.atomicOp.startEntry)
if s.atomicOp.status != aoStarted {
log.Errorf("Rollback not allowed, AtomicOp is not in the started state")
return ErrRollbackNotAllowed
Expand Down Expand Up @@ -734,7 +734,7 @@ func (s *StreamServer) broadcastAtomicOp() {
}
}

log.Infof("sent datastream entries, count: %d, clients: %d, time: %v, clients-ip: {%s}",
log.Debugf("sent datastream entries, count: %d, clients: %d, time: %v, clients-ip: {%s}",
len(broadcastOp.entries), len(s.clients), time.Since(start), sClients)
}
}
Expand Down Expand Up @@ -878,11 +878,11 @@ func (s *StreamServer) processCmdStart(client *client) error {
client.fromEntry = fromEntry

// Log
log.Infof("Client %s command Start from %d", client.clientID, fromEntry)
log.Debugf("Client %s command Start from %d", client.clientID, fromEntry)

// Check received param
if fromEntry > s.nextEntry && fromEntry > s.initEntry {
log.Infof("Start command invalid from entry %d for client %s", fromEntry, client.clientID)
log.Errorf("Start command invalid from entry %d for client %s", fromEntry, client.clientID)
err = ErrStartCommandInvalidParamFromEntry
_ = s.sendResultEntry(uint32(CmdErrBadFromEntry), StrCommandErrors[CmdErrBadFromEntry], client)
return err
Expand Down Expand Up @@ -912,7 +912,7 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error {

// Check maximum length allowed
if length > maxBookmarkLength {
log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.",
log.Errorf("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.",
client.clientID, length, maxBookmarkLength)
return ErrBookmarkMaxLength
}
Expand All @@ -924,12 +924,12 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error {
}

// Log
log.Infof("Client %s command StartBookmark [%v]", client.clientID, bookmark)
log.Debugf("Client %s command StartBookmark [%v]", client.clientID, bookmark)

// Get bookmark
entryNum, err := s.bookmark.GetBookmark(bookmark)
if err != nil {
log.Infof("StartBookmark command invalid from bookmark %v for client %s: %v", bookmark, client.clientID, err)
log.Errorf("StartBookmark command invalid from bookmark %v for client %s: %v", bookmark, client.clientID, err)
err = ErrStartBookmarkInvalidParamFromBookmark
_ = s.sendResultEntry(uint32(CmdErrBadFromBookmark), StrCommandErrors[CmdErrBadFromBookmark], client)
return err
Expand All @@ -942,7 +942,7 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error {
}

// Stream entries data from the entry number marked by the bookmark
log.Infof("Client %s Bookmark [%v] is the entry number [%d]", client.clientID, bookmark, entryNum)
log.Debugf("Client %s Bookmark [%v] is the entry number [%d]", client.clientID, bookmark, entryNum)
if entryNum < s.nextEntry {
err = s.streamingFromEntry(client, entryNum)
}
Expand All @@ -953,7 +953,7 @@ func (s *StreamServer) processCmdStartBookmark(client *client) error {
// processCmdStop processes the TCP Stop command from the clients
func (s *StreamServer) processCmdStop(client *client) error {
// Log
log.Infof("Client %s command Stop", client.clientID)
log.Debugf("Client %s command Stop", client.clientID)

// Send a command result entry OK
err := s.sendResultEntry(0, "OK", client)
Expand All @@ -963,7 +963,7 @@ func (s *StreamServer) processCmdStop(client *client) error {
// processCmdHeader processes the TCP Header command from the clients
func (s *StreamServer) processCmdHeader(client *client) error {
// Log
log.Infof("Client %s command Header", client.clientID)
log.Debugf("Client %s command Header", client.clientID)

// Send a command result entry OK
err := s.sendResultEntry(0, "OK", client)
Expand All @@ -982,7 +982,7 @@ func (s *StreamServer) processCmdHeader(client *client) error {
err = ErrNilConnection
}
if err != nil {
log.Warnf("Error sending header entry to %s: %v", client.clientID, err)
log.Errorf("Error sending header entry to %s: %v", client.clientID, err)
return err
}
return nil
Expand All @@ -997,7 +997,7 @@ func (s *StreamServer) processCmdEntry(client *client) error {
}

// Log
log.Infof("Client %s command Entry %d", client.clientID, entryNumber)
log.Debugf("Client %s command Entry %d", client.clientID, entryNumber)

// Send a command result entry OK
err = s.sendResultEntry(0, "OK", client)
Expand All @@ -1008,7 +1008,7 @@ func (s *StreamServer) processCmdEntry(client *client) error {
// Get the requested entry
entry, err := s.GetEntry(entryNumber)
if err != nil {
log.Infof("Error getting entry, not found? %d: %v", entryNumber, err)
log.Warnf("Entry not found %d: %v", entryNumber, err)
entry = FileEntry{}
entry.Length = FixedSizeFileEntry
entry.Type = EntryTypeNotFound
Expand All @@ -1023,7 +1023,7 @@ func (s *StreamServer) processCmdEntry(client *client) error {
err = ErrNilConnection
}
if err != nil {
log.Warnf("Error sending entry to %s: %v", client.clientID, err)
log.Errorf("Error sending entry to %s: %v", client.clientID, err)
return err
}

Expand All @@ -1040,7 +1040,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error {

// Check maximum length allowed
if length > maxBookmarkLength {
log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.",
log.Errorf("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.",
client.clientID, length, maxBookmarkLength)
return ErrBookmarkMaxLength
}
Expand All @@ -1052,7 +1052,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error {
}

// Log
log.Infof("Client %s command Bookmark %v", client.clientID, bookmark)
log.Debugf("Client %s command Bookmark %v", client.clientID, bookmark)

// Send a command result entry OK
err = s.sendResultEntry(0, "OK", client)
Expand All @@ -1063,7 +1063,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error {
// Get the requested bookmark
entry, err := s.GetFirstEventAfterBookmark(bookmark)
if err != nil {
log.Infof("Error getting bookmark, not found? %v: %v", bookmark, err)
log.Warnf("Entry not found %v: %v", bookmark, err)
entry = FileEntry{}
entry.Length = FixedSizeFileEntry
entry.Type = EntryTypeNotFound
Expand All @@ -1078,7 +1078,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error {
err = ErrNilConnection
}
if err != nil {
log.Warnf("Error sending entry to %s: %v", client.clientID, err)
log.Errorf("Error sending entry to %s: %v", client.clientID, err)
return err
}

Expand All @@ -1088,7 +1088,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error {
// streamingFromEntry sends to the client the stream data starting from the requested entry number
func (s *StreamServer) streamingFromEntry(client *client, fromEntry uint64) error {
// Log
log.Infof("SYNCING %s from entry %d...", client.clientID, fromEntry)
log.Debugf("SYNCING %s from entry %d...", client.clientID, fromEntry)

// Start file stream iterator
iterator, err := s.streamFile.iteratorFrom(fromEntry, true)
Expand Down Expand Up @@ -1117,11 +1117,11 @@ func (s *StreamServer) streamingFromEntry(client *client, fromEntry uint64) erro
err = ErrNilConnection
}
if err != nil {
log.Warnf("Error sending entry %d to %s: %v", iterator.Entry.Number, client.clientID, err)
log.Errorf("Error sending entry %d to %s: %v", iterator.Entry.Number, client.clientID, err)
return err
}
}
log.Infof("Synced %s until %d!", client.clientID, iterator.Entry.Number)
log.Debugf("Synced %s until %d!", client.clientID, iterator.Entry.Number)

// Close iterator
s.streamFile.iteratorEnd(iterator)
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, client
err = ErrNilConnection
}
if err != nil {
log.Warnf("Error sending result entry to %s: %v", client.clientID, err)
log.Errorf("Error sending result entry to %s: %v", client.clientID, err)
return err
}
return nil
Expand Down

0 comments on commit 59e340f

Please sign in to comment.