Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 89 additions & 27 deletions internal/daemon/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/recovery"

mrpb "google.golang.org/genproto/googleapis/monitoring/v3"
cpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
odpb "github.com/GoogleCloudPlatform/workloadagent/protos/oraclediscovery"
gapb "github.com/GoogleCloudPlatform/workloadagentplatform/sharedprotos/guestactions"
)

Expand All @@ -43,8 +45,21 @@ const (
// where the agent is running, unlike registered channels that use a producer project.
defaultChannel = "oracle-operations-ephemeral-channel"
defaultLockTimeout = 24 * time.Hour
// discoveryCheckInterval is the interval to check if discovery has found an Oracle process.
discoveryCheckInterval = 5 * time.Second
)

// DiscoveryClient is the interface for Oracle discovery operations.
type DiscoveryClient interface {
Discover(ctx context.Context, cloudProps *cpb.CloudProperties, processes []servicecommunication.ProcessWrapper) (*odpb.Discovery, error)
}

// MetricCollector is the interface for Oracle metrics collection.
type MetricCollector interface {
SendHealthMetricsToCloudMonitoring(ctx context.Context) []*mrpb.TimeSeries
SendDefaultMetricsToCloudMonitoring(ctx context.Context) []*mrpb.TimeSeries
}

// guestActionsManager is an interface satisfied by guestactions.GuestActions
// to allow for mocking in tests.
type guestActionsManager interface {
Expand Down Expand Up @@ -80,11 +95,20 @@ type Service struct {
CloudProps *cpb.CloudProperties
metricCollectionRoutine *recovery.RecoverableRoutine
discoveryRoutine *recovery.RecoverableRoutine
guestActionsRoutine *recovery.RecoverableRoutine
currentSIDs []string
CommonCh <-chan *servicecommunication.Message
isProcessPresent bool
processes []servicecommunication.ProcessWrapper
processesMutex sync.Mutex

// discovery performs Oracle discovery operations.
discovery DiscoveryClient
// newMetricCollector creates a new metrics collector.
// This is a factory function because the metrics collector is stateful (holds DB connections)
// and runs inside a recoverable routine. If the routine crashes and restarts, we need
// to create a fresh collector with new connections rather than reusing a potentially broken one.
newMetricCollector func(context.Context, *cpb.Configuration) (MetricCollector, error)
}

type runGuestActionsArgs struct {
Expand All @@ -104,29 +128,72 @@ var oraProcessPrefixes = []string{"ora_pmon_", "db_pmon_"}

// Start initiates the Oracle workload agent service
func (s *Service) Start(ctx context.Context, a any) {
s.initializeDependencies()

go (func() {
for {
s.checkServiceCommunication(ctx)
}
})()

if !s.waitForWorkload(ctx) {
return
}

if runtime.GOOS != "linux" {
log.CtxLogger(ctx).Error("Oracle service is only supported on Linux")
return
}

s.startDiscoveryRoutine(ctx)
s.startMetricCollectionRoutine(ctx)
s.startGuestActionsRoutine(ctx)

select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Oracle workload agent service cancellation requested")
return
}
}

func (s *Service) initializeDependencies() {
if s.discovery == nil {
s.discovery = oraclediscovery.New()
}
if s.newMetricCollector == nil {
s.newMetricCollector = func(ctx context.Context, cfg *cpb.Configuration) (MetricCollector, error) {
return oraclemetrics.New(ctx, cfg)
}
}
}

// waitForWorkload checks if the service should be enabled.
// If enabled field is unset, it waits for the workload to be present.
// Returns true if the service should proceed, false otherwise.
func (s *Service) waitForWorkload(ctx context.Context) bool {
// Check if the enabled field is unset. If it is, then the service is still enabled if the workload is present.
if s.Config.GetOracleConfiguration().Enabled == nil {
log.CtxLogger(ctx).Info("Oracle service enabled field is not set, will check for workload presence to determine if service should be enabled.")
// If the workload is present, proceed with starting the service even if it is not enabled.
for !s.isProcessPresent {
time.Sleep(5 * time.Second)
select {
case <-ctx.Done():
return false
case <-time.After(discoveryCheckInterval):
continue
}
}
log.CtxLogger(ctx).Info("Oracle workload is present. Starting service.")
} else if !s.Config.GetOracleConfiguration().GetEnabled() {
log.CtxLogger(ctx).Info("Oracle service is disabled")
return
return true
}

if runtime.GOOS != "linux" {
log.CtxLogger(ctx).Error("Oracle service is only supported on Linux")
return
if !s.Config.GetOracleConfiguration().GetEnabled() {
log.CtxLogger(ctx).Info("Oracle service is disabled")
return false
}
return true
}

func (s *Service) startDiscoveryRoutine(ctx context.Context) {
if s.Config.GetOracleConfiguration().GetOracleDiscovery().GetEnabled() {
dCtx := log.SetCtx(ctx, "context", "OracleDiscovery")
s.discoveryRoutine = &recovery.RecoverableRoutine{
Expand All @@ -138,7 +205,9 @@ func (s *Service) Start(ctx context.Context, a any) {
}
s.discoveryRoutine.StartRoutine(dCtx)
}
}

func (s *Service) startMetricCollectionRoutine(ctx context.Context) {
if s.Config.GetOracleConfiguration().GetOracleMetrics().GetEnabled() {
mcCtx := log.SetCtx(ctx, "context", "OracleMetricCollection")
s.metricCollectionRoutine = &recovery.RecoverableRoutine{
Expand All @@ -150,23 +219,19 @@ func (s *Service) Start(ctx context.Context, a any) {
}
s.metricCollectionRoutine.StartRoutine(mcCtx)
}
}

func (s *Service) startGuestActionsRoutine(ctx context.Context) {
gaCtx := log.SetCtx(ctx, "context", "OracleGuestActions")
guestActionsRoutine := &recovery.RecoverableRoutine{
s.guestActionsRoutine = &recovery.RecoverableRoutine{
Routine: runGuestActions,
RoutineArg: runGuestActionsArgs{s: s, handlers: guestActionHandlers()},
ErrorCode: usagemetrics.GuestActionsFailure,
UsageLogger: *usagemetrics.UsageLogger,
ExpectedMinDuration: 10 * time.Second,
}
log.CtxLogger(ctx).Info("Starting guest actions routine")
guestActionsRoutine.StartRoutine(gaCtx)

select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Oracle workload agent service cancellation requested")
return
}
s.guestActionsRoutine.StartRoutine(gaCtx)
}

func guestActionHandlers() map[string]guestactions.GuestActionHandler {
Expand Down Expand Up @@ -230,12 +295,12 @@ func runDiscovery(ctx context.Context, a any) {
log.CtxLogger(ctx).Error("args is not of type runDiscoveryArgs")
return
}
s := args.s

ticker := time.NewTicker(args.s.Config.GetOracleConfiguration().GetOracleDiscovery().GetUpdateFrequency().AsDuration())
s := args.s
ticker := time.NewTicker(s.Config.GetOracleConfiguration().GetOracleDiscovery().GetUpdateFrequency().AsDuration())
defer ticker.Stop()

ds := oraclediscovery.New()
ds := s.discovery

for {
// Discovery data is not used yet.
Expand All @@ -244,18 +309,15 @@ func runDiscovery(ctx context.Context, a any) {
s.processesMutex.Unlock()
// Don't start discovery until processes are populated.
for processes == nil {
time.Sleep(5 * time.Second)
s.processesMutex.Lock()
processes = s.processes
s.processesMutex.Unlock()
// Respect context cancellation.
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Oracle Discovery cancellation requested")
return
default:
continue
case <-time.After(discoveryCheckInterval):
}
s.processesMutex.Lock()
processes = s.processes
s.processesMutex.Unlock()
}
_, err := ds.Discover(ctx, s.CloudProps, processes)
if err != nil {
Expand Down Expand Up @@ -284,7 +346,7 @@ func runMetricCollection(ctx context.Context, a any) {
ticker := time.NewTicker(args.s.Config.GetOracleConfiguration().GetOracleMetrics().GetCollectionFrequency().AsDuration())
defer ticker.Stop()

metricCollector, err := oraclemetrics.New(ctx, args.s.Config)
metricCollector, err := args.s.newMetricCollector(ctx, args.s.Config)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to initialize metric collector", "error", err)
return
Expand Down
Loading