From 2324f2d9dbbe11e5d27c1ff1fbc0cafd4e3049a0 Mon Sep 17 00:00:00 2001 From: Gabriele Russo Russo Date: Mon, 23 Sep 2024 17:22:43 +0200 Subject: [PATCH 1/5] Avoids scheduledRequest data race in scheduler --- internal/node/pool.go | 15 +++++++++------ internal/scheduling/execution.go | 8 ++++---- internal/scheduling/scheduler.go | 27 +++++++++++++-------------- internal/scheduling/types.go | 4 ++-- scripts/start-etcd.sh | 2 ++ 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/internal/node/pool.go b/internal/node/pool.go index 479d5888..b7cdcc1f 100644 --- a/internal/node/pool.go +++ b/internal/node/pool.go @@ -42,11 +42,10 @@ func (fp *ContainerPool) getWarmContainer() (container.ContainerID, bool) { return "", false } - fp.ready.Remove(elem) - contID := elem.Value.(warmContainer).contID - fp.putBusyContainer(contID) + wc := fp.ready.Remove(elem).(warmContainer) + fp.putBusyContainer(wc.contID) - return contID, true + return wc.contID, true } func (fp *ContainerPool) putBusyContainer(contID container.ContainerID) { @@ -127,7 +126,7 @@ func AcquireWarmContainer(f *function.Function) (container.ContainerID, error) { return "", OutOfResourcesErr } - //log.Printf("Acquired resources for warm container. Now: %v", Resources) + //log.Printf("Using warm %s for %s. Now: %v", contID, f, Resources) return contID, nil } @@ -143,14 +142,18 @@ func ReleaseContainer(contID container.ContainerID, f *function.Function) { fp := getFunctionPool(f) // we must update the busy list by removing this element + var removed container.ContainerID elem := fp.busy.Front() for ok := elem != nil; ok; ok = elem != nil { if elem.Value.(container.ContainerID) == contID { - fp.busy.Remove(elem) // delete the element from the busy list + removed = fp.busy.Remove(elem).(container.ContainerID) break } elem = elem.Next() } + if removed != contID { + panic("Failed to release container") + } fp.putReadyContainer(contID, expTime) diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index d73046f4..dd3045a0 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -12,7 +12,7 @@ const HANDLER_DIR = "/app" // Execute serves a request on the specified container. func Execute(contID container.ContainerID, r *scheduledRequest) error { - //log.Printf("[%s] Executing on container: %v", r, contID) + //log.Printf("[%s] Executing on container: %v", r.Fun, contID) var req executor.InvocationRequest if r.Fun.Runtime == container.CUSTOM_RUNTIME { @@ -36,13 +36,13 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error { response, invocationWait, err := container.Execute(contID, &req) if err != nil { // notify scheduler - completions <- &completion{scheduledRequest: r, contID: contID} + completions <- &completionNotification{fun: r.Fun, contID: contID} return fmt.Errorf("[%s] Execution failed: %v", r, err) } if !response.Success { // notify scheduler - completions <- &completion{scheduledRequest: r, contID: contID} + completions <- &completionNotification{fun: r.Fun, contID: contID} return fmt.Errorf("Function execution failed") } @@ -56,7 +56,7 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error { r.ExecReport.InitTime += invocationWait.Seconds() // notify scheduler - completions <- &completion{scheduledRequest: r, contID: contID} + completions <- &completionNotification{fun: r.Fun, contID: contID} return nil } diff --git a/internal/scheduling/scheduler.go b/internal/scheduling/scheduler.go index 9fafdfc7..dd737efb 100644 --- a/internal/scheduling/scheduler.go +++ b/internal/scheduling/scheduler.go @@ -8,7 +8,6 @@ import ( "runtime" "time" - "github.com/grussorusso/serverledge/internal/metrics" "github.com/grussorusso/serverledge/internal/node" "github.com/grussorusso/serverledge/internal/config" @@ -18,16 +17,15 @@ import ( ) var requests chan *scheduledRequest -var completions chan *completion +var completions chan *completionNotification var remoteServerUrl string -var executionLogEnabled bool var offloadingClient *http.Client func Run(p Policy) { requests = make(chan *scheduledRequest, 500) - completions = make(chan *completion, 500) + completions = make(chan *completionNotification, 500) // initialize Resources resources availableCores := runtime.NumCPU() @@ -57,21 +55,22 @@ func Run(p Policy) { log.Println("Scheduler started.") var r *scheduledRequest - var c *completion + var c *completionNotification for { select { case r = <-requests: go p.OnArrival(r) case c = <-completions: - node.ReleaseContainer(c.contID, c.Fun) - p.OnCompletion(c.scheduledRequest) - - if metrics.Enabled { - metrics.AddCompletedInvocation(c.Fun.Name) - if c.ExecReport.SchedAction != SCHED_ACTION_OFFLOAD { - metrics.AddFunctionDurationValue(c.Fun.Name, c.ExecReport.Duration) - } - } + node.ReleaseContainer(c.contID, c.fun) + //p.OnCompletion(c.scheduledRequest) // TODO: restore + + // TODO: restore + //if metrics.Enabled { + // metrics.AddCompletedInvocation(c.Fun.Name) + // if c.ExecReport.SchedAction != SCHED_ACTION_OFFLOAD { + // metrics.AddFunctionDurationValue(c.Fun.Name, c.ExecReport.Duration) + // } + //} } } diff --git a/internal/scheduling/types.go b/internal/scheduling/types.go index 931fd6af..dd9e1ab4 100644 --- a/internal/scheduling/types.go +++ b/internal/scheduling/types.go @@ -12,8 +12,8 @@ type scheduledRequest struct { priority float64 } -type completion struct { - *scheduledRequest +type completionNotification struct { + fun *function.Function contID container.ContainerID } diff --git a/scripts/start-etcd.sh b/scripts/start-etcd.sh index ef5de17e..8cd57dbd 100755 --- a/scripts/start-etcd.sh +++ b/scripts/start-etcd.sh @@ -1,4 +1,6 @@ #!/bin/sh +docker ps 2>&1 > /dev/null || sudo systemctl start docker + docker run -d --rm --name Etcd-server \ --publish 2379:2379 \ --publish 2380:2380 \ From f539e181a28ae7cc742b06c832e9ecd1f63dce42 Mon Sep 17 00:00:00 2001 From: Gabriele Russo Russo Date: Mon, 23 Sep 2024 17:45:20 +0200 Subject: [PATCH 2/5] Move ExecutionReport out of Request object --- internal/api/api.go | 7 ++----- internal/function/request.go | 9 ++++----- internal/scheduling/execution.go | 23 +++++++++++++---------- internal/scheduling/offloading.go | 23 ++++++++++++----------- internal/scheduling/scheduler.go | 28 ++++++++-------------------- internal/scheduling/types.go | 22 +++++++--------------- 6 files changed, 46 insertions(+), 66 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index 3fa06c46..c9e96c03 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -65,16 +65,13 @@ func InvokeFunction(c echo.Context) error { r.Async = invocationRequest.Async r.ReturnOutput = invocationRequest.ReturnOutput r.ReqId = fmt.Sprintf("%s-%s%d", fun, node.NodeIdentifier[len(node.NodeIdentifier)-5:], r.Arrival.Nanosecond()) - // init fields if possibly not overwritten later - r.ExecReport.SchedAction = "" - r.ExecReport.OffloadLatency = 0.0 if r.Async { go scheduling.SubmitAsyncRequest(r) return c.JSON(http.StatusOK, function.AsyncResponse{ReqId: r.ReqId}) } - err = scheduling.SubmitRequest(r) + executionReport, err := scheduling.SubmitRequest(r) if errors.Is(err, node.OutOfResourcesErr) { return c.String(http.StatusTooManyRequests, "") @@ -82,7 +79,7 @@ func InvokeFunction(c echo.Context) error { log.Printf("Invocation failed: %v\n", err) return c.String(http.StatusInternalServerError, "") } else { - return c.JSON(http.StatusOK, function.Response{Success: true, ExecutionReport: r.ExecReport}) + return c.JSON(http.StatusOK, function.Response{Success: true, ExecutionReport: executionReport}) } } diff --git a/internal/function/request.go b/internal/function/request.go index e5ed3fbc..54217f5c 100644 --- a/internal/function/request.go +++ b/internal/function/request.go @@ -7,11 +7,10 @@ import ( // Request represents a single function invocation. type Request struct { - ReqId string - Fun *Function - Params map[string]interface{} - Arrival time.Time - ExecReport ExecutionReport + ReqId string + Fun *Function + Params map[string]interface{} + Arrival time.Time RequestQoS CanDoOffloading bool Async bool diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index dd3045a0..085315a3 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -2,6 +2,7 @@ package scheduling import ( "fmt" + "github.com/grussorusso/serverledge/internal/function" "time" "github.com/grussorusso/serverledge/internal/container" @@ -11,7 +12,7 @@ import ( const HANDLER_DIR = "/app" // Execute serves a request on the specified container. -func Execute(contID container.ContainerID, r *scheduledRequest) error { +func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (function.ExecutionReport, error) { //log.Printf("[%s] Executing on container: %v", r.Fun, contID) var req executor.InvocationRequest @@ -32,31 +33,33 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error { } t0 := time.Now() + initTime := t0.Sub(r.Arrival).Seconds() response, invocationWait, err := container.Execute(contID, &req) if err != nil { // notify scheduler completions <- &completionNotification{fun: r.Fun, contID: contID} - return fmt.Errorf("[%s] Execution failed: %v", r, err) + return function.ExecutionReport{}, fmt.Errorf("[%s] Execution failed: %v", r, err) } if !response.Success { // notify scheduler completions <- &completionNotification{fun: r.Fun, contID: contID} - return fmt.Errorf("Function execution failed") + return function.ExecutionReport{}, fmt.Errorf("Function execution failed") } - r.ExecReport.Result = response.Result - r.ExecReport.Output = response.Output - r.ExecReport.Duration = time.Now().Sub(t0).Seconds() - invocationWait.Seconds() - r.ExecReport.ResponseTime = time.Now().Sub(r.Arrival).Seconds() + report := function.ExecutionReport{Result: response.Result, + Output: response.Output, + IsWarmStart: isWarm, + Duration: time.Now().Sub(t0).Seconds() - invocationWait.Seconds(), + ResponseTime: time.Now().Sub(t0).Seconds() - invocationWait.Seconds()} // initializing containers may require invocation retries, adding // latency - r.ExecReport.InitTime += invocationWait.Seconds() + report.InitTime = initTime + invocationWait.Seconds() // notify scheduler - completions <- &completionNotification{fun: r.Fun, contID: contID} + completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: &report} - return nil + return report, nil } diff --git a/internal/scheduling/offloading.go b/internal/scheduling/offloading.go index d8004746..0954e3db 100644 --- a/internal/scheduling/offloading.go +++ b/internal/scheduling/offloading.go @@ -37,13 +37,13 @@ func pickEdgeNodeForOffloading(r *scheduledRequest) (url string) { return "" } -func Offload(r *function.Request, serverUrl string) error { +func Offload(r *function.Request, serverUrl string) (function.ExecutionReport, error) { // Prepare request request := client.InvocationRequest{Params: r.Params, QoSClass: int64(r.Class), QoSMaxRespT: r.MaxRespT} invocationBody, err := json.Marshal(request) if err != nil { log.Print(err) - return err + return function.ExecutionReport{}, err } sendingTime := time.Now() // used to compute latency later on resp, err := offloadingClient.Post(serverUrl+"/invoke/"+r.Fun.Name, "application/json", @@ -51,13 +51,13 @@ func Offload(r *function.Request, serverUrl string) error { if err != nil { log.Print(err) - return err + return function.ExecutionReport{}, err } if resp.StatusCode != http.StatusOK { if resp.StatusCode == http.StatusTooManyRequests { - return node.OutOfResourcesErr + return function.ExecutionReport{}, node.OutOfResourcesErr } - return fmt.Errorf("Remote returned: %v", resp.StatusCode) + return function.ExecutionReport{}, fmt.Errorf("Remote returned: %v", resp.StatusCode) } var response function.Response @@ -69,18 +69,19 @@ func Offload(r *function.Request, serverUrl string) error { }(resp.Body) body, _ := io.ReadAll(resp.Body) if err = json.Unmarshal(body, &response); err != nil { - return err + return function.ExecutionReport{}, err } - r.ExecReport = response.ExecutionReport now := time.Now() - response.ExecutionReport.ResponseTime = now.Sub(r.Arrival).Seconds() + + execReport := &response.ExecutionReport + execReport.ResponseTime = now.Sub(r.Arrival).Seconds() // TODO: check how this is used in the QoSAware policy // It was originially computed as "report.Arrival - sendingTime" - r.ExecReport.OffloadLatency = now.Sub(sendingTime).Seconds() - r.ExecReport.Duration - r.ExecReport.InitTime - r.ExecReport.SchedAction = SCHED_ACTION_OFFLOAD + execReport.OffloadLatency = now.Sub(sendingTime).Seconds() - execReport.Duration - execReport.InitTime + execReport.SchedAction = SCHED_ACTION_OFFLOAD - return nil + return response.ExecutionReport, nil } func OffloadAsync(r *function.Request, serverUrl string) error { diff --git a/internal/scheduling/scheduler.go b/internal/scheduling/scheduler.go index dd737efb..8c9de9b6 100644 --- a/internal/scheduling/scheduler.go +++ b/internal/scheduling/scheduler.go @@ -77,7 +77,7 @@ func Run(p Policy) { } // SubmitRequest submits a newly arrived request for scheduling and execution -func SubmitRequest(r *function.Request) error { +func SubmitRequest(r *function.Request) (function.ExecutionReport, error) { schedRequest := scheduledRequest{ Request: r, decisionChannel: make(chan schedDecision, 1)} @@ -86,27 +86,19 @@ func SubmitRequest(r *function.Request) error { // wait on channel for scheduling action schedDecision, ok := <-schedRequest.decisionChannel if !ok { - return fmt.Errorf("could not schedule the request") + return function.ExecutionReport{}, fmt.Errorf("could not schedule the request") } //log.Printf("[%s] Scheduling decision: %v", r, schedDecision) - var err error if schedDecision.action == DROP { //log.Printf("[%s] Dropping request", r) - return node.OutOfResourcesErr + return function.ExecutionReport{}, node.OutOfResourcesErr } else if schedDecision.action == EXEC_REMOTE { //log.Printf("Offloading request") - err = Offload(r, schedDecision.remoteHost) - if err != nil { - return err - } + return Offload(r, schedDecision.remoteHost) } else { - err = Execute(schedDecision.contID, &schedRequest) - if err != nil { - return err - } + return Execute(schedDecision.contID, &schedRequest, schedDecision.useWarm) } - return nil } // SubmitAsyncRequest submits a newly arrived async request for scheduling and execution @@ -133,11 +125,11 @@ func SubmitAsyncRequest(r *function.Request) { publishAsyncResponse(r.ReqId, function.Response{Success: false}) } } else { - err = Execute(schedDecision.contID, &schedRequest) + report, err := Execute(schedDecision.contID, &schedRequest, schedDecision.useWarm) if err != nil { publishAsyncResponse(r.ReqId, function.Response{Success: false}) } - publishAsyncResponse(r.ReqId, function.Response{Success: true, ExecutionReport: r.ExecReport}) + publishAsyncResponse(r.ReqId, function.Response{Success: true, ExecutionReport: report}) } } @@ -159,11 +151,7 @@ func dropRequest(r *scheduledRequest) { } func execLocally(r *scheduledRequest, c container.ContainerID, warmStart bool) { - initTime := time.Now().Sub(r.Arrival).Seconds() - r.ExecReport.InitTime = initTime - r.ExecReport.IsWarmStart = warmStart - - decision := schedDecision{action: EXEC_LOCAL, contID: c} + decision := schedDecision{action: EXEC_LOCAL, contID: c, useWarm: warmStart} r.decisionChannel <- decision } diff --git a/internal/scheduling/types.go b/internal/scheduling/types.go index dd9e1ab4..7db3089d 100644 --- a/internal/scheduling/types.go +++ b/internal/scheduling/types.go @@ -13,8 +13,9 @@ type scheduledRequest struct { } type completionNotification struct { - fun *function.Function - contID container.ContainerID + fun *function.Function + contID container.ContainerID + executionReport *function.ExecutionReport } // schedDecision wraps a action made by the scheduler. @@ -24,22 +25,13 @@ type schedDecision struct { action action contID container.ContainerID remoteHost string + useWarm bool } type action int64 const ( - DROP action = 0 - EXEC_LOCAL = 1 - EXEC_REMOTE = 2 - BEST_EFFORT_EXECUTION = 3 -) - -type schedulingDecision int64 - -const ( - SCHED_DROP schedulingDecision = 0 - SCHED_REMOTE = 1 - SCHED_LOCAL = 2 - SCHED_BASIC = 3 + DROP action = 0 + EXEC_LOCAL = 1 + EXEC_REMOTE = 2 ) From d79b5b9d75f3265fc7839e38442b3a78ffff42d1 Mon Sep 17 00:00:00 2001 From: Gabriele Russo Russo Date: Mon, 23 Sep 2024 18:57:37 +0200 Subject: [PATCH 3/5] Restores onCompletion() call --- cmd/serverledge/main.go | 2 -- internal/scheduling/cloudonly_policy.go | 4 ++- internal/scheduling/custom1policy.go | 37 ------------------------- internal/scheduling/edgeCloudPolicy.go | 3 +- internal/scheduling/edgeOnlyPolicy.go | 3 +- internal/scheduling/execution.go | 4 +-- internal/scheduling/policy.go | 4 ++- internal/scheduling/policy_default.go | 3 +- internal/scheduling/scheduler.go | 20 ++++++------- 9 files changed, 24 insertions(+), 56 deletions(-) delete mode 100644 internal/scheduling/custom1policy.go diff --git a/cmd/serverledge/main.go b/cmd/serverledge/main.go index 16d3b157..80ee04f3 100644 --- a/cmd/serverledge/main.go +++ b/cmd/serverledge/main.go @@ -157,8 +157,6 @@ func createSchedulingPolicy() scheduling.Policy { return &scheduling.CloudEdgePolicy{} } else if policyConf == "edgeonly" { return &scheduling.EdgePolicy{} - } else if policyConf == "custom1" { - return &scheduling.Custom1Policy{} } else { return &scheduling.DefaultLocalPolicy{} } diff --git a/internal/scheduling/cloudonly_policy.go b/internal/scheduling/cloudonly_policy.go index 69b658c3..abf58759 100644 --- a/internal/scheduling/cloudonly_policy.go +++ b/internal/scheduling/cloudonly_policy.go @@ -1,11 +1,13 @@ package scheduling +import "github.com/grussorusso/serverledge/internal/function" + type CloudOnlyPolicy struct{} func (p *CloudOnlyPolicy) Init() { } -func (p *CloudOnlyPolicy) OnCompletion(_ *scheduledRequest) { +func (p *CloudOnlyPolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) { } diff --git a/internal/scheduling/custom1policy.go b/internal/scheduling/custom1policy.go deleted file mode 100644 index 09635d96..00000000 --- a/internal/scheduling/custom1policy.go +++ /dev/null @@ -1,37 +0,0 @@ -package scheduling - -import ( - "github.com/grussorusso/serverledge/internal/function" - "github.com/grussorusso/serverledge/internal/node" -) - -type Custom1Policy struct { -} - -func (p *Custom1Policy) Init() { -} - -func (p *Custom1Policy) OnCompletion(_ *scheduledRequest) { - -} - -func (p *Custom1Policy) OnArrival(r *scheduledRequest) { - - containerID, err := node.AcquireWarmContainer(r.Fun) - if err == nil { - execLocally(r, containerID, true) - } else if handleColdStart(r) { - return - } else if r.CanDoOffloading && r.RequestQoS.Class == function.HIGH_PERFORMANCE { - url := pickEdgeNodeForOffloading(r) - if url != "" { - handleOffload(r, url) - } else { - dropRequest(r) - } - } else if r.CanDoOffloading { - handleCloudOffload(r) - } else { - dropRequest(r) - } -} diff --git a/internal/scheduling/edgeCloudPolicy.go b/internal/scheduling/edgeCloudPolicy.go index 5ca748a8..1bc02afd 100644 --- a/internal/scheduling/edgeCloudPolicy.go +++ b/internal/scheduling/edgeCloudPolicy.go @@ -1,6 +1,7 @@ package scheduling import ( + "github.com/grussorusso/serverledge/internal/function" "github.com/grussorusso/serverledge/internal/node" ) @@ -10,7 +11,7 @@ type CloudEdgePolicy struct{} func (p *CloudEdgePolicy) Init() { } -func (p *CloudEdgePolicy) OnCompletion(_ *scheduledRequest) { +func (p *CloudEdgePolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) { } diff --git a/internal/scheduling/edgeOnlyPolicy.go b/internal/scheduling/edgeOnlyPolicy.go index 76138fc1..0cdd1fea 100644 --- a/internal/scheduling/edgeOnlyPolicy.go +++ b/internal/scheduling/edgeOnlyPolicy.go @@ -1,6 +1,7 @@ package scheduling import ( + "github.com/grussorusso/serverledge/internal/function" "log" "github.com/grussorusso/serverledge/internal/node" @@ -12,7 +13,7 @@ type EdgePolicy struct{} func (p *EdgePolicy) Init() { } -func (p *EdgePolicy) OnCompletion(_ *scheduledRequest) { +func (p *EdgePolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) { } diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index 085315a3..1e1baa05 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -38,13 +38,13 @@ func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (fu response, invocationWait, err := container.Execute(contID, &req) if err != nil { // notify scheduler - completions <- &completionNotification{fun: r.Fun, contID: contID} + completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: nil} return function.ExecutionReport{}, fmt.Errorf("[%s] Execution failed: %v", r, err) } if !response.Success { // notify scheduler - completions <- &completionNotification{fun: r.Fun, contID: contID} + completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: nil} return function.ExecutionReport{}, fmt.Errorf("Function execution failed") } diff --git a/internal/scheduling/policy.go b/internal/scheduling/policy.go index d9697e8e..2bc6d279 100644 --- a/internal/scheduling/policy.go +++ b/internal/scheduling/policy.go @@ -1,7 +1,9 @@ package scheduling +import "github.com/grussorusso/serverledge/internal/function" + type Policy interface { Init() - OnCompletion(request *scheduledRequest) + OnCompletion(fun *function.Function, executionReport *function.ExecutionReport) OnArrival(request *scheduledRequest) } diff --git a/internal/scheduling/policy_default.go b/internal/scheduling/policy_default.go index 763c74f1..346721b4 100644 --- a/internal/scheduling/policy_default.go +++ b/internal/scheduling/policy_default.go @@ -2,6 +2,7 @@ package scheduling import ( "errors" + "github.com/grussorusso/serverledge/internal/function" "log" "github.com/grussorusso/serverledge/internal/config" @@ -22,7 +23,7 @@ func (p *DefaultLocalPolicy) Init() { } } -func (p *DefaultLocalPolicy) OnCompletion(_ *scheduledRequest) { +func (p *DefaultLocalPolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) { if p.queue == nil { return } diff --git a/internal/scheduling/scheduler.go b/internal/scheduling/scheduler.go index 8c9de9b6..3fbaeefb 100644 --- a/internal/scheduling/scheduler.go +++ b/internal/scheduling/scheduler.go @@ -3,6 +3,7 @@ package scheduling import ( "errors" "fmt" + "github.com/grussorusso/serverledge/internal/metrics" "log" "net/http" "runtime" @@ -27,7 +28,7 @@ func Run(p Policy) { requests = make(chan *scheduledRequest, 500) completions = make(chan *completionNotification, 500) - // initialize Resources resources + // initialize Resources availableCores := runtime.NumCPU() node.Resources.AvailableMemMB = int64(config.GetInt(config.POOL_MEMORY_MB, 1024)) node.Resources.AvailableCPUs = config.GetFloat(config.POOL_CPUS, float64(availableCores)) @@ -62,15 +63,14 @@ func Run(p Policy) { go p.OnArrival(r) case c = <-completions: node.ReleaseContainer(c.contID, c.fun) - //p.OnCompletion(c.scheduledRequest) // TODO: restore - - // TODO: restore - //if metrics.Enabled { - // metrics.AddCompletedInvocation(c.Fun.Name) - // if c.ExecReport.SchedAction != SCHED_ACTION_OFFLOAD { - // metrics.AddFunctionDurationValue(c.Fun.Name, c.ExecReport.Duration) - // } - //} + p.OnCompletion(c.fun, c.executionReport) + + if metrics.Enabled && c.executionReport != nil { + metrics.AddCompletedInvocation(c.fun.Name) + if c.executionReport.SchedAction != SCHED_ACTION_OFFLOAD { + metrics.AddFunctionDurationValue(c.fun.Name, c.executionReport.Duration) + } + } } } From 27d0c9f46f655734559083a8abdf7cc4bb058fce Mon Sep 17 00:00:00 2001 From: Gabriele Russo Russo Date: Mon, 23 Sep 2024 19:00:47 +0200 Subject: [PATCH 4/5] Drops unused priority field --- internal/scheduling/types.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/scheduling/types.go b/internal/scheduling/types.go index 7db3089d..d48c474f 100644 --- a/internal/scheduling/types.go +++ b/internal/scheduling/types.go @@ -9,7 +9,6 @@ import ( type scheduledRequest struct { *function.Request decisionChannel chan schedDecision - priority float64 } type completionNotification struct { From e75a5da94c244db535d375ff1f4bd97726460383 Mon Sep 17 00:00:00 2001 From: Gabriele Russo Russo Date: Mon, 23 Sep 2024 19:06:30 +0200 Subject: [PATCH 5/5] Minor fix --- images/nodejs17ng/executor.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/images/nodejs17ng/executor.js b/images/nodejs17ng/executor.js index 6a2eac6f..c36d5d50 100644 --- a/images/nodejs17ng/executor.js +++ b/images/nodejs17ng/executor.js @@ -22,6 +22,7 @@ http.createServer(async (request, response) => { var handler = reqbody["Handler"] var handler_dir = reqbody["HandlerDir"] var params = reqbody["Params"] + var return_output = reqbody["ReturnOutput"] var context = {} if (process.env.CONTEXT !== "undefined") { @@ -35,7 +36,11 @@ http.createServer(async (request, response) => { resp = {} resp["Result"] = JSON.stringify(result); resp["Success"] = true - resp["Output"] = "Output capture not supported for this runtime yet." + if (return_output === true) { + resp["Output"] = "Output capture not supported for this runtime yet." + } else { + resp["Output"] = "" + } response.writeHead(200, { 'Content-Type': contentType });