Skip to content

Commit

Permalink
First release for internal (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpunish3r authored Sep 19, 2023
1 parent 479e8cf commit e574f2e
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 67 deletions.
11 changes: 9 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ func runClient(*cli.Context) error {
return err
}

// Get header status (execute command Header)
// Command header: Get status
err = c.ExecCommand(datastreamer.CmdHeader)
if err != nil {
return err
}

// Start streaming receive (execute command Start)
// Command start: Sync and start streaming receive
if c.Header.TotalEntries > 10 {
c.FromEntry = c.Header.TotalEntries - 10
} else {
Expand All @@ -214,5 +214,12 @@ func runClient(*cli.Context) error {
return err
}

// Command stop: Stop streaming receive
// time.Sleep(10 * time.Second)
// err = c.ExecCommand(datastreamer.CmdStop)
// if err != nil {
// return err
// }

return nil
}
29 changes: 2 additions & 27 deletions datastreamer/entities.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datastreamer

import (
"encoding/binary"
"fmt"
"reflect"
"unsafe"
Expand All @@ -16,39 +15,15 @@ type EntityDefinition struct {
}

func (e EntityDefinition) toString(entity []byte) string {
var bigEndian = (*(*[2]uint8)(unsafe.Pointer(&[]uint16{1}[0])))[0] == 0
obj := reflect.NewAt(e.Definition, unsafe.Pointer(&entity[0]))
val := obj.Elem()
stringValue := ""
accumulatedSize := 0

for i := 0; i < val.NumField(); i++ {
name := val.Type().Field(i).Name
value := ""

if val.Type().Field(i).Type.Kind() == reflect.Uint64 {
if bigEndian {
value = fmt.Sprintf("%v", binary.BigEndian.Uint64(entity))
} else {
value = fmt.Sprintf("%v", binary.LittleEndian.Uint64(entity))
}
accumulatedSize += 8
} else if val.Type().Field(i).Type.Kind() == reflect.Uint32 {
if bigEndian {
value = fmt.Sprintf("%v", binary.BigEndian.Uint32(entity))
} else {
value = fmt.Sprintf("%v", binary.LittleEndian.Uint32(entity))
}
accumulatedSize += 4
} else if val.Type().Field(i).Type.Kind() == reflect.Uint8 {
value = fmt.Sprintf("%v", entity[0])
accumulatedSize += 1
} else if val.Type().Field(i).Type.Kind() == reflect.Slice {
if len(entity) > accumulatedSize {
value = fmt.Sprintf("%v", entity[accumulatedSize:len(entity)-accumulatedSize])
} else {
value = "[slice]"
}
if val.Type().Field(i).Type.Kind() == reflect.Slice {
value = "[slice]"
} else {
value = fmt.Sprintf("%v", val.Field(i).Interface())
}
Expand Down
122 changes: 98 additions & 24 deletions datastreamer/streamclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type StreamClient struct {
streamType StreamType
conn net.Conn
id string // Client id
streaming bool // Is syncing/streaming receive?

FromEntry uint64 // Set starting entry data (for Start command)
Header HeaderEntry // Header info received (from Header command)
Expand All @@ -28,6 +29,7 @@ func NewClient(server string, streamType StreamType) (StreamClient, error) {
server: server,
streamType: streamType,
id: "",
streaming: false,
FromEntry: 0,
}
return c, nil
Expand All @@ -45,22 +47,22 @@ func (c *StreamClient) Start() error {
c.id = c.conn.LocalAddr().String()
log.Infof("%s Connected to server: %s", c.id, c.server)

// Receiving stream
// go c.receivingStreaming()

return nil
}

// func (c *StreamClient) receivingStreaming() {
// defer c.conn.Close()
// c.streamingRead()
// }

func (c *StreamClient) SetEntriesDefinition(entriesDef map[EntryType]EntityDefinition) {
c.entriesDefinition = entriesDef
}

func (c *StreamClient) ExecCommand(cmd Command) error {
log.Infof("%s Executing command %d[%s]...", c.id, cmd, StrCommand[cmd])

// Check valid command
if cmd < CmdStart || cmd > CmdHeader {
log.Errorf("%s Invalid command %d", c.id, cmd)
return errors.New("invalid command")
}

// Send command
err := writeFullUint64(uint64(cmd), c.conn)
if err != nil {
Expand All @@ -84,20 +86,34 @@ func (c *StreamClient) ExecCommand(cmd Command) error {
}
}

if cmd == CmdStop {
c.streaming = false
} else {

// Receive command result
err = c.receiveResult(cmd)
if err != nil {
return err
}

// Manage each command type
err = c.manageCommand(cmd)
if err != nil {
return err
}
}

return nil
}

func (c *StreamClient) receiveResult(cmd Command) error {
// Read server result entry for the command
r, err := c.readResultEntry()
r, err := c.readResultEntry(cmd == CmdUnknown)
if err != nil {
log.Errorf("%s %v", c.id, err)
return err
}
log.Infof("%s Result %d[%s] received for command %d[%s]", c.id, r.errorNum, r.errorStr, cmd, StrCommand[cmd])

// Manage each command type
err = c.manageCommand(cmd)
if err != nil {
return err
}

return nil
}

Expand All @@ -112,9 +128,11 @@ func (c *StreamClient) manageCommand(cmd Command) error {

case CmdStart:
// Streaming receive goroutine
c.streamingRead() // TODO: work / call as goroutine?
c.streaming = true
c.streamingRead() // TODO: goroutine

case CmdStop:
c.streaming = false

default:
return errors.New("unknown command")
Expand All @@ -125,6 +143,11 @@ func (c *StreamClient) manageCommand(cmd Command) error {
func (c *StreamClient) streamingRead() {
defer c.conn.Close()
for {
// Stop receiving stream
if !c.streaming {
return
}

// Wait next data entry streamed
_, err := c.readDataEntry()
if err != nil {
Expand All @@ -136,9 +159,33 @@ func (c *StreamClient) streamingRead() {
func (c *StreamClient) readDataEntry() (FileEntry, error) {
d := FileEntry{}

// Read fixed size fields
buffer := make([]byte, FixedSizeFileEntry)
_, err := io.ReadFull(c.conn, buffer)
// Read packet type
packet := make([]byte, 1)
_, err := io.ReadFull(c.conn, packet)
if err != nil {
if err == io.EOF {
log.Warnf("%s Server close connection", c.id)
} else {
log.Errorf("%s Error reading from server: %v", c.id, err)
}
return d, err
}

// Check packet type
if packet[0] == PtResult {
err = c.receiveResult(CmdUnknown)
if err != nil {
return d, err
}
return d, nil
} else if packet[0] != PtData {
log.Errorf("%s Error expecting data packet type %d and received %d", c.id, PtData, packet[0])
return d, errors.New("error expecting data packet type")
}

// Read the rest of fixed size fields
buffer := make([]byte, FixedSizeFileEntry-1)
_, err = io.ReadFull(c.conn, buffer)
if err != nil {
if err == io.EOF {
log.Warnf("%s Server close connection", c.id)
Expand All @@ -147,6 +194,7 @@ func (c *StreamClient) readDataEntry() (FileEntry, error) {
}
return d, err
}
buffer = append(packet, buffer...)

// Read variable field (data)
length := binary.BigEndian.Uint32(buffer[1:5])
Expand Down Expand Up @@ -230,12 +278,37 @@ func writeFullUint64(value uint64, conn net.Conn) error {
return nil
}

func (c *StreamClient) readResultEntry() (ResultEntry, error) {
func (c *StreamClient) readResultEntry(skipPacket bool) (ResultEntry, error) {
e := ResultEntry{}

// Read fixed size fields
buffer := make([]byte, FixedSizeResultEntry)
_, err := io.ReadFull(c.conn, buffer)
var err error
packet := make([]byte, 1)

// Skip read the packet type because read previosuly in readDataEntry
if skipPacket {
packet[0] = PtResult
} else {
// Read packet type
_, err = io.ReadFull(c.conn, packet)
if err != nil {
if err == io.EOF {
log.Warnf("%s Server close connection", c.id)
} else {
log.Errorf("%s Error reading from server: %v", c.id, err)
}
return e, err
}

// Check packet type
if packet[0] != PtResult {
log.Errorf("%s Error expecting result packet type %d and received %d", c.id, PtResult, packet[0])
return e, errors.New("error expecting result packet type")
}
}

// Read the rest of fixed size fields
buffer := make([]byte, FixedSizeResultEntry-1)
_, err = io.ReadFull(c.conn, buffer)
if err != nil {
if err == io.EOF {
log.Warnf("%s Server close connection", c.id)
Expand All @@ -244,6 +317,7 @@ func (c *StreamClient) readResultEntry() (ResultEntry, error) {
}
return e, err
}
buffer = append(packet, buffer...)

// Read variable field (errStr)
length := binary.BigEndian.Uint32(buffer[1:5])
Expand Down
17 changes: 14 additions & 3 deletions datastreamer/streamfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

var (
magicNumbers = []byte("ZKSTREAM.." + "\x00\x00\x00\x04\x04\x04")
magicNumbers = []byte("polygonDATSTREAM")
)

const (
Expand All @@ -20,8 +20,8 @@ const (
headerSize = 29 // Header data size
pageHeaderSize = 4096 // 4K size header page
pageDataSize = 1024 * 1024 // 1 MB size data page
initPages = 80 // Initial number of data pages
nextPages = 8 // Number of data pages to add when file is full
initPages = 100 // Initial number of data pages
nextPages = 10 // Number of data pages to add when file is full

// Packet types
PtPadding = 0
Expand Down Expand Up @@ -302,6 +302,16 @@ func printHeaderEntry(e HeaderEntry) {
log.Infof("streamType: [%d]", e.streamType)
log.Infof("totalLength: [%d]", e.TotalLength)
log.Infof("totalEntries: [%d]", e.TotalEntries)

var usedPages uint64
if e.TotalLength == 0 {
usedPages = 0
} else if (e.TotalLength-pageHeaderSize)%pageDataSize == 0 {
usedPages = (e.TotalLength - pageHeaderSize) / pageDataSize
} else {
usedPages = (e.TotalLength-pageHeaderSize)/pageDataSize + 1
}
log.Infof("usedDataPages=[%d]", usedPages)
}

// Write the memory header struct into the file header
Expand Down Expand Up @@ -543,6 +553,7 @@ func printStreamFile(f StreamFile) {
log.Infof("pageSize: [%d]", f.pageSize)
log.Infof("streamType: [%d]", f.streamType)
log.Infof("maxLength: [%d]", f.maxLength)
log.Infof("numDataPages=[%d]", (f.maxLength-pageHeaderSize)/pageDataSize)
printHeaderEntry(f.header)
}

Expand Down
Loading

0 comments on commit e574f2e

Please sign in to comment.