Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,11 +797,8 @@ func (fmd *FakeMysqlDaemon) AcquireGlobalReadLock(ctx context.Context) error {
}

// ReleaseGlobalReadLock is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) error {
func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) {
if fmd.GlobalReadLock {
fmd.GlobalReadLock = false
return nil
}

return errors.New("no read locks acquired yet")
}
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type MysqlDaemon interface {
AcquireGlobalReadLock(ctx context.Context) error

// ReleaseGlobalReadLock release a lock acquired with the connection from the above function.
ReleaseGlobalReadLock(ctx context.Context) error
ReleaseGlobalReadLock(ctx context.Context)

// Close will close this instance of Mysqld. It will wait for all dba
// queries to be finished.
Expand Down
10 changes: 6 additions & 4 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ var (

// Mysqld is the object that represents a mysqld daemon running on this server.
type Mysqld struct {
dbcfgs *dbconfigs.DBConfigs
dbaPool *dbconnpool.ConnectionPool
appPool *dbconnpool.ConnectionPool
lockConn *dbconnpool.PooledDBConnection
dbcfgs *dbconfigs.DBConfigs
dbaPool *dbconnpool.ConnectionPool
appPool *dbconnpool.ConnectionPool

lockConnMutex sync.Mutex
lockConn *dbconnpool.PooledDBConnection

capabilities capabilitySet

Expand Down
9 changes: 2 additions & 7 deletions go/vt/mysqlctl/mysqlshellbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back
// we need to release the global read lock in case the backup fails to start and
// the lock wasn't released by releaseReadLock() yet. context might be expired,
// so we pass a new one.
defer func() { _ = params.Mysqld.ReleaseGlobalReadLock(context.Background()) }()
defer func() { params.Mysqld.ReleaseGlobalReadLock(context.Background()) }()

posBeforeBackup, err := params.Mysqld.PrimaryPosition(ctx)
if err != nil {
Expand Down Expand Up @@ -521,12 +521,7 @@ func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams,
released = true

params.Logger.Infof("mysql shell released its global read lock, doing the same")

err := params.Mysqld.ReleaseGlobalReadLock(ctx)
if err != nil {
params.Logger.Errorf("unable to release global read lock: %v", err)
}

params.Mysqld.ReleaseGlobalReadLock(ctx)
params.Logger.Infof("global read lock released after %v", time.Since(lockAcquired))
}
}
Expand Down
24 changes: 19 additions & 5 deletions go/vt/mysqlctl/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"vitess.io/vitess/go/vt/log"
)

const (
acquireGlobalReadLockTimeout = time.Minute
releaseGlobalReadLockTimeout = 10 * time.Second
)

// getPoolReconnect gets a connection from a pool, tests it, and reconnects if
// the connection is lost.
func getPoolReconnect(ctx context.Context, pool *dbconnpool.ConnectionPool) (*dbconnpool.PooledDBConnection, error) {
Expand Down Expand Up @@ -229,6 +234,9 @@ func (mysqld *Mysqld) fetchStatuses(ctx context.Context, pattern string) (map[st

// ExecuteSuperQuery allows the user to execute a query as a super user.
func (mysqld *Mysqld) AcquireGlobalReadLock(ctx context.Context) error {
mysqld.lockConnMutex.Lock()
defer mysqld.lockConnMutex.Unlock()

if mysqld.lockConn != nil {
return errors.New("lock already acquired")
}
Expand All @@ -238,6 +246,8 @@ func (mysqld *Mysqld) AcquireGlobalReadLock(ctx context.Context) error {
return err
}

ctx, cancel := context.WithTimeout(ctx, acquireGlobalReadLockTimeout)
defer cancel()
err = mysqld.executeSuperQueryListConn(ctx, conn, []string{"FLUSH TABLES WITH READ LOCK"})
if err != nil {
conn.Recycle()
Expand All @@ -248,19 +258,23 @@ func (mysqld *Mysqld) AcquireGlobalReadLock(ctx context.Context) error {
return nil
}

func (mysqld *Mysqld) ReleaseGlobalReadLock(ctx context.Context) error {
func (mysqld *Mysqld) ReleaseGlobalReadLock(ctx context.Context) {
mysqld.lockConnMutex.Lock()
defer mysqld.lockConnMutex.Unlock()

if mysqld.lockConn == nil {
return errors.New("no read locks acquired yet")
return
}

ctx, cancel := context.WithTimeout(ctx, releaseGlobalReadLockTimeout)
defer cancel()
err := mysqld.executeSuperQueryListConn(ctx, mysqld.lockConn, []string{"UNLOCK TABLES"})
if err != nil {
return err
log.Warningf("release global read lock failed: %v. closing connection", err)
mysqld.lockConn.Close()
}

mysqld.lockConn.Recycle()
mysqld.lockConn = nil
return nil
}

const (
Expand Down
Loading