Skip to content

Commit e6294a2

Browse files
Tyler Smithcopybara-github
authored andcommitted
Make plugin mode sub command available
PiperOrigin-RevId: 716728743
1 parent 40bbc88 commit e6294a2

3 files changed

Lines changed: 234 additions & 5 deletions

File tree

cmd/main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,20 @@ func main() {
6666
rootCmd.AddCommand(version.NewCommand())
6767
rootCmd.AddCommand(logusage.NewCommand(lp, cloudProps))
6868
d := daemon.NewDaemon(lp, cloudProps)
69+
p := daemon.NewPlugin(d)
70+
daemonCmd := daemon.NewDaemonSubCommand(d)
71+
pluginCmd := daemon.NewPluginSubcommand(p)
72+
daemon.PopulatePluginFlagValues(p, pluginCmd.Flags())
73+
rootCmd.AddCommand(pluginCmd)
6974
// When running on windows, the daemon is started using the winservice subcommand.
7075
// Having both the daemon command and the winservice command will cause an error when the
7176
// winservice tries to start the daemon, cobra will start the parent which is the winservice
7277
// causing a loop.
7378
if lp.OSType != "windows" {
74-
rootCmd.AddCommand(d)
79+
rootCmd.AddCommand(daemonCmd)
7580
}
7681
// Add any additional windows or linux specific subcommands.
77-
rootCmd.AddCommand(additionalSubcommands(ctx, d)...)
82+
rootCmd.AddCommand(additionalSubcommands(ctx, daemonCmd)...)
7883

7984
for _, cmd := range rootCmd.Commands() {
8085
if cmd.Name() != "startdaemon" {

internal/daemon/daemon.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,16 @@ type (
6363
}
6464
)
6565

66-
// NewDaemon creates a new startdaemon command.
67-
func NewDaemon(lp log.Parameters, cloudProps *cpb.CloudProperties) *cobra.Command {
68-
d := &Daemon{
66+
// NewDaemon creates a new Daemon.
67+
func NewDaemon(lp log.Parameters, cloudProps *cpb.CloudProperties) *Daemon {
68+
return &Daemon{
6969
lp: lp,
7070
cloudProps: cloudProps,
7171
}
72+
}
73+
74+
// NewDaemonSubCommand creates a new startdaemon subcommand.
75+
func NewDaemonSubCommand(d *Daemon) *cobra.Command {
7276
cmd := &cobra.Command{
7377
Use: "startdaemon",
7478
Short: "Start daemon mode of the agent",

internal/daemon/plugin.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
Copyright 2024 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package daemon
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net"
23+
24+
"github.com/GoogleCloudPlatform/agentcommunication_client"
25+
"github.com/spf13/cobra"
26+
"google.golang.org/api/option"
27+
"google.golang.org/grpc"
28+
"github.com/spf13/pflag"
29+
"google.golang.org/protobuf/encoding/prototext"
30+
pb "github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin"
31+
pbgrpc "github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin"
32+
"github.com/GoogleCloudPlatform/workloadagentplatform/integration/common/shared/log"
33+
)
34+
35+
const (
36+
// Label key for ACS messages to get the type of the contained message.
37+
messageTypeLabel = "message_type"
38+
39+
// Status code for an unhealthy agent -non zero means failure to the Guest Agent.
40+
unhealthyStatusCode = 1
41+
)
42+
43+
// Plugin is a subcommand that wraps the daemon subcommand.
44+
// This subcommand will start listening for messages from the guest agent
45+
// and when instructed to start plugin execution, it will start the daemon.
46+
type Plugin struct {
47+
channelID string
48+
endpoint string
49+
address string
50+
protocol string
51+
errorlogfile string
52+
grpcServer *grpc.Server
53+
daemon *Daemon
54+
daemonCtx context.Context
55+
daemonCancel context.CancelFunc
56+
}
57+
58+
// NewPlugin creates a new Plugin with an underlying daemon instance to delegate to.
59+
func NewPlugin(d *Daemon) *Plugin {
60+
return &Plugin{
61+
daemon: d,
62+
daemonCtx: context.Background(),
63+
}
64+
}
65+
66+
// PopulatePluginFlagValues uses the provided flags to set the plugin's primitive values.
67+
func PopulatePluginFlagValues(p *Plugin, fs *pflag.FlagSet) {
68+
fs.StringVar(&p.channelID, "channel", "", "Channel ID on which to receive application specific messages")
69+
fs.StringVar(&p.endpoint, "endpoint", "", "Endpoint for the agent communication service")
70+
fs.StringVar(&p.protocol, "protocol", "", "TCP or UDP")
71+
fs.StringVar(&p.address, "address", "", "Address on which to listen for messages")
72+
fs.StringVar(&p.errorlogfile, "errorlogfile", "", "plugin error log file")
73+
}
74+
75+
// NewPluginSubcommand creates a new plugin subcommand using the provided daemon as the worker.
76+
func NewPluginSubcommand(p *Plugin) *cobra.Command {
77+
return &cobra.Command{
78+
Use: "plugin",
79+
Short: "Plugin mode of the agent",
80+
RunE: func(cmd *cobra.Command, args []string) error {
81+
return p.Execute(cmd.Context())
82+
},
83+
}
84+
}
85+
86+
// Name returns the name of the plugin subcommand.
87+
func (p *Plugin) Name() string {
88+
return "plugin"
89+
}
90+
91+
// Synopsis returns a short string (less than one line) describing the plugin subcommand.
92+
func (p *Plugin) Synopsis() string { return "Plugin Mode" }
93+
94+
// Usage returns a long string explaining the command and giving usage
95+
// information.
96+
func (p *Plugin) Usage() string {
97+
return "Executes the Workload Agent plugin when instructed to by the guest agent"
98+
}
99+
100+
// Execute binds to an address and starts to listen for control messages
101+
// from the guest agent as well as domain-specific messages from the Agent
102+
// Communication Service. An error is returned if any connection cannot
103+
// be initialized.
104+
func (p *Plugin) Execute(ctx context.Context) error {
105+
log.Logger.Info("Starting plugin")
106+
if p.protocol == "" {
107+
return fmt.Errorf("protocol is required")
108+
}
109+
if p.address == "" {
110+
return fmt.Errorf("address is required")
111+
}
112+
listener, err := net.Listen(p.protocol, p.address)
113+
if err != nil {
114+
return fmt.Errorf("Failed to start listening on %q using %q: %v", p.address, p.protocol, err)
115+
}
116+
defer listener.Close()
117+
118+
// This is used to receive control messages from the Guest Agent.
119+
server := grpc.NewServer()
120+
defer server.GracefulStop()
121+
122+
// Enable the Guest Agent to handle the starting and stopping of the agent execution logic.
123+
pbgrpc.RegisterGuestAgentPluginServer(server, p)
124+
125+
// Enable listening for domain-specific messages from the Agent Communication Service.
126+
var opts []option.ClientOption
127+
if p.endpoint != "" {
128+
opts = append(opts, option.WithEndpoint(p.endpoint))
129+
}
130+
conn, err := client.CreateConnection(ctx, p.channelID, false, opts...)
131+
if err != nil {
132+
return fmt.Errorf("Failed to create ACS connection: %v", err)
133+
}
134+
defer conn.Close()
135+
136+
go func() {
137+
for {
138+
msg, err := conn.Receive()
139+
if err != nil {
140+
log.Logger.Fatalf("Failed to receive message from ACS: %v", err)
141+
}
142+
// The client will set the message type in the "message_type" label, we can key off that.
143+
messageType, ok := msg.GetLabels()[messageTypeLabel]
144+
if !ok {
145+
log.Logger.Warnf("Received message without the %q label: %v", messageType, prototext.Format(msg))
146+
continue
147+
}
148+
switch messageType {
149+
case "your custom message type here":
150+
go func() {
151+
// Unmarshall msg into your proto here and act accordingly. This must be done
152+
// asynchronously so the connection is not affected.
153+
}()
154+
default:
155+
log.Logger.Warnf("Unknown message type: %v", messageType)
156+
}
157+
}
158+
}()
159+
160+
if err = server.Serve(listener); err != nil {
161+
return fmt.Errorf("Failed to listen for GRPC messages: %v", err)
162+
}
163+
164+
return nil
165+
}
166+
167+
// Apply applies the config sent or performs the work defined in the message.
168+
// ApplyRequest is opaque to the agent and is expected to be well known contract
169+
// between Plugin and the server itself. For e.g. service might want to update
170+
// plugin config to enable/disable feature here plugins can react to such requests.
171+
func (p *Plugin) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) {
172+
return &pb.ApplyResponse{}, nil
173+
}
174+
175+
// Start starts the plugin and initiates the plugin functionality.
176+
// Until plugin receives Start request plugin is expected to be not functioning
177+
// and just listening on the address handed off waiting for the request.
178+
func (p *Plugin) Start(ctx context.Context, msg *pb.StartRequest) (*pb.StartResponse, error) {
179+
// [ctx] from request will be scoped to that of the request, when the request
180+
// is finished, the context is cancelled.
181+
// Treat this as the entry point for a plugin and create its own context.
182+
if p.daemonCancel != nil {
183+
log.Logger.Warn("Start called multiple times")
184+
return &pb.StartResponse{}, nil
185+
}
186+
p.daemonCtx, p.daemonCancel = context.WithCancel(context.Background())
187+
err := p.daemon.Execute(p.daemonCtx)
188+
return &pb.StartResponse{}, err
189+
}
190+
191+
// Stop is the stop hook and implements any cleanup if required.
192+
// Stop may be called if plugin revision is being changed.
193+
// For e.g. if plugins want to stop some task it was performing or remove some
194+
// state before exiting it can be done on this request.
195+
func (p *Plugin) Stop(ctx context.Context, msg *pb.StopRequest) (*pb.StopResponse, error) {
196+
if p.daemonCancel == nil {
197+
log.Logger.Warn("Stop called before Start")
198+
return &pb.StopResponse{}, nil
199+
}
200+
err := p.daemonCtx.Err()
201+
if err != nil {
202+
return &pb.StopResponse{}, err
203+
}
204+
p.daemonCancel()
205+
p.daemonCancel = nil
206+
p.daemonCtx = context.Background()
207+
return &pb.StopResponse{}, nil
208+
}
209+
210+
// GetStatus is the health check agent would perform to make sure plugin process
211+
// is alive. If request fails process is considered dead and relaunched. Plugins
212+
// can share any additional information to report it to the service. For e.g. if
213+
// plugins detect some non-fatal errors causing it unable to offer some features
214+
// it can reported in status which is sent back to the service by agent.
215+
func (p *Plugin) GetStatus(ctx context.Context, msg *pb.GetStatusRequest) (*pb.Status, error) {
216+
if err := p.daemonCtx.Err(); err != nil {
217+
return &pb.Status{Code: unhealthyStatusCode, Results: []string{err.Error()}}, nil
218+
}
219+
return &pb.Status{Code: 0, Results: []string{"Plugin is running ok"}}, nil
220+
}

0 commit comments

Comments
 (0)