Skip to content

Commit

Permalink
chore: tests and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcStdt committed Jul 21, 2024
1 parent 21fa37b commit 4e1c000
Show file tree
Hide file tree
Showing 8 changed files with 461 additions and 154 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ mock:
test:
go test -v ./test

test_clean:
go clean -testcache
go test -v ./test

test-integration:
go test -v ./test/integration

Expand Down
8 changes: 8 additions & 0 deletions examples/minecraft/.scroll/scroll.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ commands:
- bash
- -c
- echo eula=true > eula.txt
restart:
procedures:
- mode: command
data:
- stop
- mode: command
data:
- start
2 changes: 1 addition & 1 deletion internal/core/ports/services_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ScrollServiceInterface interface {
type ProcedureLauchnerInterface interface {
LaunchPlugins() error
RunProcedure(*domain.Procedure, string) (string, *int, error)
Run(cmd string) error
Run(cmd string, runCommandCb func(cmd string) error) error
}

type PluginManagerInterface interface {
Expand Down
22 changes: 11 additions & 11 deletions internal/core/services/process_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,24 @@ func (sc *ProcedureLauncher) LaunchPlugins() error {
return sc.pluginManager.ParseFromScroll(scroll.Plugins, string(sc.scrollService.GetScrollConfigRawYaml()), sc.scrollService.GetCwd())
}

func (sc *ProcedureLauncher) Run(cmd string) error {
// I am unsure if we should support he command mode in the future as it is an antipattern for the scroll architecture, we try to solve stuff with dependencies
func (sc *ProcedureLauncher) Run(cmd string, runCommandCb func(cmd string) error) error {

command, err := sc.scrollService.GetCommand(cmd)
if err != nil {
return err
}

for _, proc := range command.Procedures {

if proc.Mode == "command" {
if proc.Wait != nil {
return errors.New("command mode does not support wait")
}
err = runCommandCb(proc.Data.(string))
return err
}

var err error
var exitCode *int
logger.Log().Debug("Running procedure",
Expand Down Expand Up @@ -193,16 +203,6 @@ func (sc *ProcedureLauncher) RunProcedure(proc *domain.Procedure, cmd string) (s
}
sc.processManager.WriteStdin(process, stdtIn)

case "command":

logger.Log().Debug("Launching stdin process",
zap.String("cwd", processCwd),
zap.String("instructions", proc.Data.(string)),
)

//err := sc.queueManager.Additem(proc.Data.(string), false)
return "", nil, err

case "scroll-switch":

logger.Log().Debug("Launching scroll-switch process",
Expand Down
181 changes: 105 additions & 76 deletions internal/core/services/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (
"go.uber.org/zap"
)

var ErrAlreadyInQueue = fmt.Errorf("command is already in queue")
var ErrCommandNotFound = fmt.Errorf("command not found")
var ErrCommandDoneOnce = fmt.Errorf("command is already done and has run mode once")

type QueueItem struct {
status domain.ScrollLockStatus
changeStatus bool
}

type QueueManager struct {
mu sync.Mutex
runQueueMu sync.Mutex
scrollService ports.ScrollServiceInterface
processLauncher ports.ProcedureLauchnerInterface
commandQueue map[string]*QueueItem
Expand All @@ -41,111 +46,71 @@ func NewQueueManager(
}
}

func (sc *QueueManager) setCommandQueue(commandName string, status domain.ScrollLockStatus, changeStatus bool) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.commandQueue[commandName] = &QueueItem{
status: status,
changeStatus: changeStatus,
}
}

func (sc *QueueManager) workItem(cmd string) error {

queueItem := sc.commandQueue[cmd]
queueItem := sc.GetQueueItem(cmd)
if queueItem == nil {
return fmt.Errorf("command %s not found", cmd)
}

command, err := sc.scrollService.GetCommand(cmd)
if err != nil {
return err
}

changeStatus := queueItem.changeStatus

logger.Log().Debug("Running command",
zap.String("cmd", cmd),
zap.Bool("changeStatus", changeStatus),
)

lock, err := sc.scrollService.GetLock()
if err != nil {
return err
}

sc.setCommandQueue(cmd, domain.ScrollLockStatusWaiting, changeStatus)
if changeStatus {
lock.SetStatus(cmd, domain.ScrollLockStatusWaiting)
}

status := lock.GetStatus(cmd)

//Functions that run once, should be remembered, but should only have waiting status, when the are called explicitly
if command.Run == domain.RunModeOnce {
changeStatus = true
}

//if done and should be done once, skip
if status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce {
sc.setCommandQueue(cmd, domain.ScrollLockStatusDone, changeStatus)
return nil
}

sc.setCommandQueue(cmd, domain.ScrollLockStatusRunning, changeStatus)
if changeStatus {
lock.SetStatus(cmd, domain.ScrollLockStatusRunning)
}

err = sc.processLauncher.Run(cmd)

if err != nil {
sc.setCommandQueue(cmd, domain.ScrollLockStatusError, changeStatus)
return err
}

//restart means we are never done!
if changeStatus && command.Run != domain.RunModeRestart {
lock.SetStatus(cmd, domain.ScrollLockStatusDone)
}
sc.setCommandQueue(cmd, domain.ScrollLockStatusDone, changeStatus)

return nil

return sc.processLauncher.Run(cmd, func(cmd string) error {
return sc.AddItem(cmd, changeStatus)
})
}

func (sc *QueueManager) notify() {
queuedCommands := make([]string, 0)

for cmd, item := range sc.commandQueue {
if item.status != domain.ScrollLockStatusDone {
for cmd, _ := range sc.commandQueue {
if sc.getStatus(cmd) != domain.ScrollLockStatusDone && sc.getStatus(cmd) != domain.ScrollLockStatusError {
queuedCommands = append(queuedCommands, cmd)
}
}

for _, notifier := range sc.notifierChan {
notifier <- queuedCommands
select {
case notifier <- queuedCommands:
// Successfully sent queuedCommands to the notifier channel
default:
// The notifier channel is not ready to receive, handle accordingly
// For example, log a warning or skip this notifier
}
}
}

func (sc *QueueManager) AddItem(cmd string, changeStatus bool) error {
sc.mu.Lock()
defer sc.mu.Unlock()

logger.Log().Debug("Running command",
zap.String("cmd", cmd),
)

_, err := sc.scrollService.GetCommand(cmd)
command, err := sc.scrollService.GetCommand(cmd)

if err != nil {
return err
}

sc.mu.Lock()
defer sc.mu.Unlock()
//Functions that run once, should be remembered, but should only have waiting status, when the are called explicitly
if command.Run == domain.RunModeOnce {
changeStatus = true
}

if value, ok := sc.commandQueue[cmd]; ok {
if value.status != domain.ScrollLockStatusDone {
return fmt.Errorf("command %s is already in queue", cmd)

if value.status != domain.ScrollLockStatusDone && value.status != domain.ScrollLockStatusError {
return ErrAlreadyInQueue
}

if value.status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce {
return ErrCommandDoneOnce
}
}

Expand Down Expand Up @@ -197,10 +162,13 @@ func (sc *QueueManager) Work() {
}

func (sc *QueueManager) RunQueue() {
sc.runQueueMu.Lock()
defer sc.runQueueMu.Unlock()

for cmd, item := range sc.commandQueue {

//if already running, skip
if sc.commandQueue[cmd].status == domain.ScrollLockStatusRunning {
if sc.getStatus(cmd) == domain.ScrollLockStatusRunning {
continue
}

Expand All @@ -214,33 +182,52 @@ func (sc *QueueManager) RunQueue() {
}

//if run Mode is restart, we need to run it again
if (item.status == domain.ScrollLockStatusError || item.status == domain.ScrollLockStatusDone) && command.Run != domain.RunModeRestart {
if (sc.getStatus(cmd) == domain.ScrollLockStatusError || sc.getStatus(cmd) == domain.ScrollLockStatusDone) && command.Run != domain.RunModeRestart {
continue
}

dependencies := command.Needs
dependenciesReady := true
for _, dep := range dependencies {
childItem, ok := sc.commandQueue[dep]
_, ok := sc.commandQueue[dep]
//if item not in queue, add it and
if !ok {
dependenciesReady = false
sc.AddItem(dep, item.changeStatus)
continue
}

if childItem.status != domain.ScrollLockStatusDone {
if sc.getStatus(dep) != domain.ScrollLockStatusDone {
dependenciesReady = false
continue
}
}

if dependenciesReady {
err := sc.workItem(cmd)
if err != nil {
logger.Log().Error("Error running command", zap.String("command", cmd), zap.Error(err))
}
sc.taskDoneChan <- struct{}{}
//we only run one process at a time, this is not optimal, but it is simple
sc.setStatus(cmd, domain.ScrollLockStatusRunning, item.changeStatus)
go func(c string, i *QueueItem) {

err := sc.workItem(c)
if err != nil {
sc.setStatus(c, domain.ScrollLockStatusError, i.changeStatus)
logger.Log().Error("Error running command", zap.String("command", c), zap.Error(err))
sc.taskDoneChan <- struct{}{}
return
}

//restart means we are never done!
if i.changeStatus && command.Run != domain.RunModeRestart {
sc.setStatus(c, domain.ScrollLockStatusDone, true)
} else {
if command.Run == domain.RunModeRestart {
sc.setStatus(c, domain.ScrollLockStatusWaiting, false)
} else {
sc.setStatus(c, domain.ScrollLockStatusDone, false)
}
}
sc.taskDoneChan <- struct{}{}
}(cmd, item)
}
}
}
Expand All @@ -256,7 +243,49 @@ func (sc *QueueManager) WaitUntilEmpty() {
for {
cmds := <-notifier
if len(cmds) == 0 {
// remove notifier
for i, n := range sc.notifierChan {
if n == notifier {
sc.notifierChan = append(sc.notifierChan[:i], sc.notifierChan[i+1:]...)
break
}
}
return
}
}
}

func (sc *QueueManager) GetQueueItem(cmd string) *QueueItem {
sc.mu.Lock()
defer sc.mu.Unlock()

if value, ok := sc.commandQueue[cmd]; ok {
return value
}

return nil
}

func (sc *QueueManager) getStatus(cmd string) domain.ScrollLockStatus {
sc.mu.Lock()
defer sc.mu.Unlock()
if value, ok := sc.commandQueue[cmd]; ok {
return value.status
}
return domain.ScrollLockStatusDone
}

func (sc *QueueManager) setStatus(cmd string, status domain.ScrollLockStatus, writeLock bool) {
sc.mu.Lock()
defer sc.mu.Unlock()
if value, ok := sc.commandQueue[cmd]; ok {
value.status = status
}
if writeLock {
lock, err := sc.scrollService.GetLock()
if err != nil {
return
}
lock.SetStatus(cmd, status)
}
}
8 changes: 4 additions & 4 deletions test/mock/services.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4e1c000

Please sign in to comment.