Skip to content

Commit

Permalink
sql: reduce syncv2 conn pool usage and add pool utilization metrics (#…
Browse files Browse the repository at this point in the history
…6596)

## Motivation

`Database.WithConnection` is always used by syncv2 for any probe/sync operations against peers, but the actual connection it acquires from the pool is not actually used sometimes, when all the necessary information can be retrieved from the `FPTree`. Also, while holding the connection for the duration of sync/probe works best in absolute majority of cases, in case of a very slow peer it may cause unwanted extra utilization of the connection pool.

Another problem is that there's currently no metrics for the actual database connection pool utilization, and connection pool wait latency metrics for the main state database pool, api pool and the local database are not collected separately.
  • Loading branch information
ivan4th committed Jan 14, 2025
1 parent 1c27dca commit 4cf9e0c
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 38 deletions.
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type BaseConfig struct {
DatabaseQueryCache bool `mapstructure:"db-query-cache"`
DatabaseQueryCacheSizes DatabaseQueryCacheSizes `mapstructure:"db-query-cache-sizes"`
DatabaseSchemaAllowDrift bool `mapstructure:"db-allow-schema-drift"`
DatabaseConnIdleTimeout time.Duration `mapstructure:"db-conn-idle-timeout"`

PruneActivesetsFrom types.EpochID `mapstructure:"prune-activesets-from"`

Expand Down Expand Up @@ -240,9 +241,10 @@ func defaultBaseConfig() BaseConfig {
ATXBlob: 10000,
ActiveSetBlob: 200,
},
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,

PprofHTTPServerListener: "localhost:6060",
}
Expand Down
17 changes: 9 additions & 8 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func MainnetConfig() Config {

return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
DatabaseVacuumState: 21,
PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned
NetworkHRP: "sm",
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
DatabaseVacuumState: 21,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned
NetworkHRP: "sm",

LayerDuration: 5 * time.Minute,
LayerAvgSize: 50,
Expand Down
1 change: 1 addition & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func testnet() config.Config {
DatabaseConnections: 16,
DatabaseSizeMeteringInterval: 10 * time.Minute,
DatabasePruneInterval: 30 * time.Minute,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
NetworkHRP: "stest",

LayerDuration: 5 * time.Minute,
Expand Down
6 changes: 6 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
atxs.CacheKindATXBlob: app.Config.DatabaseQueryCacheSizes.ATXBlob,
activesets.CacheKindActiveSetBlob: app.Config.DatabaseQueryCacheSizes.ActiveSetBlob,
}),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("state"),
}
sqlDB, err := statesql.Open("file:"+filepath.Join(dbPath, dbFile), dbopts...)
if err != nil {
Expand All @@ -2033,6 +2035,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
sql.WithConnections(app.Config.API.DatabaseConnections),
sql.WithNoCheckSchemaDrift(), // already checked above
sql.WithMigrationsDisabled(),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("state-api"),
)
if err != nil {
return fmt.Errorf("open sqlite db: %w", err)
Expand Down Expand Up @@ -2081,6 +2085,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
sql.WithDatabaseSchema(lSchema),
sql.WithConnections(app.Config.DatabaseConnections),
sql.WithAllowSchemaDrift(app.Config.DatabaseSchemaAllowDrift),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("local"),
)
if err != nil {
return fmt.Errorf("open sqlite db: %w", err)
Expand Down
146 changes: 127 additions & 19 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-llsqlite/crawshaw/sqlitex"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
)
Expand Down Expand Up @@ -70,6 +71,8 @@ func defaultConf() *conf {
schema: &Schema{},
checkSchemaDrift: true,
handleIncompleteMigrations: true,
connIdleTimeout: 10 * time.Millisecond,
dbName: "sqlite",
}
}

Expand All @@ -91,6 +94,8 @@ type conf struct {
handleIncompleteMigrations bool
exclusive bool
readOnly bool
dbName string
connIdleTimeout time.Duration
}

// WithConnections overwrites number of pooled connections.
Expand Down Expand Up @@ -194,6 +199,23 @@ func WithTemp() Opt {
}
}

