Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMM-12896 Add limit for actions/jobs executed on the same DB at the same time #2898

Merged
merged 35 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
972bb1c
Add limit for actions/jobs executed on the same DB at the same time
artemgavrilov Mar 13, 2024
5a5a6a7
PMM-12896 Fix
artemgavrilov Mar 13, 2024
4463f1a
PMM-12896 Fixes
artemgavrilov Mar 13, 2024
af550fb
PMM-12896 Fix
artemgavrilov Mar 13, 2024
06743cf
PMM-12896 Fix tests
artemgavrilov Mar 14, 2024
dba0886
PMM-12896 Improvements, fixes, comments, tests
artemgavrilov Mar 14, 2024
4211fad
PMM-12896 Make per DB capacity configurable
artemgavrilov Mar 14, 2024
d5eefa2
PMM-12896 Fix
artemgavrilov Mar 14, 2024
a77e999
PMM-12896 Improve tests
artemgavrilov Mar 14, 2024
e003f5c
PMM-12896 Linter fixes
artemgavrilov Mar 14, 2024
be6e0bf
PMM-12896 Refactoring
artemgavrilov Mar 15, 2024
fb3a35f
PMM-12896 Fix comment
artemgavrilov Mar 15, 2024
2971596
Merge remote-tracking branch 'origin/main' into PMM-12896-limit-conns…
artemgavrilov Mar 19, 2024
e59a244
PMM-12896 Fix DSN method for PT mysql summary action
artemgavrilov Mar 19, 2024
306b2f5
PMM-12896 Fix bug in local semaphores releasing
artemgavrilov Mar 19, 2024
74f18c4
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
55b7e05
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
5d35524
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
206ca7c
PMM-12896 Refactoring
artemgavrilov Mar 19, 2024
5229182
PMM-12896 Fix test
artemgavrilov Mar 19, 2024
477790b
PMM-12896 Fix tests
artemgavrilov Mar 19, 2024
fab890b
Merge remote-tracking branch 'origin/main' into PMM-12896-limit-conns…
artemgavrilov Mar 19, 2024
428e5f1
Revert "PMM-12896 Fix tests"
artemgavrilov Mar 19, 2024
9c3b9f7
PMM-12896 Fix tests
artemgavrilov Mar 19, 2024
9484c03
PMM-12896 Use timeout only for job/action exectuion, not for resource…
artemgavrilov Mar 20, 2024
1156da4
PMM-12896 Refactoring
artemgavrilov Mar 20, 2024
5ab80bd
Update agent/config/config.go
artemgavrilov Mar 21, 2024
a779937
Update agent/runner/actions/mongodb_explain_action.go
artemgavrilov Mar 21, 2024
e9fec6f
Update agent/runner/actions/postgresql_query_select_action.go
artemgavrilov Mar 21, 2024
fb5530f
Merge remote-tracking branch 'origin/main' into PMM-12896-limit-conns…
artemgavrilov Mar 21, 2024
d1c0a9e
PMM-12896 Refactoring
artemgavrilov Mar 21, 2024
be9f781
PMM-12896 Mute linter
artemgavrilov Mar 21, 2024
270c974
Merge branch 'main' into PMM-12896-limit-conns-num-from-jobs-runner
artemgavrilov Mar 25, 2024
154761d
Merge branch 'main' into PMM-12896-limit-conns-num-from-jobs-runner
yurkovychv May 20, 2024
256c8b1
Merge branch 'main' into PMM-12896-limit-conns-num-from-jobs-runner
artemgavrilov May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion agent/agents/mongodb/internal/profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ func testProfiler(t *testing.T, url string) {
Query: findBucket.Common.Example,
}

