Skip to content

Commit

Permalink
make entries configurable and printable
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM committed Sep 7, 2023
1 parent f15e597 commit 7c22471
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 18 deletions.
28 changes: 28 additions & 0 deletions datastreamer/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package datastreamer

import (
"fmt"
"reflect"
"unsafe"
)

type EntityDefinition struct {
Name string
StreamType StreamType
EntryType EntryType
Definition reflect.Type
}

func (e EntityDefinition) toString(entity []byte) string {
obj := reflect.NewAt(e.Definition, unsafe.Pointer(&entity[0]))
val := obj.Elem()
stringValue := ""

for i := 0; i < val.NumField(); i++ {
stringValue += fmt.Sprintf(val.Type().Field(i).Name + ": " + fmt.Sprintf("%v", val.Field(i).Interface()))
if i < val.NumField()-1 {
stringValue += ", "
}
}
return stringValue
}
18 changes: 9 additions & 9 deletions datastreamer/streamfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ const (
)

type HeaderEntry struct {
packetType uint8 // 1:Header
headLength uint32 // 29
streamType uint64 // 1:Sequencer
totalLength uint64 // Total bytes used in the file
totalEntries uint64 // Total number of data entries (entry type 2)
packetType uint8 // 1:Header
headLength uint32 // 29
streamType StreamType // 1:Sequencer
totalLength uint64 // Total bytes used in the file
totalEntries uint64 // Total number of data entries (entry type 2)
}

