Skip to content

Commit 032b406

Browse files
committed
feat: if a job failed more than 10 times is marked as canceled
1 parent cecb6b7 commit 032b406

File tree

3 files changed

+44
-21
lines changed

3 files changed

+44
-21
lines changed

model/model.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,16 @@ func (t TaskEvents) GetByEventId(i int) (*TaskEventType, error) {
189189
func (t TaskEvents) GetLastElement(i int) interface{} {
190190
return t[i]
191191
}
192+
193+
func (t TaskEvents) FilterBy(notification NotificationType, status NotificationStatus) (filteredEvents TaskEvents) {
194+
for _, event := range t {
195+
if event.NotificationType == notification && event.Status == status {
196+
filteredEvents = append(filteredEvents, event)
197+
}
198+
}
199+
return filteredEvents
200+
201+
}
192202
func (v *Job) AddEvent(notificationType NotificationType, notificationStatus NotificationStatus) (newEvent *TaskEventType) {
193203
return v.AddEventComplete(notificationType, notificationStatus, "")
194204
}

server/repository/repository.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type Repository interface {
2121
PingServerUpdate(ctx context.Context, pingEventType model.PingEventType) error
2222
GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEventType, error)
2323
GetJobs(ctx context.Context) (*[]model.Job, error)
24-
GetJobsByStatus(ctx context.Context, status model.NotificationStatus) (jobs []*model.Job, returnError error)
24+
GetJobsByStatus(ctx context.Context, notificationType model.NotificationType, status model.NotificationStatus) (jobs []*model.Job, returnError error)
2525
GetJob(ctx context.Context, uuid string) (*model.Job, error)
2626
GetJobByPath(ctx context.Context, path string) (*model.Job, error)
2727
AddJob(ctx context.Context, video *model.Job) error
@@ -297,16 +297,16 @@ func (s *SQLRepository) RetrieveQueuedJob(ctx context.Context) (video *model.Job
297297
return s.queuedJob(ctx, conn)
298298
}
299299

300-
func (s *SQLRepository) GetJobsByStatus(ctx context.Context, status model.NotificationStatus) (jobs []*model.Job, returnError error) {
300+
func (s *SQLRepository) GetJobsByStatus(ctx context.Context, notificationType model.NotificationType, status model.NotificationStatus) (jobs []*model.Job, returnError error) {
301301
conn, err := s.getConnection()
302302
if err != nil {
303303
return nil, err
304304
}
305-
return s.getJobsByStatus(ctx, conn, status)
305+
return s.getJobsByStatus(ctx, conn, notificationType, status)
306306
}
307307

308-
func (s *SQLRepository) getJobsByStatus(ctx context.Context, tx SQLDBOperations, statusFilter model.NotificationStatus) ([]*model.Job, error) {
309-
rows, err := tx.QueryContext(ctx, "SELECT v.id, v.source_path,v.source_size, v.target_path,v.target_size,vs.event_time, vs.status, vs.notification_type, vs.message FROM jobs v INNER JOIN job_status vs ON v.id = vs.job_id WHERE vs.status = $1;", statusFilter)
308+
func (s *SQLRepository) getJobsByStatus(ctx context.Context, tx SQLDBOperations, notificationTypeFilter model.NotificationType, statusFilter model.NotificationStatus) ([]*model.Job, error) {
309+
rows, err := tx.QueryContext(ctx, "SELECT v.id, v.source_path,v.source_size, v.target_path,v.target_size,vs.event_time, vs.status, vs.notification_type, vs.message FROM jobs v INNER JOIN job_status vs ON v.id = vs.job_id WHERE vs.notification_type = $1 and vs.status = $2 order by event_time desc;", notificationTypeFilter, statusFilter)
310310
if err != nil {
311311
return nil, err
312312
}

server/scheduler/scheduler.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ func (r *RuntimeScheduler) jobMaintenance(ctx context.Context) error {
526526
}
527527

528528
func (r *RuntimeScheduler) queuedJobMaintenance(ctx context.Context) error {
529-
queuedJobs, err := r.repo.GetJobsByStatus(ctx, model.QueuedNotificationStatus)
529+
queuedJobs, err := r.repo.GetJobsByStatus(ctx, model.JobNotification, model.QueuedNotificationStatus)
530530
if err != nil {
531531
return err
532532
}
@@ -546,22 +546,34 @@ func (r *RuntimeScheduler) queuedJobMaintenance(ctx context.Context) error {
546546
}
547547

548548
func (r *RuntimeScheduler) failedJobMaintenance(ctx context.Context) error {
549-
failedJobs, err := r.repo.GetJobsByStatus(ctx, model.FailedNotificationStatus)
549+
failedJobs, err := r.repo.GetJobsByStatus(ctx, model.JobNotification, model.FailedNotificationStatus)
550550
if err != nil {
551551
return err
552552
}
553553
for _, failedJob := range failedJobs {
554-
if verifyFailureMessage(failedJob.StatusMessage) {
555-
jobRequest := &model.JobRequest{
556-
SourcePath: failedJob.SourcePath,
557-
TargetPath: failedJob.TargetPath,
558-
ForceFailed: true,
559-
}
560-
_, err = r.scheduleJobRequest(ctx, jobRequest)
561-
if err != nil {
554+
if !verifyFailureMessage(failedJob.StatusMessage) {
555+
continue
556+
}
557+
558+
failureJobEvents := failedJob.Events.FilterBy(model.JobNotification, model.FailedNotificationStatus)
559+
if len(failureJobEvents) > 10 {
560+
newEvent := failedJob.AddEventComplete(model.JobNotification, model.CanceledNotificationStatus, "Job canceled by system, because of too many failed attempts")
561+
if err = r.repo.AddNewTaskEvent(ctx, newEvent); err != nil {
562562
return err
563563
}
564+
continue
565+
}
566+
567+
jobRequest := &model.JobRequest{
568+
SourcePath: failedJob.SourcePath,
569+
TargetPath: failedJob.TargetPath,
570+
ForceFailed: true,
564571
}
572+
_, err = r.scheduleJobRequest(ctx, jobRequest)
573+
if err != nil {
574+
return err
575+
}
576+
565577
}
566578
return nil
567579
}
@@ -638,15 +650,10 @@ func verifyFailureMessage(message string) bool {
638650
if simpleRegex(`source File duration `, message) {
639651
return false
640652
}
641-
if simpleRegex(`timeout Waiting for PGS Job Done`, message) {
642-
return true
643-
}
653+
644654
if simpleRegex(`Disk quota exceeded`, message) || simpleRegex(`No space left on device`, message) {
645655
return true
646656
}
647-
if simpleRegex(`error on process PGS.*no such file or directory`, message) {
648-
return true
649-
}
650657
// if simpleRegex(`At least one output file must be specified`, message) {
651658
// return true
652659
//}
@@ -696,6 +703,11 @@ func verifyFailureMessage(message string) bool {
696703
if simpleRegex(`connection refused`, message) {
697704
return true
698705
}
706+
707+
if simpleRegex(`with 0 items`, message) {
708+
return false
709+
}
710+
699711
// if simpleRegex(`srt: Invalid data found when processing input`, message) {
700712
// return true
701713
//}
@@ -720,6 +732,7 @@ func verifyFailureMessage(message string) bool {
720732
if simpleRegex(`probably corrupt input`, message) {
721733
return false
722734
}
735+
723736
return false
724737
}
725738

0 commit comments

Comments
 (0)