ex := actions.NewMongoDBExplainAction(id, 5*time.Second, params, os.TempDir())
ex, err := actions.NewMongoDBExplainAction(id, 5*time.Second, params, os.TempDir())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), ex.Timeout())
defer cancel()
res, err := ex.Run(ctx)
Expand Down
31 changes: 18 additions & 13 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,10 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {

cfg := c.cfg.Get()
var action actions.Action
var err error
switch params := p.Params.(type) {
case *agentpb.StartActionRequest_MysqlExplainParams:
action = actions.NewMySQLExplainAction(p.ActionId, timeout, params.MysqlExplainParams)
action, err = actions.NewMySQLExplainAction(p.ActionId, timeout, params.MysqlExplainParams)

case *agentpb.StartActionRequest_MysqlShowCreateTableParams:
action = actions.NewMySQLShowCreateTableAction(p.ActionId, timeout, params.MysqlShowCreateTableParams)
Expand All @@ -468,13 +469,13 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
action = actions.NewMySQLShowIndexAction(p.ActionId, timeout, params.MysqlShowIndexParams)

case *agentpb.StartActionRequest_PostgresqlShowCreateTableParams:
action = actions.NewPostgreSQLShowCreateTableAction(p.ActionId, timeout, params.PostgresqlShowCreateTableParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLShowCreateTableAction(p.ActionId, timeout, params.PostgresqlShowCreateTableParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_PostgresqlShowIndexParams:
action = actions.NewPostgreSQLShowIndexAction(p.ActionId, timeout, params.PostgresqlShowIndexParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLShowIndexAction(p.ActionId, timeout, params.PostgresqlShowIndexParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbExplainParams:
action = actions.NewMongoDBExplainAction(p.ActionId, timeout, params.MongodbExplainParams, cfg.Paths.TempDir)
action, err = actions.NewMongoDBExplainAction(p.ActionId, timeout, params.MongodbExplainParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MysqlQueryShowParams:
action = actions.NewMySQLQueryShowAction(p.ActionId, timeout, params.MysqlQueryShowParams)
Expand All @@ -483,13 +484,13 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
action = actions.NewMySQLQuerySelectAction(p.ActionId, timeout, params.MysqlQuerySelectParams)

case *agentpb.StartActionRequest_PostgresqlQueryShowParams:
action = actions.NewPostgreSQLQueryShowAction(p.ActionId, timeout, params.PostgresqlQueryShowParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLQueryShowAction(p.ActionId, timeout, params.PostgresqlQueryShowParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_PostgresqlQuerySelectParams:
action = actions.NewPostgreSQLQuerySelectAction(p.ActionId, timeout, params.PostgresqlQuerySelectParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLQuerySelectAction(p.ActionId, timeout, params.PostgresqlQuerySelectParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryGetparameterParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryGetparameterParams.Dsn,
Expand All @@ -499,7 +500,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryBuildinfoParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryBuildinfoParams.Dsn,
Expand All @@ -509,7 +510,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryGetcmdlineoptsParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryGetcmdlineoptsParams.Dsn,
Expand All @@ -519,7 +520,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryReplsetgetstatusParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryReplsetgetstatusParams.Dsn,
Expand All @@ -529,7 +530,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryGetdiagnosticdataParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryGetdiagnosticdataParams.Dsn,
Expand Down Expand Up @@ -565,6 +566,10 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
return errors.Wrapf(agenterrors.ErrInvalidArgument, "invalid action type request: %T", params)
}

if err != nil {
return errors.Wrap(err, "failed to create action")
}

return c.runner.StartAction(action)
}

Expand Down Expand Up @@ -645,7 +650,7 @@ func (c *Client) handleStartJobRequest(p *agentpb.StartJobRequest) error {
return errors.WithStack(err)
}

job, err = jobs.NewMongoDBBackupJob(p.JobId, timeout, j.MongodbBackup.Name, &dsn, locationConfig,
job, err = jobs.NewMongoDBBackupJob(p.JobId, timeout, j.MongodbBackup.Name, dsn, locationConfig,
j.MongodbBackup.EnablePitr, j.MongodbBackup.DataModel, j.MongodbBackup.Folder)
if err != nil {
return err
Expand Down Expand Up @@ -678,7 +683,7 @@ func (c *Client) handleStartJobRequest(p *agentpb.StartJobRequest) error {
}

job = jobs.NewMongoDBRestoreJob(p.JobId, timeout, j.MongodbRestoreBackup.Name,
j.MongodbRestoreBackup.PitrTimestamp.AsTime(), &dsn, locationConfig,
j.MongodbRestoreBackup.PitrTimestamp.AsTime(), dsn, locationConfig,
c.supervisor, j.MongodbRestoreBackup.Folder, j.MongodbRestoreBackup.PbmMetadata.Name)
default:
return errors.Errorf("unknown job type: %T", j)
Expand Down
4 changes: 2 additions & 2 deletions agent/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestClient(t *testing.T) {
s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{})
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity)
r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerMaxConnectionsPerService)
client := New(cfgStorage, &s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestUnexpectedActionType(t *testing.T) {
s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{})
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity)
r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerMaxConnectionsPerService)
client := New(cfgStorage, s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion agent/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Run() {
supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
serviceInfoBroker := serviceinfobroker.New(configStorage)
r := runner.New(cfg.RunnerCapacity)
r := runner.New(cfg.RunnerCapacity, cfg.RunnerMaxConnectionsPerService)
client := client.New(configStorage, supervisor, r, connectionChecker, v, serviceInfoBroker, prepareConnectionService(ctx, cfg), logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

Expand Down
11 changes: 7 additions & 4 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ type Setup struct {
type Config struct { //nolint:musttag
// no config file there

ID string `yaml:"id"`
ListenAddress string `yaml:"listen-address"`
ListenPort uint16 `yaml:"listen-port"`
RunnerCapacity uint16 `yaml:"runner-capacity,omitempty"`
ID string `yaml:"id"`
ListenAddress string `yaml:"listen-address"`
ListenPort uint16 `yaml:"listen-port"`
RunnerCapacity uint16 `yaml:"runner-capacity,omitempty"`
RunnerMaxConnectionsPerService uint16 `yaml:"runner-max-connections-per-service,omitempty"`

Server Server `yaml:"server"`
Paths Paths `yaml:"paths"`
Expand Down Expand Up @@ -352,6 +353,8 @@ func Application(cfg *Config) (*kingpin.Application, *string) {
Envar("PMM_AGENT_LISTEN_PORT").Uint16Var(&cfg.ListenPort)
app.Flag("runner-capacity", "Agent internal actions/jobs runner capacity [PMM_AGENT_RUNNER_CAPACITY]").
Envar("PMM_AGENT_RUNNER_CAPACITY").Uint16Var(&cfg.RunnerCapacity)
app.Flag("runner-max-connections-per-service", "Agent internal actions/jobs runner connections limit per DB instance").
artemgavrilov marked this conversation as resolved.
Show resolved Hide resolved
Envar("PMM_AGENT_RUNNER_MAX_CONNECTIONS_PER_SERVICE").Uint16Var(&cfg.RunnerMaxConnectionsPerService)

app.Flag("server-address", "PMM Server address [PMM_AGENT_SERVER_ADDRESS]").
Envar("PMM_AGENT_SERVER_ADDRESS").PlaceHolder("<host:port>").StringVar(&cfg.Server.Address)
Expand Down
2 changes: 2 additions & 0 deletions agent/runner/actions/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Action interface {
Type() string
// Timeout returns Job timeout.
Timeout() time.Duration
// DSN returns Data Source Name required for the Action.
DSN() string
// Run runs an Action and returns output and error.
Run(ctx context.Context) ([]byte, error)

Expand Down
30 changes: 18 additions & 12 deletions agent/runner/actions/mongodb_explain_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/percona/percona-toolkit/src/go/mongolib/proto"
Expand All @@ -31,23 +30,30 @@ import (
"github.com/percona/pmm/api/agentpb"
)

const mongoDBExplainActionType = "mongodb-explain"

type mongodbExplainAction struct {
id string
timeout time.Duration
params *agentpb.StartActionRequest_MongoDBExplainParams
tempDir string
dsn string
}

var errCannotExplain = fmt.Errorf("cannot explain this type of query")

// NewMongoDBExplainAction creates a MongoDB EXPLAIN query Action.
func NewMongoDBExplainAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MongoDBExplainParams, tempDir string) Action {
func NewMongoDBExplainAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MongoDBExplainParams, tempDir string) (Action, error) {
dsn, err := templates.RenderDSN(params.Dsn, params.TextFiles, filepath.Join(tempDir, mongoDBExplainActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}

return &mongodbExplainAction{
id: id,
timeout: timeout,
params: params,
tempDir: tempDir,
}
dsn: dsn,
}, nil
}

// ID returns an Action ID.
Expand All @@ -62,17 +68,17 @@ func (a *mongodbExplainAction) Timeout() time.Duration {

// Type returns an Action type.
func (a *mongodbExplainAction) Type() string {
return "mongodb-explain"
return mongoDBExplainActionType
}

// DSN returns a DSN for the Action.
artemgavrilov marked this conversation as resolved.
Show resolved Hide resolved
func (a *mongodbExplainAction) DSN() string {
return a.dsn
}

// Run runs an action and returns output and error.
func (a *mongodbExplainAction) Run(ctx context.Context) ([]byte, error) {
dsn, err := templates.RenderDSN(a.params.Dsn, a.params.TextFiles, filepath.Join(a.tempDir, strings.ToLower(a.Type()), a.id))
if err != nil {
return nil, errors.WithStack(err)
}

opts, err := mongo_fix.ClientOptionsForDSN(dsn)
opts, err := mongo_fix.ClientOptionsForDSN(a.dsn)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
8 changes: 6 additions & 2 deletions agent/runner/actions/mongodb_explain_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestMongoDBExplain(t *testing.T) {
Query: `{"ns":"test.coll","op":"query","query":{"k":{"$lte":{"$numberInt":"1"}}}}`,
}

ex := NewMongoDBExplainAction(id, 0, params, os.TempDir())
ex, err := NewMongoDBExplainAction(id, 0, params, os.TempDir())
require.NoError(t, err)

res, err := ex.Run(ctx)
assert.Nil(t, err)

Expand Down Expand Up @@ -130,7 +132,9 @@ func TestNewMongoDBExplain(t *testing.T) {
Query: string(query),
}

ex := NewMongoDBExplainAction(id, 0, params, os.TempDir())
ex, err := NewMongoDBExplainAction(id, 0, params, os.TempDir())
require.NoError(t, err)

res, err := ex.Run(ctx)
assert.NoError(t, err)

Expand Down
38 changes: 24 additions & 14 deletions agent/runner/actions/mongodb_query_admincommand_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package actions
import (
"context"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -29,27 +28,38 @@ import (
"github.com/percona/pmm/api/agentpb"
)

const mongoDBQueryAdminCommandActionType = "mongodb-query-admincommand"

type mongodbQueryAdmincommandAction struct {
id string
timeout time.Duration
dsn string
files *agentpb.TextFiles
command string
arg interface{}
tempDir string
}

// NewMongoDBQueryAdmincommandAction creates a MongoDB adminCommand query action.
func NewMongoDBQueryAdmincommandAction(id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) Action {
func NewMongoDBQueryAdmincommandAction(
id string,
timeout time.Duration,
dsn string,
files *agentpb.TextFiles,
command string,
arg interface{},
tempDir string,
) (Action, error) {
dsn, err := templates.RenderDSN(dsn, files, filepath.Join(tempDir, mongoDBQueryAdminCommandActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}

return &mongodbQueryAdmincommandAction{
id: id,
timeout: timeout,
dsn: dsn,
files: files,
command: command,
arg: arg,
tempDir: tempDir,
}
}, nil
}

// ID returns an action ID.
Expand All @@ -64,17 +74,17 @@ func (a *mongodbQueryAdmincommandAction) Timeout() time.Duration {

// Type returns an action type.
func (a *mongodbQueryAdmincommandAction) Type() string {
return "mongodb-query-admincommand"
return mongoDBQueryAdminCommandActionType
}

// DSN returns a DSN for the Action.
func (a *mongodbQueryAdmincommandAction) DSN() string {
return a.dsn
}

// Run runs an action and returns output and error.
func (a *mongodbQueryAdmincommandAction) Run(ctx context.Context) ([]byte, error) {
dsn, err := templates.RenderDSN(a.dsn, a.files, filepath.Join(a.tempDir, strings.ToLower(a.Type()), a.id))
if err != nil {
return nil, errors.WithStack(err)
}

opts, err := mongo_fix.ClientOptionsForDSN(dsn)
opts, err := mongo_fix.ClientOptionsForDSN(a.dsn)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func TestMongoDBActionsReplWithSSL(t *testing.T) {

func runAction(t *testing.T, id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) []byte { //nolint:unparam
t.Helper()
a := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
a, err := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
b, err := a.Run(ctx)
Expand Down Expand Up @@ -227,7 +229,9 @@ func replSetGetStatusAssertionsReplicated(t *testing.T, b []byte) { //nolint:the
}

func replSetGetStatusAssertionsStandalone(t *testing.T, id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) { //nolint:thelper
a := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
a, err := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
b, err := a.Run(ctx)
Expand Down
Loading
Loading