From b547e01817c6a09e68c58cc6022ab5969bb09e25 Mon Sep 17 00:00:00 2001 From: Elijah Seed-Arita Date: Tue, 19 Sep 2023 09:12:50 -1000 Subject: [PATCH] fix: proper shutdown --- cmd/motion/main.go | 6 ++++++ integration/singularity/store.go | 30 ++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/cmd/motion/main.go b/cmd/motion/main.go index 9021cba..14a3f4d 100644 --- a/cmd/motion/main.go +++ b/cmd/motion/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "os/signal" @@ -217,6 +218,11 @@ func main() { logger.Errorw("Failed to start Singularity blob store", "err", err) return err } + defer func() { + if err := singularityStore.Shutdown(context.Background()); err != nil { + logger.Errorw("Failed to shut down Singularity blob store", "err", err) + } + }() store = singularityStore } else { store = blob.NewLocalStore(storeDir) diff --git a/integration/singularity/store.go b/integration/singularity/store.go index de7b376..b476106 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -9,6 +9,7 @@ import ( "path" "strconv" "strings" + "sync" "time" "github.com/data-preservation-programs/singularity/client/swagger/http/deal_schedule" @@ -35,7 +36,7 @@ type SingularityStore struct { sourceName string toPack chan uint64 closing chan struct{} - closed chan struct{} + closed sync.WaitGroup } func NewStore(o ...Option) (*SingularityStore, error) { @@ -50,7 +51,6 @@ func NewStore(o ...Option) (*SingularityStore, error) { sourceName: "source", toPack: make(chan uint64, 16), closing: make(chan struct{}), - closed: make(chan struct{}), }, nil } @@ -247,14 +247,16 @@ func (l *SingularityStore) Start(ctx context.Context) error { } func (l *SingularityStore) runPreparationJobs() { + l.closed.Add(1) + ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + l.closed.Done() + }() go func() { - defer func() { - close(l.closed) - }() for { select { case <-ctx.Done(): @@ -281,15 +283,24 @@ func (l *SingularityStore) runPreparationJobs() { } } }() - <-l.closing } func (l *SingularityStore) Shutdown(ctx context.Context) error { close(l.closing) + + done := make(chan struct{}) + go func() { + l.closed.Wait() + close(done) + }() + select { case <-ctx.Done(): - case <-l.closed: + case <-done: } + + logger.Infof("Singularity store shut down") + return nil } @@ -433,6 +444,9 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc } func (s *SingularityStore) runCleanupWorker(ctx context.Context) { + s.closed.Add(1) + defer s.closed.Done() + // Run immediately once before starting ticker s.runCleanupJob()