Skip to content

Commit

Permalink
add process init fail status
Browse files Browse the repository at this point in the history
  • Loading branch information
GuanqunYang193 committed Mar 29, 2024
1 parent 6500db7 commit 7ca52dc
Show file tree
Hide file tree
Showing 45 changed files with 1,200 additions and 664 deletions.
54 changes: 39 additions & 15 deletions agent/agents/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package process

import (
"context"
"errors"
"fmt"
"os/exec"
"strings"
Expand Down Expand Up @@ -53,12 +54,14 @@ const (
// implements its own logic, and then switches to then next state via "go toXXX()". "go" statement is used
// only to avoid stack overflow; there are no extra goroutines for states.
type Process struct {
params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}
params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}
err chan error
initialized chan bool

// recreated on each restart
cmd *exec.Cmd
Expand Down Expand Up @@ -88,15 +91,25 @@ func (p *Params) String() string {
// New creates new process.
func New(params *Params, redactWords []string, l *logrus.Entry) *Process {
return &Process{
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
err: make(chan error),
initialized: make(chan bool, 2),
}
}

func (p *Process) IsInitialized() <-chan bool {
return p.initialized
}

func (p *Process) GetError() <-chan error {
return p.err
}

// Run starts process and runs until ctx is canceled.
func (p *Process) Run(ctx context.Context) {
go p.toStarting()
Expand All @@ -107,7 +120,7 @@ func (p *Process) Run(ctx context.Context) {
}

// STARTING -> RUNNING.
// STARTING -> WAITING.
// STARTING -> FAILING
func (p *Process) toStarting() {
p.l.Tracef("Process: starting.")
p.changes <- inventorypb.AgentStatus_STARTING
Expand All @@ -128,7 +141,7 @@ func (p *Process) toStarting() {

if err := p.cmd.Start(); err != nil {
p.l.Warnf("Process: failed to start: %s.", err)
go p.toWaiting()
go p.toFailing(err)
return
}

Expand All @@ -142,10 +155,11 @@ func (p *Process) toStarting() {
defer t.Stop()
select {
case <-t.C:
p.initialized <- true
go p.toRunning()
case <-p.cmdDone:
p.l.Warnf("Process: exited early: %s.", p.cmd.ProcessState)
go p.toWaiting()
go p.toFailing(errors.New("exited early"))
}
}

Expand Down Expand Up @@ -192,6 +206,16 @@ func (p *Process) toWaiting() {
}
}

// FAILING -> DONE
func (p *Process) toFailing(err error) {
p.l.Tracef("Process: failing")
p.changes <- inventorypb.AgentStatus_INITIALIZATION_ERROR
p.l.Infof("Process: exited: %s.", p.cmd.ProcessState)
go p.toDone()
p.initialized <- false
p.err <- err
}

// STOPPING -> DONE.
func (p *Process) toStopping() {
p.l.Tracef("Process: stopping (sending SIGTERM)...")
Expand Down
25 changes: 6 additions & 19 deletions agent/agents/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,22 @@ func TestProcess(t *testing.T) {
})

t.Run("FailedToStart", func(t *testing.T) {
ctx, cancel, l := setup(t)
ctx, _, l := setup(t)
p := New(&Params{Path: "no_such_command"}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_INITIALIZATION_ERROR,
inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("ExitedEarly", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("CancelStarting", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
ctx, _, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_INITIALIZATION_ERROR,
inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("Exited", func(t *testing.T) {
Expand Down
44 changes: 32 additions & 12 deletions agent/agents/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (s *Supervisor) RestartAgents() {
agent.cancel()
<-agent.done

if err := s.startProcess(id, agent.requestedState, agent.listenPort); err != nil {
if err := s.tryStartProcess(id, agent.requestedState, agent.listenPort); err != nil {
s.l.Errorf("Failed to restart Agent: %s.", err)
}
}
Expand Down Expand Up @@ -310,22 +310,15 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta
agent.cancel()
<-agent.done

if err := s.startProcess(agentID, agentProcesses[agentID], agent.listenPort); err != nil {
if err := s.tryStartProcess(agentID, agentProcesses[agentID], agent.listenPort); err != nil {
s.l.Errorf("Failed to start Agent: %s.", err)
// TODO report that error to server
}
}

// start new agents
for _, agentID := range toStart {
port, err := s.portsRegistry.Reserve()
if err != nil {
s.l.Errorf("Failed to reserve port: %s.", err)
// TODO report that error to server
continue
}

if err := s.startProcess(agentID, agentProcesses[agentID], port); err != nil {
if err := s.tryStartProcess(agentID, agentProcesses[agentID], 0); err != nil {
s.l.Errorf("Failed to start Agent: %s.", err)
// TODO report that error to server
}
Expand Down Expand Up @@ -427,10 +420,32 @@ func filter(existing, ap map[string]agentpb.AgentParams) ([]string, []string, []

//nolint:golint,stylecheck,revive
const (
type_TEST_SLEEP inventorypb.AgentType = 998 // process
type_TEST_NOOP inventorypb.AgentType = 999 // built-in
type_TEST_SLEEP inventorypb.AgentType = 998 // process
type_TEST_NOOP inventorypb.AgentType = 999 // built-in
process_Retry_Time int = 3
)

func (s *Supervisor) tryStartProcess(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess, port uint16) error {
var err error = nil
for i := 0; i < process_Retry_Time; i++ {
if port == 0 {
_port, err := s.portsRegistry.Reserve()
if err != nil {
s.l.Errorf("Failed to reserve port: %s.", err)
continue
}
port = _port
}

if err = s.startProcess(agentID, agentProcess, port); err == nil {
return nil
}

port = 0
}
return err
}

// startProcess starts Agent's process.
// Must be called with s.rw held for writing.
func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess, port uint16) error {
Expand Down Expand Up @@ -473,6 +488,11 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState
close(done)
}()

if !<-process.IsInitialized() {
defer cancel()
return <-process.GetError()
}

//nolint:forcetypeassert
s.agentProcesses[agentID] = &agentProcessInfo{
cancel: cancel,
Expand Down
58 changes: 48 additions & 10 deletions agent/agents/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ func TestSupervisor(t *testing.T) {

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_STARTING},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_STARTING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
}
assert.Equal(t, expectedList, s.AgentsList())

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
Expand Down Expand Up @@ -114,17 +114,17 @@ func TestSupervisor(t *testing.T) {

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"},
)
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"},
}
assert.Equal(t, expectedList, s.AgentsList())

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
Expand Down Expand Up @@ -259,6 +259,44 @@ func TestSupervisor(t *testing.T) {
})
}

func TestStartProcessFail(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tempDir := t.TempDir()
cfgStorage := config.NewStorage(&config.Config{
Paths: config.Paths{TempDir: tempDir},
Ports: config.Ports{Min: 65000, Max: 65099},
Server: config.Server{Address: "localhost:443"},
LogLinesCount: 1,
})
s := NewSupervisor(ctx, nil, cfgStorage)
go s.Run(ctx)

t.Run("Start", func(t *testing.T) {
expectedList := []*agentlocalpb.AgentInfo{}
require.Equal(t, expectedList, s.AgentsList())

s.SetState(&agentpb.SetStateRequest{
AgentProcesses: map[string]*agentpb.SetStateRequest_AgentProcess{
"sleep1": {Type: type_TEST_SLEEP, Args: []string{"wrong format"}},
},
})

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65002, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65002, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65002, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{}
require.Equal(t, expectedList, s.AgentsList())
})
}

func TestFilter(t *testing.T) {
t.Parallel()

Expand Down
10 changes: 6 additions & 4 deletions api/agentlocalpb/json/agentlocalpb.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
"x-order": 4
},
"status": {
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.\n - INITIALIZATION_ERROR: Agent encountered error when starting.",
"type": "string",
"default": "AGENT_STATUS_INVALID",
"enum": [
Expand All @@ -159,7 +159,8 @@
"WAITING",
"STOPPING",
"DONE",
"UNKNOWN"
"UNKNOWN",
"INITIALIZATION_ERROR"
],
"x-order": 2
}
Expand Down Expand Up @@ -342,7 +343,7 @@
"x-order": 4
},
"status": {
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.\n - INITIALIZATION_ERROR: Agent encountered error when starting.",
"type": "string",
"default": "AGENT_STATUS_INVALID",
"enum": [
Expand All @@ -352,7 +353,8 @@
"WAITING",
"STOPPING",
"DONE",
"UNKNOWN"
"UNKNOWN",
"INITIALIZATION_ERROR"
],
"x-order": 2
}
Expand Down
10 changes: 7 additions & 3 deletions api/agentlocalpb/json/client/agent_local/status2_responses.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7ca52dc

Please sign in to comment.