Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 49 additions & 94 deletions integrations/event-handler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,23 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/logger"
"github.com/gravitational/teleport/integrations/lib/testing/integration"
)

type EventHandlerSuite struct {
suite.Suite
AuthHelper integration.AuthHelper
appConfig StartCmdConfig
fakeFluentd *FakeFluentd

client *client.Client
teleportConfig lib.TeleportConfig
}

func TestEventHandler(t *testing.T) {
suite.Run(t, &EventHandlerSuite{
AuthHelper: &integration.MinimalAuthHelper{},
})
}

// SetupSuite starts a Teleport auth service and creates the event forwarder
// user and role. This runs a once for the whole suite.
func (s *EventHandlerSuite) SetupSuite() {
var err error
ctx, cancel := context.WithCancel(context.Background())
s.T().Cleanup(cancel)
authHelper := &integration.MinimalAuthHelper{}

// starts a Teleport auth service and creates the event forwarder
// user and role.
// Start the Teleport Auth server and get the admin client.
s.client = s.AuthHelper.StartServer(s.T())
_, err = s.client.Ping(ctx)
require.NoError(s.T(), err)
adminClient := authHelper.StartServer(t)
t.Cleanup(func() { require.NoError(t, authHelper.Auth().Close()) })
_, err := adminClient.Ping(t.Context())
require.NoError(t, err)

eventHandlerRole, err := types.NewRole("teleport-event-handler", types.RoleSpecV6{
Allow: types.RoleConditions{
Expand All @@ -73,43 +54,38 @@ func (s *EventHandlerSuite) SetupSuite() {
},
Deny: types.RoleConditions{},
})
require.NoError(s.T(), err)
require.NoError(t, err)

eventHandlerRole, err = s.client.CreateRole(ctx, eventHandlerRole)
require.NoError(s.T(), err)
eventHandlerRole, err = adminClient.CreateRole(t.Context(), eventHandlerRole)
require.NoError(t, err)

eventHandlerUser, err := types.NewUser("teleport-event-handler")
require.NoError(s.T(), err)
require.NoError(t, err)

eventHandlerUser.SetRoles([]string{eventHandlerRole.GetName()})
eventHandlerUser, err = s.client.CreateUser(ctx, eventHandlerUser)
require.NoError(s.T(), err)

s.teleportConfig.Addr = s.AuthHelper.ServerAddr()
s.teleportConfig.Identity = s.AuthHelper.SignIdentityForUser(s.T(), ctx, eventHandlerUser)
}

// SetupTest starts a fake fluentd server.
// This runs before every test from the suite.
func (s *EventHandlerSuite) SetupTest() {
t := s.T()
eventHandlerUser, err = adminClient.CreateUser(t.Context(), eventHandlerUser)
require.NoError(t, err)

// Start fake fluentd
err := logger.Setup(logger.Config{Severity: "debug"})
// Starts a fake fluentd server.
err = logger.Setup(logger.Config{Severity: "debug"})
require.NoError(t, err)

s.fakeFluentd = NewFakeFluentd(t)
s.fakeFluentd.Start()
t.Cleanup(s.fakeFluentd.Close)
fakeFluentd := NewFakeFluentd(t)
fakeFluentd.Start()
t.Cleanup(fakeFluentd.Close)

startTime := time.Now().Add(-time.Minute)

conf := StartCmdConfig{
fluentdConfig := fakeFluentd.GetClientConfig()
fluentdConfig.FluentdURL = fakeFluentd.GetURL()
fluentdConfig.FluentdSessionURL = fluentdConfig.FluentdURL + "/session"

appConfig := StartCmdConfig{
TeleportConfig: TeleportConfig{
TeleportAddr: s.teleportConfig.Addr,
TeleportIdentityFile: s.teleportConfig.Identity,
TeleportAddr: authHelper.ServerAddr(),
TeleportIdentityFile: authHelper.SignIdentityForUser(t, t.Context(), eventHandlerUser),
},
FluentdConfig: s.fakeFluentd.GetClientConfig(),
FluentdConfig: fluentdConfig,
IngestConfig: IngestConfig{
StorageDir: t.TempDir(),
Timeout: time.Second,
Expand All @@ -121,88 +97,67 @@ func (s *EventHandlerSuite) SetupTest() {
},
}

conf.FluentdURL = s.fakeFluentd.GetURL()
conf.FluentdSessionURL = conf.FluentdURL + "/session"

s.appConfig = conf
}

func (s *EventHandlerSuite) startApp() {
s.T().Helper()
t := s.T()
t.Helper()

app, err := NewApp(&s.appConfig, slog.Default())
require.NoError(t, err)

integration.RunAndWaitReady(s.T(), app)
}

// nonce is data produced to uniquely identify an event.
// The nonce is propagated from the event generator to the event checker.
// All events not matching the nonce are skipped.
type nonce any

func (s *EventHandlerSuite) TestEvent() {
ctx, cancel := context.WithCancel(context.Background())
s.T().Cleanup(cancel)

// Original TestEvent
tests := []struct {
name string
generateEvent func(*testing.T, *client.Client) nonce
checkEvent func(*testing.T, string, nonce) bool
generateEvent func(*testing.T, *client.Client) any
checkEvent func(*testing.T, string, any) bool
}{
{
name: "new role",
generateEvent: func(t *testing.T, c *client.Client) nonce {
generateEvent: func(t *testing.T, c *client.Client) any {
roleName := uuid.New().String()
role, err := types.NewRole(roleName, types.RoleSpecV6{
Options: types.RoleOptions{},
Allow: types.RoleConditions{},
Deny: types.RoleConditions{},
})
require.NoError(t, err)
role, err = c.CreateRole(ctx, role)
role, err = c.CreateRole(t.Context(), role)
require.NoError(t, err)
return role.GetName()
},
checkEvent: func(t *testing.T, event string, n nonce) bool {
checkEvent: func(t *testing.T, event string, n any) bool {
roleName, ok := n.(string)
require.True(t, ok)
return strings.Contains(event, roleName)
},
},
{
name: "new token",
generateEvent: func(t *testing.T, c *client.Client) nonce {
generateEvent: func(t *testing.T, c *client.Client) any {
tokenName := uuid.New().String()
token, err := types.NewProvisionToken(tokenName, types.SystemRoles{types.RoleNode}, time.Time{})
require.NoError(t, err)
err = c.CreateToken(ctx, token)
err = c.CreateToken(t.Context(), token)
require.NoError(t, err)
return nil
},
checkEvent: func(t *testing.T, event string, _ nonce) bool {
checkEvent: func(t *testing.T, event string, _ any) bool {
return strings.Contains(event, "join_token.create")
},
},
}

// Start the event forwarder
s.startApp()
app, err := NewApp(&appConfig, slog.Default())
require.NoError(t, err)

t.Cleanup(app.Close)

integration.RunAndWaitReady(t, app)

for _, tt := range tests {
s.T().Run(tt.name, func(t *testing.T) {
nonce := tt.generateEvent(t, s.client)
t.Run(tt.name, func(t *testing.T) {
any := tt.generateEvent(t, adminClient)

waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
s.T().Cleanup(cancel)
waitCtx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
t.Cleanup(cancel)

eventFound := false
for !eventFound {
event, err := s.fakeFluentd.GetMessage(waitCtx)
require.NoError(s.T(), err, "did not receive the event after 5 seconds")
if tt.checkEvent(t, event, nonce) {
for eventFound := false; !eventFound; {
event, err := fakeFluentd.GetMessage(waitCtx)
require.NoError(t, err, "did not receive the event after 5 seconds")
if tt.checkEvent(t, event, any) {
t.Logf("Event matched: %s", event)
eventFound = true
} else {
Expand Down
Loading