Skip to content

Commit

Permalink
multi: Make GUI run blocking and wait for shutdown.
Browse files Browse the repository at this point in the history
This updates the gui.Run method to block until the context is canceled
as expected and then adds a waitgroup across both it and the hub to
allow for clean shutdown of both subsystems prior to shutdown.

It also moves the initial connection and work acquisition prior to
launching the subsystems to avoid launching them at all in the event of
an error that would prevent the hub from initializing.
  • Loading branch information
davecgh committed Sep 14, 2023
1 parent 7b1fff0 commit 4315c33
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 92 deletions.
53 changes: 26 additions & 27 deletions dcrpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"

"github.com/decred/dcrd/rpcclient/v8"
"github.com/decred/dcrpool/internal/gui"
Expand Down Expand Up @@ -188,45 +189,43 @@ func realMain() error {
}
}()

// Create a hub and GUI instance.
// Create a hub instance and attempt to perform initial connection and work
// acquisition.
hub, err := newHub(cfg, db, cancel)
if err != nil {
mpLog.Errorf("unable to initialize hub: %v", err)
return err
}
if err := hub.Connect(ctx); err != nil {
mpLog.Errorf("unable to establish node connections: %v", err)
return err
}
if err := hub.FetchWork(ctx); err != nil {
mpLog.Errorf("unable to get work from consensus daemon: %v", err)
return err
}

// Create a gui instance.
gui, err := newGUI(cfg, hub)
if err != nil {
mpLog.Errorf("unable to initialize GUI: %v", err)
return err
}

// Run the GUI in the background.
go gui.Run(ctx)

// Run the hub. This will block until the context is cancelled.
runHub := func(ctx context.Context, h *pool.Hub) error {
// Ideally these would go into hub.Run, but the tests don't work
// properly with this code there due to their tight coupling.
if err := h.Connect(ctx); err != nil {
return fmt.Errorf("unable to establish node connections: %w", err)
}

if err := h.FetchWork(ctx); err != nil {
return fmt.Errorf("unable to get work from consensus daemon: %w", err)
}

h.Run(ctx)
return nil
}
if err := runHub(ctx, hub); err != nil {
// Ensure the GUI is signaled to shutdown.
cancel()
mpLog.Errorf("unable to run pool hub: %v", err)
return err
}
// Run the GUI and hub in the background.
var wg sync.WaitGroup
wg.Add(2)
go func() {
hub.Run(ctx)
wg.Done()
}()
go func() {
gui.Run(ctx)
wg.Done()
}()
wg.Wait()

// hub.Run() blocks until the pool is fully shut down. When it returns,
// write a backup of the DB (if not using postgres), and then close the DB.
// Write a backup of the DB (if not using postgres) once the hub shuts down.
if !cfg.UsePostgres {
mpLog.Info("Backing up database.")
err = db.Backup(pool.BoltBackupFile)
Expand Down
128 changes: 63 additions & 65 deletions internal/gui/gui.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,81 +345,79 @@ func (ui *GUI) Run(ctx context.Context) {

// Use a ticker to periodically update cached data and push updates through
// any established websockets
go func(ctx context.Context) {
signalCh := ui.cfg.FetchCacheChannel()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
hashData, err := ui.cfg.FetchHashData()
signalCh := ui.cfg.FetchCacheChannel()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
hashData, err := ui.cfg.FetchHashData()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateHashData(hashData)
ui.websocketServer.send(payload{
PoolHashRate: ui.cache.getPoolHash(),
})

case msg := <-signalCh:
switch msg {
case pool.Confirmed, pool.Unconfirmed:
work, err := ui.cfg.FetchMinedWork()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateHashData(hashData)
ui.cache.updateMinedWork(work)
ui.websocketServer.send(payload{
PoolHashRate: ui.cache.getPoolHash(),
LastWorkHeight: ui.cfg.FetchLastWorkHeight(),
})

case msg := <-signalCh:
switch msg {
case pool.Confirmed, pool.Unconfirmed:
work, err := ui.cfg.FetchMinedWork()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateMinedWork(work)
ui.websocketServer.send(payload{
LastWorkHeight: ui.cfg.FetchLastWorkHeight(),
})

case pool.ClaimedShare:
quotas, err := ui.cfg.FetchWorkQuotas()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateRewardQuotas(quotas)

case pool.DividendsPaid:
pendingPayments, err := ui.cfg.FetchPendingPayments()
if err != nil {
log.Error(err)
continue
}

archivedPayments, err := ui.cfg.FetchArchivedPayments()
if err != nil {
log.Error(err)
continue
}

ui.cache.updatePayments(pendingPayments, archivedPayments)

lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn, err := ui.cfg.FetchLastPaymentInfo()
if err != nil {
log.Error(err)
continue
}
ui.cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn)

ui.websocketServer.send(payload{
LastPaymentHeight: lastPmtHeight,
})

default:
log.Errorf("unknown cache signal received: %v", msg)
case pool.ClaimedShare:
quotas, err := ui.cfg.FetchWorkQuotas()
if err != nil {
log.Error(err)
continue
}

case <-ctx.Done():
return
ui.cache.updateRewardQuotas(quotas)

case pool.DividendsPaid:
pendingPayments, err := ui.cfg.FetchPendingPayments()
if err != nil {
log.Error(err)
continue
}

archivedPayments, err := ui.cfg.FetchArchivedPayments()
if err != nil {
log.Error(err)
continue
}

ui.cache.updatePayments(pendingPayments, archivedPayments)

lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn, err := ui.cfg.FetchLastPaymentInfo()
if err != nil {
log.Error(err)
continue
}
ui.cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn)

ui.websocketServer.send(payload{
LastPaymentHeight: lastPmtHeight,
})

default:
log.Errorf("unknown cache signal received: %v", msg)
}

case <-ctx.Done():
return
}
}(ctx)
}
}

0 comments on commit 4315c33

Please sign in to comment.