From eb5d71e4cbfeaced40f2218d52dbe7f59b3c5361 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Thu, 7 Sep 2023 13:41:49 +0200 Subject: [PATCH] make entries configurable and printable (#8) * make entries configurable and printable * fix config * fix * fix * fix --- datastreamer/entities.go | 28 ++++++++ datastreamer/streamfile.go | 18 ++--- datastreamer/streamserver.go | 128 +++++++++++++---------------------- tool/db/db.go | 6 +- tool/db/types.go | 17 +++++ tool/main.go | 23 +++++++ 6 files changed, 125 insertions(+), 95 deletions(-) create mode 100644 datastreamer/entities.go diff --git a/datastreamer/entities.go b/datastreamer/entities.go new file mode 100644 index 0000000..e683c96 --- /dev/null +++ b/datastreamer/entities.go @@ -0,0 +1,28 @@ +package datastreamer + +import ( + "fmt" + "reflect" + "unsafe" +) + +type EntityDefinition struct { + Name string + StreamType StreamType + EntryType EntryType + Definition reflect.Type +} + +func (e EntityDefinition) toString(entity []byte) string { + obj := reflect.NewAt(e.Definition, unsafe.Pointer(&entity[0])) + val := obj.Elem() + stringValue := "" + + for i := 0; i < val.NumField(); i++ { + stringValue += fmt.Sprintf(val.Type().Field(i).Name + ": " + fmt.Sprintf("%v", val.Field(i).Interface())) + if i < val.NumField()-1 { + stringValue += ", " + } + } + return stringValue +} diff --git a/datastreamer/streamfile.go b/datastreamer/streamfile.go index e21315c..8708ef4 100644 --- a/datastreamer/streamfile.go +++ b/datastreamer/streamfile.go @@ -28,11 +28,11 @@ const ( ) type HeaderEntry struct { - packetType uint8 // 1:Header - headLength uint32 // 29 - streamType uint64 // 1:Sequencer - totalLength uint64 // Total bytes used in the file - totalEntries uint64 // Total number of data entries (entry type 2) + packetType uint8 // 1:Header + headLength uint32 // 29 + streamType StreamType // 1:Sequencer + totalLength uint64 // Total bytes used in the file + totalEntries uint64 // Total number of data entries (entry type 2) } type FileEntry struct { @@ -47,13 +47,13 @@ type StreamFile struct { fileName string pageSize uint32 // in bytes file *os.File - streamType uint64 + streamType StreamType maxLength uint64 // File size header HeaderEntry } -func PrepareStreamFile(fn string, st uint64) (StreamFile, error) { +func PrepareStreamFile(fn string, st StreamType) (StreamFile, error) { sf := StreamFile{ fileName: fn, pageSize: pageSize, @@ -275,7 +275,7 @@ func encodeHeaderEntryToBinary(e HeaderEntry) []byte { be := make([]byte, 1) be[0] = e.packetType be = binary.BigEndian.AppendUint32(be, e.headLength) - be = binary.BigEndian.AppendUint64(be, e.streamType) + be = binary.BigEndian.AppendUint64(be, uint64(e.streamType)) be = binary.BigEndian.AppendUint64(be, e.totalLength) be = binary.BigEndian.AppendUint64(be, e.totalEntries) return be @@ -292,7 +292,7 @@ func decodeBinaryToHeaderEntry(b []byte) (HeaderEntry, error) { e.packetType = b[0] e.headLength = binary.BigEndian.Uint32(b[1:5]) - e.streamType = binary.BigEndian.Uint64(b[5:13]) + e.streamType = StreamType(binary.BigEndian.Uint64(b[5:13])) e.totalLength = binary.BigEndian.Uint64(b[13:21]) e.totalEntries = binary.BigEndian.Uint64(b[21:29]) diff --git a/datastreamer/streamserver.go b/datastreamer/streamserver.go index 1bb2cd6..591e087 100644 --- a/datastreamer/streamserver.go +++ b/datastreamer/streamserver.go @@ -17,6 +17,7 @@ type Command uint64 type ClientStatus uint64 type AOStatus uint64 type EntryType uint32 +type StreamType uint64 const ( // Stream type @@ -28,42 +29,33 @@ const ( CmdHeader Command = 3 // Client status - csStarting ClientStatus = 1 - csStarted ClientStatus = 2 - csStopped ClientStatus = 3 - csKilled ClientStatus = 0xff + csStarted ClientStatus = 1 + csStopped ClientStatus = 2 + csKilled ClientStatus = 0xff // Atomic operation status - aoNone AOStatus = 0 - aoStarted AOStatus = 1 - aoCommitting AOStatus = 2 - aoRollbacking AOStatus = 3 + aoNone AOStatus = 0 + aoStarted AOStatus = 1 + aoCommitting AOStatus = 2 // Entry types (events) EtStartL2Block EntryType = 1 EtExecuteL2Tx EntryType = 2 ) -var ( - StrClientStatus = map[ClientStatus]string{ - csStarting: "Starting", - csStarted: "Started", - csStopped: "Stopped", - csKilled: "Killed", - } -) - type StreamServer struct { port uint16 // server stream port fileName string // stream file name - streamType uint64 + streamType StreamType ln net.Listener clients map[string]*client lastEntry uint64 atomicOp streamAO sf StreamFile + + entriesDefinition map[EntryType]EntityDefinition } type streamAO struct { @@ -131,6 +123,10 @@ func (s *StreamServer) Start() error { return nil } +func (s *StreamServer) SetEntriesDefinition(entriesDefinition map[EntryType]EntityDefinition) { + s.entriesDefinition = entriesDefinition +} + func (s *StreamServer) waitConnections() { defer s.ln.Close() @@ -158,25 +154,29 @@ func (s *StreamServer) handleConnection(conn net.Conn) { status: csStopped, } + cli := s.clients[clientId] + reader := bufio.NewReader(conn) for { // Read command command, err := readFullUint64(reader) if err != nil { - s.killClient(clientId) - return + cli.status = csKilled + return //TODO } // Read stream type - st, err := readFullUint64(reader) + stUint64, err := readFullUint64(reader) if err != nil { - s.killClient(clientId) - return + cli.status = csKilled + return //TODO } + st := StreamType(stUint64) + // Check stream type if st != s.streamType { log.Error("Mismatch stream type, killed: ", clientId) - s.killClient(clientId) - return + cli.status = csKilled + return //TODO } // Manage the requested command @@ -184,7 +184,7 @@ func (s *StreamServer) handleConnection(conn net.Conn) { err = s.processCommand(Command(command), clientId) if err != nil { // Kill client connection - s.killClient(clientId) + cli.status = csKilled return } } @@ -197,8 +197,15 @@ func (s *StreamServer) StartAtomicOp() error { return nil } -func (s *StreamServer) AddStreamEntry(etype uint32, data []uint8) (uint64, error) { - log.Debugf("!!!Add entry %d", s.lastEntry+1) +func (s *StreamServer) AddStreamEntry(etype EntryType, data []uint8) (uint64, error) { + // Log entity example + entity := s.entriesDefinition[etype] + if entity.Name != "" { + log.Info(entity.toString(data)) + } else { + log.Warn("entry definition does not exist") + } + // Generate data entry e := FileEntry{ packetType: PtEntry, @@ -223,7 +230,7 @@ func (s *StreamServer) AddStreamEntry(etype uint32, data []uint8) (uint64, error } func (s *StreamServer) CommitAtomicOp() error { - log.Debug("!!!Commit AtomicOp") + log.Debug("!!!Commit Tx") s.atomicOp.status = aoCommitting // Update header in the file (commit new entries) @@ -238,20 +245,11 @@ func (s *StreamServer) CommitAtomicOp() error { return nil } -func (s *StreamServer) RollbackAtomicOp() error { - log.Debug("!!!Rollback AtomicOp") - s.atomicOp.status = aoRollbacking - - // TODO: work - - return nil -} - func (s *StreamServer) broadcastAtomicOp() { // For each connected and started client log.Debug("Broadcast clients length: ", len(s.clients)) for id, cli := range s.clients { - log.Infof("Client %s status %d[%s]", id, cli.status, StrClientStatus[cli.status]) + log.Debugf("Client %s status %d", id, cli.status) if cli.status != csStarted { continue } @@ -260,34 +258,22 @@ func (s *StreamServer) broadcastAtomicOp() { log.Debug("Streaming to: ", id) writer := bufio.NewWriter(cli.conn) for _, entry := range s.atomicOp.entries { - log.Debugf("Sending data entry %d to %s", entry.entryNum, id) + log.Debug("Sending data entry: ", entry.entryNum) binaryEntry := encodeFileEntryToBinary(entry) - // Send the file data entry + // Send the file entry _, err := writer.Write(binaryEntry) if err != nil { - // Kill client connection - log.Errorf("Error sending entry to %s", id) - s.killClient(id) - } - - // Flush buffers - err = writer.Flush() - if err != nil { - log.Errorf("Error flushing socket data to %s", id) - s.killClient(id) + log.Error("Error sending file entry") + // TODO: kill client } } + writer.Flush() } s.atomicOp.status = aoNone } -func (s *StreamServer) killClient(clientId string) { - s.clients[clientId].status = csKilled - s.clients[clientId].conn.Close() -} - func (s *StreamServer) processCommand(command Command, clientId string) error { cli := s.clients[clientId] @@ -301,12 +287,8 @@ func (s *StreamServer) processCommand(command Command, clientId string) error { log.Error("Stream to client already started!") err = errors.New("client already started") } else { - // Perform work of start command - cli.status = csStarting - err = s.processCmdStart(clientId) - if err == nil { - cli.status = csStarted - } + cli.status = csStarted + // TODO } case CmdStop: @@ -339,20 +321,6 @@ func (s *StreamServer) processCommand(command Command, clientId string) error { return err } -func (s *StreamServer) processCmdStart(clientId string) error { - // Read from entry number parameter - reader := bufio.NewReader(s.clients[clientId].conn) - fromEntry, err := readFullUint64(reader) - if err != nil { - s.killClient(clientId) - return err - } - - log.Infof("Starting entry %d for client %s", fromEntry, clientId) - - return nil -} - // Send the response to a command that is a result entry func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientId string) error { // Prepare the result entry @@ -364,7 +332,7 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientI errorNum: errorNum, errorStr: byteSlice, } - // PrintResultEntry(entry) // TODO: remove + PrintResultEntry(entry) // TODO: remove // Convert struct to binary bytes binaryEntry := encodeResultEntryToBinary(entry) @@ -375,12 +343,10 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientI writer := bufio.NewWriter(conn) _, err := writer.Write(binaryEntry) if err != nil { - log.Errorf("Error sending result entry to %s", clientId) - s.killClient(clientId) - return err + log.Error("Error sending result entry") } - writer.Flush() + return nil } diff --git a/tool/db/db.go b/tool/db/db.go index 29a1ef9..9586fce 100644 --- a/tool/db/db.go +++ b/tool/db/db.go @@ -112,17 +112,13 @@ func (db *StateDB) GetL2Transactions(ctx context.Context, minL2Block, maxL2Block func scanL2Transaction(row pgx.Row) (*L2Transaction, error) { l2Transaction := L2Transaction{} - var ( - encodedStr string - ) if err := row.Scan( &l2Transaction.EffectiveGasPricePercentage, - &encodedStr, + &l2Transaction.Encoded, &l2Transaction.BatchNumber, ); err != nil { return &l2Transaction, err } - l2Transaction.Encoded = common.Hex2Bytes(encodedStr) l2Transaction.EncodedLength = uint32(len(l2Transaction.Encoded)) l2Transaction.IsValid = 1 return &l2Transaction, nil diff --git a/tool/db/types.go b/tool/db/types.go index f611def..051cfa7 100644 --- a/tool/db/types.go +++ b/tool/db/types.go @@ -4,9 +4,20 @@ import ( "time" "unsafe" + "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" "github.com/ethereum/go-ethereum/common" ) +const ( + // StreamTypeSequencer represents a Sequencer stream + StreamTypeSequencer datastreamer.StreamType = 1 + // EntryTypeL2Block represents a L2 block + EntryTypeL2Block datastreamer.EntryType = 1 + // EntryTypeL2Tx represents a L2 transaction + EntryTypeL2Tx datastreamer.EntryType = 2 +) + +// L2Block represents a L2 block type L2Block struct { BatchNumber uint64 L2BlockNumber uint64 @@ -21,6 +32,7 @@ func (b L2Block) Encode() []byte { return (*(*[size]byte)(unsafe.Pointer(&b)))[:] } +// L2Transaction represents a L2 transaction type L2Transaction struct { BatchNumber uint64 EffectiveGasPricePercentage uint8 @@ -34,3 +46,8 @@ func (l L2Transaction) Encode() []byte { const size = int(unsafe.Sizeof(L2Transaction{})) return (*(*[size]byte)(unsafe.Pointer(&l)))[:] } + +// DecodeL2Transaction decodes a byte slice into a L2Transaction +func DecodeL2Transaction(data []byte) L2Transaction { + return *(*L2Transaction)(unsafe.Pointer(&data[0])) +} diff --git a/tool/main.go b/tool/main.go index 237899e..ddf9e53 100644 --- a/tool/main.go +++ b/tool/main.go @@ -2,6 +2,7 @@ package main import ( "os" + "reflect" "github.com/0xPolygonHermez/zkevm-data-streamer" "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" @@ -76,6 +77,24 @@ func start(cliCtx *cli.Context) error { log.Fatal(err) } + // Set entities definition + entriesDefinition := map[datastreamer.EntryType]datastreamer.EntityDefinition{ + datastreamer.EtStartL2Block: { + Name: "L2Block", + StreamType: db.StreamTypeSequencer, + EntryType: db.EntryTypeL2Block, + Definition: reflect.TypeOf(db.L2Block{}), + }, + datastreamer.EtExecuteL2Tx: { + Name: "L2Transaction", + StreamType: db.StreamTypeSequencer, + EntryType: db.EntryTypeL2Tx, + Definition: reflect.TypeOf(db.L2Transaction{}), + }, + } + + streamServer.SetEntriesDefinition(entriesDefinition) + // Connect to the database stateSqlDB, err := db.NewSQLDB(c.StateDB) if err != nil { @@ -93,6 +112,10 @@ func start(cliCtx *cli.Context) error { log.Fatal(err) } + if len(l2blocks) == 0 { + log.Fatal("No genesis block found") + } + err = streamServer.StartAtomicOp() if err != nil { log.Fatal(err)