Skip to content

Commit

Permalink
Detect and handle rollbacks on the RPC nodes (#886)
Browse files Browse the repository at this point in the history
* Implement the RPC rollback feature

* Resolve conflicts

* Fix TestStreamClientGetLatestL2Block
  • Loading branch information
Stefan-Ethernal authored Sep 20, 2024
1 parent 292a2ec commit c69caf5
Show file tree
Hide file tree
Showing 16 changed files with 1,129 additions and 151 deletions.
35 changes: 23 additions & 12 deletions zk/datastream/client/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,27 @@ func (c *StreamClient) sendHeaderCmd() error {
return nil
}

// sendStartBookmarkCmd sends a start command to the server, indicating
// that the client wishes to start streaming from the given bookmark
func (c *StreamClient) sendStartBookmarkCmd(bookmark []byte) error {
err := c.sendCommand(CmdStartBookmark)
if err != nil {
return err
// sendBookmarkCmd sends either CmdStartBookmark or CmdBookmark for the provided bookmark value.
// In case streaming parameter is set to true, the CmdStartBookmark is sent, otherwise the CmdBookmark.
func (c *StreamClient) sendBookmarkCmd(bookmark []byte, streaming bool) error {
// in case we want to stream the entries, CmdStartBookmark is sent, otherwise CmdBookmark command
command := CmdStartBookmark
if !streaming {
command = CmdBookmark
}

// Send starting/from entry number
if err := writeFullUint32ToConn(c.conn, uint32(len(bookmark))); err != nil {
// Send the command
if err := c.sendCommand(command); err != nil {
return err
}
if err := writeBytesToConn(c.conn, bookmark); err != nil {

// Send bookmark length
if err := writeFullUint32ToConn(c.conn, uint32(len(bookmark))); err != nil {
return err
}

return nil
// Send the bookmark to retrieve
return writeBytesToConn(c.conn, bookmark)
}

// sendStartCmd sends a start command to the server, indicating
Expand All @@ -51,11 +55,18 @@ func (c *StreamClient) sendStartCmd(from uint64) error {
}

// Send starting/from entry number
if err := writeFullUint64ToConn(c.conn, from); err != nil {
return writeFullUint64ToConn(c.conn, from)
}

// sendEntryCmd sends the get data stream entry by number command to a TCP connection
func (c *StreamClient) sendEntryCmd(entryNum uint64) error {
// Send CmdEntry command
if err := c.sendCommand(CmdEntry); err != nil {
return err
}

return nil
// Send entry number
return writeFullUint64ToConn(c.conn, entryNum)
}

// sendHeaderCmd sends the header command to the server.
Expand Down
Loading

0 comments on commit c69caf5

Please sign in to comment.