forked from allegro/mesos-executor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
command.go
187 lines (158 loc) · 4.91 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// +build !windows
package executor
import (
"errors"
"fmt"
"os"
"os/exec"
"strings"
"syscall"
"time"
log "github.com/sirupsen/logrus"
osutil "github.com/allegro/mesos-executor/os"
"github.com/allegro/mesos-executor/servicelog"
"github.com/allegro/mesos-executor/servicelog/appender"
"github.com/allegro/mesos-executor/servicelog/scraper"
"github.com/mesos/mesos-go/api/v1/lib"
)
// TaskExitState is a type describing reason of program execution interuption.
type TaskExitState struct {
Code TaskExitCode
Err error
}
// TaskExitCode is an enum.
type TaskExitCode int8
const (
// SuccessCode means task exited successfully.
SuccessCode TaskExitCode = iota
// FailedCode means task exited with error.
FailedCode
// KilledCode means task was killed and it's code was ignored.
KilledCode
)
// Command is an interface to abstract command running on a system.
type Command interface {
Start() error
Wait() <-chan TaskExitState
Stop(gracePeriod time.Duration)
}
type cancellableCommand struct {
cmd *exec.Cmd
doneChan chan error
killing bool
}
func (c *cancellableCommand) Start() error {
if c.cmd == nil {
return errors.New("missing command to run")
}
if err := c.cmd.Start(); err != nil {
return err
}
c.doneChan = make(chan error)
go c.waitForCommand()
return nil
}
func (c *cancellableCommand) Wait() <-chan TaskExitState {
exitChan := make(chan TaskExitState)
go func() {
err := <-c.doneChan
log.Infof("Command exited with state: %s", c.cmd.ProcessState.String())
if err == nil && c.cmd.ProcessState.Success() {
exitChan <- TaskExitState{
Code: SuccessCode,
}
return
}
if c.killing {
exitChan <- TaskExitState{
Code: KilledCode,
}
return
}
exitChan <- TaskExitState{
Code: FailedCode,
Err: err,
}
}()
return exitChan
}
func (c *cancellableCommand) waitForCommand() {
err := c.cmd.Wait()
c.doneChan <- err
close(c.doneChan)
}
func (c *cancellableCommand) Stop(gracePeriod time.Duration) {
// Return if Stop was already called.
if c.killing {
return
}
c.killing = true
err := osutil.KillTree(syscall.SIGTERM, int32(c.cmd.Process.Pid))
if err != nil {
log.WithError(err).Errorf("There was a problem with sending %s to %d children", syscall.SIGTERM, c.cmd.Process.Pid)
return
}
<-time.After(gracePeriod)
if err := osutil.KillTree(syscall.SIGKILL, int32(c.cmd.Process.Pid)); err != nil {
log.WithError(err).Warnf("There was a problem with sending %s to %d tree", syscall.SIGKILL, c.cmd.Process.Pid)
return
}
}
// NewCommand returns a new command based on passed CommandInfo.
func NewCommand(commandInfo mesos.CommandInfo, env []string, options ...func(*exec.Cmd) error) (Command, error) {
// TODO(janisz): Implement shell policy
// From: https://github.com/apache/mesos/blob/1.1.3/include/mesos/mesos.proto#L509-L521
// There are two ways to specify the command:
// 1) If 'shell == true', the command will be launched via shell
// (i.e., /bin/sh -c 'value'). The 'value' specified will be
// treated as the shell command. The 'arguments' will be ignored.
// 2) If 'shell == false', the command will be launched by passing
// arguments to an executable. The 'value' specified will be
// treated as the filename of the executable. The 'arguments'
// will be treated as the arguments to the executable. This is
// similar to how POSIX exec families launch processes (i.e.,
// execlp(value, arguments(0), arguments(1), ...)).
cmd := exec.Command("sh", "-c", commandInfo.GetValue()) // #nosec
cmd.Env = append(envWithoutExecutorConfig(), env...)
for _, option := range options {
if err := option(cmd); err != nil {
return nil, fmt.Errorf("invalid config option: %s", err)
}
}
// Set new group for a command
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
return &cancellableCommand{cmd: cmd}, nil
}
// ForwardCmdOutput configures command to forward its output to the system stderr
// and stdout.
func ForwardCmdOutput() func(*exec.Cmd) error {
return func(cmd *exec.Cmd) error {
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
return nil
}
}
// ScrapCmdOutput configures command so itd output will be scraped and forwarded
// by provided log appender.
func ScrapCmdOutput(s scraper.Scraper, a appender.Appender, extenders ...servicelog.Extender) func(*exec.Cmd) error {
return func(cmd *exec.Cmd) error {
entries, writer := scraper.Pipe(s)
entries = servicelog.Extend(entries, extenders...)
cmd.Stderr = writer
cmd.Stdout = writer
go a.Append(entries)
return nil
}
}
// envWithoutExecutorConfig returns os.Environ without executor specific entries.
// Marathon does not support custom executor env and all task env are passed
// as executor env. This means environment are setup before executor startup.
func envWithoutExecutorConfig() (env []string) {
for _, variable := range os.Environ() {
if !strings.HasPrefix(variable, strings.ToUpper(EnvironmentPrefix)) {
env = append(env, variable)
} else {
}
}
return env
}