diff --git a/pkg/kbagent/service/action.go b/pkg/kbagent/service/action.go index db5a5d81ebc..9b5a6de5337 100644 --- a/pkg/kbagent/service/action.go +++ b/pkg/kbagent/service/action.go @@ -28,7 +28,6 @@ import ( "sync" "github.com/go-logr/logr" - "github.com/pkg/errors" "golang.org/x/exp/maps" "github.com/apecloud/kubeblocks/pkg/kbagent/proto" @@ -85,17 +84,14 @@ func (s *actionService) HandleRequest(ctx context.Context, payload []byte) ([]by } resp, err := s.handleRequest(ctx, req) result := string(resp) - if err != nil { - result = err.Error() - } - s.logger.Info("Action Executed", "action", req.Action, "result", result) + s.logger.Info("Action Executed", "action", req.Action, "result", result, "err", err) return s.encode(resp, err), nil } func (s *actionService) decode(payload []byte) (*proto.ActionRequest, error) { req := &proto.ActionRequest{} if err := json.Unmarshal(payload, req); err != nil { - return nil, errors.Wrapf(proto.ErrBadRequest, "unmarshal action request error: %s", err.Error()) + return nil, fmt.Errorf("%w: unmarshal action request error: %w", proto.ErrBadRequest, err) } return req, nil } @@ -114,11 +110,11 @@ func (s *actionService) encode(out []byte, err error) []byte { func (s *actionService) handleRequest(ctx context.Context, req *proto.ActionRequest) ([]byte, error) { if _, ok := s.actions[req.Action]; !ok { - return nil, errors.Wrapf(proto.ErrNotDefined, "%s is not defined", req.Action) + return nil, fmt.Errorf("%w: %s is not defined", proto.ErrNotDefined, req.Action) } action := s.actions[req.Action] if action.Exec == nil { - return nil, errors.Wrap(proto.ErrNotImplemented, "only exec action is supported") + return nil, fmt.Errorf("%w: only exec action is supported", proto.ErrNotImplemented) } // HACK: pre-check for the reconfigure action if err := checkReconfigure(ctx, req); err != nil { @@ -154,8 +150,5 @@ func (s *actionService) handleExecActionNonBlocking(ctx context.Context, req *pr return nil, proto.ErrInProgress } delete(s.runningActions, req.Action) - if (*result).err != nil { - return nil, (*result).err - } - return (*result).stdout.Bytes(), nil + return (*result).stdout.Bytes(), wrapExecError((*result).err, (*result).stderr) } diff --git a/pkg/kbagent/service/command.go b/pkg/kbagent/service/command.go index 216cf525be1..23532c91654 100644 --- a/pkg/kbagent/service/command.go +++ b/pkg/kbagent/service/command.go @@ -22,15 +22,15 @@ package service import ( "bytes" "context" + "errors" "fmt" "io" "os" "os/exec" "strings" + "syscall" "time" - "github.com/pkg/errors" - "github.com/apecloud/kubeblocks/pkg/kbagent/proto" "github.com/apecloud/kubeblocks/pkg/kbagent/util" ) @@ -63,19 +63,15 @@ func runCommand(ctx context.Context, action *proto.ExecAction, parameters map[st return nil, err } result := <-resultChan - err = result.err - if err != nil { - var exitErr *exec.ExitError - if errors.As(err, &exitErr) { - errMsg := fmt.Sprintf("exit code: %d", exitErr.ExitCode()) - if stderrMsg := result.stderr.String(); len(stderrMsg) > 0 { - errMsg += fmt.Sprintf(", stderr: %s", stderrMsg) - } - return nil, errors.Wrapf(proto.ErrFailed, errMsg) - } - return nil, err + return result.stdout.Bytes(), wrapExecError(result.err, result.stderr) +} + +func wrapExecError(err error, stderr *bytes.Buffer) error { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + return fmt.Errorf("%w: exit code: %d, stderr: %s", proto.ErrFailed, exitErr.ExitCode(), stderr.String()) } - return result.stdout.Bytes(), nil + return err } func runCommandNonBlocking(ctx context.Context, action *proto.ExecAction, parameters map[string]string, timeout *int32) (chan *commandResult, error) { @@ -149,6 +145,11 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s cmd.Stdin = stdinReader cmd.Stdout = stdoutWriter cmd.Stderr = stderrWriter + cmd.WaitDelay = time.Second * 1 + // gracefully terminate, go will kill it after waitDelay + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGTERM) + } errChan := make(chan error, 1) go func() { @@ -159,7 +160,7 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s if errors.Is(ctx.Err(), context.DeadlineExceeded) { errChan <- proto.ErrTimedOut } else { - errChan <- errors.Wrapf(proto.ErrFailed, "failed to start command: %v", err) + errChan <- fmt.Errorf("%w: failed to start command: %w", proto.ErrFailed, err) } return } diff --git a/pkg/kbagent/service/command_test.go b/pkg/kbagent/service/command_test.go index 2307a2cecab..dcd9db05a54 100644 --- a/pkg/kbagent/service/command_test.go +++ b/pkg/kbagent/service/command_test.go @@ -271,11 +271,10 @@ var _ = Describe("command", func() { action := &proto.ExecAction{ Commands: []string{"/bin/bash", "-c", "command-not-found"}, } - output, err := runCommand(ctx, action, nil, nil) + _, err := runCommand(ctx, action, nil, nil) Expect(err).ShouldNot(BeNil()) Expect(errors.Is(err, proto.ErrFailed)).Should(BeTrue()) Expect(err.Error()).Should(ContainSubstring("command not found")) - Expect(output).Should(BeNil()) }) It("timeout", func() { @@ -283,10 +282,9 @@ var _ = Describe("command", func() { Commands: []string{"/bin/bash", "-c", "sleep 60"}, } timeout := int32(1) - output, err := runCommand(ctx, action, nil, &timeout) + _, err := runCommand(ctx, action, nil, &timeout) Expect(err).ShouldNot(BeNil()) Expect(errors.Is(err, proto.ErrTimedOut)).Should(BeTrue()) - Expect(output).Should(BeNil()) }) }) })