// WithDBName sets the name of the database which is used for metrics.
func WithDBName(name string) Opt {
return func(c *conf) {
c.dbName = name
}
}

// WithConnIdleTimeout sets idle timeout for connections from the pool
// which are acquired upon first statement executed against a Connection
// passed to the callback of Database.WithConnection. After the timeout,
// the connection is released back to the pool until the next statement.
func WithConnIdleTimeout(timeout time.Duration) Opt {
return func(c *conf) {
c.connIdleTimeout = timeout
}
}

func withDisableIncompleteMigrationHandling() Opt {
return func(c *conf) {
c.handleIncompleteMigrations = false
Expand Down Expand Up @@ -307,7 +329,7 @@ func openDB(config *conf) (db *sqliteDatabase, err error) {
return nil, fmt.Errorf("create db %s: %w", config.uri, err)
}
}
db = &sqliteDatabase{pool: pool}
db = &sqliteDatabase{pool: pool, connIdleTimeout: config.connIdleTimeout}
defer func() {
// If something goes wrong, close the database even in case of a
// panic. This is important for tests that verify incomplete migration.
Expand All @@ -323,6 +345,8 @@ func openDB(config *conf) (db *sqliteDatabase, err error) {
db.Close()
return nil, err
}
actualDB.connWaitLatency = ConnWaitLatency.WithLabelValues(config.dbName)
actualDB.poolUsage = PoolUsage.WithLabelValues(config.dbName)
return actualDB, nil
}

Expand Down Expand Up @@ -654,19 +678,32 @@ type sqliteDatabase struct {

interceptMtx sync.Mutex
interceptors map[string]Interceptor

connWaitLatency prometheus.Observer
poolUsage prometheus.Gauge

connIdleTimeout time.Duration
}

var _ Database = &sqliteDatabase{}

func (db *sqliteDatabase) getConn(ctx context.Context) *sqlite.Conn {
start := time.Now()
conn := db.pool.Get(ctx)
if conn != nil {
connWaitLatency.Observe(time.Since(start).Seconds())
if conn != nil && db.connWaitLatency != nil {
db.connWaitLatency.Observe(time.Since(start).Seconds())
db.poolUsage.Inc()
}
return conn
}

func (db *sqliteDatabase) putConn(conn *sqlite.Conn) {
db.pool.Put(conn)
if db.poolUsage != nil {
db.poolUsage.Dec()
}
}

func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx, error) {
if db.closed {
return nil, ErrClosed
Expand All @@ -680,7 +717,7 @@ func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx
tx := &sqliteTx{queryCache: db.queryCache, db: db, conn: conn, freeConn: cancel}
if err := tx.begin(initstmt); err != nil {
cancel()
db.pool.Put(conn)
db.putConn(conn)
return nil, err
}
return tx, nil
Expand All @@ -707,7 +744,7 @@ func (db *sqliteDatabase) startExclusive() error {
if conn == nil {
return ErrNoConnection
}
defer db.pool.Put(conn)
defer db.putConn(conn)
// We don't need to wait for long if the database is busy
conn.SetBusyTimeout(1 * time.Millisecond)
// From SQLite docs:
Expand Down Expand Up @@ -787,7 +824,7 @@ func (db *sqliteDatabase) Exec(query string, encoder Encoder, decoder Decoder) (
if conn == nil {
return 0, ErrNoConnection
}
defer db.pool.Put(conn)
defer db.putConn(conn)
if db.latency != nil {
start := time.Now()
defer func() {
Expand All @@ -812,18 +849,15 @@ func (db *sqliteDatabase) Close() error {
}

// WithConnection implements Database.
func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error {
func (db *sqliteDatabase) WithConnection(ctx context.Context, toCall func(Executor) error) error {
if db.closed {
return ErrClosed
}
conCtx, cancel := context.WithCancel(ctx)
defer cancel()
conn := db.getConn(conCtx)
if conn == nil {
return ErrNoConnection
}
defer db.pool.Put(conn)
return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn})
c := newLazyConn(conCtx, db)
defer c.release()
return toCall(c)
}

// Intercept adds an interceptor function to the database. The interceptor functions
Expand Down Expand Up @@ -1120,7 +1154,7 @@ func (tx *sqliteTx) Commit() error {

// Release transaction. Every transaction that was created must be released.
func (tx *sqliteTx) Release() error {
defer tx.db.pool.Put(tx.conn)
defer tx.db.putConn(tx.conn)
if tx.committed {
tx.freeConn()
return nil
Expand All @@ -1147,13 +1181,83 @@ func (tx *sqliteTx) Exec(query string, encoder Encoder, decoder Decoder) (int, e
return exec(tx.conn, query, encoder, decoder)
}

type sqliteConn struct {
// lazyConn is a connection that is acquired lazily from the pool, that is, upon the first
// query, and released after a certain period of inactivity.
type lazyConn struct {
*queryCache
db *sqliteDatabase
conn *sqlite.Conn
db *sqliteDatabase
getConn func() *sqlite.Conn
eg errgroup.Group
conn *sqlite.Conn
timer *time.Timer
doneCh chan struct{}
connMtx sync.Mutex
}

func newLazyConn(ctx context.Context, db *sqliteDatabase) *lazyConn {
return &lazyConn{
queryCache: db.queryCache,
db: db,
getConn: func() *sqlite.Conn {
return db.getConn(ctx)
},
}
}

func (c *lazyConn) ensureConn() *sqlite.Conn {
if c.conn != nil {
return c.conn
}

c.conn = c.getConn()
if c.timer != nil {
c.timer.Reset(c.db.connIdleTimeout)
return c.conn
}
c.timer = time.NewTimer(c.db.connIdleTimeout)
c.doneCh = make(chan struct{})
c.eg.Go(func() error {
for {
select {
case <-c.timer.C:
// Although TryLock docs say that it's not recommended to use it
// in most cases, this use case is justified.
// If the mutex is already locked here, this means that an SQL
// statement is being executed on the connection, after which
// the idle timer will be restarted, or the connection is currently
// being released.
if c.connMtx.TryLock() {
c.releaseConn()
c.connMtx.Unlock()
}
case <-c.doneCh:
return nil
}
}
})
return c.conn
}

func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
func (c *lazyConn) releaseConn() {
if c.conn != nil {
c.timer.Stop()
c.db.putConn(c.conn)
c.conn = nil
}
}

func (c *lazyConn) release() {
// Lock the mutex so that we don't get concurrent releaseConn() from the timer handler.
c.connMtx.Lock()
defer c.connMtx.Unlock()
c.releaseConn()
if c.doneCh != nil {
close(c.doneCh)
c.eg.Wait()
}
}

func (c *lazyConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
if err := c.db.runInterceptors(query); err != nil {
return 0, fmt.Errorf("running query interceptors: %w", err)
}
Expand All @@ -1165,7 +1269,11 @@ func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int,
c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start)))
}()
}
return exec(c.conn, query, encoder, decoder)
c.connMtx.Lock()
defer c.connMtx.Unlock()
conn := c.ensureConn()
defer c.timer.Reset(c.db.connIdleTimeout)
return exec(conn, query, encoder, decoder)
}

func mapSqliteError(err error) error {
Expand Down
30 changes: 30 additions & 0 deletions sql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
Expand Down Expand Up @@ -659,3 +660,32 @@ func TestConnection(t *testing.T) {
return errors.New("error")
}))
}

func TestConnection_Idle(t *testing.T) {
dbName := t.Name()
numConns := func() int {
return int(testutil.ToFloat64(PoolUsage.WithLabelValues(dbName)))
}
require.Zero(t, numConns())
db := InMemoryTest(t, WithDBName(dbName))
require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error {
for range 3 {
for range 3 {
_, err := ex.Exec("select 1", nil, func(stmt *Statement) bool {
require.Equal(t, 1, numConns())
return true
})
require.NoError(t, err)
// The connection should still be in use right after the query
require.Equal(t, 1, numConns())
}
// The connection should be released after idle interval,
// but reacquired on the next query
require.Eventually(t, func() bool { return numConns() == 0 },
time.Second, 10*time.Millisecond)
}
return nil
}))

require.Zero(t, numConns())
}
Loading

0 comments on commit 4cf9e0c

Please sign in to comment.