Skip to content

Commit

Permalink
make entries configurable and printable (#8)
Browse files Browse the repository at this point in the history
* make entries configurable and printable

* fix config

* fix

* fix

* fix
  • Loading branch information
ToniRamirezM authored Sep 7, 2023
1 parent 799d061 commit eb5d71e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 95 deletions.
28 changes: 28 additions & 0 deletions datastreamer/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package datastreamer

import (
"fmt"
"reflect"
"unsafe"
)

type EntityDefinition struct {
Name string
StreamType StreamType
EntryType EntryType
Definition reflect.Type
}

func (e EntityDefinition) toString(entity []byte) string {
obj := reflect.NewAt(e.Definition, unsafe.Pointer(&entity[0]))
val := obj.Elem()
stringValue := ""

for i := 0; i < val.NumField(); i++ {
stringValue += fmt.Sprintf(val.Type().Field(i).Name + ": " + fmt.Sprintf("%v", val.Field(i).Interface()))
if i < val.NumField()-1 {
stringValue += ", "
}
}
return stringValue
}
18 changes: 9 additions & 9 deletions datastreamer/streamfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ const (
)

type HeaderEntry struct {
packetType uint8 // 1:Header
headLength uint32 // 29
streamType uint64 // 1:Sequencer
totalLength uint64 // Total bytes used in the file
totalEntries uint64 // Total number of data entries (entry type 2)
packetType uint8 // 1:Header
headLength uint32 // 29
streamType StreamType // 1:Sequencer
totalLength uint64 // Total bytes used in the file
totalEntries uint64 // Total number of data entries (entry type 2)
}

type FileEntry struct {
Expand All @@ -47,13 +47,13 @@ type StreamFile struct {
fileName string
pageSize uint32 // in bytes
file *os.File
streamType uint64
streamType StreamType
maxLength uint64 // File size

header HeaderEntry
}

func PrepareStreamFile(fn string, st uint64) (StreamFile, error) {
func PrepareStreamFile(fn string, st StreamType) (StreamFile, error) {
sf := StreamFile{
fileName: fn,
pageSize: pageSize,
Expand Down Expand Up @@ -275,7 +275,7 @@ func encodeHeaderEntryToBinary(e HeaderEntry) []byte {
be := make([]byte, 1)
be[0] = e.packetType
be = binary.BigEndian.AppendUint32(be, e.headLength)
be = binary.BigEndian.AppendUint64(be, e.streamType)
be = binary.BigEndian.AppendUint64(be, uint64(e.streamType))
be = binary.BigEndian.AppendUint64(be, e.totalLength)
be = binary.BigEndian.AppendUint64(be, e.totalEntries)
return be
Expand All @@ -292,7 +292,7 @@ func decodeBinaryToHeaderEntry(b []byte) (HeaderEntry, error) {

e.packetType = b[0]
e.headLength = binary.BigEndian.Uint32(b[1:5])
e.streamType = binary.BigEndian.Uint64(b[5:13])
e.streamType = StreamType(binary.BigEndian.Uint64(b[5:13]))
e.totalLength = binary.BigEndian.Uint64(b[13:21])
e.totalEntries = binary.BigEndian.Uint64(b[21:29])

Expand Down
128 changes: 47 additions & 81 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Command uint64
type ClientStatus uint64
type AOStatus uint64
type EntryType uint32
type StreamType uint64

const (
// Stream type
Expand All @@ -28,42 +29,33 @@ const (
CmdHeader Command = 3

// Client status
csStarting ClientStatus = 1
csStarted ClientStatus = 2
csStopped ClientStatus = 3
csKilled ClientStatus = 0xff
csStarted ClientStatus = 1
csStopped ClientStatus = 2
csKilled ClientStatus = 0xff

// Atomic operation status
aoNone AOStatus = 0
aoStarted AOStatus = 1
aoCommitting AOStatus = 2
aoRollbacking AOStatus = 3
aoNone AOStatus = 0
aoStarted AOStatus = 1
aoCommitting AOStatus = 2

// Entry types (events)
EtStartL2Block EntryType = 1
EtExecuteL2Tx EntryType = 2
)

var (
StrClientStatus = map[ClientStatus]string{
csStarting: "Starting",
csStarted: "Started",
csStopped: "Stopped",
csKilled: "Killed",
}
)

type StreamServer struct {
port uint16 // server stream port
fileName string // stream file name

streamType uint64
streamType StreamType
ln net.Listener
clients map[string]*client

lastEntry uint64
atomicOp streamAO
sf StreamFile

entriesDefinition map[EntryType]EntityDefinition
}

type streamAO struct {
Expand Down Expand Up @@ -131,6 +123,10 @@ func (s *StreamServer) Start() error {
return nil
}

func (s *StreamServer) SetEntriesDefinition(entriesDefinition map[EntryType]EntityDefinition) {
s.entriesDefinition = entriesDefinition
}

func (s *StreamServer) waitConnections() {
defer s.ln.Close()

Expand Down Expand Up @@ -158,33 +154,37 @@ func (s *StreamServer) handleConnection(conn net.Conn) {
status: csStopped,
}

cli := s.clients[clientId]

reader := bufio.NewReader(conn)
for {
// Read command
command, err := readFullUint64(reader)
if err != nil {
s.killClient(clientId)
return
cli.status = csKilled
return //TODO
}
// Read stream type
st, err := readFullUint64(reader)
stUint64, err := readFullUint64(reader)
if err != nil {
s.killClient(clientId)
return
cli.status = csKilled
return //TODO
}
st := StreamType(stUint64)

// Check stream type
if st != s.streamType {
log.Error("Mismatch stream type, killed: ", clientId)
s.killClient(clientId)
return
cli.status = csKilled
return //TODO
}

// Manage the requested command
log.Infof("Command %d received from %s", command, clientId)
err = s.processCommand(Command(command), clientId)
if err != nil {
// Kill client connection
s.killClient(clientId)
cli.status = csKilled
return
}
}
Expand All @@ -197,8 +197,15 @@ func (s *StreamServer) StartAtomicOp() error {
return nil
}

func (s *StreamServer) AddStreamEntry(etype uint32, data []uint8) (uint64, error) {
log.Debugf("!!!Add entry %d", s.lastEntry+1)
func (s *StreamServer) AddStreamEntry(etype EntryType, data []uint8) (uint64, error) {
// Log entity example
entity := s.entriesDefinition[etype]
if entity.Name != "" {
log.Info(entity.toString(data))
} else {
log.Warn("entry definition does not exist")
}

// Generate data entry
e := FileEntry{
packetType: PtEntry,
Expand All @@ -223,7 +230,7 @@ func (s *StreamServer) AddStreamEntry(etype uint32, data []uint8) (uint64, error
}

func (s *StreamServer) CommitAtomicOp() error {
log.Debug("!!!Commit AtomicOp")
log.Debug("!!!Commit Tx")
s.atomicOp.status = aoCommitting

// Update header in the file (commit new entries)
Expand All @@ -238,20 +245,11 @@ func (s *StreamServer) CommitAtomicOp() error {
return nil
}

func (s *StreamServer) RollbackAtomicOp() error {
log.Debug("!!!Rollback AtomicOp")
s.atomicOp.status = aoRollbacking

// TODO: work

return nil
}

func (s *StreamServer) broadcastAtomicOp() {
// For each connected and started client
log.Debug("Broadcast clients length: ", len(s.clients))
for id, cli := range s.clients {
log.Infof("Client %s status %d[%s]", id, cli.status, StrClientStatus[cli.status])
log.Debugf("Client %s status %d", id, cli.status)
if cli.status != csStarted {
continue
}
Expand All @@ -260,34 +258,22 @@ func (s *StreamServer) broadcastAtomicOp() {
log.Debug("Streaming to: ", id)
writer := bufio.NewWriter(cli.conn)
for _, entry := range s.atomicOp.entries {
log.Debugf("Sending data entry %d to %s", entry.entryNum, id)
log.Debug("Sending data entry: ", entry.entryNum)
binaryEntry := encodeFileEntryToBinary(entry)

// Send the file data entry
// Send the file entry
_, err := writer.Write(binaryEntry)
if err != nil {
// Kill client connection
log.Errorf("Error sending entry to %s", id)
s.killClient(id)
}

// Flush buffers
err = writer.Flush()
if err != nil {
log.Errorf("Error flushing socket data to %s", id)
s.killClient(id)
log.Error("Error sending file entry")
// TODO: kill client
}
}
writer.Flush()
}

s.atomicOp.status = aoNone
}

func (s *StreamServer) killClient(clientId string) {
s.clients[clientId].status = csKilled
s.clients[clientId].conn.Close()
}

func (s *StreamServer) processCommand(command Command, clientId string) error {
cli := s.clients[clientId]

Expand All @@ -301,12 +287,8 @@ func (s *StreamServer) processCommand(command Command, clientId string) error {
log.Error("Stream to client already started!")
err = errors.New("client already started")
} else {
// Perform work of start command
cli.status = csStarting
err = s.processCmdStart(clientId)
if err == nil {
cli.status = csStarted
}
cli.status = csStarted
// TODO
}

case CmdStop:
Expand Down Expand Up @@ -339,20 +321,6 @@ func (s *StreamServer) processCommand(command Command, clientId string) error {
return err
}

func (s *StreamServer) processCmdStart(clientId string) error {
// Read from entry number parameter
reader := bufio.NewReader(s.clients[clientId].conn)
fromEntry, err := readFullUint64(reader)
if err != nil {
s.killClient(clientId)
return err
}

log.Infof("Starting entry %d for client %s", fromEntry, clientId)

return nil
}

// Send the response to a command that is a result entry
func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientId string) error {
// Prepare the result entry
Expand All @@ -364,7 +332,7 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientI
errorNum: errorNum,
errorStr: byteSlice,
}
// PrintResultEntry(entry) // TODO: remove
PrintResultEntry(entry) // TODO: remove

// Convert struct to binary bytes
binaryEntry := encodeResultEntryToBinary(entry)
Expand All @@ -375,12 +343,10 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientI
writer := bufio.NewWriter(conn)
_, err := writer.Write(binaryEntry)
if err != nil {
log.Errorf("Error sending result entry to %s", clientId)
s.killClient(clientId)
return err
log.Error("Error sending result entry")
}

writer.Flush()

return nil
}

Expand Down
6 changes: 1 addition & 5 deletions tool/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,13 @@ func (db *StateDB) GetL2Transactions(ctx context.Context, minL2Block, maxL2Block

func scanL2Transaction(row pgx.Row) (*L2Transaction, error) {
l2Transaction := L2Transaction{}
var (
encodedStr string
)
if err := row.Scan(
&l2Transaction.EffectiveGasPricePercentage,
&encodedStr,
&l2Transaction.Encoded,
&l2Transaction.BatchNumber,
); err != nil {
return &l2Transaction, err
}
l2Transaction.Encoded = common.Hex2Bytes(encodedStr)
l2Transaction.EncodedLength = uint32(len(l2Transaction.Encoded))
l2Transaction.IsValid = 1
return &l2Transaction, nil
Expand Down
Loading

0 comments on commit eb5d71e

Please sign in to comment.