Skip to content

Commit

Permalink
Debugging a data race
Browse files Browse the repository at this point in the history
  • Loading branch information
grussorusso committed Sep 20, 2024
1 parent 7193b72 commit 4a5aff3
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 37 deletions.
10 changes: 8 additions & 2 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func InvokeFunction(c echo.Context) error {

r := requestsPool.Get().(*function.Request)
defer requestsPool.Put(r)
r.Fun = fun
r.Fun = &fun
f0 := r.Fun.Name
r.Params = invocationRequest.Params
r.Arrival = time.Now()
r.Class = function.ServiceClass(invocationRequest.QoSClass)
Expand All @@ -76,6 +77,11 @@ func InvokeFunction(c echo.Context) error {

err = scheduling.SubmitRequest(r)

f1 := r.Fun.Name
if f0 != f1 {
panic("assert")
}

if errors.Is(err, node.OutOfResourcesErr) {
return c.String(http.StatusTooManyRequests, "")
} else if err != nil {
Expand Down Expand Up @@ -223,7 +229,7 @@ func PrewarmFunction(c echo.Context) error {
return c.String(http.StatusNotFound, "Function unknown")
}

count, err := node.PrewarmInstances(fun, req.Instances, req.ForceImagePull)
count, err := node.PrewarmInstances(&fun, req.Instances, req.ForceImagePull)

if err != nil && !errors.Is(err, node.OutOfResourcesErr) {
log.Printf("Failed prewarming: %v\n", err)
Expand Down
24 changes: 11 additions & 13 deletions internal/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func getEtcdKey(funcName string) string {
}

// GetFunction retrieves a Function given its name.
func GetFunction(name string) (*Function, bool) {
func GetFunction(name string) (Function, bool) {

val, found := getFromCache(name)
if !found {
// cache miss
f, response := getFromEtcd(name)
if !response {
return nil, false
return Function{}, false
}
//insert a new element to the cache
cache.GetCacheInstance().Set(name, f, cache.DefaultExp)
Expand All @@ -53,37 +53,35 @@ func (f *Function) String() string {
return f.Name
}

func getFromCache(name string) (*Function, bool) {
func getFromCache(name string) (Function, bool) {
localCache := cache.GetCacheInstance()
f, found := localCache.Get(name)
if !found {
return nil, false
return Function{}, false
}
//cache hit
//return a safe copy of the function previously obtained
function := *f.(*Function)
return &function, true
return f.(Function), true

}

func getFromEtcd(name string) (*Function, bool) {
func getFromEtcd(name string) (Function, bool) {
cli, err := utils.GetEtcdClient()
if err != nil {
return nil, false
return Function{}, false
}
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
getResponse, err := cli.Get(ctx, getEtcdKey(name))
if err != nil || len(getResponse.Kvs) < 1 {
return nil, false
return Function{}, false
}

var f Function
err = json.Unmarshal(getResponse.Kvs[0].Value, &f)
if err != nil {
return nil, false
return f, false
}

return &f, true
return f, true
}

func (f *Function) SaveToEtcd() error {
Expand All @@ -103,7 +101,7 @@ func (f *Function) SaveToEtcd() error {
}

// Add the function to the local cache
cache.GetCacheInstance().Set(f.Name, f, cache.DefaultExp)
cache.GetCacheInstance().Set(f.Name, *f, cache.DefaultExp)

return nil
}
Expand Down
24 changes: 18 additions & 6 deletions internal/node/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ func (fp *ContainerPool) getWarmContainer() (container.ContainerID, bool) {
return "", false
}

fp.ready.Remove(elem)
contID := elem.Value.(warmContainer).contID
fp.putBusyContainer(contID)
wc := fp.ready.Remove(elem).(warmContainer)
fp.putBusyContainer(wc.contID)

return contID, true
return wc.contID, true
}

func (fp *ContainerPool) putBusyContainer(contID container.ContainerID) {
Expand Down Expand Up @@ -127,12 +126,13 @@ func AcquireWarmContainer(f *function.Function) (container.ContainerID, error) {
return "", OutOfResourcesErr
}

//log.Printf("Acquired resources for warm container. Now: %v", Resources)
log.Printf("Using warm %s for %s. Now: %v", contID, f, Resources)
return contID, nil
}

// ReleaseContainer puts a container in the ready pool for a function.
func ReleaseContainer(contID container.ContainerID, f *function.Function) {
log.Printf("I have to remove: %s for %s\n", contID, f)
// setup Expiration as time duration from now
d := time.Duration(config.GetInt(config.CONTAINER_EXPIRATION_TIME, 600)) * time.Second
expTime := time.Now().Add(d).UnixNano()
Expand All @@ -143,14 +143,26 @@ func ReleaseContainer(contID container.ContainerID, f *function.Function) {
fp := getFunctionPool(f)

// we must update the busy list by removing this element
log.Printf("I have to remove: %s for %s\n", contID, f)
elem := fp.busy.Front()
for ok := elem != nil; ok; ok = elem != nil {
log.Printf("I have: %v\n", elem.Value)
elem = elem.Next()
}

var removed container.ContainerID = ""
elem = fp.busy.Front()
for ok := elem != nil; ok; ok = elem != nil {
if elem.Value.(container.ContainerID) == contID {
fp.busy.Remove(elem) // delete the element from the busy list
removed = fp.busy.Remove(elem).(container.ContainerID) // delete the element from the busy list
log.Printf("Removed: %v\n", removed)
break
}
elem = elem.Next()
}
if removed == "" {
panic("nothing was removed")
}

fp.putReadyContainer(contID, expTime)

Expand Down
9 changes: 5 additions & 4 deletions internal/scheduling/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduling

import (
"fmt"
"log"
"time"

"github.com/grussorusso/serverledge/internal/container"
Expand All @@ -12,7 +13,7 @@ const HANDLER_DIR = "/app"

// Execute serves a request on the specified container.
func Execute(contID container.ContainerID, r *scheduledRequest) error {
//log.Printf("[%s] Executing on container: %v", r, contID)
log.Printf("[%s] Executing on container: %v", r.Fun, contID)

var req executor.InvocationRequest
if r.Fun.Runtime == container.CUSTOM_RUNTIME {
Expand All @@ -36,13 +37,13 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error {
response, invocationWait, err := container.Execute(contID, &req)
if err != nil {
// notify scheduler
completions <- &completion{scheduledRequest: r, contID: contID}
completions <- &completion{fun: r.Fun, contID: contID}
return fmt.Errorf("[%s] Execution failed: %v", r, err)
}

if !response.Success {
// notify scheduler
completions <- &completion{scheduledRequest: r, contID: contID}
completions <- &completion{fun: r.Fun, contID: contID}
return fmt.Errorf("Function execution failed")
}

Expand All @@ -56,7 +57,7 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error {
r.ExecReport.InitTime += invocationWait.Seconds()

// notify scheduler
completions <- &completion{scheduledRequest: r, contID: contID}
completions <- &completion{fun: r.Fun, contID: contID, duration: r.ExecReport.Duration}

return nil
}
20 changes: 10 additions & 10 deletions internal/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"runtime"
"time"

"github.com/grussorusso/serverledge/internal/metrics"
"github.com/grussorusso/serverledge/internal/node"

"github.com/grussorusso/serverledge/internal/config"
Expand Down Expand Up @@ -63,15 +62,16 @@ func Run(p Policy) {
case r = <-requests:
go p.OnArrival(r)
case c = <-completions:
node.ReleaseContainer(c.contID, c.Fun)
p.OnCompletion(c.scheduledRequest)

if metrics.Enabled {
metrics.AddCompletedInvocation(c.Fun.Name)
if c.ExecReport.SchedAction != SCHED_ACTION_OFFLOAD {
metrics.AddFunctionDurationValue(c.Fun.Name, c.ExecReport.Duration)
}
}
log.Printf("Completed %s on %s\n", c.fun, c.contID)
node.ReleaseContainer(c.contID, c.fun)
//p.OnCompletion(c.scheduledRequest) // TODO: restore!

//if metrics.Enabled {
// metrics.AddCompletedInvocation(c.fun.Name)
// if c.ExecReport.SchedAction != SCHED_ACTION_OFFLOAD {
// metrics.AddFunctionDurationValue(c.fun.Name, c.ExecReport.Duration)
// }
//}
}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/scheduling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ type scheduledRequest struct {
}

type completion struct {
*scheduledRequest
contID container.ContainerID
fun *function.Function
contID container.ContainerID
duration float64
}

// schedDecision wraps a action made by the scheduler.
Expand Down

0 comments on commit 4a5aff3

Please sign in to comment.