Skip to content

Commit

Permalink
Queue up reads from bluetooth devices.
Browse files Browse the repository at this point in the history
  • Loading branch information
xperimental committed Jul 1, 2019
1 parent ab8b9cc commit 6dd209d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 14 deletions.
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ func main() {
cancel()
}()

reader := newQueuedDataReader()
reader.Run(ctx, wg)

for _, s := range config.Sensors {
log.Infof("Sensor: %s", s)

reader := newDataReader(s.MacAddress, config.Device)
reader := reader.ReadFunc(s.MacAddress, config.Device)
collector := newCollector(reader, config.RefreshDuration, s)

if err := prometheus.Register(collector); err != nil {
Expand Down
100 changes: 87 additions & 13 deletions reader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/barnybug/miflora"
Expand All @@ -13,24 +16,95 @@ type sensorData struct {
Sensors miflora.Sensors
}

func newDataReader(macAddress, device string) func() (sensorData, error) {
func readData(macAddress, device string) (sensorData, error) {
f := miflora.NewMiflora(macAddress, device)

firmware, err := f.ReadFirmware()
if err != nil {
return sensorData{}, fmt.Errorf("can not read firmware: %s", err)
}

sensors, err := f.ReadSensors()
if err != nil {
return sensorData{}, fmt.Errorf("can not read sensors: %s", err)
}

return sensorData{
Time: time.Now(),
Firmware: firmware,
Sensors: sensors,
}, nil
}

type query struct {
MacAddress string
Device string
Result chan queryResult
}

type queryResult struct {
Data sensorData
Err error
}

type queuedReader struct {
shutdown bool
queryCh chan query
}

func newQueuedDataReader() *queuedReader {
return &queuedReader{
queryCh: make(chan query, 1),
}
}

func (r *queuedReader) Run(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)

go func() {
defer wg.Done()
defer log.Debug("Shutdown reader loop.")

log.Debug("Starting reader loop.")
for {
select {
case <-ctx.Done():
r.shutdown = true
return
case q := <-r.queryCh:
log.Debugf("Reading data for %q on %q", q.MacAddress, q.Device)
data, err := readData(q.MacAddress, q.Device)

q.Result <- queryResult{
Data: data,
Err: err,
}
close(q.Result)
}
}
}()
}

func (r *queuedReader) ReadFunc(macAddress, device string) func() (sensorData, error) {
log.Debugf("Creating reader for %q on %q", macAddress, device)
return func() (sensorData, error) {
f := miflora.NewMiflora(macAddress, device)
if r.shutdown {
return sensorData{}, errors.New("reader shut down")
}

firmware, err := f.ReadFirmware()
if err != nil {
return sensorData{}, fmt.Errorf("can not read firmware: %s", err)
q := query{
MacAddress: macAddress,
Device: device,
Result: make(chan queryResult),
}

sensors, err := f.ReadSensors()
if err != nil {
return sensorData{}, fmt.Errorf("can not read sensors: %s", err)
r.queryCh <- q

res, ok := <-q.Result
if !ok {
return sensorData{}, errors.New("channel closed")
}

return sensorData{
Time: time.Now(),
Firmware: firmware,
Sensors: sensors,
}, nil
return res.Data, res.Err
}
}

0 comments on commit 6dd209d

Please sign in to comment.