Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements ShutdownCode option and ShutdownSignal os.Signal wrapper #912

Closed
wants to merge 9 commits into from
128 changes: 105 additions & 23 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ type App struct {
clock fxclock.Clock
lifecycle *lifecycleWrapper

stopch chan struct{} // closed when Stop is called
stopChLock sync.RWMutex // mutex for init and closing of stopch

container *dig.Container
root *module
modules []*module
Expand All @@ -286,10 +289,16 @@ type App struct {
// Decides how we react to errors when building the graph.
errorHooks []ErrorHandler
validate bool

// Used to signal shutdowns.
donesMu sync.Mutex // guards dones and shutdownSig
dones []chan os.Signal
shutdownSig os.Signal
shutdownMu sync.Mutex
shutdownSig *ShutdownSignal
sigReceivers []signalReceiver
signalOnce sync.Once

// Used to make sure Start/Stop is called only once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this to be included in this PR? I'm asking because this stuff was supposed to ship with the previous version (v1.17.2) and it broke some users: see #945 and #950 for context.

runStart sync.Once
runStop sync.Once

osExit func(code int) // os.Exit override; used for testing only
}
Expand Down Expand Up @@ -394,6 +403,7 @@ func New(opts ...Option) *App {
startTimeout: DefaultTimeout,
stopTimeout: DefaultTimeout,
}

app.root = &module{
app: app,
// We start with a logger that writes to stderr. One of the
Expand Down Expand Up @@ -544,27 +554,32 @@ func (app *App) Run() {
// Historically, we do not os.Exit(0) even though most applications
// cede control to Fx with they call app.Run. To avoid a breaking
// change, never os.Exit for success.
if code := app.run(app.Done()); code != 0 {
if code := app.run(app.Wait()); code != 0 {
app.exit(code)
}
}

func (app *App) run(done <-chan os.Signal) (exitCode int) {
func (app *App) run(done <-chan ShutdownSignal) (exitCode int) {
startCtx, cancel := app.clock.WithTimeout(context.Background(), app.StartTimeout())
defer cancel()

if err := app.Start(startCtx); err != nil {
app.closeStopChannel()
return 1
}

sig := <-done
app.log().LogEvent(&fxevent.Stopping{Signal: sig})
app.log().LogEvent(&fxevent.Stopping{Signal: sig.Signal})

stopCtx, cancel := app.clock.WithTimeout(context.Background(), app.StopTimeout())
defer cancel()

if err := app.Stop(stopCtx); err != nil {
return 1
// if we encounter a timeout during stop, force exit code 1
if errors.Is(err, context.DeadlineExceeded) {
return 1
}
return sig.ExitCode
}

return 0
Expand Down Expand Up @@ -605,14 +620,18 @@ var (
// encountered any errors in application initialization.
func (app *App) Start(ctx context.Context) (err error) {
defer func() {
app.log().LogEvent(&fxevent.Started{Err: err})
app.runStart.Do(func() {
app.log().LogEvent(&fxevent.Started{Err: err})
})
Comment on lines +623 to +625
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the LogEvent need to be guarded in the Once? We're allowing multiple actual Start invocations, so they should be able to log multiple times?

}()

if app.err != nil {
// Some provides failed, short-circuit immediately.
return app.err
}

app.initStopChannel()

return withTimeout(ctx, &withTimeoutParams{
hook: _onStartHook,
callback: app.start,
Expand All @@ -638,6 +657,30 @@ func (app *App) start(ctx context.Context) error {
return nil
}

func (app *App) initStopChannel() {
app.stopChLock.Lock()
defer app.stopChLock.Unlock()
if app.stopch == nil {
app.stopch = make(chan struct{})
}
}

func (app *App) stopChannel() chan struct{} {
app.stopChLock.RLock()
defer app.stopChLock.RUnlock()
ch := app.stopch
return ch
}

func (app *App) closeStopChannel() {
app.stopChLock.Lock()
defer app.stopChLock.Unlock()
if app.stopch != nil {
close(app.stopch)
app.stopch = nil
}
}

// Stop gracefully stops the application. It executes any registered OnStop
// hooks in reverse order, so that each constructor's stop hooks are called
// before its dependencies' stop hooks.
Expand All @@ -646,16 +689,23 @@ func (app *App) start(ctx context.Context) error {
// called are executed. However, all those hooks are executed, even if some
// fail.
func (app *App) Stop(ctx context.Context) (err error) {

defer func() {
app.log().LogEvent(&fxevent.Stopped{Err: err})
// Protect the Stop hooks from being called multiple times.
app.runStop.Do(func() {
app.log().LogEvent(&fxevent.Stopped{Err: err})
app.closeStopChannel()
})
Comment on lines +695 to +698
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. I'm not sure we need this guard? If there's a stop channel because of Start, then there should be a defer close.

}()

return withTimeout(ctx, &withTimeoutParams{
err = withTimeout(ctx, &withTimeoutParams{
hook: _onStopHook,
callback: app.lifecycle.Stop,
lifecycle: app.lifecycle,
log: app.log(),
})

return
}

// Done returns a channel of signals to block on after starting the
Expand All @@ -666,21 +716,53 @@ func (app *App) Stop(ctx context.Context) (err error) {
// Alternatively, a signal can be broadcast to all done channels manually by
// using the Shutdown functionality (see the Shutdowner documentation for details).
func (app *App) Done() <-chan os.Signal {
c := make(chan os.Signal, 1)
rcv, ch := newOSSignalReceiver()
app.appendSignalReceiver(rcv)
return ch
}

app.donesMu.Lock()
defer app.donesMu.Unlock()
// If shutdown signal has been received already
// send it and return. If not, wait for user to send a termination
// signal.
if app.shutdownSig != nil {
c <- app.shutdownSig
return c
}
func (app *App) Wait() <-chan ShutdownSignal {
rcv, ch := newShutdownSignalReceiver()
app.appendSignalReceiver(rcv)
return ch
}

func (app *App) appendSignalReceiver(r signalReceiver) {
app.shutdownMu.Lock()
defer app.shutdownMu.Unlock()

signal.Notify(c, os.Interrupt, _sigINT, _sigTERM)
app.dones = append(app.dones, c)
return c
// If shutdown signal has been received already
// send it and return.
// If not, wait for user to send a termination signal.
if sig := app.shutdownSig; sig != nil {
// Ignore the error from ReceiveSignal.
// This is a newly created channel and can't possibly be
// blocked.
_ = r.ReceiveShutdownSignal(*sig)
return
}

app.sigReceivers = append(app.sigReceivers, r)

// The first time either Wait or Done is called,
// register an OS signal handler
// and make that broadcast the signal to all sigReceivers
// regardless of whether they're Wait or Done based.
app.signalOnce.Do(func() {
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, os.Interrupt, _sigINT, _sigTERM)
go func() {
// if the stop channel is nil; that means that the app was never started
// thus, do not broadcast any signals
if stopch := app.stopChannel(); stopch != nil {
select {
case sig := <-sigch:
app.broadcastSignal(sig, 1)
case <-stopch:
}
}
}()
})
}

// StartTimeout returns the configured startup timeout. Apps default to using
Expand Down
5 changes: 2 additions & 3 deletions app_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package fx

import (
"fmt"
"os"
"sync"
"testing"

Expand All @@ -41,7 +40,7 @@ func TestAppRun(t *testing.T) {
app := New(
WithLogger(func() fxevent.Logger { return spy }),
)
done := make(chan os.Signal)
done := make(chan ShutdownSignal)

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -50,7 +49,7 @@ func TestAppRun(t *testing.T) {
app.run(done)
}()

done <- _sigINT
done <- ShutdownSignal{Signal: _sigINT}
wg.Wait()

assert.Equal(t, []string{
Expand Down
2 changes: 1 addition & 1 deletion app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ func TestAppRunTimeout(t *testing.T) {

err, _ := errv.Interface().(error)
assert.ErrorIs(t, err, context.DeadlineExceeded,
"should fail because of a timeout")
"should fail because of a timeout: %v", err)
})
}
}
Expand Down
Loading