Skip to content

Commit

Permalink
scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mstgnz committed Jul 8, 2024
1 parent cbf450c commit cfee43c
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 12 deletions.
5 changes: 2 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Catch(h HttpHandler) http.HandlerFunc {
func main() {

// test mail with attach
//err := config.App().Mail.SetSubject("cron").SetContent("mail geldi mi?").SetTo("[email protected]").SetAttachment(map[string][]byte{"query.sql": []byte("1. dosya içeriği"), "query2.sql": []byte("2. dosya içeriği")}).SendText()
//err := config.App().Mail.SetSubject("cron").SetContent("test mail").SetTo("[email protected]").SetAttachment(map[string][]byte{"query.sql": []byte("1. folder content"), "query2.sql": []byte("2. folder content")}).SendText()

// Scheduler Call
schedule.CallSchedule(config.App().Cron)
Expand Down Expand Up @@ -290,10 +290,9 @@ func main() {
time.Sleep(time.Second * 5)
}

config.App().Log.Info("Shutting down gracefully...")

config.App().Cron.Stop()
config.App().DB.CloseDatabase()
config.App().Log.Info("Shutting down gracefully...")
}

func webAuthMiddleware(next http.Handler) http.Handler {
Expand Down
4 changes: 2 additions & 2 deletions models/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Schedule struct {
Group *Group `json:"group,omitempty"`
Request *Request `json:"request,omitempty"`
Notification *Notification `json:"notification,omitempty"`
Webhook []*Webhook `json:"webhook,omitempty"`
Webhooks []*Webhook `json:"webhooks,omitempty"`
CreatedAt *time.Time `json:"created_at,omitempty"`
UpdatedAt *time.Time `json:"updated_at,omitempty"`
DeletedAt *time.Time `json:"deleted_at,omitempty"`
Expand Down Expand Up @@ -375,7 +375,7 @@ func (m *Schedule) queryPrepare(query string) []*Schedule {
if err := json.Unmarshal([]byte(webhookJson), &webhooks); err != nil {
return schedules
}
schedule.Webhook = webhooks
schedule.Webhooks = webhooks

schedules = append(schedules, schedule)
}
Expand Down
141 changes: 134 additions & 7 deletions schedule/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package schedule

import (
"fmt"
"io"
"log"
"net/http"
"strings"
"time"
_ "time/tzdata"

"github.com/mstgnz/cronjob/config"
"github.com/mstgnz/cronjob/models"
"github.com/robfig/cron/v3"
)

Expand All @@ -18,14 +23,136 @@ func CallSchedule(c *cron.Cron) {

cron.WithLocation(loc)

// Alotech Call - every night at 23:59
if _, err = c.AddFunc("* * * * *", func() {
config.ShuttingWrapper(func() {
log.Println("running schedulr every 1 minute")
})
schedule := &models.Schedule{}
schedules := schedule.WithQueryAll()

}); err != nil {
log.Println("AddFunc AlotechCall", err)
scheduleMap := make(map[int]cron.EntryID)
AddSchedules(c, schedules, scheduleMap)

// Check for new schedules every minute
c.AddFunc("@every 1m", func() {
newSchedules := schedule.WithQueryAll()
AddSchedules(c, newSchedules, scheduleMap)
})
}

func AddSchedules(c *cron.Cron, schedules []*models.Schedule, scheduleMap map[int]cron.EntryID) {
scheduleLog := &models.ScheduleLog{}
for _, schedule := range schedules {
if !schedule.Active {
continue
}
if _, exists := scheduleMap[schedule.ID]; !exists {
id, err := c.AddFunc(schedule.Timing, func() {
defer func() {
if r := recover(); r != nil {
config.App().Log.Warn("Recovered from panic in schedule", fmt.Sprintf("%v", r))
}
}()

startAt := time.Now()

client := &http.Client{
Timeout: time.Duration(schedule.Timeout) * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
req, err := http.NewRequest(schedule.Request.Method, schedule.Request.Url, strings.NewReader(string(schedule.Request.Content)))
if err != nil {
config.App().Log.Warn("Schedule Request Error", err.Error())
return
}

for _, header := range schedule.Request.RequestHeaders {
req.Header.Set(header.Key, header.Value)
}

scheduleUpdate(schedule, true)
var resp *http.Response
for retries := 0; retries < schedule.Retries; retries++ {
resp, err = client.Do(req)
if err == nil {
break
}
config.App().Log.Warn("Schedule Do Error, retrying", fmt.Sprintf("Attempt %d/%d: %v", retries+1, schedule.Retries, err.Error()))
time.Sleep(1 * time.Second)
}
scheduleUpdate(schedule, false)
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
config.App().Log.Warn("Schedule Body Error", err.Error())
return
}
notification(schedule, body)

finishAt := time.Now()
scheduleLog.StartedAt = &startAt
scheduleLog.FinishedAt = &finishAt
scheduleLog.Took = float32(finishAt.Sub(startAt).Seconds())
scheduleLog.Result = string(body)
scheduleLog.Create(schedule.ID)

webhooks(schedule)
})
if err != nil {
config.App().Log.Warn("Schedule Error", err.Error())
} else {
scheduleMap[schedule.ID] = id
}
}
}
}

func scheduleUpdate(schedule *models.Schedule, running bool) {
query := "UPDATE schedules SET running=$1 WHERE id=$2"
err := schedule.Update(query, []any{running, schedule.ID})
if err != nil {
config.App().Log.Warn("Schedule Update Error", err.Error())
}
}

func notification(schedule *models.Schedule, body []byte) {
if schedule.Notification.IsMail {
for _, mail := range schedule.Notification.NotifyEmails {
err := config.App().Mail.SetSubject(schedule.Timing + " is running").SetContent(string(body)).SetTo(mail.Email).SendText()
if err != nil {
config.App().Log.Warn("Schedule Mail Error", err.Error())
}
}
}
/* if schedule.Notification.IsMessage {
for _, message := range schedule.Notification.NotifyMessages {
// TODO send message
}
} */
}

func webhooks(schedule *models.Schedule) {
for _, webhook := range schedule.Webhooks {
go func() {
client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
if webhook.Request == nil {
return
}

req, err := http.NewRequest(webhook.Request.Method, webhook.Request.Url, strings.NewReader(string(webhook.Request.Content)))
if err != nil {
config.App().Log.Warn("Schedule Webhook Error", err.Error())
return
}

_, err = client.Do(req)
if err != nil {
config.App().Log.Warn("Schedule Webhook Error", err.Error())
return
}
}()
}
}

0 comments on commit cfee43c

Please sign in to comment.