Skip to content

Commit

Permalink
Feature/wip (#7)
Browse files Browse the repository at this point in the history
* Replace prints with logs. Lint satisfy. Bug fixes

* Renamed some fields and methods

* Wip server broadcast and stream client

* Merge main

---------

Co-authored-by: Toni Ramírez <[email protected]>
  • Loading branch information
dpunish3r and ToniRamirezM authored Sep 6, 2023
1 parent bac1043 commit f15e597
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 162 deletions.
41 changes: 35 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
Expand All @@ -24,16 +25,20 @@ func main() {
}

// Create clients
go datastreamer.NewClient(1)
go startNewClient()
go startNewClient()

// Fake stream data
time.Sleep(2 * time.Second)

// ------------------------------------------------------------
// Fake Sequencer data
data := make([]byte, 32)
for i := 0; i < 32; i++ {
data[i] = byte(i)
}

// Start tx
err = s.StartStreamTx()
// Start atomic operation
err = s.StartAtomicOp()
if err != nil {
log.Error(">> App error! StartStreamTx")
return
Expand All @@ -49,12 +54,13 @@ func main() {
log.Info(">> App info. Added entry:", entry)
}

// Commit tx
err = s.CommitStreamTx()
// Commit atomic operation
err = s.CommitAtomicOp()
if err != nil {
log.Error(">> App error! CommitStreamTx")
return
}
// ------------------------------------------------------------

// Wait for ctl+c
log.Info(">> Press Control+C to finish...")
Expand All @@ -64,3 +70,26 @@ func main() {

log.Info(">> App end")
}

func startNewClient() {
// Create client
log.Debug("### New client")
c, err := datastreamer.NewClient("127.0.0.1:1337")
if err != nil {
return
}

// Start client (connect to the server)
log.Debug("### Start client")
err = c.Start()
if err != nil {
return
}

// Start streaming receive (execute command Start)
log.Debug("### Send Start command client")
err = c.ExecCommand(datastreamer.CmdStart)
if err != nil {
return
}
}
154 changes: 106 additions & 48 deletions datastreamer/streamclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,111 +6,169 @@ import (
"errors"
"io"
"net"
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/log"
)

const (
server = "127.0.0.1:1337"
)
type StreamClient struct {
server string
conn net.Conn
id string
}

func NewClient(server string) (StreamClient, error) {
// Create the client data stream
c := StreamClient{
server: server,
id: "",
}
return c, nil
}

// For Development
func NewClient(i int) {
func (c *StreamClient) Start() error {
// Connect to server
conn, err := net.Dial("tcp", server)
var err error
c.conn, err = net.Dial("tcp", c.server)
if err != nil {
log.Error("**Error connecting to server:", server, err)
return
log.Error("Error connecting to server: ", c.server, err)
return err
}

defer conn.Close()
log.Info("**Connected to server:", server)
c.id = c.conn.LocalAddr().String()
log.Infof("%s Connected to server: %s", c.id, c.server)
return nil
}

// Send the command and stream type
err = writeFullUint64(uint64(i), conn)
func (c *StreamClient) ExecCommand(cmd Command) error {
// Send command
err := writeFullUint64(uint64(cmd), c.conn)
if err != nil {
log.Error(err)
return
log.Errorf("%s ", c.id, err)
return err
}
err = writeFullUint64(StSequencer, conn)
// Send stream type
err = writeFullUint64(StSequencer, c.conn)
if err != nil {
log.Error(err)
return
log.Errorf("%s ", c.id, err)
return err
}

// Read server result entry for the command
_, err = readResultEntry(conn)
r, err := c.readResultEntry()
if err != nil {
log.Error(err)
return
log.Errorf("%s ", c.id, err)
return err
}

// Read from server
readFromServer(conn)
log.Infof("%s Result %d[%s] received for command %d", c.id, r.errorNum, r.errorStr, cmd)

// Streaming receive goroutine
if cmd == CmdStart {
go c.streamingReceive()
}

return nil
}

func readFromServer(conn net.Conn) {
client := conn.LocalAddr().String()
buffer := make([]byte, 1024)
func (c *StreamClient) streamingReceive() {
defer c.conn.Close()

for {
n, err := conn.Read(buffer)
// Wait next data entry streamed
d, err := c.readDataEntry()
if err != nil {
if err == io.EOF {
log.Errorf("**client %s: no more data", client)
return
}
log.Errorf("**client %s:error reading from server:%s", client, err)
time.Sleep(2 * time.Second)
continue
return
}

log.Infof("**client %s:message from server:[%s]", client, buffer[:n])
log.Infof("%s Received entry number %d: ", c.id, d.entryNum)
}
}

func (c *StreamClient) readDataEntry() (FileEntry, error) {
d := FileEntry{}
reader := bufio.NewReader(c.conn)

// Read fixed size fields
buffer := make([]byte, FixedSizeFileEntry)
_, err := io.ReadFull(reader, buffer)
if err != nil {
if err == io.EOF {
log.Errorf("%s Server close connection", c.id)
} else {
log.Errorf("%s Error reading from server: ", c.id, err)
}
return d, err
}

// Read variable field (data)
length := binary.BigEndian.Uint32(buffer[1:5])
if length < FixedSizeFileEntry {
log.Errorf("%s Error reading data entry", c.id)
return d, errors.New("error reading data entry")
}

bufferAux := make([]byte, length-FixedSizeFileEntry)
_, err = io.ReadFull(reader, bufferAux)
if err != nil {
if err == io.EOF {
log.Errorf("%s Server close connection", c.id)
} else {
log.Errorf("%s Error reading from server: ", c.id, err)
}
return d, err
}
buffer = append(buffer, bufferAux...)

// Decode binary data entry
d, err = DecodeBinaryToFileEntry(buffer)
if err != nil {
return d, err
}

return d, nil
}

func writeFullUint64(value uint64, conn net.Conn) error {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(value))

_, err := conn.Write(buffer)
if err != nil {
log.Error("**Error sending to server:", err)
log.Errorf("%s Error sending to server: ", conn.RemoteAddr().String(), err)
return err
}
return nil
}

func readResultEntry(conn net.Conn) (ResultEntry, error) {
func (c *StreamClient) readResultEntry() (ResultEntry, error) {
e := ResultEntry{}
reader := bufio.NewReader(conn)
reader := bufio.NewReader(c.conn)

// Read fixed fields (isentry 1byte, length 4bytes, errNum 4bytes)
buffer := make([]byte, 9)
// Read fixed size fields
buffer := make([]byte, FixedSizeResultEntry)
_, err := io.ReadFull(reader, buffer)
if err != nil {
if err == io.EOF {
log.Warn("**Server close connection")
log.Errorf("%s Server close connection", c.id)
} else {
log.Error("**Error reading from server:", err)
log.Errorf("%s Error reading from server: ", c.id, err)
}
return e, err
}

// Read variable field (errStr)
length := binary.BigEndian.Uint32(buffer[1:5])
if length < 10 {
log.Error("**Error reading result entry")
if length < FixedSizeResultEntry {
log.Errorf("%s Error reading result entry", c.id)
return e, errors.New("error reading result entry")
}

bufferAux := make([]byte, length-9)
bufferAux := make([]byte, length-FixedSizeResultEntry)
_, err = io.ReadFull(reader, bufferAux)
if err != nil {
if err == io.EOF {
log.Warn("**Server close connection")
log.Errorf("%s Server close connection", c.id)
} else {
log.Error("**Error reading from server:", err)
log.Errorf("%s Error reading from server: ", c.id, err)
}
return e, err
}
Expand Down
Loading

0 comments on commit f15e597

Please sign in to comment.