type FileEntry struct {
Expand All @@ -47,13 +47,13 @@ type StreamFile struct {
fileName string
pageSize uint32 // in bytes
file *os.File
streamType uint64
streamType StreamType
maxLength uint64 // File size

header HeaderEntry
}

func PrepareStreamFile(fn string, st uint64) (StreamFile, error) {
func PrepareStreamFile(fn string, st StreamType) (StreamFile, error) {
sf := StreamFile{
fileName: fn,
pageSize: pageSize,
Expand Down Expand Up @@ -275,7 +275,7 @@ func encodeHeaderEntryToBinary(e HeaderEntry) []byte {
be := make([]byte, 1)
be[0] = e.packetType
be = binary.BigEndian.AppendUint32(be, e.headLength)
be = binary.BigEndian.AppendUint64(be, e.streamType)
be = binary.BigEndian.AppendUint64(be, uint64(e.streamType))
be = binary.BigEndian.AppendUint64(be, e.totalLength)
be = binary.BigEndian.AppendUint64(be, e.totalEntries)
return be
Expand All @@ -292,7 +292,7 @@ func decodeBinaryToHeaderEntry(b []byte) (HeaderEntry, error) {

e.packetType = b[0]
e.headLength = binary.BigEndian.Uint32(b[1:5])
e.streamType = binary.BigEndian.Uint64(b[5:13])
e.streamType = StreamType(binary.BigEndian.Uint64(b[5:13]))
e.totalLength = binary.BigEndian.Uint64(b[13:21])
e.totalEntries = binary.BigEndian.Uint64(b[21:29])

Expand Down
19 changes: 16 additions & 3 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Command uint64
type ClientStatus uint64
type AOStatus uint64
type EntryType uint32
type StreamType uint64

const (
// Stream type
Expand Down Expand Up @@ -46,13 +47,15 @@ type StreamServer struct {
port uint16 // server stream port
fileName string // stream file name

streamType uint64
streamType StreamType
ln net.Listener
clients map[string]*client

lastEntry uint64
atomicOp streamAO
sf StreamFile

entriesDefinition map[EntryType]EntityDefinition
}

type streamAO struct {
Expand Down Expand Up @@ -120,6 +123,10 @@ func (s *StreamServer) Start() error {
return nil
}

func (s *StreamServer) SetEntriesDefinition(entriesDefinition map[EntryType]EntityDefinition) {
s.entriesDefinition = entriesDefinition
}

func (s *StreamServer) waitConnections() {
defer s.ln.Close()

Expand Down Expand Up @@ -158,11 +165,13 @@ func (s *StreamServer) handleConnection(conn net.Conn) {
return //TODO
}
// Read stream type
st, err := readFullUint64(reader)
stUint64, err := readFullUint64(reader)
if err != nil {
cli.status = csKilled
return //TODO
}
st := StreamType(stUint64)

// Check stream type
if st != s.streamType {
log.Error("Mismatch stream type, killed: ", clientId)
Expand All @@ -188,7 +197,11 @@ func (s *StreamServer) StartAtomicOp() error {
return nil
}

func (s *StreamServer) AddStreamEntry(etype uint32, data []uint8) (uint64, error) {
func (s *StreamServer) AddStreamEntry(etype EntryType, data []uint8) (uint64, error) {
// Log entity example
entity := s.entriesDefinition[etype]
log.Info(entity.toString(data))

log.Debug("!!!Add entry")
// Generate data entry
e := FileEntry{
Expand Down
2 changes: 1 addition & 1 deletion tool/config/tool.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ User = "state_user"
Password = "state_password"
Name = "state_db"
Host = "localhost"
Port = "5432"
Port = "5433"
EnableLog = false
MaxConns = 200

Expand Down
6 changes: 1 addition & 5 deletions tool/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,13 @@ func (db *StateDB) GetL2Transactions(ctx context.Context, minL2Block, maxL2Block

func scanL2Transaction(row pgx.Row) (*L2Transaction, error) {
l2Transaction := L2Transaction{}
var (
encodedStr string
)
if err := row.Scan(
&l2Transaction.EffectiveGasPricePercentage,
&encodedStr,
&l2Transaction.Encoded,
&l2Transaction.BatchNumber,
); err != nil {
return &l2Transaction, err
}
l2Transaction.Encoded = common.Hex2Bytes(encodedStr)
l2Transaction.EncodedLength = uint32(len(l2Transaction.Encoded))
l2Transaction.IsValid = 1
return &l2Transaction, nil
Expand Down
17 changes: 17 additions & 0 deletions tool/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,20 @@ import (
"time"
"unsafe"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/ethereum/go-ethereum/common"
)

const (
// StreamTypeSequencer represents a Sequencer stream
StreamTypeSequencer datastreamer.StreamType = 1
// EntryTypeL2Block represents a L2 block
EntryTypeL2Block datastreamer.EntryType = 1
// EntryTypeL2Tx represents a L2 transaction
EntryTypeL2Tx datastreamer.EntryType = 2
)

// L2Block represents a L2 block
type L2Block struct {
BatchNumber uint64
L2BlockNumber uint64
Expand All @@ -21,6 +32,7 @@ func (b L2Block) Encode() []byte {
return (*(*[size]byte)(unsafe.Pointer(&b)))[:]
}

// L2Transaction represents a L2 transaction
type L2Transaction struct {
BatchNumber uint64
EffectiveGasPricePercentage uint8
Expand All @@ -34,3 +46,8 @@ func (l L2Transaction) Encode() []byte {
const size = int(unsafe.Sizeof(L2Transaction{}))
return (*(*[size]byte)(unsafe.Pointer(&l)))[:]
}

// DecodeL2Transaction decodes a byte slice into a L2Transaction
func DecodeL2Transaction(data []byte) L2Transaction {
return *(*L2Transaction)(unsafe.Pointer(&data[0]))
}
23 changes: 23 additions & 0 deletions tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"os"
"reflect"

"github.com/0xPolygonHermez/zkevm-data-streamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
Expand Down Expand Up @@ -76,6 +77,24 @@ func start(cliCtx *cli.Context) error {
log.Fatal(err)
}

// Set entities definition
entriesDefinition := map[datastreamer.EntryType]datastreamer.EntityDefinition{
datastreamer.EtStartL2Block: {
Name: "L2Block",
StreamType: db.StreamTypeSequencer,
EntryType: db.EntryTypeL2Block,
Definition: reflect.TypeOf(db.L2Block{}),
},
datastreamer.EtExecuteL2Tx: {
Name: "L2Transaction",
StreamType: db.StreamTypeSequencer,
EntryType: db.EntryTypeL2Tx,
Definition: reflect.TypeOf(db.L2Transaction{}),
},
}

streamServer.SetEntriesDefinition(entriesDefinition)

// Connect to the database
stateSqlDB, err := db.NewSQLDB(c.StateDB)
if err != nil {
Expand All @@ -93,6 +112,10 @@ func start(cliCtx *cli.Context) error {
log.Fatal(err)
}

if len(l2blocks) == 0 {
log.Fatal("No genesis block found")
}

err = streamServer.StartAtomicOp()
if err != nil {
log.Fatal(err)
Expand Down

0 comments on commit 7c22471

Please sign in to comment.