diff --git a/managed/cmd/pmm-managed/main.go b/managed/cmd/pmm-managed/main.go index a5a5cf3c0c..91e47207d2 100644 --- a/managed/cmd/pmm-managed/main.go +++ b/managed/cmd/pmm-managed/main.go @@ -137,7 +137,7 @@ const ( var pprofSemaphore = semaphore.NewWeighted(1) -func addLogsHandler(mux *http.ServeMux, logs *supervisord.Logs) { +func addLogsHandler(mux *http.ServeMux, logs *server.Logs) { l := logrus.WithField("component", "logs.zip") mux.HandleFunc("/logs.zip", func(rw http.ResponseWriter, req *http.Request) { @@ -147,7 +147,7 @@ func addLogsHandler(mux *http.ServeMux, logs *supervisord.Logs) { if err != nil { l.Debug("Unable to read 'pprof' query param. Using default: pprof=false") } - var pprofConfig *supervisord.PprofConfig + var pprofConfig *server.PprofConfig if pprofQueryParameter { if !pprofSemaphore.TryAcquire(1) { rw.WriteHeader(http.StatusLocked) @@ -160,7 +160,7 @@ func addLogsHandler(mux *http.ServeMux, logs *supervisord.Logs) { defer pprofSemaphore.Release(1) contextTimeout += pProfProfileDuration + pProfTraceDuration - pprofConfig = &supervisord.PprofConfig{ + pprofConfig = &server.PprofConfig{ ProfileDuration: pProfProfileDuration, TraceDuration: pProfTraceDuration, } @@ -334,7 +334,7 @@ func runGRPCServer(ctx context.Context, deps *gRPCServerDeps) { } type http1ServerDeps struct { - logs *supervisord.Logs + logs *server.Logs authServer *grafana.AuthServer } @@ -875,13 +875,12 @@ func main() { //nolint:cyclop,maintidx connectionCheck := agents.NewConnectionChecker(agentsRegistry) serviceInfoBroker := agents.NewServiceInfoBroker(agentsRegistry) - pmmUpdateCheck := supervisord.NewPMMUpdateChecker(logrus.WithField("component", "supervisord/pmm-update-checker")) + updater := server.NewUpdater(*watchtowerHostF, gRPCMessageMaxSize) - logs := supervisord.NewLogs(version.FullInfo(), pmmUpdateCheck, vmParams) + logs := server.NewLogs(version.FullInfo(), updater, vmParams) supervisord := supervisord.New( *supervisordConfigDirF, - pmmUpdateCheck, &models.Params{ VMParams: vmParams, PGParams: &models.PGParams{ @@ -895,8 +894,7 @@ func main() { //nolint:cyclop,maintidx SSLCertPath: *postgresSSLCertPathF, }, HAParams: haParams, - }, - gRPCMessageMaxSize) + }) haService.AddLeaderService(ha.NewStandardService("pmm-agent-runner", func(ctx context.Context) error { return supervisord.StartSupervisedService("pmm-agent") @@ -971,8 +969,6 @@ func main() { //nolint:cyclop,maintidx dumpService := dump.New(db) - updater := server.NewUpdater(supervisord, *watchtowerHostF) - serverParams := &server.Params{ DB: db, VMDB: vmdb, diff --git a/managed/services/server/deps.go b/managed/services/server/deps.go index b675fba85d..20bdca6661 100644 --- a/managed/services/server/deps.go +++ b/managed/services/server/deps.go @@ -17,11 +17,11 @@ package server import ( "context" + "net/url" "time" "github.com/percona/pmm/api/serverpb" "github.com/percona/pmm/managed/models" - "github.com/percona/pmm/version" ) // healthChecker interface wraps all services that implements the IsReady method to report the @@ -73,13 +73,13 @@ type vmAlertExternalRules interface { // supervisordService is a subset of methods of supervisord.Service used by this package. // We use it instead of real type for testing and to avoid dependency cycle. type supervisordService interface { - InstalledPMMVersion(ctx context.Context) *version.PackageInfo - LastCheckUpdatesResult(ctx context.Context) (*version.UpdateCheckResult, time.Time) - ForceCheckUpdates(ctx context.Context) error - - StartUpdate() (uint32, error) - UpdateRunning() bool - UpdateLog(offset uint32) ([]string, uint32, error) + //InstalledPMMVersion(ctx context.Context) *version.PackageInfo + //LastCheckUpdatesResult(ctx context.Context) (*version.UpdateCheckResult, time.Time) + //ForceCheckUpdates(ctx context.Context) error + // + //StartUpdate() (uint32, error) + //UpdateRunning() bool + //UpdateLog(offset uint32) ([]string, uint32, error) UpdateConfiguration(settings *models.Settings, ssoDetails *models.PerconaSSODetails) error } @@ -108,3 +108,10 @@ type templatesService interface { type haService interface { IsLeader() bool } + +// victoriaMetricsParams is a subset of methods of models.VMParams used by this package. +// We use it instead of real type to avoid dependency cycle. +type victoriaMetricsParams interface { + ExternalVM() bool + URLFor(path string) (*url.URL, error) +} diff --git a/managed/services/supervisord/logs.go b/managed/services/server/logs.go similarity index 97% rename from managed/services/supervisord/logs.go rename to managed/services/server/logs.go index 207a69947d..a32735e503 100644 --- a/managed/services/supervisord/logs.go +++ b/managed/services/server/logs.go @@ -13,7 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package supervisord +package server import ( "archive/zip" @@ -56,12 +56,12 @@ type fileContent struct { // Logs is responsible for interactions with logs. type Logs struct { pmmVersion string - pmmUpdateChecker *PMMUpdateChecker + pmmUpdateChecker *Updater vmParams victoriaMetricsParams } // NewLogs creates a new Logs service. -func NewLogs(pmmVersion string, pmmUpdateChecker *PMMUpdateChecker, vmParams victoriaMetricsParams) *Logs { +func NewLogs(pmmVersion string, pmmUpdateChecker *Updater, vmParams victoriaMetricsParams) *Logs { return &Logs{ pmmVersion: pmmVersion, pmmUpdateChecker: pmmUpdateChecker, @@ -204,7 +204,7 @@ func (l *Logs) files(ctx context.Context, pprofConfig *PprofConfig) []fileConten }) // update checker installed info - b, err = json.Marshal(l.pmmUpdateChecker.Installed(ctx)) + b, err = json.Marshal(l.pmmUpdateChecker.InstalledPMMVersion()) files = append(files, fileContent{ Name: "installed.json", Data: b, diff --git a/managed/services/supervisord/logs_test.go b/managed/services/server/logs_test.go similarity index 95% rename from managed/services/supervisord/logs_test.go rename to managed/services/server/logs_test.go index 8fb6f2a7aa..2085f632cb 100644 --- a/managed/services/supervisord/logs_test.go +++ b/managed/services/server/logs_test.go @@ -13,7 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package supervisord +package server import ( "archive/zip" @@ -27,7 +27,6 @@ import ( "testing" "time" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -143,10 +142,10 @@ func TestAddAdminSummary(t *testing.T) { } func TestFiles(t *testing.T) { - checker := NewPMMUpdateChecker(logrus.WithField("test", t.Name())) + updater := &Updater{} params, err := models.NewVictoriaMetricsParams(models.BasePrometheusConfigPath, models.VMBaseURL) require.NoError(t, err) - l := NewLogs("2.4.5", checker, params) + l := NewLogs("2.4.5", updater, params) ctx := logger.Set(context.Background(), t.Name()) files := l.files(ctx, nil) @@ -184,10 +183,10 @@ func TestFiles(t *testing.T) { func TestZip(t *testing.T) { t.Skip("FIXME") - checker := NewPMMUpdateChecker(logrus.WithField("test", t.Name())) + updater := &Updater{} params, err := models.NewVictoriaMetricsParams(models.BasePrometheusConfigPath, models.VMBaseURL) require.NoError(t, err) - l := NewLogs("2.4.5", checker, params) + l := NewLogs("2.4.5", updater, params) ctx := logger.Set(context.Background(), t.Name()) var buf bytes.Buffer diff --git a/managed/services/supervisord/pmm_update_checker.go b/managed/services/server/pmm_update_checker.go similarity index 65% rename from managed/services/supervisord/pmm_update_checker.go rename to managed/services/server/pmm_update_checker.go index fe062da7fe..5cc2b4ff93 100644 --- a/managed/services/supervisord/pmm_update_checker.go +++ b/managed/services/server/pmm_update_checker.go @@ -13,7 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package supervisord +package server import ( "bytes" @@ -24,7 +24,6 @@ import ( "sync" "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" @@ -55,28 +54,9 @@ type PMMUpdateChecker struct { // NewPMMUpdateChecker returns a PMMUpdateChecker instance that can be shared across different parts of the code. // Since this is used inside this package, it could be a singleton, but it would make things mode difficult to test. -func NewPMMUpdateChecker(l *logrus.Entry) *PMMUpdateChecker { +func NewPMMUpdateChecker() *PMMUpdateChecker { return &PMMUpdateChecker{ - l: l, - } -} - -// run runs check for updates loop until ctx is canceled. -func (p *PMMUpdateChecker) run(ctx context.Context) { - p.l.Info("Starting...") - ticker := time.NewTicker(updateCheckInterval) - defer ticker.Stop() - - for { - _ = p.check(ctx) - - select { - case <-ticker.C: - // continue with next loop iteration - case <-ctx.Done(): - p.l.Info("Done.") - return - } + l: logrus.WithField("component", "pmm-update-checker"), } } @@ -126,45 +106,3 @@ func (p *PMMUpdateChecker) cmdRun(ctx context.Context, cmdLine string) ([]byte, p.cmdMutex.Unlock() return b, stderr, err } - -// checkResult returns last `pmm-update -check` result and check time. -// It may force re-check if last result is empty or too old. -func (p *PMMUpdateChecker) checkResult(ctx context.Context) (*version.UpdateCheckResult, time.Time) { - p.checkRW.RLock() - defer p.checkRW.RUnlock() - - if time.Since(p.lastCheckTime) > updateCheckResultFresh { - p.checkRW.RUnlock() - _ = p.check(ctx) - p.checkRW.RLock() - } - - return p.lastCheckResult, p.lastCheckTime -} - -// check calls `pmm-update -check` and fills lastInstalledPackageInfo/lastCheckResult/lastCheckTime on success. -func (p *PMMUpdateChecker) check(ctx context.Context) error { - p.checkRW.Lock() - defer p.checkRW.Unlock() - - cmdLine := "pmm-update -check" - b, stderr, err := p.cmdRun(ctx, cmdLine) - if err != nil { - p.l.Errorf("%s output: %s. Error: %s", cmdLine, stderr.Bytes(), err) - return errors.WithStack(err) - } - - var res version.UpdateCheckResult - if err = json.Unmarshal(b, &res); err != nil { - p.l.Errorf("%s output: %s", cmdLine, stderr.Bytes()) - return errors.WithStack(err) - } - - p.l.Debugf("%s output: %s", cmdLine, stderr.Bytes()) - p.installedRW.Lock() - p.lastInstalledPackageInfo = &res.Installed - p.installedRW.Unlock() - p.lastCheckResult = &res - p.lastCheckTime = time.Now() - return nil -} diff --git a/managed/services/supervisord/pprof_config.go b/managed/services/server/pprof_config.go similarity index 97% rename from managed/services/supervisord/pprof_config.go rename to managed/services/server/pprof_config.go index 52ab238389..9697cb8108 100644 --- a/managed/services/supervisord/pprof_config.go +++ b/managed/services/server/pprof_config.go @@ -13,7 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package supervisord +package server import "time" diff --git a/managed/services/server/server.go b/managed/services/server/server.go index dc4d9634d1..1e2828e9d0 100644 --- a/managed/services/server/server.go +++ b/managed/services/server/server.go @@ -189,15 +189,14 @@ func (s *Server) Version(ctx context.Context, req *serverpb.VersionRequest) (*se res.Managed.Timestamp = timestamppb.New(t) } - if v := s.supervisord.InstalledPMMVersion(ctx); v != nil { - res.Version = v.Version - res.Server = &serverpb.VersionInfo{ - Version: v.Version, - FullVersion: v.FullVersion, - } - if v.BuildTime != nil { - res.Server.Timestamp = timestamppb.New(*v.BuildTime) - } + v := s.updater.InstalledPMMVersion() + res.Version = v.Version + res.Server = &serverpb.VersionInfo{ + Version: v.Version, + FullVersion: v.FullVersion, + } + if v.BuildTime != nil { + res.Server.Timestamp = timestamppb.New(*v.BuildTime) } return res, nil @@ -298,7 +297,17 @@ func (s *Server) StartUpdate(ctx context.Context, req *serverpb.StartUpdateReque return nil, status.Error(codes.FailedPrecondition, "Updates are disabled via DISABLE_UPDATES environment variable.") } - err := s.updater.StartUpdate(ctx, req.GetNewImage()) + newImage := req.GetNewImage() + if newImage == "" { + latest, err := s.updater.latest(ctx) + if err != nil { + s.l.WithError(err).Error("Failed to get latest version") + newImage = defaultLatestPMMImage + } else { + newImage = fmt.Sprintf("%s:%s", latest.Repo, latest.Version) + } + } + err := s.updater.StartUpdate(ctx, newImage) if err != nil { return nil, err } @@ -331,13 +340,13 @@ func (s *Server) UpdateStatus(ctx context.Context, req *serverpb.UpdateStatusReq ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() for ctx.Err() == nil { - done = !s.supervisord.UpdateRunning() + done = !s.updater.IsRunning() if done { // give supervisord a second to flush logs to file time.Sleep(time.Second) } - lines, newOffset, err = s.supervisord.UpdateLog(req.LogOffset) + lines, newOffset, err = s.updater.UpdateLog(req.LogOffset) if err != nil { s.l.Warn(err) } diff --git a/managed/services/server/updater.go b/managed/services/server/updater.go index 3d374a6e87..08735eaaa4 100644 --- a/managed/services/server/updater.go +++ b/managed/services/server/updater.go @@ -16,13 +16,15 @@ package server import ( + "bufio" "context" "encoding/json" - "fmt" + "io" "net/http" "net/url" "os" "strings" + "sync" "time" "github.com/pkg/errors" @@ -34,30 +36,57 @@ import ( // defaultLatestPMMImage is the default image name to use when the latest version cannot be determined. const defaultLatestPMMImage = "perconalab/pmm-server:3-dev-latest" +const pmmUpdatePerformLog = "/srv/logs/pmm-update-perform-init.log" // Updater is a service to check for updates and trigger the update process. type Updater struct { - l *logrus.Entry - supervisord supervisordService - watchtowerHost *url.URL + l *logrus.Entry + watchtowerHost *url.URL + gRPCMessageMaxSize uint32 + + performM sync.Mutex + running bool + + checkRW sync.RWMutex + lastCheckResult *version.PackageInfo + lastCheckTime time.Time } // NewUpdater creates a new Updater service. -func NewUpdater(supervisord supervisordService, watchtowerHost *url.URL) *Updater { +func NewUpdater(watchtowerHost *url.URL, gRPCMessageMaxSize uint32) *Updater { return &Updater{ - l: logrus.WithField("service", "updater"), - supervisord: supervisord, - watchtowerHost: watchtowerHost, + l: logrus.WithField("service", "updater"), + watchtowerHost: watchtowerHost, + gRPCMessageMaxSize: gRPCMessageMaxSize, + } +} + +// run runs check for updates loop until ctx is canceled. +func (up *Updater) run(ctx context.Context) { + up.l.Info("Starting...") + ticker := time.NewTicker(updateCheckInterval) + defer ticker.Stop() + + for { + _ = up.check(ctx) + + select { + case <-ticker.C: + // continue with next loop iteration + case <-ctx.Done(): + up.l.Info("Done.") + return + } } } -func (s *Updater) sendRequestToWatchtower(ctx context.Context, newImageName string) error { +func (up *Updater) sendRequestToWatchtower(ctx context.Context, newImageName string) error { hostname, err := os.Hostname() if err != nil { return errors.Wrap(err, "failed to get hostname") } - u, err := s.watchtowerHost.Parse("/v1/update") + u, err := up.watchtowerHost.Parse("/v1/update") if err != nil { return errors.Wrap(err, "failed to parse URL") } @@ -91,69 +120,73 @@ func (s *Updater) sendRequestToWatchtower(ctx context.Context, newImageName stri return errors.Errorf("received non-OK response: %v", resp.StatusCode) } - s.l.Info("Successfully triggered update") + up.l.Info("Successfully triggered update") return nil } -func (s *Updater) currentVersion() string { +func (up *Updater) currentVersion() string { return version.Version } // StartUpdate triggers the update process. -func (s *Updater) StartUpdate(ctx context.Context, newImageName string) error { +func (up *Updater) StartUpdate(ctx context.Context, newImageName string) error { + up.performM.Lock() + defer up.performM.Unlock() + if up.running { + return errors.New("update already in progress") + } + up.running = true + up.performM.Unlock() if newImageName == "" { - latest, err := s.latest(ctx) - if err != nil { - s.l.WithError(err).Error("Failed to get latest version") - newImageName = defaultLatestPMMImage - } else { - newImageName = fmt.Sprintf("%s:%s", latest.Repo, latest.Version) - } + return errors.New("newImageName is empty") } - if err := s.sendRequestToWatchtower(ctx, newImageName); err != nil { - s.l.WithError(err).Error("Failed to trigger update") + if err := up.sendRequestToWatchtower(ctx, newImageName); err != nil { + up.l.WithError(err).Error("Failed to trigger update") return errors.Wrap(err, "failed to trigger update") } return nil } -func (s *Updater) onlyInstalledVersionResponse() *serverpb.CheckUpdatesResponse { +func (up *Updater) onlyInstalledVersionResponse() *serverpb.CheckUpdatesResponse { return &serverpb.CheckUpdatesResponse{ Installed: &serverpb.VersionInfo{ - Version: s.currentVersion(), + Version: up.currentVersion(), }, } } // ForceCheckUpdates forces an update check. -func (s *Updater) ForceCheckUpdates(_ context.Context) error { +func (up *Updater) ForceCheckUpdates(_ context.Context) error { // TODO: PMM-11261 Implement this method return nil } +type result struct { + Name string `json:"name"` +} + +type versionInfo struct { + Version version.Parsed `json:"version"` + DockerImage string `json:"docker_image"` +} + // TagsResponse is a response from DockerHub. type TagsResponse struct { - Results []struct { - Name string `json:"name"` - } `json:"results"` + Results []result `json:"results"` } // LastCheckUpdatesResult returns the result of the last update check. -func (s *Updater) LastCheckUpdatesResult(ctx context.Context) (*version.UpdateCheckResult, time.Time) { +func (up *Updater) LastCheckUpdatesResult(ctx context.Context) (*version.UpdateCheckResult, time.Time) { buildTime, err := version.Time() if err != nil { - s.l.WithError(err).Error("Failed to get build time") - return nil, time.Now() - } - latest, err := s.latest(ctx) - if err != nil { - s.l.WithError(err).Error("Failed to get latest version") + up.l.WithError(err).Error("Failed to get build time") return nil, time.Now() } + latest, lastCheckTime := up.checkResult(ctx) return &version.UpdateCheckResult{ Installed: version.PackageInfo{ - Version: s.currentVersion(), + Version: up.currentVersion(), FullVersion: version.PMMVersion, BuildTime: &buildTime, Repo: "local", @@ -161,10 +194,10 @@ func (s *Updater) LastCheckUpdatesResult(ctx context.Context) (*version.UpdateCh Latest: *latest, UpdateAvailable: true, LatestNewsURL: "", - }, time.Now() + }, lastCheckTime } -func (s *Updater) latest(ctx context.Context) (*version.PackageInfo, error) { +func (up *Updater) latest(ctx context.Context) (*version.PackageInfo, error) { fileName := "/etc/pmm-server-update-version.json" content, err := os.ReadFile(fileName) switch { @@ -172,45 +205,55 @@ func (s *Updater) latest(ctx context.Context) (*version.PackageInfo, error) { info := version.PackageInfo{} err = json.Unmarshal(content, &info) if err != nil { - s.l.WithError(err).Error("Failed to unmarshal file") + up.l.WithError(err).Error("Failed to unmarshal file") return nil, errors.Wrap(err, "failed to unmarshal file") } return &info, nil case err != nil && !os.IsNotExist(err): - s.l.WithError(err).Error("Failed to read file") + up.l.WithError(err).Error("Failed to read file") return nil, errors.Wrap(err, "failed to read file") case os.Getenv("PMM_SERVER_UPDATE_VERSION") != "": - return s.parseDockerTag(os.Getenv("PMM_SERVER_UPDATE_VERSION")), nil + return up.parseDockerTag(os.Getenv("PMM_SERVER_UPDATE_VERSION")), nil default: // os.IsNotExist(err) // File does not exist, get the latest tag from DockerHub u := "https://registry.hub.docker.com/v2/repositories/percona/pmm-server/tags/" req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if err != nil { - s.l.WithError(err).Error("Failed to create request") + up.l.WithError(err).Error("Failed to create request") return nil, errors.Wrap(err, "failed to create request") } resp, err := http.DefaultClient.Do(req) if err != nil { - s.l.WithError(err).Error("Failed to get tags from DockerHub") + up.l.WithError(err).Error("Failed to get tags from DockerHub") return nil, errors.Wrap(err, "failed to get tags from DockerHub") } defer resp.Body.Close() //nolint:errcheck var tagsResponse TagsResponse if err := json.NewDecoder(resp.Body).Decode(&tagsResponse); err != nil { - s.l.WithError(err).Error("Failed to decode response") + up.l.WithError(err).Error("Failed to decode response") return nil, errors.Wrap(err, "failed to decode response") } if len(tagsResponse.Results) != 0 { - // Assuming the first tag is the latest - return s.parseDockerTag(tagsResponse.Results[0].Name), nil + currentVersion, err := version.Parse(up.currentVersion()) + if err != nil { + up.l.WithError(err).Error("Failed to parse current version") + return nil, errors.Wrap(err, "failed to parse current version") + } + + update, err := up.next(*currentVersion, tagsResponse.Results) + if err != nil { + up.l.WithError(err).Error("Failed to get latest minor version") + return nil, errors.Wrap(err, "failed to get latest minor version") + } + return up.parseDockerTag(update.DockerImage), nil } return nil, errors.New("no tags found") } } -func (s *Updater) parseDockerTag(tag string) *version.PackageInfo { +func (up *Updater) parseDockerTag(tag string) *version.PackageInfo { splitTag := strings.Split(tag, ":") if len(splitTag) != 2 { return nil @@ -221,3 +264,111 @@ func (s *Updater) parseDockerTag(tag string) *version.PackageInfo { Repo: splitTag[0], } } + +func (up *Updater) next(currentVersion version.Parsed, results []result) (*versionInfo, error) { + latest := versionInfo{ + Version: currentVersion, + } + for _, result := range results { + splitTag := strings.Split(result.Name, ":") + if len(splitTag) != 2 { + continue + } + v, err := version.Parse(splitTag[1]) + if err != nil { + up.l.Debugf("Failed to parse version: %s", splitTag[1]) + continue + } + if v.Major == currentVersion.Major && v.Minor > currentVersion.Minor { + latest = versionInfo{ + Version: *v, + DockerImage: result.Name, + } + } else if v.Major > currentVersion.Major && v.Major < latest.Version.Major { + latest = versionInfo{ + Version: *v, + DockerImage: result.Name, + } + } + } + return &latest, nil +} + +func (up *Updater) InstalledPMMVersion() version.PackageInfo { + t, _ := version.Time() + return version.PackageInfo{ + Version: up.currentVersion(), + FullVersion: version.PMMVersion, + BuildTime: &t, + Repo: "local", + } +} + +func (up *Updater) IsRunning() bool { + up.performM.Lock() + defer up.performM.Unlock() + return up.running +} + +func (up *Updater) UpdateLog(offset uint32) ([]string, uint32, error) { + up.performM.Lock() + defer up.performM.Unlock() + + f, err := os.Open(pmmUpdatePerformLog) + if err != nil { + return nil, 0, errors.WithStack(err) + } + defer f.Close() //nolint:errcheck,gosec,nolintlint + + if _, err = f.Seek(int64(offset), io.SeekStart); err != nil { + return nil, 0, errors.WithStack(err) + } + + lines := make([]string, 0, 10) + reader := bufio.NewReader(f) + newOffset := offset + for { + line, err := reader.ReadString('\n') + if err == nil { + newOffset += uint32(len(line)) + if newOffset-offset > up.gRPCMessageMaxSize { + return lines, newOffset - uint32(len(line)), nil + } + lines = append(lines, strings.TrimSuffix(line, "\n")) + continue + } + if err == io.EOF { + err = nil + } + return lines, newOffset, errors.WithStack(err) + } +} + +// checkResult returns last `pmm-update -check` result and check time. +// It may force re-check if last result is empty or too old. +func (up *Updater) checkResult(ctx context.Context) (*version.PackageInfo, time.Time) { + up.checkRW.RLock() + defer up.checkRW.RUnlock() + + if time.Since(up.lastCheckTime) > updateCheckResultFresh { + up.checkRW.RUnlock() + _ = up.check(ctx) + up.checkRW.RLock() + } + + return up.lastCheckResult, up.lastCheckTime +} + +// check calls `pmm-update -check` and fills lastInstalledPackageInfo/lastCheckResult/lastCheckTime on success. +func (up *Updater) check(ctx context.Context) error { + up.checkRW.Lock() + defer up.checkRW.Unlock() + + latest, err := up.latest(ctx) + if err != nil { + return errors.Wrap(err, "failed to get latest version") + } + up.lastCheckResult = latest + up.lastCheckTime = time.Now() + return nil +} diff --git a/managed/services/server/updater_test.go b/managed/services/server/updater_test.go new file mode 100644 index 0000000000..f56e2d6c2f --- /dev/null +++ b/managed/services/server/updater_test.go @@ -0,0 +1,269 @@ +package server + +import ( + "context" + "github.com/percona/pmm/version" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "net/url" + "strings" + "testing" + "time" +) + +func TestUpdater(t *testing.T) { + + gRPCMessageMaxSize := uint32(100 * 1024 * 1024) + gaReleaseDate := time.Date(2019, 9, 18, 0, 0, 0, 0, time.UTC) + watchtowerURL, _ := url.Parse("http://watchtower:8080") + + t.Run("TestNextVersion", func(t *testing.T) { + type args struct { + currentVersion string + results []result + } + type versionInfo struct { + Version string + DockerImage string + } + tests := []struct { + name string + args args + want *versionInfo + wantErr assert.ErrorAssertionFunc + }{ + { + name: "no results", + args: args{ + currentVersion: "3.0.0", + results: nil, + }, + want: &versionInfo{ + Version: "3.0.0", + DockerImage: "", + }, + wantErr: nil, + }, + { + name: "no new version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:3.0.0"}, + }, + }, + want: &versionInfo{ + Version: "3.0.0", + DockerImage: "", + }, + wantErr: nil, + }, + { + name: "new minor version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:3.1.0"}, + {Name: "percona/pmm-server:3.0.0"}, + }, + }, + want: &versionInfo{ + Version: "3.1.0", + DockerImage: "percona/pmm-server:3.1.0", + }, + wantErr: nil, + }, + { + name: "new major version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:4.0.0"}, + {Name: "percona/pmm-server:3.0.0"}, + }, + }, + want: &versionInfo{ + Version: "4.0.0", + DockerImage: "percona/pmm-server:4.0.0", + }, + wantErr: nil, + }, + { + name: "new major version with minor version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:4.1.0"}, + {Name: "percona/pmm-server:4.0.0"}, + {Name: "percona/pmm-server:3.0.0"}, + {Name: "percona/pmm-server:3.1.0"}, + }, + }, + want: &versionInfo{ + Version: "3.1.0", + DockerImage: "percona/pmm-server:3.1.0", + }, + wantErr: nil, + }, + { + name: "invalid version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:3.0.0"}, + {Name: "percona/pmm-server:3.1.0"}, + {Name: "percona/pmm-server:invalid"}, + }, + }, + want: &versionInfo{ + Version: "3.1.0", + DockerImage: "percona/pmm-server:3.1.0", + }, + wantErr: nil, + }, + { + name: "non semver version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:3.0.0"}, + {Name: "percona/pmm-server:3.1"}, + }, + }, + want: &versionInfo{ + Version: "3.0.0", + DockerImage: "", + }, + wantErr: nil, + }, + { + name: "rc version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:3.0.0"}, + {Name: "percona/pmm-server:3.1.0-rc"}, + {Name: "percona/pmm-server:3.1.0-rc757"}, + }, + }, + want: &versionInfo{ + Version: "3.1.0-rc757", + DockerImage: "percona/pmm-server:3.1.0-rc757", + }, + wantErr: nil, + }, + { + name: "rc version and release version", + args: args{ + currentVersion: "3.0.0", + results: []result{ + {Name: "percona/pmm-server:3.0.0"}, + {Name: "percona/pmm-server:3.1.0-rc"}, + {Name: "percona/pmm-server:3.1.0-rc757"}, + {Name: "percona/pmm-server:3.1.0"}, + }, + }, + want: &versionInfo{ + Version: "3.1.0", + DockerImage: "percona/pmm-server:3.1.0", + }, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + u := NewUpdater(nil, gRPCMessageMaxSize) + parsed, err := version.Parse(tt.args.currentVersion) + require.NoError(t, err) + next, err := u.next(*parsed, tt.args.results) + if tt.wantErr != nil { + tt.wantErr(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tt.want.Version, next.Version.String()) + assert.Equal(t, tt.want.DockerImage, next.DockerImage) + }) + } + }) + + t.Run("Installed", func(t *testing.T) { + checker := NewUpdater(watchtowerURL, gRPCMessageMaxSize) + + info := checker.InstalledPMMVersion() + require.NotNil(t, info) + + assert.True(t, strings.HasPrefix(info.Version, "3."), "version should start with `3.`. Actual value is: %s", info.Version) + fullVersion, _ := normalizeFullversion(&info) + assert.True(t, strings.HasPrefix(fullVersion, "3."), "full version should start with `3.`. Actual value is: %s", fullVersion) + require.NotEmpty(t, info.BuildTime) + assert.True(t, info.BuildTime.After(gaReleaseDate), "BuildTime = %s", info.BuildTime) + }) + + t.Run("Check", func(t *testing.T) { + t.Skip("This test is to be deprecated or completely rewritten") + + ctx := context.TODO() + checker := NewUpdater(watchtowerURL, gRPCMessageMaxSize) + + res, resT := checker.LastCheckUpdatesResult(ctx) + assert.WithinDuration(t, time.Now(), resT, time.Second) + + assert.True(t, strings.HasPrefix(res.Installed.Version, "3."), "installed version should start with `3.`. Actual value is: %s", res.Installed.Version) + installedFullVersion, _ := normalizeFullversion(&res.Installed) + assert.True(t, strings.HasPrefix(installedFullVersion, "3."), "installed full version should start with `3.`. Actual value is: %s", installedFullVersion) + require.NotEmpty(t, res.Installed.BuildTime) + assert.True(t, res.Installed.BuildTime.After(gaReleaseDate), "Installed.BuildTime = %s", res.Installed.BuildTime) + assert.Equal(t, "local", res.Installed.Repo) + + assert.True(t, strings.HasPrefix(res.Latest.Version, "3."), "The latest available version should start with `3.`. Actual value is: %s", res.Latest.Version) + latestFullVersion, isFeatureBranch := normalizeFullversion(&res.Latest) + if isFeatureBranch { + t.Skip("Skipping check latest version.") + } + assert.True(t, strings.HasPrefix(latestFullVersion, "3."), "The latest available versions full value should start with `3.`. Actual value is: %s", latestFullVersion) + require.NotEmpty(t, res.Latest.BuildTime) + assert.True(t, res.Latest.BuildTime.After(gaReleaseDate), "Latest.BuildTime = %s", res.Latest.BuildTime) + assert.NotEmpty(t, res.Latest.Repo) + + // We assume that the latest perconalab/pmm-server:3-dev-latest image + // always contains the latest pmm-update package versions. + // If this test fails, re-pull them and recreate devcontainer. + t.Log("Assuming the latest pmm-update version.") + assert.False(t, res.UpdateAvailable, "update should not be available") + assert.Empty(t, res.LatestNewsURL, "latest_news_url should be empty") + assert.Equal(t, res.Installed, res.Latest, "version should be the same (latest)") + assert.Equal(t, *res.Installed.BuildTime, *res.Latest.BuildTime, "build times should be the same") + assert.Equal(t, "local", res.Latest.Repo) + + // cached result + res2, resT2 := checker.checkResult(ctx) + assert.Equal(t, res, res2) + assert.Equal(t, resT, resT2) + + time.Sleep(100 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), updateDefaultTimeout) + defer cancel() + go checker.run(ctx) + time.Sleep(100 * time.Millisecond) + + // should block and wait for run to finish one iteration + res3, resT3 := checker.checkResult(ctx) + assert.Equal(t, res2, res3) + assert.NotEqual(t, resT2, resT3, "%s", resT2) + assert.WithinDuration(t, resT2, resT3, 10*time.Second) + }) +} + +func normalizeFullversion(info *version.PackageInfo) (version string, isFeatureBranch bool) { + fullVersion := info.FullVersion + + epochPrefix := "1:" // set by RPM_EPOCH in PMM Server build scripts + isFeatureBranch = strings.HasPrefix(fullVersion, epochPrefix) + if isFeatureBranch { + fullVersion = strings.TrimPrefix(fullVersion, epochPrefix) + } + + return fullVersion, isFeatureBranch +} diff --git a/managed/services/supervisord/deps.go b/managed/services/supervisord/deps.go index 9de477c334..9176f8b1bb 100644 --- a/managed/services/supervisord/deps.go +++ b/managed/services/supervisord/deps.go @@ -14,12 +14,3 @@ // along with this program. If not, see . package supervisord - -import "net/url" - -// victoriaMetricsParams is a subset of methods of models.VMParams used by this package. -// We use it instead of real type to avoid dependency cycle. -type victoriaMetricsParams interface { - ExternalVM() bool - URLFor(path string) (*url.URL, error) -} diff --git a/managed/services/supervisord/devcontainer_test.go b/managed/services/supervisord/devcontainer_test.go index e7b688c011..2c71f50445 100644 --- a/managed/services/supervisord/devcontainer_test.go +++ b/managed/services/supervisord/devcontainer_test.go @@ -19,105 +19,23 @@ import ( "context" "os" "path/filepath" - "strconv" - "strings" "testing" "time" - "github.com/sirupsen/logrus" + "github.com/percona/pmm/managed/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "github.com/percona/pmm/managed/models" - "github.com/percona/pmm/version" ) // TODO move tests to other files and remove this one. func TestDevContainer(t *testing.T) { - gRPCMessageMaxSize := uint32(100 * 1024 * 1024) - gaReleaseDate := time.Date(2019, 9, 18, 0, 0, 0, 0, time.UTC) - - t.Run("Installed", func(t *testing.T) { - ctx := context.TODO() - checker := NewPMMUpdateChecker(logrus.WithField("test", t.Name())) - - info := checker.Installed(ctx) - require.NotNil(t, info) - - assert.True(t, strings.HasPrefix(info.Version, "3."), "version should start with `3.`. Actual value is: %s", info.Version) - fullVersion, _ := normalizeFullversion(info) - assert.True(t, strings.HasPrefix(fullVersion, "3."), "full version should start with `3.`. Actual value is: %s", fullVersion) - require.NotEmpty(t, info.BuildTime) - assert.True(t, info.BuildTime.After(gaReleaseDate), "BuildTime = %s", info.BuildTime) - assert.Equal(t, "local", info.Repo) - - info2 := checker.Installed(ctx) - assert.Equal(t, info, info2) - }) - - t.Run("Check", func(t *testing.T) { - t.Skip("This test is to be deprecated or completely rewritten") - - ctx := context.TODO() - checker := NewPMMUpdateChecker(logrus.WithField("test", t.Name())) - - res, resT := checker.checkResult(ctx) - assert.WithinDuration(t, time.Now(), resT, time.Second) - - assert.True(t, strings.HasPrefix(res.Installed.Version, "3."), "installed version should start with `3.`. Actual value is: %s", res.Installed.Version) - installedFullVersion, _ := normalizeFullversion(&res.Installed) - assert.True(t, strings.HasPrefix(installedFullVersion, "3."), "installed full version should start with `3.`. Actual value is: %s", installedFullVersion) - require.NotEmpty(t, res.Installed.BuildTime) - assert.True(t, res.Installed.BuildTime.After(gaReleaseDate), "Installed.BuildTime = %s", res.Installed.BuildTime) - assert.Equal(t, "local", res.Installed.Repo) - - assert.True(t, strings.HasPrefix(res.Latest.Version, "3."), "The latest available version should start with `3.`. Actual value is: %s", res.Latest.Version) - latestFullVersion, isFeatureBranch := normalizeFullversion(&res.Latest) - if isFeatureBranch { - t.Skip("Skipping check latest version.") - } - assert.True(t, strings.HasPrefix(latestFullVersion, "3."), "The latest available versions full value should start with `3.`. Actual value is: %s", latestFullVersion) - require.NotEmpty(t, res.Latest.BuildTime) - assert.True(t, res.Latest.BuildTime.After(gaReleaseDate), "Latest.BuildTime = %s", res.Latest.BuildTime) - assert.NotEmpty(t, res.Latest.Repo) - - // We assume that the latest perconalab/pmm-server:3-dev-latest image - // always contains the latest pmm-update package versions. - // If this test fails, re-pull them and recreate devcontainer. - t.Log("Assuming the latest pmm-update version.") - assert.False(t, res.UpdateAvailable, "update should not be available") - assert.Empty(t, res.LatestNewsURL, "latest_news_url should be empty") - assert.Equal(t, res.Installed, res.Latest, "version should be the same (latest)") - assert.Equal(t, *res.Installed.BuildTime, *res.Latest.BuildTime, "build times should be the same") - assert.Equal(t, "local", res.Latest.Repo) - - // cached result - res2, resT2 := checker.checkResult(ctx) - assert.Equal(t, res, res2) - assert.Equal(t, resT, resT2) - - time.Sleep(100 * time.Millisecond) - ctx, cancel := context.WithTimeout(context.Background(), updateDefaultTimeout) - defer cancel() - go checker.run(ctx) - time.Sleep(100 * time.Millisecond) - - // should block and wait for run to finish one iteration - res3, resT3 := checker.checkResult(ctx) - assert.Equal(t, res2, res3) - assert.NotEqual(t, resT2, resT3, "%s", resT2) - assert.WithinDuration(t, resT2, resT3, 10*time.Second) - }) t.Run("UpdateConfiguration", func(t *testing.T) { // logrus.SetLevel(logrus.DebugLevel) - checker := NewPMMUpdateChecker(logrus.WithField("test", t.Name())) vmParams, err := models.NewVictoriaMetricsParams(models.BasePrometheusConfigPath, models.VMBaseURL) require.NoError(t, err) - s := New("/etc/supervisord.d", checker, &models.Params{VMParams: vmParams, PGParams: &models.PGParams{}, HAParams: &models.HAParams{}}, gRPCMessageMaxSize) + s := New("/etc/supervisord.d", &models.Params{VMParams: vmParams, PGParams: &models.PGParams{}, HAParams: &models.HAParams{}}) require.NotEmpty(t, s.supervisorctlPath) ctx, cancel := context.WithCancel(context.Background()) @@ -160,87 +78,75 @@ func TestDevContainer(t *testing.T) { require.NoError(t, err) }) - t.Run("Update", func(t *testing.T) { - // This test can be run only once as it breaks assumptions of other tests. - // It also should be the last test in devcontainer. - if ok, _ := strconv.ParseBool(os.Getenv("PMM_TEST_RUN_UPDATE")); !ok { - t.Skip("skipping update test") - } - - // logrus.SetLevel(logrus.DebugLevel) - checker := NewPMMUpdateChecker(logrus.WithField("test", t.Name())) - vmParams := &models.VictoriaMetricsParams{} - s := New("/etc/supervisord.d", checker, &models.Params{VMParams: vmParams, PGParams: &models.PGParams{}, HAParams: &models.HAParams{}}, gRPCMessageMaxSize) - require.NotEmpty(t, s.supervisorctlPath) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go s.Run(ctx) - - require.Equal(t, false, s.UpdateRunning()) - - offset, err := s.StartUpdate() - require.NoError(t, err) - assert.Zero(t, offset) - - assert.True(t, s.UpdateRunning()) - - _, err = s.StartUpdate() - assert.Equal(t, status.Errorf(codes.FailedPrecondition, "Update is already running."), err) - - // get logs as often as possible to increase a chance for race detector to spot something - var lastLine string - for { - done := s.UpdateRunning() - if done { - // give supervisord a second to flush logs to file - time.Sleep(time.Second) - } - - lines, newOffset, err := s.UpdateLog(offset) - require.NoError(t, err) - if newOffset == offset { - assert.Empty(t, lines, "lines:\n%s", strings.Join(lines, "\n")) - if done { - continue - } - break - } - - assert.NotEmpty(t, lines) - t.Logf("%s", strings.Join(lines, "\n")) - lastLine = lines[len(lines)-1] - - assert.NotZero(t, newOffset) - assert.True(t, newOffset > offset, "expected newOffset = %d > offset = %d", newOffset, offset) - offset = newOffset - } - - t.Logf("lastLine = %q", lastLine) - assert.Contains(t, lastLine, "Waiting for Grafana dashboards update to finish...") - - // extra checks that we did not miss `pmp-update -perform` self-update and restart by supervisord - const wait = 3 * time.Second - const delay = 200 * time.Millisecond - for i := 0; i < int(wait/delay); i++ { - time.Sleep(delay) - require.False(t, s.UpdateRunning()) - lines, newOffset, err := s.UpdateLog(offset) - require.NoError(t, err) - require.Empty(t, lines, "lines:\n%s", strings.Join(lines, "\n")) - require.Equal(t, offset, newOffset, "offset = %d, newOffset = %d", offset, newOffset) - } - }) -} - -func normalizeFullversion(info *version.PackageInfo) (version string, isFeatureBranch bool) { - fullVersion := info.FullVersion - - epochPrefix := "1:" // set by RPM_EPOCH in PMM Server build scripts - isFeatureBranch = strings.HasPrefix(fullVersion, epochPrefix) - if isFeatureBranch { - fullVersion = strings.TrimPrefix(fullVersion, epochPrefix) - } - - return fullVersion, isFeatureBranch + //t.Run("Update", func(t *testing.T) { + // // This test can be run only once as it breaks assumptions of other tests. + // // It also should be the last test in devcontainer. + // if ok, _ := strconv.ParseBool(os.Getenv("PMM_TEST_RUN_UPDATE")); !ok { + // t.Skip("skipping update test") + // } + // + // // logrus.SetLevel(logrus.DebugLevel) + // checker := NewPMMUpdateChecker() + // vmParams := &models.VictoriaMetricsParams{} + // s := New("/etc/supervisord.d", &models.Params{VMParams: vmParams, PGParams: &models.PGParams{}, HAParams: &models.HAParams{}}, gRPCMessageMaxSize) + // require.NotEmpty(t, s.supervisorctlPath) + // + // ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + // go s.Run(ctx) + // + // require.Equal(t, false, s.UpdateRunning()) + // + // offset, err := s.StartUpdate() + // require.NoError(t, err) + // assert.Zero(t, offset) + // + // assert.True(t, s.UpdateRunning()) + // + // _, err = s.StartUpdate() + // assert.Equal(t, status.Errorf(codes.FailedPrecondition, "Update is already running."), err) + // + // // get logs as often as possible to increase a chance for race detector to spot something + // var lastLine string + // for { + // done := s.UpdateRunning() + // if done { + // // give supervisord a second to flush logs to file + // time.Sleep(time.Second) + // } + // + // lines, newOffset, err := s.UpdateLog(offset) + // require.NoError(t, err) + // if newOffset == offset { + // assert.Empty(t, lines, "lines:\n%s", strings.Join(lines, "\n")) + // if done { + // continue + // } + // break + // } + // + // assert.NotEmpty(t, lines) + // t.Logf("%s", strings.Join(lines, "\n")) + // lastLine = lines[len(lines)-1] + // + // assert.NotZero(t, newOffset) + // assert.True(t, newOffset > offset, "expected newOffset = %d > offset = %d", newOffset, offset) + // offset = newOffset + // } + // + // t.Logf("lastLine = %q", lastLine) + // assert.Contains(t, lastLine, "Waiting for Grafana dashboards update to finish...") + // + // // extra checks that we did not miss `pmp-update -perform` self-update and restart by supervisord + // const wait = 3 * time.Second + // const delay = 200 * time.Millisecond + // for i := 0; i < int(wait/delay); i++ { + // time.Sleep(delay) + // require.False(t, s.UpdateRunning()) + // lines, newOffset, err := s.UpdateLog(offset) + // require.NoError(t, err) + // require.Empty(t, lines, "lines:\n%s", strings.Join(lines, "\n")) + // require.Equal(t, offset, newOffset, "offset = %d, newOffset = %d", offset, newOffset) + // } + //}) } diff --git a/managed/services/supervisord/supervisord.go b/managed/services/supervisord/supervisord.go index 2d94414418..2036914ca7 100644 --- a/managed/services/supervisord/supervisord.go +++ b/managed/services/supervisord/supervisord.go @@ -21,7 +21,6 @@ import ( "bytes" "context" "fmt" - "io" "io/fs" "net/url" "os" @@ -35,16 +34,12 @@ import ( "time" "github.com/AlekSi/pointer" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "golang.org/x/sys/unix" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/percona/pmm/managed/models" "github.com/percona/pmm/managed/utils/envvars" "github.com/percona/pmm/utils/pdeathsig" - "github.com/percona/pmm/version" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" ) const ( @@ -63,18 +58,15 @@ const ( // Service is responsible for interactions with Supervisord via supervisorctl. type Service struct { - configDir string - supervisorctlPath string - gRPCMessageMaxSize uint32 - l *logrus.Entry - pmmUpdateCheck *PMMUpdateChecker + configDir string + supervisorctlPath string + l *logrus.Entry eventsM sync.Mutex subs map[chan *event]sub lastEvents map[string]eventType - pmmUpdatePerformLogM sync.Mutex - supervisordConfigsM sync.Mutex + supervisordConfigsM sync.Mutex vmParams *models.VictoriaMetricsParams pgParams *models.PGParams @@ -88,25 +80,21 @@ type sub struct { // values from supervisord configuration. const ( - pmmUpdatePerformProgram = "pmm-update-perform" - pmmUpdatePerformLog = "/srv/logs/pmm-update-perform.log" - pmmConfig = "/etc/supervisord.d/pmm.ini" + pmmConfig = "/etc/supervisord.d/pmm.ini" ) // New creates new service. -func New(configDir string, pmmUpdateCheck *PMMUpdateChecker, params *models.Params, gRPCMessageMaxSize uint32) *Service { +func New(configDir string, params *models.Params, ) *Service { path, _ := exec.LookPath("supervisorctl") return &Service{ - configDir: configDir, - supervisorctlPath: path, - gRPCMessageMaxSize: gRPCMessageMaxSize, - l: logrus.WithField("component", "supervisord"), - pmmUpdateCheck: pmmUpdateCheck, - subs: make(map[chan *event]sub), - lastEvents: make(map[string]eventType), - vmParams: params.VMParams, - pgParams: params.PGParams, - haParams: params.HAParams, + configDir: configDir, + supervisorctlPath: path, + l: logrus.WithField("component", "supervisord"), + subs: make(map[chan *event]sub), + lastEvents: make(map[string]eventType), + vmParams: params.VMParams, + pgParams: params.PGParams, + haParams: params.HAParams, } } @@ -114,24 +102,6 @@ func New(configDir string, pmmUpdateCheck *PMMUpdateChecker, params *models.Para func (s *Service) Run(ctx context.Context) { var wg sync.WaitGroup wg.Add(1) - go func() { - defer wg.Done() - - // pre-set installed packages info to cache it. - s.pmmUpdateCheck.Installed(ctx) - - // Do not check for updates for the first 10 minutes. - // That solves PMM Server building problems when we start pmm-managed. - // TODO https://jira.percona.com/browse/PMM-4429 - sleepCtx, sleepCancel := context.WithTimeout(ctx, 10*time.Minute) - <-sleepCtx.Done() - sleepCancel() - if ctx.Err() != nil { - return - } - - s.pmmUpdateCheck.run(ctx) - }() defer wg.Wait() if s.supervisorctlPath == "" { @@ -210,21 +180,6 @@ func (s *Service) Run(ctx context.Context) { } } -// InstalledPMMVersion returns currently installed PMM version information. -func (s *Service) InstalledPMMVersion(ctx context.Context) *version.PackageInfo { - return s.pmmUpdateCheck.Installed(ctx) -} - -// LastCheckUpdatesResult returns last PMM update check result and last check time. -func (s *Service) LastCheckUpdatesResult(ctx context.Context) (*version.UpdateCheckResult, time.Time) { - return s.pmmUpdateCheck.checkResult(ctx) -} - -// ForceCheckUpdates forces check for PMM updates. Result can be obtained via LastCheckUpdatesResult. -func (s *Service) ForceCheckUpdates(ctx context.Context) error { - return s.pmmUpdateCheck.check(ctx) -} - func (s *Service) subscribe(program string, eventTypes ...eventType) chan *event { ch := make(chan *event, 1) s.eventsM.Lock() @@ -249,66 +204,6 @@ func (s *Service) supervisorctl(args ...string) ([]byte, error) { return b, errors.Wrapf(err, "%s failed", cmdLine) } -// StartUpdate starts pmm-update-perform supervisord program with some preparations. -// It returns initial log file offset. -func (s *Service) StartUpdate() (uint32, error) { - if s.UpdateRunning() { - return 0, status.Errorf(codes.FailedPrecondition, "Update is already running.") - } - - // We need to remove and reopen log file for UpdateStatus API to be able to read it without it being rotated. - // Additionally, SIGUSR2 is expected by our Ansible playbook. - - s.pmmUpdatePerformLogM.Lock() - defer s.pmmUpdatePerformLogM.Unlock() - - // remove existing log file - err := os.Remove(pmmUpdatePerformLog) - if err != nil && errors.Is(err, fs.ErrNotExist) { - err = nil - } - if err != nil { - s.l.Warn(err) - } - - // send SIGUSR2 to supervisord and wait for it to reopen log file - ch := s.subscribe("supervisord", logReopen) - b, err := s.supervisorctl("pid") - if err != nil { - return 0, err - } - pid, err := strconv.Atoi(strings.TrimSpace(string(b))) - if err != nil { - return 0, errors.WithStack(err) - } - p, err := os.FindProcess(pid) - if err != nil { - return 0, errors.WithStack(err) - } - if err = p.Signal(unix.SIGUSR2); err != nil { - s.l.Warnf("Failed to send SIGUSR2: %s", err) - } - s.l.Debug("Waiting for log reopen...") - <-ch - - var offset uint32 - fi, err := os.Stat(pmmUpdatePerformLog) - switch { - case err == nil: - if fi.Size() != 0 { - s.l.Warnf("Unexpected log file size: %+v", fi) - } - offset = uint32(fi.Size()) - case errors.Is(err, fs.ErrNotExist): - // that's expected as we remove this file above - default: - s.l.Warn(err) - } - - _, err = s.supervisorctl("start", pmmUpdatePerformProgram) - return offset, err -} - // parseStatus parses `supervisorctl status ` output, returns true if is running, // false if definitely not, and nil if status can't be determined. func parseStatus(status string) *bool { @@ -327,11 +222,6 @@ func parseStatus(status string) *bool { return nil } -// UpdateRunning returns true if pmm-update-perform is not done yet. -func (s *Service) UpdateRunning() bool { - return s.programRunning(pmmUpdatePerformProgram) -} - // UpdateRunning returns true if given supervisord program is running or being restarted, // false if it is not running / failed. func (s *Service) programRunning(program string) bool { @@ -367,42 +257,6 @@ func (s *Service) programRunning(program string) bool { } } -// UpdateLog returns some lines and a new offset from pmm-update-perform log starting from the given offset. -// It may return zero lines and the same offset. Caller is expected to handle this. -func (s *Service) UpdateLog(offset uint32) ([]string, uint32, error) { - s.pmmUpdatePerformLogM.Lock() - defer s.pmmUpdatePerformLogM.Unlock() - - f, err := os.Open(pmmUpdatePerformLog) - if err != nil { - return nil, 0, errors.WithStack(err) - } - defer f.Close() //nolint:errcheck,gosec,nolintlint - - if _, err = f.Seek(int64(offset), io.SeekStart); err != nil { - return nil, 0, errors.WithStack(err) - } - - lines := make([]string, 0, 10) - reader := bufio.NewReader(f) - newOffset := offset - for { - line, err := reader.ReadString('\n') - if err == nil { - newOffset += uint32(len(line)) - if newOffset-offset > s.gRPCMessageMaxSize { - return lines, newOffset - uint32(len(line)), nil - } - lines = append(lines, strings.TrimSuffix(line, "\n")) - continue - } - if err == io.EOF { - err = nil - } - return lines, newOffset, errors.WithStack(err) - } -} - // reload asks supervisord to reload configuration. func (s *Service) reload(name string) error { if _, err := s.supervisorctl("reread"); err != nil { diff --git a/managed/services/supervisord/supervisord_test.go b/managed/services/supervisord/supervisord_test.go index b0fe8ef8b3..c576042826 100644 --- a/managed/services/supervisord/supervisord_test.go +++ b/managed/services/supervisord/supervisord_test.go @@ -22,19 +22,15 @@ import ( "time" "github.com/AlekSi/pointer" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/percona/pmm/managed/models" ) -const gRPCMessageMaxSize = uint32(100 * 1024 * 1024) - func TestConfig(t *testing.T) { t.Parallel() - pmmUpdateCheck := NewPMMUpdateChecker(logrus.WithField("component", "supervisord/pmm-update-checker_logs")) configDir := filepath.Join("..", "..", "testdata", "supervisord.d") vmParams, err := models.NewVictoriaMetricsParams(models.BasePrometheusConfigPath, models.VMBaseURL) require.NoError(t, err) @@ -48,7 +44,7 @@ func TestConfig(t *testing.T) { SSLKeyPath: "path-to-key", SSLCertPath: "path-to-cert", } - s := New(configDir, pmmUpdateCheck, &models.Params{VMParams: vmParams, PGParams: pgParams, HAParams: &models.HAParams{}}, gRPCMessageMaxSize) + s := New(configDir, &models.Params{VMParams: vmParams, PGParams: pgParams, HAParams: &models.HAParams{}}) settings := &models.Settings{ DataRetention: 30 * 24 * time.Hour, PMMPublicAddress: "192.168.0.42:8443", @@ -73,7 +69,6 @@ func TestConfig(t *testing.T) { } func TestConfigVictoriaMetricsEnvvars(t *testing.T) { - pmmUpdateCheck := NewPMMUpdateChecker(logrus.WithField("component", "supervisord/pmm-update-checker_logs")) configDir := filepath.Join("..", "..", "testdata", "supervisord.d") vmParams, err := models.NewVictoriaMetricsParams(models.BasePrometheusConfigPath, models.VMBaseURL) require.NoError(t, err) @@ -87,7 +82,7 @@ func TestConfigVictoriaMetricsEnvvars(t *testing.T) { SSLKeyPath: "path-to-key", SSLCertPath: "path-to-cert", } - s := New(configDir, pmmUpdateCheck, &models.Params{VMParams: vmParams, PGParams: pgParams, HAParams: &models.HAParams{}}, gRPCMessageMaxSize) + s := New(configDir, &models.Params{VMParams: vmParams, PGParams: pgParams, HAParams: &models.HAParams{}}) settings := &models.Settings{ DataRetention: 30 * 24 * time.Hour, PMMPublicAddress: "192.168.0.42:8443",