From cfee43c5169387a4f7d8a6ae40fc71f01914d682 Mon Sep 17 00:00:00 2001 From: Mesut GENEZ Date: Mon, 8 Jul 2024 11:16:32 +0300 Subject: [PATCH] scheduler --- cmd/main.go | 5 +- models/schedule.go | 4 +- schedule/scheduler.go | 141 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 138 insertions(+), 12 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 0d15f6d..6921a1d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -80,7 +80,7 @@ func Catch(h HttpHandler) http.HandlerFunc { func main() { // test mail with attach - //err := config.App().Mail.SetSubject("cron").SetContent("mail geldi mi?").SetTo("mesutgenez@hotmail.com").SetAttachment(map[string][]byte{"query.sql": []byte("1. dosya içeriği"), "query2.sql": []byte("2. dosya içeriği")}).SendText() + //err := config.App().Mail.SetSubject("cron").SetContent("test mail").SetTo("mesutgenez@hotmail.com").SetAttachment(map[string][]byte{"query.sql": []byte("1. folder content"), "query2.sql": []byte("2. folder content")}).SendText() // Scheduler Call schedule.CallSchedule(config.App().Cron) @@ -290,10 +290,9 @@ func main() { time.Sleep(time.Second * 5) } - config.App().Log.Info("Shutting down gracefully...") - config.App().Cron.Stop() config.App().DB.CloseDatabase() + config.App().Log.Info("Shutting down gracefully...") } func webAuthMiddleware(next http.Handler) http.Handler { diff --git a/models/schedule.go b/models/schedule.go index a8a8165..8dcbed8 100644 --- a/models/schedule.go +++ b/models/schedule.go @@ -24,7 +24,7 @@ type Schedule struct { Group *Group `json:"group,omitempty"` Request *Request `json:"request,omitempty"` Notification *Notification `json:"notification,omitempty"` - Webhook []*Webhook `json:"webhook,omitempty"` + Webhooks []*Webhook `json:"webhooks,omitempty"` CreatedAt *time.Time `json:"created_at,omitempty"` UpdatedAt *time.Time `json:"updated_at,omitempty"` DeletedAt *time.Time `json:"deleted_at,omitempty"` @@ -375,7 +375,7 @@ func (m *Schedule) queryPrepare(query string) []*Schedule { if err := json.Unmarshal([]byte(webhookJson), &webhooks); err != nil { return schedules } - schedule.Webhook = webhooks + schedule.Webhooks = webhooks schedules = append(schedules, schedule) } diff --git a/schedule/scheduler.go b/schedule/scheduler.go index f31c691..c806f70 100644 --- a/schedule/scheduler.go +++ b/schedule/scheduler.go @@ -1,11 +1,16 @@ package schedule import ( + "fmt" + "io" "log" + "net/http" + "strings" "time" _ "time/tzdata" "github.com/mstgnz/cronjob/config" + "github.com/mstgnz/cronjob/models" "github.com/robfig/cron/v3" ) @@ -18,14 +23,136 @@ func CallSchedule(c *cron.Cron) { cron.WithLocation(loc) - // Alotech Call - every night at 23:59 - if _, err = c.AddFunc("* * * * *", func() { - config.ShuttingWrapper(func() { - log.Println("running schedulr every 1 minute") - }) + schedule := &models.Schedule{} + schedules := schedule.WithQueryAll() - }); err != nil { - log.Println("AddFunc AlotechCall", err) + scheduleMap := make(map[int]cron.EntryID) + AddSchedules(c, schedules, scheduleMap) + + // Check for new schedules every minute + c.AddFunc("@every 1m", func() { + newSchedules := schedule.WithQueryAll() + AddSchedules(c, newSchedules, scheduleMap) + }) +} + +func AddSchedules(c *cron.Cron, schedules []*models.Schedule, scheduleMap map[int]cron.EntryID) { + scheduleLog := &models.ScheduleLog{} + for _, schedule := range schedules { + if !schedule.Active { + continue + } + if _, exists := scheduleMap[schedule.ID]; !exists { + id, err := c.AddFunc(schedule.Timing, func() { + defer func() { + if r := recover(); r != nil { + config.App().Log.Warn("Recovered from panic in schedule", fmt.Sprintf("%v", r)) + } + }() + + startAt := time.Now() + + client := &http.Client{ + Timeout: time.Duration(schedule.Timeout) * time.Second, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + req, err := http.NewRequest(schedule.Request.Method, schedule.Request.Url, strings.NewReader(string(schedule.Request.Content))) + if err != nil { + config.App().Log.Warn("Schedule Request Error", err.Error()) + return + } + + for _, header := range schedule.Request.RequestHeaders { + req.Header.Set(header.Key, header.Value) + } + + scheduleUpdate(schedule, true) + var resp *http.Response + for retries := 0; retries < schedule.Retries; retries++ { + resp, err = client.Do(req) + if err == nil { + break + } + config.App().Log.Warn("Schedule Do Error, retrying", fmt.Sprintf("Attempt %d/%d: %v", retries+1, schedule.Retries, err.Error())) + time.Sleep(1 * time.Second) + } + scheduleUpdate(schedule, false) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + config.App().Log.Warn("Schedule Body Error", err.Error()) + return + } + notification(schedule, body) + + finishAt := time.Now() + scheduleLog.StartedAt = &startAt + scheduleLog.FinishedAt = &finishAt + scheduleLog.Took = float32(finishAt.Sub(startAt).Seconds()) + scheduleLog.Result = string(body) + scheduleLog.Create(schedule.ID) + + webhooks(schedule) + }) + if err != nil { + config.App().Log.Warn("Schedule Error", err.Error()) + } else { + scheduleMap[schedule.ID] = id + } + } } +} +func scheduleUpdate(schedule *models.Schedule, running bool) { + query := "UPDATE schedules SET running=$1 WHERE id=$2" + err := schedule.Update(query, []any{running, schedule.ID}) + if err != nil { + config.App().Log.Warn("Schedule Update Error", err.Error()) + } +} + +func notification(schedule *models.Schedule, body []byte) { + if schedule.Notification.IsMail { + for _, mail := range schedule.Notification.NotifyEmails { + err := config.App().Mail.SetSubject(schedule.Timing + " is running").SetContent(string(body)).SetTo(mail.Email).SendText() + if err != nil { + config.App().Log.Warn("Schedule Mail Error", err.Error()) + } + } + } + /* if schedule.Notification.IsMessage { + for _, message := range schedule.Notification.NotifyMessages { + // TODO send message + } + } */ +} + +func webhooks(schedule *models.Schedule) { + for _, webhook := range schedule.Webhooks { + go func() { + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + if webhook.Request == nil { + return + } + + req, err := http.NewRequest(webhook.Request.Method, webhook.Request.Url, strings.NewReader(string(webhook.Request.Content))) + if err != nil { + config.App().Log.Warn("Schedule Webhook Error", err.Error()) + return + } + + _, err = client.Do(req) + if err != nil { + config.App().Log.Warn("Schedule Webhook Error", err.Error()) + return + } + }() + } }