Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time stats in atomic op #27

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
The first page is the header.
Header page size = 4096 bytes

#### Magic numbers
At the beginning of the file there are the following magic bytes (file signature): `polygonDATSTREAM`

#### Header entry format

>u8 packetType=1
Expand Down
7 changes: 4 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func runServer(*cli.Context) error {

rand.Seed(time.Now().UnixNano())

for n := 1; n <= 1000; n++ {
for n := 1; n <= 1000000; n++ {
// Start atomic operation
err = s.StartAtomicOp()
if err != nil {
Expand All @@ -128,7 +128,7 @@ func runServer(*cli.Context) error {
return
}
// Tx
numTx := rand.Intn(20) + 1
numTx := 1 //rand.Intn(20) + 1
for i := 1; i <= numTx; i++ {
_, err = s.AddStreamEntry(2, dataTx)
if err != nil {
Expand All @@ -153,7 +153,7 @@ func runServer(*cli.Context) error {
latestRollback = entryBlock
}

time.Sleep(2000 * time.Millisecond)
// time.Sleep(200 * time.Millisecond)
}
}()
// ------------------------------------------------------------
Expand All @@ -172,6 +172,7 @@ func runServer(*cli.Context) error {
func runClient(*cli.Context) error {
// Create client
c, err := datastreamer.NewClient("127.0.0.1:6900", StSequencer)
// c, err := datastreamer.NewClient("stream.internal.zkevm-test.net:6900", StSequencer)
if err != nil {
return err
}
Expand Down
4 changes: 0 additions & 4 deletions datastreamer/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"reflect"
"unsafe"

"github.com/0xPolygonHermez/zkevm-data-streamer/log"
)

type EntityDefinition struct {
Expand All @@ -32,8 +30,6 @@ func (e EntityDefinition) toString(entity []byte) string {
if i < val.NumField()-1 {
stringValue += ", "
}

log.Debug(stringValue)
}
return stringValue
}
172 changes: 156 additions & 16 deletions datastreamer/streamfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type StreamFile struct {
streamType StreamType
maxLength uint64 // File size in bytes

fileHeader *os.File // File descriptor just for read/write the header
header HeaderEntry // Current header in memory (atomic operation in progress)
writtenHead HeaderEntry // Current header written in the file
}
Expand All @@ -75,6 +76,7 @@ func PrepareStreamFile(fn string, st StreamType) (StreamFile, error) {
streamType: st,
maxLength: 0,

fileHeader: nil,
header: HeaderEntry{
packetType: PtHeader,
headLength: headerSize,
Expand All @@ -86,7 +88,11 @@ func PrepareStreamFile(fn string, st StreamType) (StreamFile, error) {

// Open (or create) the data stream file
err := sf.openCreateFile()
if err != nil {
return sf, err
}

// Print file info
printStreamFile(sf)

return sf, err
Expand All @@ -104,6 +110,10 @@ func (f *StreamFile) openCreateFile() error {
if err != nil {
log.Errorf("Error creating datastream file %s: %v", f.fileName, err)
} else {
err = f.openFileForHeader()
if err != nil {
return err
}
err = f.initializeFile()
}

Expand All @@ -113,7 +123,10 @@ func (f *StreamFile) openCreateFile() error {
f.file, err = os.OpenFile(f.fileName, os.O_RDWR, 0666)
if err != nil {
log.Errorf("Error opening datastream file %s: %v", f.fileName, err)
return err
}

err = f.openFileForHeader()
} else {
log.Errorf("Unable to check datastream file status %s: %v", f.fileName, err)
}
Expand Down Expand Up @@ -154,6 +167,17 @@ func (f *StreamFile) openCreateFile() error {
return nil
}

func (f *StreamFile) openFileForHeader() error {
// Get another file descriptor to use just for read/write the header
var err error
f.fileHeader, err = os.OpenFile(f.fileName, os.O_RDWR, 0666)
if err != nil {
log.Errorf("Error opening file for read/write header: %v", err)
return err
}
return nil
}

func (f *StreamFile) initializeFile() error {
// Create the header page
err := f.createHeaderPage()
Expand Down Expand Up @@ -261,15 +285,15 @@ func (f *StreamFile) extendFile() error {
// Read header from file to restore the header struct
func (f *StreamFile) readHeaderEntry() error {
// Position at the beginning of the file
_, err := f.file.Seek(magicNumSize, io.SeekStart)
_, err := f.fileHeader.Seek(magicNumSize, io.SeekStart)
if err != nil {
log.Errorf("Error seeking the start of the file: %v", err)
return err
}

// Read header stream bytes
binaryHeader := make([]byte, headerSize)
n, err := f.file.Read(binaryHeader)
n, err := f.fileHeader.Read(binaryHeader)
if err != nil {
log.Errorf("Error reading the header: %v", err)
return err
Expand Down Expand Up @@ -317,7 +341,7 @@ func printHeaderEntry(e HeaderEntry) {
// Write the memory header struct into the file header
func (f *StreamFile) writeHeaderEntry() error {
// Position at the beginning of the file
_, err := f.file.Seek(magicNumSize, io.SeekStart)
_, err := f.fileHeader.Seek(magicNumSize, io.SeekStart)
if err != nil {
log.Errorf("Error seeking the start of the file: %v", err)
return err
Expand All @@ -326,12 +350,12 @@ func (f *StreamFile) writeHeaderEntry() error {
// Write after convert header struct to binary stream
binaryHeader := encodeHeaderEntryToBinary(f.header)
log.Debugf("writing header entry: %v", binaryHeader)
_, err = f.file.Write(binaryHeader)
_, err = f.fileHeader.Write(binaryHeader)
if err != nil {
log.Errorf("Error writing the header: %v", err)
return err
}
err = f.file.Sync()
err = f.fileHeader.Sync()
if err != nil {
log.Errorf("Error flushing header data to disk: %v", err)
return err
Expand Down Expand Up @@ -605,7 +629,7 @@ func (f *StreamFile) iteratorFrom(entryNum uint64) (*iteratorFile, error) {
}

// TODO: Locate file start streaming point using dichotomic search
// f.seekEntry(&iterator)
// err = f.seekEntry(&iterator)
if entryNum > 0 {
for {
end, err := f.iteratorNext(&iterator)
Expand Down Expand Up @@ -687,15 +711,24 @@ func (f *StreamFile) iteratorNext(iterator *iteratorFile) (bool, error) {
}
buffer = append(packet, buffer...)

// Read variable data
// Check length
length := binary.BigEndian.Uint32(buffer[1:5])
bufferAux := make([]byte, length-FixedSizeFileEntry)
_, err = iterator.file.Read(bufferAux)
if err != nil {
log.Errorf("Error reading data for iterator: %v", err)
if length < FixedSizeFileEntry {
log.Errorf("Error decoding length data entry")
err = errors.New("error decoding length data entry")
return true, err
}
buffer = append(buffer, bufferAux...)

// Read variable data
if length > FixedSizeFileEntry {
bufferAux := make([]byte, length-FixedSizeFileEntry)
_, err = iterator.file.Read(bufferAux)
if err != nil {
log.Errorf("Error reading data for iterator: %v", err)
return true, err
}
buffer = append(buffer, bufferAux...)
}

// Convert to data entry struct
iterator.entry, err = DecodeBinaryToFileEntry(buffer)
Expand Down Expand Up @@ -743,24 +776,47 @@ func (f *StreamFile) iteratorEnd(iterator *iteratorFile) {
// // Decode packet type
// packetType := buffer[0]
// if packetType != PtData {
// log.Errorf("Error data page (%d) not starting with packet of type data. Type: %d", avg, packetType)
// log.Errorf("Error data page %d not starting with packet of type data. Type: %d", avg, packetType)
// return errors.New("page not starting with entry data")
// }

// // Decode entry number and compare with target value
// // Decode entry number and compare it with the one we are looking for
// entryNum := binary.BigEndian.Uint64(buffer[9:17])
// if entryNum == iterator.fromEntry {
// // Found! the first of the page
// break
// } else if entryNum > iterator.fromEntry {
// // Bigger value, cut half the search pages
// end = avg - 1
// } else if entryNum < iterator.fromEntry {
// beg = avg
// // Smaller value but could be inside the page, let's check it
// nextPageEntryNum, err := f.getFirstEntryOnNextPage(iterator)
// if err != nil {
// return err
// }

// // Smaller value committed, cut half the search pages
// if nextPageEntryNum <= iterator.fromEntry {
// beg = avg + 1
// } else {
// // Should be found in this page
// err = f.locateEntry(iterator)
// if err != nil {
// return err
// }
// break
// }
// } else if beg == end {
// // Should be found in this page
// err = f.locateEntry(iterator)
// if err != nil {
// return err
// }
// break
// }
// }

// // Back to the start of the current data page (where the entry number is present)
// // Back to the start of the data entry
// _, err := iterator.file.Seek(-FixedSizeFileEntry, io.SeekCurrent)
// if err != nil {
// log.Errorf("Error seeking page for iterator seek entry: %v", err)
Expand All @@ -770,3 +826,87 @@ func (f *StreamFile) iteratorEnd(iterator *iteratorFile) {
// log.Debugf("Entry number %d is in the data page %d", iterator.fromEntry, avg)
// return nil
// }

// func (f *StreamFile) getFirstEntryOnNextPage(iterator *iteratorFile) (uint64, error) {
// // Current file position
// curpos, err := iterator.file.Seek(0, io.SeekCurrent)
// if err != nil {
// log.Errorf("Error seeking current pos: %v", err)
// return 0, err
// }

// // Check if it is valid the current file position
// if curpos < pageHeaderSize || curpos > int64(f.writtenHead.TotalLength) {
// log.Errorf("Error current file position outside a data page")
// return 0, errors.New("current position outside data page")
// }

// // Seek for the start of next data page
// forward := pageDataSize - (curpos-pageHeaderSize)%pageDataSize
// _, err = iterator.file.Seek(int64(forward), io.SeekCurrent)
// if err != nil {
// log.Errorf("Error seeking next data page: %v", err)
// return 0, err
// }

// // Read fixed data entry bytes
// buffer := make([]byte, FixedSizeFileEntry)
// _, err = iterator.file.Read(buffer)
// if err != nil {
// log.Errorf("Error reading entry: %v", err)
// return 0, err
// }

// // Decode packet type
// if buffer[0] != PtData {
// log.Errorf("Error data page not starting with packet of type data(%d). Type: %d", PtData, buffer[0])
// return 0, errors.New("page not starting with entry data")
// }

// // Decode entry number
// entryNum := binary.BigEndian.Uint64(buffer[9:17])

// // Restore file position
// _, err = iterator.file.Seek(-int64(forward), io.SeekCurrent)
// if err != nil {
// log.Errorf("Error seeking current pos: %v", err)
// return 0, err
// }

// return entryNum, nil
// }

// func (f *StreamFile) locateEntry(iterator *iteratorFile) error {
// // Seek backward to the start of data entry
// _, err := iterator.file.Seek(-FixedSizeFileEntry, io.SeekCurrent)
// if err != nil {
// log.Errorf("Error in file seeking: %v", err)
// return err
// }

// for {
// end, err := f.iteratorNext(iterator)
// if err != nil {
// return err
// }

// // Not found
// if end || iterator.entry.entryNum > iterator.fromEntry {
// log.Errorf("Error can not locate the data entry number: %d", iterator.fromEntry)
// return errors.New("entry not found")
// }

// // Found!
// if iterator.entry.entryNum == iterator.fromEntry {
// // Seek backward to the end of fixed data
// backward := iterator.entry.length - FixedSizeFileEntry
// _, err = iterator.file.Seek(-int64(backward), io.SeekCurrent)
// if err != nil {
// log.Errorf("Error in file seeking: %v", err)
// return err
// }
// break
// }
// }
// return nil
// }
Loading