Skip to content

Commit

Permalink
OpenTelemetry integration
Browse files Browse the repository at this point in the history
  • Loading branch information
grussorusso committed Oct 7, 2024
2 parents 02eb4f5 + 8943c93 commit 8b58d71
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/makefile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: "1.20.0"
go-version: "1.21.0"

- uses: actions/checkout@v2

Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ bin/
.run/
__pycache__*
images/*/executor
*.backup
default.etcd
20 changes: 20 additions & 0 deletions cmd/serverledge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grussorusso/serverledge/internal/metrics"
"github.com/grussorusso/serverledge/internal/registration"
"github.com/grussorusso/serverledge/internal/scheduling"
"github.com/grussorusso/serverledge/internal/telemetry"
"github.com/grussorusso/serverledge/utils"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
Expand Down Expand Up @@ -129,6 +130,25 @@ func main() {

go metrics.Init()

if config.GetBool(config.TRACING_ENABLED, false) {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

tracesOutfile := config.GetString(config.TRACING_OUTFILE, "")
if len(tracesOutfile) < 1 {
tracesOutfile = fmt.Sprintf("traces-%s.json", time.Now().Format("20060102-150405"))
}
log.Printf("Enabling tracing to %s\n", tracesOutfile)
otelShutdown, err := telemetry.SetupOTelSDK(ctx, tracesOutfile)
if err != nil {
log.Fatal(err)
}
// Handle shutdown properly so nothing leaks.
defer func() {
err = errors.Join(err, otelShutdown(context.Background()))
}()
}

e := echo.New()

// Register a signal handler to cleanup things on termination
Expand Down
15 changes: 15 additions & 0 deletions docs/tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Serverledge relies on [OpenTelemetry](https://opentelemetry.io) for optional
request tracing, aimed at performance investigations.

## Enabling tracing

Tracing is disabled by default. It can enabled with the following configuration
line:

tracing.enabled: true

By default, JSON-encoded traces are written to `./traces-<timestamp>.json`.
The following line sets a custom output file:

tracing.outfile: /path/to/file.json

15 changes: 12 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/grussorusso/serverledge

go 1.20
go 1.21

toolchain go1.22.5

require (
github.com/LK4D4/trylock v0.0.0-20191027065348-ff7e133a5c54
Expand All @@ -12,6 +14,9 @@ require (
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.4.0
go.etcd.io/etcd/client/v3 v3.5.1
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
)

Expand All @@ -26,10 +31,12 @@ require (
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down Expand Up @@ -57,11 +64,13 @@ require (
github.com/valyala/fasttemplate v1.2.1 // indirect
go.etcd.io/etcd/api/v3 v3.5.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.1 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
Expand Down
31 changes: 25 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc=
Expand Down Expand Up @@ -357,7 +362,8 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand All @@ -373,8 +379,9 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
Expand Down Expand Up @@ -643,8 +650,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down Expand Up @@ -698,6 +706,16 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 h1:EVSnY9JbEEW92bEkIYOVMw4q1WJxIAGoFTrtYOzWuRQ=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0/go.mod h1:Ea1N1QQryNXpCD0I1fdLibBAIpQuBkznMmkdKrapk1Y=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down Expand Up @@ -902,8 +920,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -1099,8 +1117,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
Expand Down
16 changes: 14 additions & 2 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/grussorusso/serverledge/internal/function"
"github.com/grussorusso/serverledge/internal/node"
"github.com/grussorusso/serverledge/internal/registration"
"github.com/grussorusso/serverledge/internal/telemetry"
"github.com/grussorusso/serverledge/utils"
"go.opentelemetry.io/otel/attribute"

"github.com/grussorusso/serverledge/internal/scheduling"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -64,11 +66,21 @@ func InvokeFunction(c echo.Context) error {
r.CanDoOffloading = invocationRequest.CanDoOffloading
r.Async = invocationRequest.Async
r.ReturnOutput = invocationRequest.ReturnOutput
r.ReqId = fmt.Sprintf("%s-%s%d", fun, node.NodeIdentifier[len(node.NodeIdentifier)-5:], r.Arrival.Nanosecond())

reqId := fmt.Sprintf("%s-%s%d", fun, node.NodeIdentifier[len(node.NodeIdentifier)-5:], r.Arrival.Nanosecond())
r.Ctx = context.WithValue(context.Background(), "ReqId", reqId)

// Tracing
if telemetry.DefaultTracer != nil {
ctx, span := telemetry.DefaultTracer.Start(r.Ctx, "invocation")
r.Ctx = ctx
span.SetAttributes(attribute.String("function", r.Fun.Name))
defer span.End()
}

if r.Async {
go scheduling.SubmitAsyncRequest(r)
return c.JSON(http.StatusOK, function.AsyncResponse{ReqId: r.ReqId})
return c.JSON(http.StatusOK, function.AsyncResponse{ReqId: r.Id()})
}

executionReport, err := scheduling.SubmitRequest(r)
Expand Down
14 changes: 10 additions & 4 deletions internal/config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package config
// Etcd server hostname
const ETCD_ADDRESS = "etcd.address"

//exposed port for serverledge APIs
// exposed port for serverledge APIs
const API_PORT = "api.port"

//REMOTE SERVER URL
// REMOTE SERVER URL
const CLOUD_URL = "cloud.server.url"

// Forces runtime container images to be pulled the first time they are used,
Expand All @@ -28,10 +28,10 @@ const CONTAINER_EXPIRATION_TIME = "container.expiration"
// cache capacity
const CACHE_SIZE = "cache.size"

//cache janitor interval (Seconds) : deletes expired items
// cache janitor interval (Seconds) : deletes expired items
const CACHE_CLEANUP = "cache.cleanup"

//default expiration time assigned to a cache item (Seconds)
// default expiration time assigned to a cache item (Seconds)
const CACHE_ITEM_EXPIRATION = "cache.expiration"

// true if the current server is a remote cloud server
Expand Down Expand Up @@ -64,3 +64,9 @@ const SCHEDULING_POLICY = "scheduler.policy"

// Capacity of the queue (possibly) used by the scheduler
const SCHEDULER_QUEUE_CAPACITY = "scheduler.queue.capacity"

// Enables tracing
const TRACING_ENABLED = "tracing.enabled"

// Custom output file for traces
const TRACING_OUTFILE = "tracing.outfile"
9 changes: 7 additions & 2 deletions internal/function/request.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package function

import (
"context"
"fmt"
"time"
)

// Request represents a single function invocation.
type Request struct {
ReqId string
Ctx context.Context
Fun *Function
Params map[string]interface{}
Arrival time.Time
Expand Down Expand Up @@ -42,8 +43,12 @@ type AsyncResponse struct {
ReqId string
}

func (r *Request) Id() string {
return r.Ctx.Value("ReqId").(string)
}

func (r *Request) String() string {
return fmt.Sprintf("[%s] Rq-%s", r.Fun.Name, r.ReqId)
return fmt.Sprintf("[%s] Rq-%s", r.Fun.Name, r.Id())
}

type ServiceClass int64
Expand Down
29 changes: 23 additions & 6 deletions internal/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package scheduling
import (
"errors"
"fmt"
"github.com/grussorusso/serverledge/internal/metrics"
"log"
"net/http"
"runtime"
"time"

"github.com/grussorusso/serverledge/internal/metrics"

"github.com/grussorusso/serverledge/internal/node"
"github.com/grussorusso/serverledge/internal/telemetry"
"go.opentelemetry.io/otel/trace"

"github.com/grussorusso/serverledge/internal/config"

Expand Down Expand Up @@ -83,13 +86,21 @@ func SubmitRequest(r *function.Request) (function.ExecutionReport, error) {
decisionChannel: make(chan schedDecision, 1)}
requests <- &schedRequest

if telemetry.DefaultTracer != nil {
trace.SpanFromContext(r.Ctx).AddEvent("Scheduling start")
}

// wait on channel for scheduling action
schedDecision, ok := <-schedRequest.decisionChannel
if !ok {
return function.ExecutionReport{}, fmt.Errorf("could not schedule the request")
}
//log.Printf("[%s] Scheduling decision: %v", r, schedDecision)

if telemetry.DefaultTracer != nil {
trace.SpanFromContext(r.Ctx).AddEvent("Scheduling complete")
}

if schedDecision.action == DROP {
//log.Printf("[%s] Dropping request", r)
return function.ExecutionReport{}, node.OutOfResourcesErr
Expand All @@ -111,36 +122,42 @@ func SubmitAsyncRequest(r *function.Request) {
// wait on channel for scheduling action
schedDecision, ok := <-schedRequest.decisionChannel
if !ok {
publishAsyncResponse(r.ReqId, function.Response{Success: false})
publishAsyncResponse(r.Id(), function.Response{Success: false})
return
}

var err error
if schedDecision.action == DROP {
publishAsyncResponse(r.ReqId, function.Response{Success: false})
publishAsyncResponse(r.Id(), function.Response{Success: false})
} else if schedDecision.action == EXEC_REMOTE {
//log.Printf("Offloading request")
err = OffloadAsync(r, schedDecision.remoteHost)
if err != nil {
publishAsyncResponse(r.ReqId, function.Response{Success: false})
publishAsyncResponse(r.Id(), function.Response{Success: false})
}
} else {
report, err := Execute(schedDecision.contID, &schedRequest, schedDecision.useWarm)
if err != nil {
publishAsyncResponse(r.ReqId, function.Response{Success: false})
publishAsyncResponse(r.Id(), function.Response{Success: false})
}
publishAsyncResponse(r.ReqId, function.Response{Success: true, ExecutionReport: report})
publishAsyncResponse(r.Id(), function.Response{Success: true, ExecutionReport: report})
}
}

func handleColdStart(r *scheduledRequest) (isSuccess bool) {
if telemetry.DefaultTracer != nil {
trace.SpanFromContext(r.Ctx).AddEvent("Container init start")
}
newContainer, err := node.NewContainer(r.Fun)
if errors.Is(err, node.OutOfResourcesErr) {
return false
} else if err != nil {
log.Printf("Cold start failed: %v\n", err)
return false
} else {
if telemetry.DefaultTracer != nil {
trace.SpanFromContext(r.Ctx).AddEvent("Container initialized")
}
execLocally(r, newContainer, false)
return true
}
Expand Down
Loading

0 comments on commit 8b58d71

Please sign in to comment.