Skip to content

Commit

Permalink
Set Meta of Restarted Job as Unused
Browse files Browse the repository at this point in the history
because whenever a new allocation is started (for the same runner), it is already handled as fresh in Poseidon.
With this PR we persist this information to Nomad, to allow a minimal improved runner recovery behavior when such runners are still counted as idle.
  • Loading branch information
mpass99 authored and MrSerth committed Sep 12, 2024
1 parent fe6ed1b commit afa54da
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 101 deletions.
2 changes: 1 addition & 1 deletion internal/api/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func newRunnerWithNotMockedRunnerManager(s *MainTestSuite, apiMock *nomad.Execut
r runner.Runner, wsURL *url.URL, cleanup func(),
) {
s.T().Helper()
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("LoadRunnerIDs", mock.AnythingOfType("string")).Return([]string{}, nil)
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
apiMock.On("RegisterRunnerJob", mock.AnythingOfType("*api.Job")).Return(nil)
Expand Down
90 changes: 7 additions & 83 deletions internal/nomad/executor_api_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions internal/nomad/nomad.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ type ExecutorAPI interface {
ExecuteCommand(ctx context.Context, jobID string, command string, tty bool, privilegedExecution bool,
stdin io.Reader, stdout, stderr io.Writer) (int, error)

// MarkRunnerAsUsed marks the runner with the given ID as used. It also stores the timeout duration in the metadata.
MarkRunnerAsUsed(runnerID string, duration int) error
// SetRunnerMetaUsed marks the runner with the given ID as used or unused.
// If used, also the timeout duration is stored in the metadata.
SetRunnerMetaUsed(runnerID string, used bool, duration int) error
}

// APIClient implements the ExecutorAPI interface and can be used to perform different operations on the real
Expand Down Expand Up @@ -174,14 +175,18 @@ func (a *APIClient) LoadRunnerJobs(environmentID dto.EnvironmentID) ([]*nomadApi
return jobs, occurredError
}

func (a *APIClient) MarkRunnerAsUsed(runnerID string, duration int) error {
func (a *APIClient) SetRunnerMetaUsed(runnerID string, used bool, duration int) error {
job, err := a.job(runnerID)
if err != nil {
return fmt.Errorf("couldn't retrieve job info: %w", err)
}
configTaskGroup := FindAndValidateConfigTaskGroup(job)
configTaskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUsedValue
configTaskGroup.Meta[ConfigMetaTimeoutKey] = strconv.Itoa(duration)
if used {
configTaskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUsedValue
configTaskGroup.Meta[ConfigMetaTimeoutKey] = strconv.Itoa(duration)
} else {
configTaskGroup.Meta[ConfigMetaUsedKey] = ConfigMetaUnusedValue
}

_, err = a.RegisterNomadJob(job)
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions internal/runner/nomad_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,24 @@ func (m *NomadRunnerManager) Claim(environmentID dto.EnvironmentID, duration int
}

m.usedRunners.Add(runner.ID(), runner)
go m.markRunnerAsUsed(runner, duration)
go m.setRunnerMetaUsed(runner, true, duration)

runner.SetupTimeout(time.Duration(duration) * time.Second)
return runner, nil
}

func (m *NomadRunnerManager) markRunnerAsUsed(runner Runner, timeoutDuration int) {
func (m *NomadRunnerManager) setRunnerMetaUsed(runner Runner, used bool, timeoutDuration int) {
err := util.RetryExponential(func() (err error) {
if err = m.apiClient.MarkRunnerAsUsed(runner.ID(), timeoutDuration); err != nil {
if err = m.apiClient.SetRunnerMetaUsed(runner.ID(), used, timeoutDuration); err != nil {
err = fmt.Errorf("cannot mark runner as used: %w", err)
}
return
})
if err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("cannot mark runner as used")
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).WithField("used", used).Error("cannot mark runner")
err := m.Return(runner)
if err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("can't mark runner as used and can't return runner")
log.WithError(err).WithField(dto.KeyRunnerID, runner.ID()).Error("can't mark runner and can't return runner")
}
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (m *NomadRunnerManager) loadSingleJob(ctx context.Context, job *nomadApi.Jo
if err != nil {
log.WithField(dto.KeyRunnerID, newJob.ID()).WithError(err).Warn("failed loading timeout from meta values")
timeout = int(nomad.RunnerTimeoutFallback.Seconds())
go m.markRunnerAsUsed(newJob, timeout)
go m.setRunnerMetaUsed(newJob, true, timeout)
}
newJob.SetupTimeout(time.Duration(timeout) * time.Second)
} else {
Expand Down Expand Up @@ -311,7 +311,12 @@ func (m *NomadRunnerManager) onAllocationAdded(ctx context.Context, alloc *nomad
if alloc.AllocatedResources != nil {
mappedPorts = alloc.AllocatedResources.Shared.Ports
}
environment.AddRunner(NewNomadJob(ctx, alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed))

r := NewNomadJob(ctx, alloc.JobID, mappedPorts, m.apiClient, m.onRunnerDestroyed)
if alloc.PreviousAllocation != "" {
go m.setRunnerMetaUsed(r, false, 0)
}
environment.AddRunner(r)
go m.checkPrewarmingPoolAlert(ctx, environment, true)
monitorAllocationStartupDuration(startup, alloc.JobID, environmentID)
}
Expand Down
29 changes: 24 additions & 5 deletions internal/runner/nomad_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func mockRunnerQueries(ctx context.Context, apiMock *nomad.ExecutorAPIMock, retu
})
apiMock.On("LoadEnvironmentJobs").Return([]*nomadApi.Job{}, nil)
apiMock.On("LoadRunnerJobs", mock.AnythingOfType("dto.EnvironmentID")).Return([]*nomadApi.Job{}, nil)
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("LoadRunnerIDs", tests.DefaultRunnerID).Return(returnedRunnerIDs, nil)
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
apiMock.On("RegisterRunnerJob", mock.Anything).Return(nil)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *ManagerTestSuite) TestClaimRemovesRunnerWhenMarkAsUsedFails() {
s.exerciseEnvironment.On("DeleteRunner", mock.AnythingOfType("string")).Return(nil, false)
s.apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
util.MaxConnectionRetriesExponential = 1
modifyMockedCall(s.apiMock, "MarkRunnerAsUsed", func(call *mock.Call) {
modifyMockedCall(s.apiMock, "SetRunnerMetaUsed", func(call *mock.Call) {
call.Run(func(_ mock.Arguments) {
call.ReturnArguments = mock.Arguments{tests.ErrDefault}
})
Expand Down Expand Up @@ -441,6 +441,25 @@ func (s *ManagerTestSuite) TestOnAllocationAdded() {
s.Require().NoError(err)
})
})
s.nomadRunnerManager.usedRunners.Purge()
s.Run("resets meta used when added allocation has a previous allocation", func() {
environment, ok := s.nomadRunnerManager.environments.Get(tests.DefaultEnvironmentIDAsString)
s.True(ok)
environmentMock, ok := environment.(*ExecutionEnvironmentMock)
s.Require().True(ok)
mockIdleRunners(environmentMock)

alloc := &nomadApi.Allocation{JobID: tests.DefaultRunnerID, PreviousAllocation: tests.DefaultUUID}
s.nomadRunnerManager.onAllocationAdded(s.TestCtx, alloc, 0)

<-time.After(tests.ShortTimeout)
s.apiMock.AssertCalled(s.T(), "SetRunnerMetaUsed", tests.DefaultRunnerID, false, 0)

runner, ok := environment.Sample()
s.True(ok)
err := runner.Destroy(nil)
s.Require().NoError(err)
})
}

func (s *ManagerTestSuite) TestOnAllocationStopped() {
Expand Down Expand Up @@ -571,7 +590,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
})

s.Run("Stores used runner", func() {
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil)
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil)
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
Expand Down Expand Up @@ -613,7 +632,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
})

s.Run("Don't stop running executions", func() {
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil).Once()
apiMock.On("SetRunnerMetaUsed", mock.AnythingOfType("string"), mock.AnythingOfType("bool"), mock.AnythingOfType("int")).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
Expand Down Expand Up @@ -667,7 +686,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
apiMock.On("LoadRunnerPortMappings", mock.Anything).
Return([]nomadApi.PortMapping{updatedPortMapping}, nil).Once()

apiMock.On("MarkRunnerAsUsed", mock.Anything, mock.Anything).Return(nil).Once()
apiMock.On("SetRunnerMetaUsed", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
Expand Down

0 comments on commit afa54da

Please sign in to comment.