Skip to content

Commit

Permalink
Fix/wip branch (#10)
Browse files Browse the repository at this point in the history
* Wip

* wip

* wip

* wip

* wip

* Wip

* Replace prints with logs. Lint satisfy. Bug fixes

* Renamed some fields and methods

* Wip server broadcast and stream client

* merge main

* Add rollback interface. Clean logs

* WIP processing commands

* Pass lint

* fix branch

---------

Co-authored-by: dPunisher <[email protected]>
  • Loading branch information
ToniRamirezM and dpunish3r authored Sep 7, 2023
1 parent eb5d71e commit 277ff4c
Showing 1 changed file with 77 additions and 27 deletions.
104 changes: 77 additions & 27 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@ const (
CmdHeader Command = 3

// Client status
csStarted ClientStatus = 1
csStopped ClientStatus = 2
csKilled ClientStatus = 0xff
csStarting ClientStatus = 1
csStarted ClientStatus = 2
csStopped ClientStatus = 3
csKilled ClientStatus = 0xff

// Atomic operation status
aoNone AOStatus = 0
aoStarted AOStatus = 1
aoCommitting AOStatus = 2
aoNone AOStatus = 0
aoStarted AOStatus = 1
aoCommitting AOStatus = 2
aoRollbacking AOStatus = 3

// Entry types (events)
EtStartL2Block EntryType = 1
EtExecuteL2Tx EntryType = 2
)

var (
StrClientStatus = map[ClientStatus]string{
csStarting: "Starting",
csStarted: "Started",
csStopped: "Stopped",
csKilled: "Killed",
}
)

type StreamServer struct {
port uint16 // server stream port
fileName string // stream file name
Expand Down Expand Up @@ -154,37 +165,35 @@ func (s *StreamServer) handleConnection(conn net.Conn) {
status: csStopped,
}

cli := s.clients[clientId]

reader := bufio.NewReader(conn)
for {
// Read command
command, err := readFullUint64(reader)
if err != nil {
cli.status = csKilled
return //TODO
s.killClient(clientId)
return
}
// Read stream type
stUint64, err := readFullUint64(reader)
if err != nil {
cli.status = csKilled
return //TODO
s.killClient(clientId)
return
}
st := StreamType(stUint64)

// Check stream type
if st != s.streamType {
log.Error("Mismatch stream type, killed: ", clientId)
cli.status = csKilled
return //TODO
s.killClient(clientId)
return
}

// Manage the requested command
log.Infof("Command %d received from %s", command, clientId)
err = s.processCommand(Command(command), clientId)
if err != nil {
// Kill client connection
cli.status = csKilled
s.killClient(clientId)
return
}
}
Expand Down Expand Up @@ -230,7 +239,7 @@ func (s *StreamServer) AddStreamEntry(etype EntryType, data []uint8) (uint64, er
}

func (s *StreamServer) CommitAtomicOp() error {
log.Debug("!!!Commit Tx")
log.Debug("!!!Commit AtomicOp")
s.atomicOp.status = aoCommitting

// Update header in the file (commit new entries)
Expand All @@ -245,11 +254,20 @@ func (s *StreamServer) CommitAtomicOp() error {
return nil
}

func (s *StreamServer) RollbackAtomicOp() error {
log.Debug("!!!Rollback AtomicOp")
s.atomicOp.status = aoRollbacking

// TODO: work

return nil
}

func (s *StreamServer) broadcastAtomicOp() {
// For each connected and started client
log.Debug("Broadcast clients length: ", len(s.clients))
for id, cli := range s.clients {
log.Debugf("Client %s status %d", id, cli.status)
log.Infof("Client %s status %d[%s]", id, cli.status, StrClientStatus[cli.status])
if cli.status != csStarted {
continue
}
Expand All @@ -258,22 +276,34 @@ func (s *StreamServer) broadcastAtomicOp() {
log.Debug("Streaming to: ", id)
writer := bufio.NewWriter(cli.conn)
for _, entry := range s.atomicOp.entries {
log.Debug("Sending data entry: ", entry.entryNum)
log.Debugf("Sending data entry %d to %s", entry.entryNum, id)
binaryEntry := encodeFileEntryToBinary(entry)

// Send the file entry
// Send the file data entry
_, err := writer.Write(binaryEntry)
if err != nil {
log.Error("Error sending file entry")
// TODO: kill client
// Kill client connection
log.Errorf("Error sending entry to %s", id)
s.killClient(id)
}

// Flush buffers
err = writer.Flush()
if err != nil {
log.Errorf("Error flushing socket data to %s", id)
s.killClient(id)
}
}
writer.Flush()
}

s.atomicOp.status = aoNone
}

func (s *StreamServer) killClient(clientId string) {
s.clients[clientId].status = csKilled
s.clients[clientId].conn.Close()
}

func (s *StreamServer) processCommand(command Command, clientId string) error {
cli := s.clients[clientId]

Expand All @@ -287,8 +317,12 @@ func (s *StreamServer) processCommand(command Command, clientId string) error {
log.Error("Stream to client already started!")
err = errors.New("client already started")
} else {
cli.status = csStarted
// TODO
// Perform work of start command
cli.status = csStarting
err = s.processCmdStart(clientId)
if err == nil {
cli.status = csStarted
}
}

case CmdStop:
Expand Down Expand Up @@ -321,6 +355,20 @@ func (s *StreamServer) processCommand(command Command, clientId string) error {
return err
}

func (s *StreamServer) processCmdStart(clientId string) error {
// Read from entry number parameter
reader := bufio.NewReader(s.clients[clientId].conn)
fromEntry, err := readFullUint64(reader)
if err != nil {
s.killClient(clientId)
return err
}

log.Infof("Starting entry %d for client %s", fromEntry, clientId)

return nil
}

// Send the response to a command that is a result entry
func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientId string) error {
// Prepare the result entry
Expand All @@ -332,7 +380,7 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientI
errorNum: errorNum,
errorStr: byteSlice,
}
PrintResultEntry(entry) // TODO: remove
// PrintResultEntry(entry) // TODO: remove

// Convert struct to binary bytes
binaryEntry := encodeResultEntryToBinary(entry)
Expand All @@ -343,10 +391,12 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientI
writer := bufio.NewWriter(conn)
_, err := writer.Write(binaryEntry)
if err != nil {
log.Error("Error sending result entry")
log.Errorf("Error sending result entry to %s", clientId)
s.killClient(clientId)
return err
}
writer.Flush()

writer.Flush()
return nil
}

Expand Down

0 comments on commit 277ff4c

Please sign in to comment.