diff --git a/internal/api/api.go b/internal/api/api.go index 3fa06c46..5505dfc5 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -56,7 +56,8 @@ func InvokeFunction(c echo.Context) error { r := requestsPool.Get().(*function.Request) defer requestsPool.Put(r) - r.Fun = fun + r.Fun = &fun + f0 := r.Fun.Name r.Params = invocationRequest.Params r.Arrival = time.Now() r.Class = function.ServiceClass(invocationRequest.QoSClass) @@ -76,6 +77,11 @@ func InvokeFunction(c echo.Context) error { err = scheduling.SubmitRequest(r) + f1 := r.Fun.Name + if f0 != f1 { + panic("assert") + } + if errors.Is(err, node.OutOfResourcesErr) { return c.String(http.StatusTooManyRequests, "") } else if err != nil { @@ -223,7 +229,7 @@ func PrewarmFunction(c echo.Context) error { return c.String(http.StatusNotFound, "Function unknown") } - count, err := node.PrewarmInstances(fun, req.Instances, req.ForceImagePull) + count, err := node.PrewarmInstances(&fun, req.Instances, req.ForceImagePull) if err != nil && !errors.Is(err, node.OutOfResourcesErr) { log.Printf("Failed prewarming: %v\n", err) diff --git a/internal/function/function.go b/internal/function/function.go index fad3bd2e..3c9f8202 100644 --- a/internal/function/function.go +++ b/internal/function/function.go @@ -31,14 +31,14 @@ func getEtcdKey(funcName string) string { } // GetFunction retrieves a Function given its name. -func GetFunction(name string) (*Function, bool) { +func GetFunction(name string) (Function, bool) { val, found := getFromCache(name) if !found { // cache miss f, response := getFromEtcd(name) if !response { - return nil, false + return Function{}, false } //insert a new element to the cache cache.GetCacheInstance().Set(name, f, cache.DefaultExp) @@ -53,37 +53,35 @@ func (f *Function) String() string { return f.Name } -func getFromCache(name string) (*Function, bool) { +func getFromCache(name string) (Function, bool) { localCache := cache.GetCacheInstance() f, found := localCache.Get(name) if !found { - return nil, false + return Function{}, false } //cache hit - //return a safe copy of the function previously obtained - function := *f.(*Function) - return &function, true + return f.(Function), true } -func getFromEtcd(name string) (*Function, bool) { +func getFromEtcd(name string) (Function, bool) { cli, err := utils.GetEtcdClient() if err != nil { - return nil, false + return Function{}, false } ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) getResponse, err := cli.Get(ctx, getEtcdKey(name)) if err != nil || len(getResponse.Kvs) < 1 { - return nil, false + return Function{}, false } var f Function err = json.Unmarshal(getResponse.Kvs[0].Value, &f) if err != nil { - return nil, false + return f, false } - return &f, true + return f, true } func (f *Function) SaveToEtcd() error { @@ -103,7 +101,7 @@ func (f *Function) SaveToEtcd() error { } // Add the function to the local cache - cache.GetCacheInstance().Set(f.Name, f, cache.DefaultExp) + cache.GetCacheInstance().Set(f.Name, *f, cache.DefaultExp) return nil } diff --git a/internal/node/pool.go b/internal/node/pool.go index 479d5888..4592ac09 100644 --- a/internal/node/pool.go +++ b/internal/node/pool.go @@ -42,11 +42,10 @@ func (fp *ContainerPool) getWarmContainer() (container.ContainerID, bool) { return "", false } - fp.ready.Remove(elem) - contID := elem.Value.(warmContainer).contID - fp.putBusyContainer(contID) + wc := fp.ready.Remove(elem).(warmContainer) + fp.putBusyContainer(wc.contID) - return contID, true + return wc.contID, true } func (fp *ContainerPool) putBusyContainer(contID container.ContainerID) { @@ -127,12 +126,13 @@ func AcquireWarmContainer(f *function.Function) (container.ContainerID, error) { return "", OutOfResourcesErr } - //log.Printf("Acquired resources for warm container. Now: %v", Resources) + log.Printf("Using warm %s for %s. Now: %v", contID, f, Resources) return contID, nil } // ReleaseContainer puts a container in the ready pool for a function. func ReleaseContainer(contID container.ContainerID, f *function.Function) { + log.Printf("I have to remove: %s for %s\n", contID, f) // setup Expiration as time duration from now d := time.Duration(config.GetInt(config.CONTAINER_EXPIRATION_TIME, 600)) * time.Second expTime := time.Now().Add(d).UnixNano() @@ -143,14 +143,26 @@ func ReleaseContainer(contID container.ContainerID, f *function.Function) { fp := getFunctionPool(f) // we must update the busy list by removing this element + log.Printf("I have to remove: %s for %s\n", contID, f) elem := fp.busy.Front() + for ok := elem != nil; ok; ok = elem != nil { + log.Printf("I have: %v\n", elem.Value) + elem = elem.Next() + } + + var removed container.ContainerID = "" + elem = fp.busy.Front() for ok := elem != nil; ok; ok = elem != nil { if elem.Value.(container.ContainerID) == contID { - fp.busy.Remove(elem) // delete the element from the busy list + removed = fp.busy.Remove(elem).(container.ContainerID) // delete the element from the busy list + log.Printf("Removed: %v\n", removed) break } elem = elem.Next() } + if removed == "" { + panic("nothing was removed") + } fp.putReadyContainer(contID, expTime) diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index d73046f4..c366ad08 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -2,6 +2,7 @@ package scheduling import ( "fmt" + "log" "time" "github.com/grussorusso/serverledge/internal/container" @@ -12,7 +13,7 @@ const HANDLER_DIR = "/app" // Execute serves a request on the specified container. func Execute(contID container.ContainerID, r *scheduledRequest) error { - //log.Printf("[%s] Executing on container: %v", r, contID) + log.Printf("[%s] Executing on container: %v", r.Fun, contID) var req executor.InvocationRequest if r.Fun.Runtime == container.CUSTOM_RUNTIME { @@ -36,13 +37,13 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error { response, invocationWait, err := container.Execute(contID, &req) if err != nil { // notify scheduler - completions <- &completion{scheduledRequest: r, contID: contID} + completions <- &completion{fun: r.Fun, contID: contID} return fmt.Errorf("[%s] Execution failed: %v", r, err) } if !response.Success { // notify scheduler - completions <- &completion{scheduledRequest: r, contID: contID} + completions <- &completion{fun: r.Fun, contID: contID} return fmt.Errorf("Function execution failed") } @@ -56,7 +57,7 @@ func Execute(contID container.ContainerID, r *scheduledRequest) error { r.ExecReport.InitTime += invocationWait.Seconds() // notify scheduler - completions <- &completion{scheduledRequest: r, contID: contID} + completions <- &completion{fun: r.Fun, contID: contID, duration: r.ExecReport.Duration} return nil } diff --git a/internal/scheduling/scheduler.go b/internal/scheduling/scheduler.go index 9fafdfc7..b86a6330 100644 --- a/internal/scheduling/scheduler.go +++ b/internal/scheduling/scheduler.go @@ -8,7 +8,6 @@ import ( "runtime" "time" - "github.com/grussorusso/serverledge/internal/metrics" "github.com/grussorusso/serverledge/internal/node" "github.com/grussorusso/serverledge/internal/config" @@ -63,15 +62,16 @@ func Run(p Policy) { case r = <-requests: go p.OnArrival(r) case c = <-completions: - node.ReleaseContainer(c.contID, c.Fun) - p.OnCompletion(c.scheduledRequest) - - if metrics.Enabled { - metrics.AddCompletedInvocation(c.Fun.Name) - if c.ExecReport.SchedAction != SCHED_ACTION_OFFLOAD { - metrics.AddFunctionDurationValue(c.Fun.Name, c.ExecReport.Duration) - } - } + log.Printf("Completed %s on %s\n", c.fun, c.contID) + node.ReleaseContainer(c.contID, c.fun) + //p.OnCompletion(c.scheduledRequest) // TODO: restore! + + //if metrics.Enabled { + // metrics.AddCompletedInvocation(c.fun.Name) + // if c.ExecReport.SchedAction != SCHED_ACTION_OFFLOAD { + // metrics.AddFunctionDurationValue(c.fun.Name, c.ExecReport.Duration) + // } + //} } } diff --git a/internal/scheduling/types.go b/internal/scheduling/types.go index 931fd6af..5114df54 100644 --- a/internal/scheduling/types.go +++ b/internal/scheduling/types.go @@ -13,8 +13,9 @@ type scheduledRequest struct { } type completion struct { - *scheduledRequest - contID container.ContainerID + fun *function.Function + contID container.ContainerID + duration float64 } // schedDecision wraps a action made by the scheduler.