Skip to content

Commit

Permalink
FIX: Move ExecutionReport out of Request struct
Browse files Browse the repository at this point in the history
Because of a data race, the scheduler used to receive completion
requests where the fields in the Request object were not valid any more.
The data race was due to the fact that Request objects are allocated
by means of a sync.Pool.
To fix the issue and improve code organization, ExecutionReport is now
allocated and returned during function execution, rather than being
pre-allocated within the Request struct.
  • Loading branch information
grussorusso committed Sep 23, 2024
2 parents 7193b72 + e75a5da commit 208860b
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 134 deletions.
2 changes: 0 additions & 2 deletions cmd/serverledge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ func createSchedulingPolicy() scheduling.Policy {
return &scheduling.CloudEdgePolicy{}
} else if policyConf == "edgeonly" {
return &scheduling.EdgePolicy{}
} else if policyConf == "custom1" {
return &scheduling.Custom1Policy{}
} else {
return &scheduling.DefaultLocalPolicy{}
}
Expand Down
7 changes: 6 additions & 1 deletion images/nodejs17ng/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ http.createServer(async (request, response) => {
var handler = reqbody["Handler"]
var handler_dir = reqbody["HandlerDir"]
var params = reqbody["Params"]
var return_output = reqbody["ReturnOutput"]

var context = {}
if (process.env.CONTEXT !== "undefined") {
Expand All @@ -35,7 +36,11 @@ http.createServer(async (request, response) => {
resp = {}
resp["Result"] = JSON.stringify(result);
resp["Success"] = true
resp["Output"] = "Output capture not supported for this runtime yet."
if (return_output === true) {
resp["Output"] = "Output capture not supported for this runtime yet."
} else {
resp["Output"] = ""
}


response.writeHead(200, { 'Content-Type': contentType });
Expand Down
7 changes: 2 additions & 5 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,21 @@ func InvokeFunction(c echo.Context) error {
r.Async = invocationRequest.Async
r.ReturnOutput = invocationRequest.ReturnOutput
r.ReqId = fmt.Sprintf("%s-%s%d", fun, node.NodeIdentifier[len(node.NodeIdentifier)-5:], r.Arrival.Nanosecond())
// init fields if possibly not overwritten later
r.ExecReport.SchedAction = ""
r.ExecReport.OffloadLatency = 0.0

if r.Async {
go scheduling.SubmitAsyncRequest(r)
return c.JSON(http.StatusOK, function.AsyncResponse{ReqId: r.ReqId})
}

err = scheduling.SubmitRequest(r)
executionReport, err := scheduling.SubmitRequest(r)

if errors.Is(err, node.OutOfResourcesErr) {
return c.String(http.StatusTooManyRequests, "")
} else if err != nil {
log.Printf("Invocation failed: %v\n", err)
return c.String(http.StatusInternalServerError, "")
} else {
return c.JSON(http.StatusOK, function.Response{Success: true, ExecutionReport: r.ExecReport})
return c.JSON(http.StatusOK, function.Response{Success: true, ExecutionReport: executionReport})
}
}

Expand Down
9 changes: 4 additions & 5 deletions internal/function/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (

// Request represents a single function invocation.
type Request struct {
ReqId string
Fun *Function
Params map[string]interface{}
Arrival time.Time
ExecReport ExecutionReport
ReqId string
Fun *Function
Params map[string]interface{}
Arrival time.Time
RequestQoS
CanDoOffloading bool
Async bool
Expand Down
15 changes: 9 additions & 6 deletions internal/node/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ func (fp *ContainerPool) getWarmContainer() (container.ContainerID, bool) {
return "", false
}

fp.ready.Remove(elem)
contID := elem.Value.(warmContainer).contID
fp.putBusyContainer(contID)
wc := fp.ready.Remove(elem).(warmContainer)
fp.putBusyContainer(wc.contID)

return contID, true
return wc.contID, true
}

func (fp *ContainerPool) putBusyContainer(contID container.ContainerID) {
Expand Down Expand Up @@ -127,7 +126,7 @@ func AcquireWarmContainer(f *function.Function) (container.ContainerID, error) {
return "", OutOfResourcesErr
}

//log.Printf("Acquired resources for warm container. Now: %v", Resources)
//log.Printf("Using warm %s for %s. Now: %v", contID, f, Resources)
return contID, nil
}

Expand All @@ -143,14 +142,18 @@ func ReleaseContainer(contID container.ContainerID, f *function.Function) {
fp := getFunctionPool(f)

// we must update the busy list by removing this element
var removed container.ContainerID
elem := fp.busy.Front()
for ok := elem != nil; ok; ok = elem != nil {
if elem.Value.(container.ContainerID) == contID {
fp.busy.Remove(elem) // delete the element from the busy list
removed = fp.busy.Remove(elem).(container.ContainerID)
break
}
elem = elem.Next()
}
if removed != contID {
panic("Failed to release container")
}

fp.putReadyContainer(contID, expTime)

Expand Down
4 changes: 3 additions & 1 deletion internal/scheduling/cloudonly_policy.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package scheduling

import "github.com/grussorusso/serverledge/internal/function"

type CloudOnlyPolicy struct{}

func (p *CloudOnlyPolicy) Init() {
}

func (p *CloudOnlyPolicy) OnCompletion(_ *scheduledRequest) {
func (p *CloudOnlyPolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) {

}

Expand Down
37 changes: 0 additions & 37 deletions internal/scheduling/custom1policy.go

This file was deleted.

3 changes: 2 additions & 1 deletion internal/scheduling/edgeCloudPolicy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduling

import (
"github.com/grussorusso/serverledge/internal/function"
"github.com/grussorusso/serverledge/internal/node"
)

Expand All @@ -10,7 +11,7 @@ type CloudEdgePolicy struct{}
func (p *CloudEdgePolicy) Init() {
}

func (p *CloudEdgePolicy) OnCompletion(_ *scheduledRequest) {
func (p *CloudEdgePolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) {

}

Expand Down
3 changes: 2 additions & 1 deletion internal/scheduling/edgeOnlyPolicy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduling

import (
"github.com/grussorusso/serverledge/internal/function"
"log"

"github.com/grussorusso/serverledge/internal/node"
Expand All @@ -12,7 +13,7 @@ type EdgePolicy struct{}
func (p *EdgePolicy) Init() {
}

func (p *EdgePolicy) OnCompletion(_ *scheduledRequest) {
func (p *EdgePolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) {

}

Expand Down
29 changes: 16 additions & 13 deletions internal/scheduling/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduling

import (
"fmt"
"github.com/grussorusso/serverledge/internal/function"
"time"

"github.com/grussorusso/serverledge/internal/container"
Expand All @@ -11,8 +12,8 @@ import (
const HANDLER_DIR = "/app"

// Execute serves a request on the specified container.
func Execute(contID container.ContainerID, r *scheduledRequest) error {
//log.Printf("[%s] Executing on container: %v", r, contID)
func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (function.ExecutionReport, error) {
//log.Printf("[%s] Executing on container: %v", r.Fun, contID)

var req executor.InvocationRequest
if r.Fun.Runtime == container.CUSTOM_RUNTIME {
Expand All @@ -32,31 +33,33 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error {
}

t0 := time.Now()
initTime := t0.Sub(r.Arrival).Seconds()

response, invocationWait, err := container.Execute(contID, &req)
if err != nil {
// notify scheduler
completions <- &completion{scheduledRequest: r, contID: contID}
return fmt.Errorf("[%s] Execution failed: %v", r, err)
completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: nil}
return function.ExecutionReport{}, fmt.Errorf("[%s] Execution failed: %v", r, err)
}

if !response.Success {
// notify scheduler
completions <- &completion{scheduledRequest: r, contID: contID}
return fmt.Errorf("Function execution failed")
completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: nil}
return function.ExecutionReport{}, fmt.Errorf("Function execution failed")
}

r.ExecReport.Result = response.Result
r.ExecReport.Output = response.Output
r.ExecReport.Duration = time.Now().Sub(t0).Seconds() - invocationWait.Seconds()
r.ExecReport.ResponseTime = time.Now().Sub(r.Arrival).Seconds()
report := function.ExecutionReport{Result: response.Result,
Output: response.Output,
IsWarmStart: isWarm,
Duration: time.Now().Sub(t0).Seconds() - invocationWait.Seconds(),
ResponseTime: time.Now().Sub(t0).Seconds() - invocationWait.Seconds()}

// initializing containers may require invocation retries, adding
// latency
r.ExecReport.InitTime += invocationWait.Seconds()
report.InitTime = initTime + invocationWait.Seconds()

// notify scheduler
completions <- &completion{scheduledRequest: r, contID: contID}
completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: &report}

return nil
return report, nil
}
23 changes: 12 additions & 11 deletions internal/scheduling/offloading.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,27 @@ func pickEdgeNodeForOffloading(r *scheduledRequest) (url string) {
return ""
}

func Offload(r *function.Request, serverUrl string) error {
func Offload(r *function.Request, serverUrl string) (function.ExecutionReport, error) {
// Prepare request
request := client.InvocationRequest{Params: r.Params, QoSClass: int64(r.Class), QoSMaxRespT: r.MaxRespT}
invocationBody, err := json.Marshal(request)
if err != nil {
log.Print(err)
return err
return function.ExecutionReport{}, err
}
sendingTime := time.Now() // used to compute latency later on
resp, err := offloadingClient.Post(serverUrl+"/invoke/"+r.Fun.Name, "application/json",
bytes.NewBuffer(invocationBody))

if err != nil {
log.Print(err)
return err
return function.ExecutionReport{}, err
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusTooManyRequests {
return node.OutOfResourcesErr
return function.ExecutionReport{}, node.OutOfResourcesErr
}
return fmt.Errorf("Remote returned: %v", resp.StatusCode)
return function.ExecutionReport{}, fmt.Errorf("Remote returned: %v", resp.StatusCode)
}

var response function.Response
Expand All @@ -69,18 +69,19 @@ func Offload(r *function.Request, serverUrl string) error {
}(resp.Body)
body, _ := io.ReadAll(resp.Body)
if err = json.Unmarshal(body, &response); err != nil {
return err
return function.ExecutionReport{}, err
}
r.ExecReport = response.ExecutionReport
now := time.Now()
response.ExecutionReport.ResponseTime = now.Sub(r.Arrival).Seconds()

execReport := &response.ExecutionReport
execReport.ResponseTime = now.Sub(r.Arrival).Seconds()

// TODO: check how this is used in the QoSAware policy
// It was originially computed as "report.Arrival - sendingTime"
r.ExecReport.OffloadLatency = now.Sub(sendingTime).Seconds() - r.ExecReport.Duration - r.ExecReport.InitTime
r.ExecReport.SchedAction = SCHED_ACTION_OFFLOAD
execReport.OffloadLatency = now.Sub(sendingTime).Seconds() - execReport.Duration - execReport.InitTime
execReport.SchedAction = SCHED_ACTION_OFFLOAD

return nil
return response.ExecutionReport, nil
}

func OffloadAsync(r *function.Request, serverUrl string) error {
Expand Down
4 changes: 3 additions & 1 deletion internal/scheduling/policy.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package scheduling

import "github.com/grussorusso/serverledge/internal/function"

type Policy interface {
Init()
OnCompletion(request *scheduledRequest)
OnCompletion(fun *function.Function, executionReport *function.ExecutionReport)
OnArrival(request *scheduledRequest)
}
3 changes: 2 additions & 1 deletion internal/scheduling/policy_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduling

import (
"errors"
"github.com/grussorusso/serverledge/internal/function"
"log"

"github.com/grussorusso/serverledge/internal/config"
Expand All @@ -22,7 +23,7 @@ func (p *DefaultLocalPolicy) Init() {
}
}

func (p *DefaultLocalPolicy) OnCompletion(_ *scheduledRequest) {
func (p *DefaultLocalPolicy) OnCompletion(_ *function.Function, _ *function.ExecutionReport) {
if p.queue == nil {
return
}
Expand Down
Loading

0 comments on commit 208860b

Please sign in to comment.