diff --git a/lib/cache/app.go b/lib/cache/app.go index f745a653332d0..3b29585e26aea 100644 --- a/lib/cache/app.go +++ b/lib/cache/app.go @@ -24,7 +24,7 @@ import ( "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/itertools/stream" + "github.com/gravitational/teleport/api/utils/clientutils" "github.com/gravitational/teleport/lib/services" ) @@ -47,12 +47,8 @@ func newAppCollection(upstream services.Applications, w types.WatchKind) (*colle appNameIndex: types.Application.GetName, }), fetcher: func(ctx context.Context, loadSecrets bool) ([]types.Application, error) { - out, err := stream.Collect(upstream.Apps(ctx, "", "")) - // TODO(tross): DELETE IN v21.0.0 - if trace.IsNotImplemented(err) { - apps, err := upstream.GetApps(ctx) - return apps, trace.Wrap(err) - } + // TODO(tross): DELETE IN v21.0.0 replace by regular clientutils.Resources + out, err := clientutils.CollectWithFallback(ctx, upstream.ListApps, upstream.GetApps) return out, trace.Wrap(err) }, headerTransform: func(hdr *types.ResourceHeader) types.Application { diff --git a/lib/cache/database.go b/lib/cache/database.go index 846cbe1007d30..ffb2898f8dcfd 100644 --- a/lib/cache/database.go +++ b/lib/cache/database.go @@ -53,12 +53,8 @@ func newDatabaseCollection(upstream services.Databases, w types.WatchKind) (*col databaseNameIndex: types.Database.GetName, }), fetcher: func(ctx context.Context, loadSecrets bool) ([]types.Database, error) { - out, err := stream.Collect(upstream.RangeDatabases(ctx, "", "")) - // TODO(lokraszewski): DELETE IN v21.0.0 - if trace.IsNotImplemented(err) { - out, err := upstream.GetDatabases(ctx) - return out, trace.Wrap(err) - } + // TODO(lokraszewski): DELETE IN v21.0.0 replace by regular clientutils.Resources + out, err := clientutils.CollectWithFallback(ctx, upstream.ListDatabases, upstream.GetDatabases) return out, trace.Wrap(err) }, headerTransform: func(hdr *types.ResourceHeader) types.Database { diff --git a/lib/cache/windows_desktop.go b/lib/cache/windows_desktop.go index 12924ac39dbdd..27df838c2705b 100644 --- a/lib/cache/windows_desktop.go +++ b/lib/cache/windows_desktop.go @@ -200,33 +200,25 @@ func newWindowsDesktopCollection(upstream services.WindowsDesktops, w types.Watc }, }), fetcher: func(ctx context.Context, loadSecrets bool) ([]types.WindowsDesktop, error) { - var start string - var desktops []types.WindowsDesktop - for { - req := types.ListWindowsDesktopsRequest{ - // A non zero limit is required by older versions. - Limit: defaults.DefaultChunkSize, - StartKey: start, - } - - resp, err := upstream.ListWindowsDesktops(ctx, req) - if err != nil { - // TODO(tross): DELETE in V21.0.0 - if trace.IsNotImplemented(err) { - return upstream.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{}) + // TODO(tross): DELETE in V21.0.0 replace by regular clientutils.Resources + out, err := clientutils.CollectWithFallback( + ctx, + func(ctx context.Context, limit int, start string) ([]types.WindowsDesktop, string, error) { + resp, err := upstream.ListWindowsDesktops(ctx, types.ListWindowsDesktopsRequest{ + Limit: limit, + StartKey: start, + }) + if err != nil { + return nil, "", trace.Wrap(err) } + return resp.Desktops, resp.NextKey, nil + }, + func(ctx context.Context) ([]types.WindowsDesktop, error) { + return upstream.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{}) + }, + ) - return nil, trace.Wrap(err) - } - - desktops = append(desktops, resp.Desktops...) - start = resp.NextKey - if resp.NextKey == "" { - break - } - } - - return desktops, nil + return out, trace.Wrap(err) }, headerTransform: func(hdr *types.ResourceHeader) types.WindowsDesktop { return &types.WindowsDesktopV3{