Skip to content

Commit

Permalink
feat: queue handler
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcStdt committed Jul 21, 2024
1 parent 24bfcef commit a0fa185
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 25 deletions.
3 changes: 2 additions & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ to interact and monitor the Scroll Application`,
processHandler := handler.NewProcessHandler(processManager)
scrollLogHandler := handler.NewScrollLogHandler(scrollService, logManager, processManager)
scrollMetricHandler := handler.NewScrollMetricHandler(scrollService, processMonitor)
queueHandler := handler.NewQueueHandler(queueManager)

var annotationHandler *handler.AnnotationHandler = nil

Expand All @@ -103,7 +104,7 @@ to interact and monitor the Scroll Application`,

websocketHandler := handler.NewWebsocketHandler(authorizer, scrollService, consoleService)

s := web.NewServer(jwksUrl, scrollHandler, scrollLogHandler, scrollMetricHandler, annotationHandler, processHandler, websocketHandler, authorizer, cwd)
s := web.NewServer(jwksUrl, scrollHandler, scrollLogHandler, scrollMetricHandler, annotationHandler, processHandler, queueHandler, websocketHandler, authorizer, cwd)

a := s.Initialize()

Expand Down
5 changes: 5 additions & 0 deletions cmd/server/web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Server struct {
scrollMetricHandler ports.ScrollMetricHandlerInterface
annotationHandler ports.AnnotationHandlerInterface
processHandler ports.ProcessHandlerInterface
queueHandler ports.QueueHandlerInterface
websocketHandler ports.WebsocketHandlerInterface
webdavPath string
}
Expand All @@ -43,6 +44,7 @@ func NewServer(
scrollMetricHandler ports.ScrollMetricHandlerInterface,
annotationHandler ports.AnnotationHandlerInterface,
processHandler ports.ProcessHandlerInterface,
queueHandler ports.QueueHandlerInterface,
websocketHandler ports.WebsocketHandlerInterface,
authorizerService ports.AuthorizerServiceInterface,
webdavPath string,
Expand All @@ -59,6 +61,7 @@ func NewServer(
scrollMetricHandler: scrollMetricHandler,
annotationHandler: annotationHandler,
processHandler: processHandler,
queueHandler: queueHandler,
websocketHandler: websocketHandler,
tokenAuthenticationMiddleware: middlewares.TokenAuthentication(authorizerService),
webdavPath: webdavPath,
Expand Down Expand Up @@ -132,6 +135,8 @@ func (s *Server) SetAPI(app *fiber.App) *fiber.App {
//Processes Group
apiRoutes.Get("/processes", s.processHandler.Processes).Name("processes.list")

apiRoutes.Get("/queue", s.queueHandler.Queue).Name("queue.list")

//Websocket Group
apiRoutes.Get("/consoles", s.websocketHandler.Consoles).Name("consoles.list")

Expand Down
43 changes: 43 additions & 0 deletions docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,34 @@ const docTemplate = `{
}
}
},
"/api/v1/queue": {
"get": {
"consumes": [
"*/*"
],
"produces": [
"application/json"
],
"tags": [
"queue",
"druid",
"daemon"
],
"summary": "Get current scroll",
"operationId": "getQueue",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/domain.ScrollLockStatus"
}
}
}
}
}
},
"/api/v1/scroll": {
"get": {
"consumes": [
Expand Down Expand Up @@ -601,6 +629,21 @@ const docTemplate = `{
"RunModeRestart"
]
},
"domain.ScrollLockStatus": {
"type": "string",
"enum": [
"running",
"done",
"error",
"waiting"
],
"x-enum-varnames": [
"ScrollLockStatusRunning",
"ScrollLockStatusDone",
"ScrollLockStatusError",
"ScrollLockStatusWaiting"
]
},
"handler.ProcessesBody": {
"type": "object",
"properties": {
Expand Down
43 changes: 43 additions & 0 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,34 @@
}
}
},
"/api/v1/queue": {
"get": {
"consumes": [
"*/*"
],
"produces": [
"application/json"
],
"tags": [
"queue",
"druid",
"daemon"
],
"summary": "Get current scroll",
"operationId": "getQueue",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/domain.ScrollLockStatus"
}
}
}
}
}
},
"/api/v1/scroll": {
"get": {
"consumes": [
Expand Down Expand Up @@ -590,6 +618,21 @@
"RunModeRestart"
]
},
"domain.ScrollLockStatus": {
"type": "string",
"enum": [
"running",
"done",
"error",
"waiting"
],
"x-enum-varnames": [
"ScrollLockStatusRunning",
"ScrollLockStatusDone",
"ScrollLockStatusError",
"ScrollLockStatusWaiting"
]
},
"handler.ProcessesBody": {
"type": "object",
"properties": {
Expand Down
31 changes: 31 additions & 0 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ definitions:
- RunModeAlways
- RunModeOnce
- RunModeRestart
domain.ScrollLockStatus:
enum:
- running
- done
- error
- waiting
type: string
x-enum-varnames:
- ScrollLockStatusRunning
- ScrollLockStatusDone
- ScrollLockStatusError
- ScrollLockStatusWaiting
handler.ProcessesBody:
properties:
processes:
Expand Down Expand Up @@ -385,6 +397,25 @@ paths:
- metrics
- druid
- daemon
/api/v1/queue:
get:
consumes:
- '*/*'
operationId: getQueue
produces:
- application/json
responses:
"200":
description: OK
schema:
additionalProperties:
$ref: '#/definitions/domain.ScrollLockStatus'
type: object
summary: Get current scroll
tags:
- queue
- druid
- daemon
/api/v1/scroll:
get:
consumes:
Expand Down
6 changes: 6 additions & 0 deletions internal/core/domain/queue_item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package domain

type QueueItem struct {
Status ScrollLockStatus
ChangeStatus bool
}
4 changes: 4 additions & 0 deletions internal/core/ports/handler_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ type WebsocketHandlerInterface interface {
type ProcessHandlerInterface interface {
Processes(c *fiber.Ctx) error
}

type QueueHandlerInterface interface {
Queue(c *fiber.Ctx) error
}
1 change: 1 addition & 0 deletions internal/core/ports/services_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,5 @@ type CronManagerInterface interface {

type QueueManagerInterface interface {
AddItem(cmd string, changeStatus bool) error
GetQueue() map[string]domain.ScrollLockStatus
}
54 changes: 30 additions & 24 deletions internal/core/services/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@ var ErrAlreadyInQueue = fmt.Errorf("command is already in queue")
var ErrCommandNotFound = fmt.Errorf("command not found")
var ErrCommandDoneOnce = fmt.Errorf("command is already done and has run mode once")

type QueueItem struct {
status domain.ScrollLockStatus
changeStatus bool
}

type QueueManager struct {
mu sync.Mutex
runQueueMu sync.Mutex
scrollService ports.ScrollServiceInterface
processLauncher ports.ProcedureLauchnerInterface
commandQueue map[string]*QueueItem
commandQueue map[string]*domain.QueueItem
taskChan chan string
taskDoneChan chan struct{}
shutdownChan chan struct{}
Expand All @@ -39,7 +34,7 @@ func NewQueueManager(
return &QueueManager{
scrollService: scrollService,
processLauncher: processLauncher,
commandQueue: make(map[string]*QueueItem),
commandQueue: make(map[string]*domain.QueueItem),
taskChan: make(chan string),
taskDoneChan: make(chan struct{}),
shutdownChan: make(chan struct{}),
Expand All @@ -53,7 +48,7 @@ func (sc *QueueManager) workItem(cmd string) error {
if queueItem == nil {
return fmt.Errorf("command %s not found", cmd)
}
changeStatus := queueItem.changeStatus
changeStatus := queueItem.ChangeStatus

logger.Log().Debug("Running command",
zap.String("cmd", cmd),
Expand Down Expand Up @@ -106,18 +101,18 @@ func (sc *QueueManager) AddItem(cmd string, changeStatus bool) error {

if value, ok := sc.commandQueue[cmd]; ok {

if value.status != domain.ScrollLockStatusDone && value.status != domain.ScrollLockStatusError {
if value.Status != domain.ScrollLockStatusDone && value.Status != domain.ScrollLockStatusError {
return ErrAlreadyInQueue
}

if value.status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce {
if value.Status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce {
return ErrCommandDoneOnce
}
}

sc.commandQueue[cmd] = &QueueItem{
status: domain.ScrollLockStatusWaiting,
changeStatus: changeStatus,
sc.commandQueue[cmd] = &domain.QueueItem{
Status: domain.ScrollLockStatusWaiting,
ChangeStatus: changeStatus,
}
sc.taskChan <- cmd

Expand Down Expand Up @@ -149,9 +144,9 @@ func (sc *QueueManager) QueueLockFile() error {
}
}

sc.commandQueue[cmd] = &QueueItem{
status: status,
changeStatus: true,
sc.commandQueue[cmd] = &domain.QueueItem{
Status: status,
ChangeStatus: true,
}
}

Expand Down Expand Up @@ -211,7 +206,7 @@ func (sc *QueueManager) RunQueue() {
//if item not in queue, add it and
if !ok {
dependenciesReady = false
sc.AddItem(dep, item.changeStatus)
sc.AddItem(dep, item.ChangeStatus)
continue
}

Expand All @@ -223,19 +218,19 @@ func (sc *QueueManager) RunQueue() {

if dependenciesReady {
//we only run one process at a time, this is not optimal, but it is simple
sc.setStatus(cmd, domain.ScrollLockStatusRunning, item.changeStatus)
go func(c string, i *QueueItem) {
sc.setStatus(cmd, domain.ScrollLockStatusRunning, item.ChangeStatus)
go func(c string, i *domain.QueueItem) {

err := sc.workItem(c)
if err != nil {
sc.setStatus(c, domain.ScrollLockStatusError, i.changeStatus)
sc.setStatus(c, domain.ScrollLockStatusError, i.ChangeStatus)
logger.Log().Error("Error running command", zap.String("command", c), zap.Error(err))
sc.taskDoneChan <- struct{}{}
return
}

//restart means we are never done!
if i.changeStatus && command.Run != domain.RunModeRestart {
if i.ChangeStatus && command.Run != domain.RunModeRestart {
sc.setStatus(c, domain.ScrollLockStatusDone, true)
} else {
if command.Run == domain.RunModeRestart {
Expand Down Expand Up @@ -273,7 +268,7 @@ func (sc *QueueManager) WaitUntilEmpty() {
}
}

func (sc *QueueManager) GetQueueItem(cmd string) *QueueItem {
func (sc *QueueManager) GetQueueItem(cmd string) *domain.QueueItem {
sc.mu.Lock()
defer sc.mu.Unlock()

Expand All @@ -288,7 +283,7 @@ func (sc *QueueManager) getStatus(cmd string) domain.ScrollLockStatus {
sc.mu.Lock()
defer sc.mu.Unlock()
if value, ok := sc.commandQueue[cmd]; ok {
return value.status
return value.Status
}
return domain.ScrollLockStatusDone
}
Expand All @@ -297,7 +292,7 @@ func (sc *QueueManager) setStatus(cmd string, status domain.ScrollLockStatus, wr
sc.mu.Lock()
defer sc.mu.Unlock()
if value, ok := sc.commandQueue[cmd]; ok {
value.status = status
value.Status = status
}
if writeLock {
lock, err := sc.scrollService.GetLock()
Expand All @@ -307,3 +302,14 @@ func (sc *QueueManager) setStatus(cmd string, status domain.ScrollLockStatus, wr
lock.SetStatus(cmd, status)
}
}

func (sc *QueueManager) GetQueue() map[string]domain.ScrollLockStatus {
sc.mu.Lock()
defer sc.mu.Unlock()

queue := make(map[string]domain.ScrollLockStatus)
for cmd, item := range sc.commandQueue {
queue[cmd] = item.Status
}
return queue
}
Loading

0 comments on commit a0fa185

Please sign in to comment.