Skip to content

Commit

Permalink
make configWatcher a method instead of an anonymous function
Browse files Browse the repository at this point in the history
  • Loading branch information
benjirewis committed Dec 19, 2024
1 parent a2d8bd2 commit 9e68fd6
Showing 1 changed file with 102 additions and 85 deletions.
187 changes: 102 additions & 85 deletions web/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.viam.com/rdk/config"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
robotimpl "go.viam.com/rdk/robot/impl"
"go.viam.com/rdk/robot/web"
weboptions "go.viam.com/rdk/robot/web/options"
Expand Down Expand Up @@ -249,6 +250,92 @@ func (s *robotServer) createWebOptions(cfg *config.Config) (weboptions.Options,
return options, nil
}

// A wrapper around actual config processing that also applies options from the
// robot server.
func (s *robotServer) processConfig(in *config.Config) (*config.Config, error) {
out, err := config.ProcessConfig(in)
if err != nil {
return nil, err
}
out.Debug = s.args.Debug || in.Debug
out.EnableWebProfile = s.args.WebProfile || in.EnableWebProfile
out.FromCommand = true
out.AllowInsecureCreds = s.args.AllowInsecureCreds
out.UntrustedEnv = s.args.UntrustedEnv
out.PackagePath = path.Join(viamDotDir, "packages")
return out, nil
}

// A function to be started as a goroutine that watches for changes, either
// from disk or from cloud, to the robot's config. Starts comparisons based on
// `currCfg`. Reconfigures the robot when config changes are received from the
// watcher.
func (s *robotServer) configWatcher(ctx context.Context, currCfg *config.Config, r robot.LocalRobot,
watcher config.Watcher,
) {
// Reconfigure robot to have passed-in config before listening for any config
// changes.
r.Reconfigure(ctx, currCfg)

// Once reconfigure with initial config is complete; set initializing to
// false. Robot is now fully running and can indicate this through the
// MachineStatus endpoint.
r.SetInitializing(false)

for {
select {
case <-ctx.Done():
return
default:
}
select {
case <-ctx.Done():
return
case cfg := <-watcher.Config():
processedConfig, err := s.processConfig(cfg)
if err != nil {
s.logger.Errorw("reconfiguration aborted: error processing config", "error", err)
continue
}

// flag to restart web service if necessary
diff, err := config.DiffConfigs(*currCfg, *processedConfig, s.args.RevealSensitiveConfigDiffs)
if err != nil {
s.logger.Errorw("reconfiguration aborted: error diffing config", "error", err)
continue
}
var options weboptions.Options

if !diff.NetworkEqual {
// TODO(RSDK-2694): use internal web service reconfiguration instead
r.StopWeb()
options, err = s.createWebOptions(processedConfig)
if err != nil {
s.logger.Errorw("reconfiguration aborted: error creating weboptions", "error", err)
continue
}
}

// Update logger registry if log patterns may have changed.
//
// This functionality is tested in `TestLogPropagation` in `local_robot_test.go`.
if !diff.LogEqual {
s.logger.Debug("Detected potential changes to log patterns; updating logger levels")
config.UpdateLoggerRegistryFromConfig(s.registry, processedConfig, s.logger)
}

r.Reconfigure(ctx, processedConfig)

if !diff.NetworkEqual {
if err := r.StartWeb(ctx, options); err != nil {
s.logger.Errorw("reconfiguration failed: error starting web service while reconfiguring", "error", err)
}
}
currCfg = processedConfig
}
}
}

func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err error) {
ctx, cancel := context.WithCancel(ctx)

Expand Down Expand Up @@ -324,21 +411,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
defer cancel()
ctx = rpc.ContextWithDialer(ctx, rpcDialer)

processConfig := func(in *config.Config) (*config.Config, error) {
out, err := config.ProcessConfig(in)
if err != nil {
return nil, err
}
out.Debug = s.args.Debug || in.Debug
out.EnableWebProfile = s.args.WebProfile || in.EnableWebProfile
out.FromCommand = true
out.AllowInsecureCreds = s.args.AllowInsecureCreds
out.UntrustedEnv = s.args.UntrustedEnv
out.PackagePath = path.Join(viamDotDir, "packages")
return out, nil
}

fullProcessedConfig, err := processConfig(cfg)
fullProcessedConfig, err := s.processConfig(cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -434,80 +507,24 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
defer func() {
err = multierr.Combine(err, watcher.Close())
}()
onWatchDone := make(chan struct{})
// Use `fullProcessedConfig` as the initial `oldCfg` for the config watcher
// goroutine, as we want incoming config changes to be compared to the full
// config.
oldCfg := fullProcessedConfig
utils.ManagedGo(func() {
// Reconfigure robot to have full processed config before listening for any
// config changes.
myRobot.Reconfigure(ctx, fullProcessedConfig)

// Once reconfigure with full processed config is complete; set initializing
// to false. Robot is now fully running and can indicate this through the
// MachineStatus endpoint.
myRobot.SetInitializing(false)

for {
select {
case <-ctx.Done():
return
default:
}
select {
case <-ctx.Done():
return
case cfg := <-watcher.Config():
processedConfig, err := processConfig(cfg)
if err != nil {
s.logger.Errorw("reconfiguration aborted: error processing config", "error", err)
continue
}

// flag to restart web service if necessary
diff, err := config.DiffConfigs(*oldCfg, *processedConfig, s.args.RevealSensitiveConfigDiffs)
if err != nil {
s.logger.Errorw("reconfiguration aborted: error diffing config", "error", err)
continue
}
var options weboptions.Options

if !diff.NetworkEqual {
// TODO(RSDK-2694): use internal web service reconfiguration instead
myRobot.StopWeb()
options, err = s.createWebOptions(processedConfig)
if err != nil {
s.logger.Errorw("reconfiguration aborted: error creating weboptions", "error", err)
continue
}
}

// Update logger registry if log patterns may have changed.
//
// This functionality is tested in `TestLogPropagation` in `local_robot_test.go`.
if !diff.LogEqual {
s.logger.Debug("Detected potential changes to log patterns; updating logger levels")
config.UpdateLoggerRegistryFromConfig(s.registry, processedConfig, s.logger)
}

myRobot.Reconfigure(ctx, processedConfig)

if !diff.NetworkEqual {
if err := myRobot.StartWeb(ctx, options); err != nil {
s.logger.Errorw("reconfiguration failed: error starting web service while reconfiguring", "error", err)
}
}
oldCfg = processedConfig
}
}
}, func() {
close(onWatchDone)
})
onWatchDone := make(chan struct{})
go func() {
defer func() {
close(onWatchDone)
}()

// Use `fullProcessedConfig` as the initial config for the config watcher
// goroutine, as we want incoming config changes to be compared to the full
// config.
s.configWatcher(ctx, fullProcessedConfig, myRobot, watcher)
}()
// At end of this function, cancel context and wait for watcher goroutine
// to complete.
defer func() {
cancel()
<-onWatchDone
}()
defer cancel()

// Create initial web options with `minimalProcessedConfig`.
options, err := s.createWebOptions(minimalProcessedConfig)
Expand Down

0 comments on commit 9e68fd6

Please sign in to comment.