diff --git a/dcrpool.go b/dcrpool.go index 00dae4e2..c5980c63 100644 --- a/dcrpool.go +++ b/dcrpool.go @@ -16,6 +16,7 @@ import ( "path/filepath" "runtime" "strings" + "sync" "github.com/decred/dcrd/rpcclient/v8" "github.com/decred/dcrpool/internal/gui" @@ -188,45 +189,43 @@ func realMain() error { } }() - // Create a hub and GUI instance. + // Create a hub instance and attempt to perform initial connection and work + // acquisition. hub, err := newHub(cfg, db, cancel) if err != nil { mpLog.Errorf("unable to initialize hub: %v", err) return err } + if err := hub.Connect(ctx); err != nil { + mpLog.Errorf("unable to establish node connections: %v", err) + return err + } + if err := hub.FetchWork(ctx); err != nil { + mpLog.Errorf("unable to get work from consensus daemon: %v", err) + return err + } + + // Create a gui instance. gui, err := newGUI(cfg, hub) if err != nil { mpLog.Errorf("unable to initialize GUI: %v", err) return err } - // Run the GUI in the background. - go gui.Run(ctx) - - // Run the hub. This will block until the context is cancelled. - runHub := func(ctx context.Context, h *pool.Hub) error { - // Ideally these would go into hub.Run, but the tests don't work - // properly with this code there due to their tight coupling. - if err := h.Connect(ctx); err != nil { - return fmt.Errorf("unable to establish node connections: %w", err) - } - - if err := h.FetchWork(ctx); err != nil { - return fmt.Errorf("unable to get work from consensus daemon: %w", err) - } - - h.Run(ctx) - return nil - } - if err := runHub(ctx, hub); err != nil { - // Ensure the GUI is signaled to shutdown. - cancel() - mpLog.Errorf("unable to run pool hub: %v", err) - return err - } + // Run the GUI and hub in the background. + var wg sync.WaitGroup + wg.Add(2) + go func() { + hub.Run(ctx) + wg.Done() + }() + go func() { + gui.Run(ctx) + wg.Done() + }() + wg.Wait() - // hub.Run() blocks until the pool is fully shut down. When it returns, - // write a backup of the DB (if not using postgres), and then close the DB. + // Write a backup of the DB (if not using postgres) once the hub shuts down. if !cfg.UsePostgres { mpLog.Info("Backing up database.") err = db.Backup(pool.BoltBackupFile) diff --git a/internal/gui/gui.go b/internal/gui/gui.go index aeeaee82..591f3e0c 100644 --- a/internal/gui/gui.go +++ b/internal/gui/gui.go @@ -345,81 +345,79 @@ func (ui *GUI) Run(ctx context.Context) { // Use a ticker to periodically update cached data and push updates through // any established websockets - go func(ctx context.Context) { - signalCh := ui.cfg.FetchCacheChannel() - ticker := time.NewTicker(15 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - hashData, err := ui.cfg.FetchHashData() + signalCh := ui.cfg.FetchCacheChannel() + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + hashData, err := ui.cfg.FetchHashData() + if err != nil { + log.Error(err) + continue + } + + ui.cache.updateHashData(hashData) + ui.websocketServer.send(payload{ + PoolHashRate: ui.cache.getPoolHash(), + }) + + case msg := <-signalCh: + switch msg { + case pool.Confirmed, pool.Unconfirmed: + work, err := ui.cfg.FetchMinedWork() if err != nil { log.Error(err) continue } - ui.cache.updateHashData(hashData) + ui.cache.updateMinedWork(work) ui.websocketServer.send(payload{ - PoolHashRate: ui.cache.getPoolHash(), + LastWorkHeight: ui.cfg.FetchLastWorkHeight(), }) - case msg := <-signalCh: - switch msg { - case pool.Confirmed, pool.Unconfirmed: - work, err := ui.cfg.FetchMinedWork() - if err != nil { - log.Error(err) - continue - } - - ui.cache.updateMinedWork(work) - ui.websocketServer.send(payload{ - LastWorkHeight: ui.cfg.FetchLastWorkHeight(), - }) - - case pool.ClaimedShare: - quotas, err := ui.cfg.FetchWorkQuotas() - if err != nil { - log.Error(err) - continue - } - - ui.cache.updateRewardQuotas(quotas) - - case pool.DividendsPaid: - pendingPayments, err := ui.cfg.FetchPendingPayments() - if err != nil { - log.Error(err) - continue - } - - archivedPayments, err := ui.cfg.FetchArchivedPayments() - if err != nil { - log.Error(err) - continue - } - - ui.cache.updatePayments(pendingPayments, archivedPayments) - - lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn, err := ui.cfg.FetchLastPaymentInfo() - if err != nil { - log.Error(err) - continue - } - ui.cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn) - - ui.websocketServer.send(payload{ - LastPaymentHeight: lastPmtHeight, - }) - - default: - log.Errorf("unknown cache signal received: %v", msg) + case pool.ClaimedShare: + quotas, err := ui.cfg.FetchWorkQuotas() + if err != nil { + log.Error(err) + continue } - case <-ctx.Done(): - return + ui.cache.updateRewardQuotas(quotas) + + case pool.DividendsPaid: + pendingPayments, err := ui.cfg.FetchPendingPayments() + if err != nil { + log.Error(err) + continue + } + + archivedPayments, err := ui.cfg.FetchArchivedPayments() + if err != nil { + log.Error(err) + continue + } + + ui.cache.updatePayments(pendingPayments, archivedPayments) + + lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn, err := ui.cfg.FetchLastPaymentInfo() + if err != nil { + log.Error(err) + continue + } + ui.cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn) + + ui.websocketServer.send(payload{ + LastPaymentHeight: lastPmtHeight, + }) + + default: + log.Errorf("unknown cache signal received: %v", msg) } + + case <-ctx.Done(): + return } - }(ctx) + } }