Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow goroutines not launched by the tracer to stop on tracer.Stop() #3025

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
33 changes: 20 additions & 13 deletions contrib/database/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,27 @@ var interval = 10 * time.Second

// pollDBStats calls (*DB).Stats on the db at a predetermined interval. It pushes the DBStats off to the statsd client.
// the caller should always ensure that db & statsd are non-nil
func pollDBStats(statsd internal.StatsdClient, db *sql.DB) {
func pollDBStats(statsd internal.StatsdClient, db *sql.DB, stop chan struct{}) {
log.Debug("DB stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("Reporting DB.Stats metrics...")
stat := db.Stats()
statsd.Gauge(MaxOpenConnections, float64(stat.MaxOpenConnections), []string{}, 1)
statsd.Gauge(OpenConnections, float64(stat.OpenConnections), []string{}, 1)
statsd.Gauge(InUse, float64(stat.InUse), []string{}, 1)
statsd.Gauge(Idle, float64(stat.Idle), []string{}, 1)
statsd.Gauge(WaitCount, float64(stat.WaitCount), []string{}, 1)
statsd.Timing(WaitDuration, stat.WaitDuration, []string{}, 1)
statsd.Gauge(MaxIdleClosed, float64(stat.MaxIdleClosed), []string{}, 1)
statsd.Gauge(MaxIdleTimeClosed, float64(stat.MaxIdleTimeClosed), []string{}, 1)
statsd.Gauge(MaxLifetimeClosed, float64(stat.MaxLifetimeClosed), []string{}, 1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debug("Reporting DB.Stats metrics...")
stat := db.Stats()
statsd.Gauge(MaxOpenConnections, float64(stat.MaxOpenConnections), []string{}, 1)
statsd.Gauge(OpenConnections, float64(stat.OpenConnections), []string{}, 1)
statsd.Gauge(InUse, float64(stat.InUse), []string{}, 1)
statsd.Gauge(Idle, float64(stat.Idle), []string{}, 1)
statsd.Gauge(WaitCount, float64(stat.WaitCount), []string{}, 1)
statsd.Timing(WaitDuration, stat.WaitDuration, []string{}, 1)
statsd.Gauge(MaxIdleClosed, float64(stat.MaxIdleClosed), []string{}, 1)
statsd.Gauge(MaxIdleTimeClosed, float64(stat.MaxIdleTimeClosed), []string{}, 1)
statsd.Gauge(MaxLifetimeClosed, float64(stat.MaxLifetimeClosed), []string{}, 1)
case <-stop:
return
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion contrib/database/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

sqlinternal "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql/internal"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
)
Expand Down Expand Up @@ -211,7 +212,7 @@ func OpenDB(c driver.Connector, opts ...Option) *sql.DB {
}
db := sql.OpenDB(tc)
if cfg.dbStats && cfg.statsdClient != nil {
go pollDBStats(cfg.statsdClient, db)
go pollDBStats(cfg.statsdClient, db, contribroutines.GetStopChan())
}
return db
}
Expand Down
39 changes: 23 additions & 16 deletions contrib/jackc/pgx.v5/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,30 @@ const (
var interval = 10 * time.Second

// pollPoolStats calls (*pgxpool).Stats on the pool at a predetermined interval. It pushes the pool Stats off to the statsd client.
func pollPoolStats(statsd internal.StatsdClient, pool *pgxpool.Pool) {
func pollPoolStats(statsd internal.StatsdClient, pool *pgxpool.Pool, stop chan struct{}) {
log.Debug("contrib/jackc/pgx.v5: Traced pool connection found: Pool stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("contrib/jackc/pgx.v5: Reporting pgxpool.Stat metrics...")
stat := pool.Stat()
statsd.Gauge(AcquireCount, float64(stat.AcquireCount()), []string{}, 1)
statsd.Timing(AcquireDuration, stat.AcquireDuration(), []string{}, 1)
statsd.Gauge(AcquiredConns, float64(stat.AcquiredConns()), []string{}, 1)
statsd.Gauge(CanceledAcquireCount, float64(stat.CanceledAcquireCount()), []string{}, 1)
statsd.Gauge(ConstructingConns, float64(stat.ConstructingConns()), []string{}, 1)
statsd.Gauge(EmptyAcquireCount, float64(stat.EmptyAcquireCount()), []string{}, 1)
statsd.Gauge(IdleConns, float64(stat.IdleConns()), []string{}, 1)
statsd.Gauge(MaxConns, float64(stat.MaxConns()), []string{}, 1)
statsd.Gauge(TotalConns, float64(stat.TotalConns()), []string{}, 1)
statsd.Gauge(NewConnsCount, float64(stat.NewConnsCount()), []string{}, 1)
statsd.Gauge(MaxLifetimeDestroyCount, float64(stat.MaxLifetimeDestroyCount()), []string{}, 1)
statsd.Gauge(MaxIdleDestroyCount, float64(stat.MaxIdleDestroyCount()), []string{}, 1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debug("contrib/jackc/pgx.v5: Reporting pgxpool.Stat metrics...")
stat := pool.Stat()
statsd.Gauge(AcquireCount, float64(stat.AcquireCount()), []string{}, 1)
statsd.Timing(AcquireDuration, stat.AcquireDuration(), []string{}, 1)
statsd.Gauge(AcquiredConns, float64(stat.AcquiredConns()), []string{}, 1)
statsd.Gauge(CanceledAcquireCount, float64(stat.CanceledAcquireCount()), []string{}, 1)
statsd.Gauge(ConstructingConns, float64(stat.ConstructingConns()), []string{}, 1)
statsd.Gauge(EmptyAcquireCount, float64(stat.EmptyAcquireCount()), []string{}, 1)
statsd.Gauge(IdleConns, float64(stat.IdleConns()), []string{}, 1)
statsd.Gauge(MaxConns, float64(stat.MaxConns()), []string{}, 1)
statsd.Gauge(TotalConns, float64(stat.TotalConns()), []string{}, 1)
statsd.Gauge(NewConnsCount, float64(stat.NewConnsCount()), []string{}, 1)
statsd.Gauge(MaxLifetimeDestroyCount, float64(stat.MaxLifetimeDestroyCount()), []string{}, 1)
statsd.Gauge(MaxIdleDestroyCount, float64(stat.MaxIdleDestroyCount()), []string{}, 1)
case <-stop:
return
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion contrib/jackc/pgx.v5/pgxpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"

"github.com/jackc/pgx/v5/pgxpool"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
)

func NewPool(ctx context.Context, connString string, opts ...Option) (*pgxpool.Pool, error) {
Expand All @@ -30,7 +31,7 @@ func NewPoolWithConfig(ctx context.Context, config *pgxpool.Config, opts ...Opti
return nil, err
}
if tracer.cfg.poolStats && tracer.cfg.statsdClient != nil {
go pollPoolStats(tracer.cfg.statsdClient, pool)
go pollPoolStats(tracer.cfg.statsdClient, pool, contribroutines.GetStopChan())
}
return pool, nil
}
2 changes: 2 additions & 0 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec"
appsecConfig "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/config"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
"gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/internal/hostname"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -710,6 +711,7 @@ func (t *tracer) Stop() {
if t.logFile != nil {
t.logFile.Close()
}
contribroutines.Stop()
}

// Inject uses the configured or default TextMap Propagator.
Expand Down
27 changes: 27 additions & 0 deletions internal/contribroutines/contribroutines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
package contribroutines

import "sync"

var (
stop chan struct{} = make(chan struct{})
once sync.Once
mu sync.Mutex
)

func Stop() {
mu.Lock()
defer mu.Unlock()
once.Do(func() {
close(stop)
})
}

func GetStopChan() chan struct{} {
mu.Lock()
defer mu.Unlock()
return stop
}
74 changes: 74 additions & 0 deletions internal/contribroutines/contribroutines_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
package contribroutines

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestContribRoutines(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
var done bool
go func() {
doSomething(&wg, &done, GetStopChan())
}()
Stop()
wg.Wait()
assert.True(t, done)
}

func doSomething(wg *sync.WaitGroup, done *bool, stop chan struct{}) {
defer wg.Done()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-stop:
*done = true
return
}
}
}

func TestStopConcurrency(t *testing.T) {
var wg sync.WaitGroup

for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Stop()
}()
}

wg.Wait()

select {
case <-stop:
// channel is closed, so Stop() was called successfully
case <-time.After(1 * time.Second):
t.Error("stop channel was not closed within 1 second")
}
}

func TestGetStopChanConcurrency(t *testing.T) {
var wg sync.WaitGroup

for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
GetStopChan()
}()
}

wg.Wait()
}
Loading