@@ -12,23 +12,25 @@ import (
1212
1313// Scheduler manages cron jobs for tasks
1414type Scheduler struct {
15- cron * cron.Cron
16- db * db.DB
17- executor * executor.Executor
18- jobs map [int64 ]cron.EntryID
19- mu sync.RWMutex
20- running bool
21- stopSync chan struct {}
15+ cron * cron.Cron
16+ db * db.DB
17+ executor * executor.Executor
18+ jobs map [int64 ]cron.EntryID
19+ cronExprs map [int64 ]string // Track cron expressions to detect changes
20+ mu sync.RWMutex
21+ running bool
22+ stopSync chan struct {}
2223}
2324
2425// New creates a new scheduler
2526func New (database * db.DB ) * Scheduler {
2627 return & Scheduler {
27- cron : cron .New (cron .WithSeconds ()),
28- db : database ,
29- executor : executor .New (database ),
30- jobs : make (map [int64 ]cron.EntryID ),
31- stopSync : make (chan struct {}),
28+ cron : cron .New (cron .WithSeconds ()),
29+ db : database ,
30+ executor : executor .New (database ),
31+ jobs : make (map [int64 ]cron.EntryID ),
32+ cronExprs : make (map [int64 ]string ),
33+ stopSync : make (chan struct {}),
3234 }
3335}
3436
@@ -146,26 +148,38 @@ func (s *Scheduler) scheduleTaskLocked(task *db.Task) error {
146148 delete (s .jobs , task .ID )
147149 }
148150
149- // Create a copy of task for the closure
150- taskCopy := * task
151+ // Create a copy of task ID for the closure
152+ taskID := task . ID
151153
152154 entryID , err := s .cron .AddFunc (task .CronExpr , func () {
153155 // Get fresh task data from DB
154- freshTask , err := s .db .GetTask (taskCopy . ID )
156+ freshTask , err := s .db .GetTask (taskID )
155157 if err != nil {
156- fmt .Printf ("Failed to get task %d: %v\n " , taskCopy . ID , err )
158+ fmt .Printf ("Failed to get task %d: %v\n " , taskID , err )
157159 return
158160 }
159161 if ! freshTask .Enabled {
160162 return
161163 }
162164 s .executor .ExecuteAsync (freshTask )
165+
166+ // Update next run time in DB after execution
167+ s .mu .RLock ()
168+ if eid , ok := s .jobs [taskID ]; ok {
169+ entry := s .cron .Entry (eid )
170+ if ! entry .Next .IsZero () {
171+ freshTask .NextRunAt = & entry .Next
172+ _ = s .db .UpdateTask (freshTask )
173+ }
174+ }
175+ s .mu .RUnlock ()
163176 })
164177 if err != nil {
165178 return fmt .Errorf ("invalid cron expression: %w" , err )
166179 }
167180
168181 s .jobs [task .ID ] = entryID
182+ s .cronExprs [task .ID ] = task .CronExpr
169183
170184 // Update next run time in DB
171185 entry := s .cron .Entry (entryID )
@@ -228,13 +242,15 @@ func (s *Scheduler) SyncTasks() {
228242 if entryID , ok := s .jobs [taskID ]; ok {
229243 s .cron .Remove (entryID )
230244 delete (s .jobs , taskID )
245+ delete (s .cronExprs , taskID )
231246 }
232247 }
233248 }
234249
235250 // Add/update tasks
236251 for _ , task := range tasks {
237252 _ , scheduled := s .jobs [task .ID ]
253+ oldCronExpr := s .cronExprs [task .ID ]
238254
239255 if task .Enabled && ! scheduled {
240256 // Task should be scheduled but isn't
@@ -244,7 +260,11 @@ func (s *Scheduler) SyncTasks() {
244260 if entryID , ok := s .jobs [task .ID ]; ok {
245261 s .cron .Remove (entryID )
246262 delete (s .jobs , task .ID )
263+ delete (s .cronExprs , task .ID )
247264 }
265+ } else if task .Enabled && scheduled && task .CronExpr != oldCronExpr {
266+ // Cron expression changed, reschedule
267+ _ = s .scheduleTaskLocked (task )
248268 }
249269 }
250270}
0 commit comments