diff --git a/.gitignore b/.gitignore index 931d5f5ce..e52a7547d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ /docgen-md /docgen-openrpc /deps.json +/userschedule extern/filecoin-ffi/rust/target **/*.a diff --git a/Makefile b/Makefile index 89242c264..1658fa23b 100644 --- a/Makefile +++ b/Makefile @@ -79,6 +79,11 @@ sptool: $(BUILD_DEPS) .PHONY: sptool BINS+=sptool +userschedule: + rm -f userschedule + $(GOCC) build $(GOFLAGS) -o userschedule ./cmd/userschedule +.PHONY: userschedule + ifeq ($(shell uname),Linux) batchdep: build/.supraseal-install diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 94592c447..47d3ecff4 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt" "github.com/filecoin-project/curio/lib/chainsched" "github.com/filecoin-project/curio/lib/curiochain" "github.com/filecoin-project/curio/lib/fastparamfetch" @@ -208,6 +209,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task // (we could have just appended to this list in the reverse order, but defining // tasks in pipeline order is more intuitive) + usertaskmgt.WrapTasks(activeTasks, dependencies.Cfg.Subsystems.UserScheduler, dependencies.DB, dependencies.ListenAddr) ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr) if err != nil { return nil, err diff --git a/cmd/userschedule/userschedule.go b/cmd/userschedule/userschedule.go new file mode 100644 index 000000000..574534335 --- /dev/null +++ b/cmd/userschedule/userschedule.go @@ -0,0 +1,100 @@ +// This is an example round-robin scheduler. +// It can be used by modifying Curio's base configuration Subsystems.UserSchedule +// to point to a machine named myscheduler with URL: +// http://myscheduler:7654/userschedule +// Be sure to open the selected port on the machine running this scheduler. +// +// Usage: +// +// Fork the repo from Github and clone it to your local machine, +// Edit this file as needed to implement your own scheduling logic, +// build with 'make userschedule' then run with ./userschedule +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "runtime/debug" + "runtime/pprof" + "sync" + "syscall" + + "golang.org/x/xerrors" +) + +const WorkerBusyTimeout = 60 // Seconds until Curio asks again for this task. +func sched(w http.ResponseWriter, r *http.Request) { + var input struct { + TaskID string `json:"task_id"` + TaskType string `json:"task_type"` + Workers []string `json:"workers"` + } + // Parse the request + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + OrHTTPFail(w, xerrors.Errorf("failed to parse request: %s", err)) + } + + // Scheduler Logic goes here + selectedWorker := roundRobin(input.TaskType, input.Workers) + + // Respond to Curio + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(struct { + Worker string `json:"worker"` + Timeout int `json:"timeout"` + }{selectedWorker, WorkerBusyTimeout}) + if err != nil { + OrHTTPFail(w, err) + } +} + +// ///////// Round Robin Scheduler ///////// // +var mx sync.Mutex +var m = make(map[string]int) + +func roundRobin(taskType string, workers []string) string { + mx.Lock() + defer mx.Unlock() + selectedWorker := workers[m[taskType]%len(workers)] + m[taskType]++ + return selectedWorker +} + +// /////////////////////////////////// +// Everything below this line is boilerplate code. +// /////////////////////////////////// + +func main() { + setupCloseHandler() + mux := http.NewServeMux() + mux.HandleFunc("/userschedule", func(w http.ResponseWriter, r *http.Request) { + defer func() { _ = recover() }() + sched(w, r) + }) + fmt.Println(http.ListenAndServe(":7654", mux)) +} + +// Intentionally inlined dependencies to make it easy to copy-paste into your own codebase. +func OrHTTPFail(w http.ResponseWriter, err error) { + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + log.Printf("http fail. err %s, stack %s", err, string(debug.Stack())) + panic(1) + } +} + +func setupCloseHandler() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\r- Ctrl+C pressed in Terminal") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + panic(1) + }() +} diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index fb67ef5cf..bfbd97b42 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -689,6 +689,24 @@ cache data held on disk after the completion of TreeRC task to 11GiB.`, Comment: `The maximum amount of SyntheticPoRep tasks that can run simultaneously. Note that the maximum number of tasks will also be bounded by resources available on the machine.`, + }, + { + Name: "UserScheduler", + Type: "[]UserSchedule", + + Comment: `UserScheduler allows for the user to schedule tasks on specific machines of their choice. +This http endpoint gets a POST request with the following JSON body: +{ +"task_id": "task_id", +"task_type": "task_type", +"workers": ["worker1", "worker2"] +} +And looks for a 200 response with the following JSON body: +{ +"worker": "worker1" +"timeout": 60 +} +Timeout in seconds until it will be rescheduled.`, }, { Name: "EnableBatchSeal", @@ -769,4 +787,25 @@ identifier in the integration page for the service.`, Example: https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX`, }, }, + "UserSchedule": { + { + Name: "TaskName", + Type: "string", + + Comment: `TaskName as listed in the GUI. Ex: SDR`, + }, + { + Name: "URL", + Type: "string", + + Comment: `URL to http(s) user scheduler`, + }, + { + Name: "HaltOnSchedulerDown", + Type: "bool", + + Comment: `HaltOnSchedulerDown - If true, the tasks will not run when the URL response is not usable. +The False value is recommended to keep scheduling working even if the UserScheduler service is down.`, + }, + }, } diff --git a/deps/config/types.go b/deps/config/types.go index 4e344ce56..9bb1f1001 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -264,9 +264,53 @@ type CurioSubsystemsConfig struct { // also be bounded by resources available on the machine. SyntheticPoRepMaxTasks int + // UserScheduler allows for the user to schedule tasks on specific machines of their choice. + // This http endpoint gets a POST request with the following JSON body: + // { + // "task_id": "task_id", + // "task_type": "task_type", + // "workers": ["worker1", "worker2"] + // } + // And looks for a 200 response with the following JSON body: + // { + // "worker": "worker1" + // "timeout": 60 + // } + // Timeout in seconds until it will be rescheduled. + UserScheduler []UserSchedule + // Batch Seal EnableBatchSeal bool } + +// UserSchedule allows for the user to schedule a task on specific machines of their choice. +// This http endpoint gets a POST request with the following JSON body: +// +// { +// "task_id": "task_id", +// "task_type": "task_type", +// "workers": ["worker1", "worker2"] +// } +// +// And looks for a 200 response with the following JSON body: +// +// { +// "worker": "worker1" +// "timeout": 60 +// } +// +// Timeout in seconds until it will be rescheduled. +type UserSchedule struct { + // TaskName as listed in the GUI. Ex: SDR + TaskName string + + // URL to http(s) user scheduler + URL string + + // HaltOnSchedulerDown - If true, the tasks will not run when the URL response is not usable. + // The False value is recommended to keep scheduling working even if the UserScheduler service is down. + HaltOnSchedulerDown bool +} type CurioFees struct { DefaultMaxFee types.FIL MaxPreCommitGasFee types.FIL diff --git a/harmony/harmonydb/sql/20240724-user_sched.sql b/harmony/harmonydb/sql/20240724-user_sched.sql new file mode 100644 index 000000000..6c7b6df8d --- /dev/null +++ b/harmony/harmonydb/sql/20240724-user_sched.sql @@ -0,0 +1,7 @@ +CREATE TABLE harmony_task_user ( + task_id INTEGER PRIMARY KEY, + owner TEXT NOT NULL, + expiration INTEGER NOT NULL, + ignore_userscheduler BOOLEAN NOT NULL DEFAULT FALSE, + FOREIGN KEY (task_id) REFERENCES harmony_task (id) ON DELETE CASCADE +); \ No newline at end of file diff --git a/harmony/harmonytask/task_type_handler.go b/harmony/harmonytask/task_type_handler.go index ed56c4d47..1fa0511bd 100644 --- a/harmony/harmonytask/task_type_handler.go +++ b/harmony/harmonytask/task_type_handler.go @@ -212,13 +212,15 @@ canAcceptAgain: } return owner == h.TaskEngine.ownerID }) - if doErr != nil { + if doErr != nil && doErr != ErrReturnToPoolPlease { log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr) } }() return true } +var ErrReturnToPoolPlease = errors.New("return to pool") + func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { workEnd := time.Now() retryWait := time.Millisecond * 100 @@ -246,15 +248,9 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done retryRecordCompletion: cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { - var postedTime time.Time - err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) - - if err != nil { - return false, fmt.Errorf("could not log completion: %w ", err) - } result := "unspecified error" if done { - _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) + _, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) if err != nil { return false, fmt.Errorf("could not log completion: %w", err) @@ -268,9 +264,9 @@ retryRecordCompletion: result = "error: " + doErr.Error() } var deleteTask bool - if h.MaxFailures > 0 { + if doErr != ErrReturnToPoolPlease && h.MaxFailures > 0 { ct := uint(0) - err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history + err := tx.QueryRow(`SELECT count(*) FROM harmony_task_history WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct) if err != nil { return false, fmt.Errorf("could not read task history: %w", err) @@ -280,7 +276,7 @@ retryRecordCompletion: } } if deleteTask { - _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) + _, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) if err != nil { return false, fmt.Errorf("could not delete failed job: %w", err) } @@ -292,6 +288,15 @@ retryRecordCompletion: } } } + if doErr == ErrReturnToPoolPlease { + return true, nil + } + var postedTime time.Time + err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) + + if err != nil { + return false, fmt.Errorf("could not log completion: %w ", err) + } _, err = tx.Exec(`INSERT INTO harmony_task_history (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result) diff --git a/harmony/taskhelp/usertaskmgt/usertaskmgt.go b/harmony/taskhelp/usertaskmgt/usertaskmgt.go new file mode 100644 index 000000000..d5b8eda0d --- /dev/null +++ b/harmony/taskhelp/usertaskmgt/usertaskmgt.go @@ -0,0 +1,170 @@ +/* + Package usertaskmgt provides a way to wrap tasks with a URL that can be called to assign the task to a worker. + Timeline + +- UrlTask accepts everything +- once accepted, UrlTask.Do() finds who should own the task and updates the DB: + - harmony_task_user.owner_id & expiration_time + - harmony_task releases the task (without err) + +- The poller will see the task & call CanAccept() + - CanAccept() will see the owner_id and call the deeper canaccept() if it's us. + - If it's not us, check the expiration time and release the task by deleting the row. + +- The task will be done by the worker who was told to do it, or eventually reassigned. + +Pitfalls: +- If the user's URL is down, the task will be stuck in the DB. +- Turnaround time is slowed by the additional trip through the poller. +- Full task resources are claimed by the URL runner, so the task needs a full capacity. +*/ +package usertaskmgt + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/samber/lo" + "golang.org/x/xerrors" + + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" +) + +var log = logging.Logger("userTaskMgt") + +func WrapTasks(tasks []harmonytask.TaskInterface, UserScheduler []config.UserSchedule, db *harmonydb.DB, hostAndPort string) { + m := lo.SliceToMap(UserScheduler, func(s config.UserSchedule) (string, *config.UserSchedule) { + _, err := url.Parse(s.URL) + if err != nil { + log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s) + return "", nil + } + return s.TaskName, &s + }) + for i, task := range tasks { + if s, ok := m[task.TypeDetails().Name]; ok { + tasks[i] = &UrlTask{ + TaskInterface: task, + UserScheduleUrl: s.URL, + name: task.TypeDetails().Name, + db: db, + hostAndPort: hostAndPort, + haltOnSchedulerDown: s.HaltOnSchedulerDown, + } + } + } +} + +type UrlTask struct { + harmonytask.TaskInterface + db *harmonydb.DB + UserScheduleUrl string + name string + hostAndPort string + haltOnSchedulerDown bool +} + +// CanAccept should accept all IF no harmony_task_user row exists, ELSE +// if us, try CanAccept() until expiration hits. +func (t *UrlTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := tids[0] + var owner string + var expiration int64 + var ignoreUserScheduler bool + err := t.db.QueryRow(context.Background(), `SELECT + COALESCE(owner,''), + COALESCE(expiration, 0), + COALESCE(ignore_userscheduler,false) + from harmony_task_user WHERE task_id=$1`, id).Scan(&owner, &expiration, &ignoreUserScheduler) + if err != nil { + return nil, xerrors.Errorf("could not get owner: %w", err) + } + if owner != "" { + if owner == t.hostAndPort || ignoreUserScheduler { + return t.TaskInterface.CanAccept(tids, te) + } + if expiration < time.Now().Unix() { + _, err = t.db.Exec(context.Background(), `DELETE FROM harmony_task_user WHERE task_id=$1`, id) + if err != nil { + return nil, xerrors.Errorf("could not delete from harmony_task_user: %w", err) + } + } + } + return &id, nil +} + +var client = &http.Client{Timeout: time.Second * 10} + +func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (b bool, err error) { + defer func() { + if err != harmonytask.ErrReturnToPoolPlease && !t.haltOnSchedulerDown { + log.Error("Proceeding without user scheduler service running (as configured)") + log.Error(err) + _, err = t.db.Exec(context.Background(), + `INSERT INTO harmony_task_user (task_id, owner, expiration, ignore_userscheduler) + VALUES ($1, '-', 0, true)`, id) + if err != nil { + log.Error("Could not insert into harmony_task_user: ", err) + return + } + err = harmonytask.ErrReturnToPoolPlease + } + }() + var owner string + err = t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,'') FROM harmony_task_user WHERE task_id=$1`, id).Scan(&owner) + if err != nil { + return false, xerrors.Errorf("could not get owner: %w", err) + } + if owner == t.hostAndPort { + return t.TaskInterface.Do(id, stillMe) + } + var workerList []string + err = t.db.Select(context.Background(), &workerList, `SELECT host_and_port + FROM harmony_machines m JOIN harmony_machine_details d ON d.machine_id=m.id + WHERE tasks LIKE $1`, "%,"+t.name+",%") + if err != nil { + return false, xerrors.Errorf("could not get worker list: %w", err) + } + + resp, err := client.Post(t.UserScheduleUrl, "application/json", bytes.NewReader([]byte(` + { + "task_type": "`+t.name+`", + "task_id": `+strconv.Itoa(int(id))+`, + "workers": [`+strings.Join(workerList, ",")+`], + } + `))) + if err != nil { + return false, xerrors.Errorf("could not call user defined URL: %w", err) + } + if resp.StatusCode != http.StatusOK { + return false, xerrors.Errorf("User defined URL returned non-200 status code: %d", resp.StatusCode) + } + var respData struct { + Worker string + Timeout int + } + defer resp.Body.Close() + err = json.NewDecoder(resp.Body).Decode(&respData) + if err != nil { + return false, xerrors.Errorf("could not decode user defined URL response: %w", err) + } + + // If it's us, we cannot shortcut because we don't have CanAccept's 2nd arg. + + expires := time.Now().Add(time.Second * time.Duration(respData.Timeout)) + _, err = t.db.Exec(context.Background(), `INSERT INTO harmony_task_user (task_id, owner, expiration) VALUES ($1,$2)`, id, respData.Worker, expires) + if err != nil { + return false, xerrors.Errorf("could not insert into harmony_task_user: %w", err) + } + + return false, harmonytask.ErrReturnToPoolPlease +} diff --git a/itests/userschedule_test.go b/itests/userschedule_test.go new file mode 100644 index 000000000..75b48ea91 --- /dev/null +++ b/itests/userschedule_test.go @@ -0,0 +1,84 @@ +package itests + +import ( + "os/exec" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt" +) + +func TestUserTaskMgt(t *testing.T) { + gopath, err := exec.LookPath("go") + require.NoError(t, err) + require.NoError(t, exec.Command(gopath, "run", "cmd/userschedule").Run()) // round-robin scheduler + + db := dbSetup(t) + harmonytask.POLL_DURATION = 200 * time.Millisecond + output := "" + var instances []*ut + for a := 0; a < 3; a++ { // make 3 "machines" + inst := &ut{db: db, f: func() { output += strconv.Itoa(a) }} + instances = append(instances, inst) + + host := "foo:" + strconv.Itoa(a) + tasks := []harmonytask.TaskInterface{inst} + usertaskmgt.WrapTasks(tasks, []config.UserSchedule{ + {TaskName: "foo", URL: "http://localhost:7654"}, + }, db, host) + _, err := harmonytask.New(db, tasks, host) + require.NoError(t, err) + } + + for a := 0; a < 5; a++ { // schedule 5 tasks + instances[0].myAddTask(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { + return true, nil + }) + } + require.Equal(t, "01201", output) +} + +type ut struct { + db *harmonydb.DB + f func() + myAddTask harmonytask.AddTaskFunc +} + +func (u *ut) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Name: "foo", + Cost: resources.Resources{}, + } +} +func (u *ut) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + return &tids[0], nil +} +func (u *ut) Adder(f harmonytask.AddTaskFunc) { + u.myAddTask = f +} +func (u *ut) Do(tID harmonytask.TaskID, _ func() bool) (bool, error) { + u.f() + time.Sleep(time.Second) // so there's no chance that a later task will finish first. + return true, nil +} + +func dbSetup(t *testing.T) *harmonydb.DB { + sharedITestID := harmonydb.ITestNewID() + dbConfig := config.HarmonyDB{ + Hosts: []string{envElse("CURIO_HARMONYDB_HOSTS", "127.0.0.1")}, + Database: "yugabyte", + Username: "yugabyte", + Password: "yugabyte", + Port: "5433", + } + db, err := harmonydb.NewFromConfigWithITestID(t, dbConfig, sharedITestID) + require.NoError(t, err) + return db +}