Skip to content

Commit 6c76c5d

Browse files
Feature: Add metrics and uPlot chart for viewing metrics
1 parent 87534cb commit 6c76c5d

File tree

39 files changed

+1038
-142
lines changed

39 files changed

+1038
-142
lines changed

.local.env

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
CLOUDNESS_TRACE=true
22
CLOUDNESS_ENVIRONMENT=local
33
CLOUDNESS_ACME_USE_STAGING=true
4-
# CLOUDNESS_PUBSUB_PROVIDER=redis
4+
# CLOUDNESS_PUBSUB_PROVIDER=redis
5+
6+
# CLOUDNESS_DATABASE_DRIVER=postgres
7+
# CLOUDNESS_DATABASE_DATASOURCE=postgres://postgres:postgres@localhost:5432/cloudness?sslmode=disable

app/controller/application/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Controller struct {
2727
schemaSvc *schema.Service
2828
specSvc *spec.Service
2929
applicationStore store.ApplicationStore
30+
metricsStore store.MetricsStore
3031
serverCtrl *server.Controller
3132
varCtrl *variable.Controller
3233
gitPublicCtrl *gitpublic.Controller
@@ -42,6 +43,7 @@ func NewController(
4243
schemaSvc *schema.Service,
4344
specSvc *spec.Service,
4445
applicationStore store.ApplicationStore,
46+
metricsStore store.MetricsStore,
4547
serverCtrl *server.Controller,
4648
varCtrl *variable.Controller,
4749
gitPublicCtrl *gitpublic.Controller,
@@ -56,6 +58,7 @@ func NewController(
5658
schemaSvc: schemaSvc,
5759
specSvc: specSvc,
5860
applicationStore: applicationStore,
61+
metricsStore: metricsStore,
5962
serverCtrl: serverCtrl,
6063
varCtrl: varCtrl,
6164
gitPublicCtrl: gitPublicCtrl,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package application
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/cloudness-io/cloudness/types"
9+
"github.com/cloudness-io/cloudness/types/enum"
10+
)
11+
12+
func (c *Controller) GetMetrics(ctx context.Context, application *types.Application, span enum.MetricsSpan) (*types.AppMetricsViewModel, error) {
13+
from, to, bucket := getTimeFromSpan(span)
14+
15+
metrics, err := c.metricsStore.ListByApplicationUID(ctx, application.UID, from, to, bucket)
16+
if err != nil {
17+
return nil, err
18+
}
19+
20+
return convertToMetricsView(application, metrics), nil
21+
}
22+
23+
func convertToMetricsView(application *types.Application, metrics []*types.AppMetricsAggregate) *types.AppMetricsViewModel {
24+
cpuInstanceData := make(map[string]map[int64]float64)
25+
memInstanceData := make(map[string]map[int64]float64)
26+
allTimestamps := make(map[int64]bool)
27+
instanceNameMap := make(map[string]string)
28+
var bytesToMB float64 = 1024 * 1024
29+
30+
for _, db := range metrics {
31+
if _, ok := instanceNameMap[db.InstanceName]; !ok {
32+
instanceNameMap[db.InstanceName] = fmt.Sprintf("%s-%d", application.Name, len(instanceNameMap))
33+
}
34+
instanceName, _ := instanceNameMap[db.InstanceName]
35+
if _, ok := cpuInstanceData[instanceName]; !ok {
36+
cpuInstanceData[instanceName] = make(map[int64]float64)
37+
memInstanceData[instanceName] = make(map[int64]float64)
38+
}
39+
cpuInstanceData[instanceName][db.BucketTimestamp] = db.CPU
40+
memInstanceData[instanceName][db.BucketTimestamp] = db.Memory
41+
allTimestamps[db.BucketTimestamp] = true
42+
}
43+
44+
// Create sorted timestamp array
45+
timestamps := make([]int64, 0, len(allTimestamps))
46+
for t := range allTimestamps {
47+
timestamps = append(timestamps, t)
48+
}
49+
50+
// Sort timestamps
51+
for i := 0; i < len(timestamps)-1; i++ {
52+
for j := i + 1; j < len(timestamps); j++ {
53+
if timestamps[i] > timestamps[j] {
54+
timestamps[i], timestamps[j] = timestamps[j], timestamps[i]
55+
}
56+
}
57+
}
58+
59+
// Build CPU series
60+
cpuSeries := make([]*types.MetricsSeriesViewModel, 0, len(cpuInstanceData))
61+
for instanceName, data := range cpuInstanceData {
62+
seriesValue := make([]float64, len(timestamps))
63+
for i, ts := range timestamps {
64+
if val, ok := data[ts]; ok {
65+
seriesValue[i] = val
66+
} else {
67+
seriesValue[i] = 0
68+
}
69+
}
70+
71+
cpuSeries = append(cpuSeries, &types.MetricsSeriesViewModel{
72+
Label: instanceName,
73+
Timestamps: timestamps,
74+
Values: seriesValue,
75+
})
76+
}
77+
78+
// Memory series
79+
memSeries := make([]*types.MetricsSeriesViewModel, 0, len(memInstanceData))
80+
for instanceName, data := range memInstanceData {
81+
seriesValue := make([]float64, len(timestamps))
82+
for i, ts := range timestamps {
83+
if val, ok := data[ts]; ok {
84+
seriesValue[i] = val / float64(bytesToMB)
85+
} else {
86+
seriesValue[i] = 0
87+
}
88+
}
89+
90+
memSeries = append(memSeries, &types.MetricsSeriesViewModel{
91+
Label: instanceName,
92+
Timestamps: timestamps,
93+
Values: seriesValue,
94+
})
95+
}
96+
97+
return &types.AppMetricsViewModel{
98+
CPU: cpuSeries,
99+
Memory: memSeries,
100+
}
101+
}
102+
103+
func getTimeFromSpan(span enum.MetricsSpan) (from time.Time, to time.Time, bucket int64) {
104+
to = time.Now().UTC()
105+
switch span {
106+
case enum.MetricsSpan1h:
107+
from = to.Add(-1 * time.Hour)
108+
bucket = 60 // 1 min
109+
case enum.MetricsSpan6h:
110+
from = to.Add(-6 * time.Hour)
111+
bucket = 2 * 60 // 2 min
112+
case enum.MetricsSpan1d:
113+
from = to.Add(-24 * time.Hour)
114+
bucket = 5 * 60 // 5 min
115+
case enum.MetricsSpan7d:
116+
from = to.Add(-7 * 24 * time.Hour)
117+
bucket = 15 * 60 // 15 min
118+
// case enum.MetricsSpan30d:
119+
// from = to - 30*24*60*60
120+
// bucket = 30 * 24 * 300
121+
}
122+
return
123+
}

app/controller/application/wire.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func ProvideController(
2828
schemaSvc *schema.Service,
2929
specSvc *spec.Service,
3030
applicationStore store.ApplicationStore,
31+
metricsStore store.MetricsStore,
3132
serverCtrl *server.Controller,
3233
varCtrl *variable.Controller,
3334
gitPublicCtrl *gitpublic.Controller,
@@ -42,6 +43,7 @@ func ProvideController(
4243
schemaSvc,
4344
specSvc,
4445
applicationStore,
46+
metricsStore,
4547
serverCtrl,
4648
varCtrl,
4749
gitPublicCtrl,

app/controller/project/create.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/cloudness-io/cloudness/errors"
1010
"github.com/cloudness-io/cloudness/helpers"
1111
"github.com/cloudness-io/cloudness/types"
12+
"github.com/cloudness-io/cloudness/types/enum"
1213

1314
"github.com/rs/zerolog/log"
1415
)
@@ -58,6 +59,12 @@ func (c *Controller) Create(ctx context.Context, session *auth.Session, tenant *
5859
return err
5960
}
6061

62+
if err := c.AddMember(ctx, session, tenant.ID, project.ID, &ProjectMembershipAddModel{
63+
Email: session.Principal.Email,
64+
Role: enum.ProjectRoleOwner,
65+
}); err != nil {
66+
return err
67+
}
6168
_, err = c.envCtrl.Create(ctx, session, tenant, project, &environment.CreateEnvironmentInput{
6269
Name: "Development",
6370
})

app/pipeline/agent/agent.go

Lines changed: 23 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,15 @@ package agent
33
import (
44
"context"
55
"fmt"
6-
"os"
76
"runtime/debug"
8-
"sync"
97
"time"
108

119
"github.com/cloudness-io/cloudness/app/pipeline/manager/client"
12-
"github.com/cloudness-io/cloudness/app/pipeline/runner"
1310
"github.com/cloudness-io/cloudness/app/pipeline/runner/engine"
1411
"github.com/cloudness-io/cloudness/app/pipeline/runner/engine/kubernetes"
12+
"github.com/cloudness-io/cloudness/app/services/manager"
1513
"github.com/cloudness-io/cloudness/types"
14+
"github.com/cloudness-io/cloudness/types/enum"
1615

1716
"github.com/rs/zerolog/log"
1817
)
@@ -24,13 +23,20 @@ var engines = []engine.Engine{
2423
type Agent struct {
2524
serverUID int64
2625
client client.RunnerClient
26+
serverManager manager.ServerManager
2727
stopChan <-chan bool
2828
configUpdated int64
2929
}
3030

31-
func New(client client.RunnerClient) *Agent {
31+
func New(client client.RunnerClient, factory manager.ManagerFactory) *Agent {
32+
serverManager, err := factory.GetServerManagerByType(enum.ServerTypeK8s)
33+
if err != nil {
34+
panic(err)
35+
}
36+
3237
return &Agent{
33-
client: client,
38+
client: client,
39+
serverManager: serverManager,
3440
}
3541
}
3642

@@ -69,6 +75,8 @@ func (a *Agent) Start(ctx context.Context) error {
6975
rCtxCancel() // cancels previous run
7076
}
7177
rCtx, rCtxCancel = context.WithCancel(ctx)
78+
79+
//Build Runner
7280
if config.EnableRunner {
7381
go func() {
7482
if err := a.run(rCtx, *config); err != nil {
@@ -77,6 +85,16 @@ func (a *Agent) Start(ctx context.Context) error {
7785
}
7886
}()
7987
}
88+
89+
//Metrics Scrapper
90+
// if config.EnableMetricsScrapper{
91+
go func() {
92+
if err := a.runMetricsScrapper(rCtx, *config); err != nil {
93+
log.Ctx(ctx).Error().Err(err).Msg("agent: error running metrics scrapper")
94+
return
95+
}
96+
}()
97+
// }
8098
}
8199
}
82100
}()
@@ -107,128 +125,3 @@ func (a *Agent) checkConfigChange(ctx context.Context, configChan chan<- *types.
107125
}
108126
}
109127
}
110-
111-
func (a *Agent) run(ctx context.Context, config types.RunnerConfig) error {
112-
engine, err := engine.FindEngine(engines, config)
113-
if err != nil {
114-
log.Error().Err(err).Msg("agent: error finding engine")
115-
return err
116-
}
117-
118-
_, err = engine.Load(ctx, config)
119-
if err != nil {
120-
log.Ctx(ctx).Error().Err(err).Msgf("agent: error loading %s engine", engine.Type())
121-
return err
122-
}
123-
124-
hostname := config.Hostname
125-
if len(hostname) == 0 {
126-
hostname, _ = os.Hostname()
127-
}
128-
129-
log.Info().Msgf("agent: starting agent with %s engine", engine.Type())
130-
131-
var wg sync.WaitGroup
132-
jobChan := make(chan *types.Deployment)
133-
defer close(jobChan)
134-
// number of runners based on job
135-
for i := range config.ParallelWorkers {
136-
wg.Go(func() {
137-
for {
138-
select {
139-
case <-ctx.Done():
140-
return
141-
case deployment := <-jobChan:
142-
log := log.With().
143-
Int64("agent.thread", i).
144-
Int64("deployment.id", deployment.ID).
145-
Int64("deployment.uid", deployment.UID).
146-
Logger()
147-
deployment.Machine = hostname
148-
if err := a.client.Accept(ctx, deployment); err != nil {
149-
log.Error().Err(err).Msg("agent: unable to accept deployment")
150-
continue
151-
}
152-
153-
log.Trace().Msg("agent: deployment accepted for processing")
154-
155-
runnerContext, err := a.client.Init(ctx, deployment.ID)
156-
if err != nil {
157-
log.Error().Err(err).Msg("agent: unable to initialize deployment")
158-
continue
159-
}
160-
log.Trace().Msg("agent: deployment initialized for processing")
161-
log = log.With().
162-
Int64("application.uid", runnerContext.ApplicationUID).
163-
Int64("deployment.uid", runnerContext.Deployment.UID).
164-
Str("runner.id", runnerContext.RunnerName).
165-
Logger()
166-
167-
runnerCtx, runnerCancel := context.WithCancel(log.WithContext(ctx))
168-
runner := runner.NewRunner(a.client, config, engine)
169-
170-
go func() {
171-
log.Debug().Msg("agent: watch for deployment cancellation")
172-
done, _ := a.client.Watch(ctx, deployment.ID)
173-
if done {
174-
log.Debug().Msg("agent: received cancellation event")
175-
runnerCancel()
176-
} else {
177-
log.Debug().Msg("agent: done listening for cancellation event")
178-
}
179-
}()
180-
181-
runner.Start(runnerCtx, runnerContext)
182-
}
183-
}
184-
})
185-
}
186-
187-
//listing incomplete
188-
runner := runner.NewRunner(a.client, config, engine)
189-
if incomplete, err := runner.ListPending(ctx); err != nil {
190-
log.Ctx(ctx).Error().Err(err).Any("Incomplete", incomplete).Msg("agent: error listing pending jobs, skipping...")
191-
} else {
192-
if len(incomplete) == 0 {
193-
log.Ctx(ctx).Info().Msg("agent: no pending jobs found")
194-
} else {
195-
log.Ctx(ctx).Info().Any("pending", len(incomplete)).Msg("agent: found pending jobs, TODO: yet to be done")
196-
}
197-
}
198-
199-
a.startPolling(ctx, config, jobChan)
200-
wg.Wait()
201-
202-
return nil
203-
}
204-
205-
func (a *Agent) startPolling(ctx context.Context, config types.RunnerConfig, jobChan chan<- *types.Deployment) {
206-
ticker := time.NewTicker(time.Duration(config.PollingInterval) * time.Second)
207-
defer ticker.Stop()
208-
209-
for {
210-
select {
211-
case <-ctx.Done():
212-
return
213-
case <-ticker.C:
214-
deployment, err := a.client.Request(ctx)
215-
if err == context.Canceled || err == context.DeadlineExceeded {
216-
log.Trace().Err(err).Msg("agent: no deployment received")
217-
continue
218-
}
219-
if err != nil {
220-
log.Error().Err(err).Msg("agent: unable to request deployment")
221-
//TODO: max retries?
222-
continue
223-
}
224-
225-
log.Trace().Msg("agent: deployment received")
226-
227-
if deployment == nil || deployment.ID == 0 {
228-
continue
229-
}
230-
231-
jobChan <- deployment
232-
}
233-
}
234-
}

0 commit comments

Comments
 (0)