-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathoptions.go
337 lines (284 loc) · 9.4 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
// Copyright 2024 T-Mobile USA, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// See the LICENSE file for additional language around the disclaimer of warranties.
// Trademark Disclaimer: Neither the name of “T-Mobile, USA” nor the names of
// its contributors may be used to endorse or promote products
package depaginator
import (
"context"
"errors"
)
// DefaultCapacity is the default capacity for the updates channel.
const DefaultCapacity = 500
// options describes options for [Depaginate].
type options struct {
totalItems int // Total number of items (hint)
totalPages int // Total number of pages (hint)
perPage int // Number of items per page
capacity int // Capacity of the update queue
starter Starter // Object with a Start method
updater Updater // Object with an Update method
doner Doner // Object with a Done method
initReq any // Initial request
}
// Option describes an option that may be passed to [Depaginate].
type Option interface {
// apply applies an option.
apply(opts *options)
}
// TotalItems is used to indicate an update to the total number of
// items to be expected. It may also be passed to [Depaginate] to
// hint to the total number of items to be expected.
type TotalItems int
// apply applies an option.
func (o TotalItems) apply(opts *options) {
opts.totalItems = int(o)
}
// TotalPages is used to indicate an update to the total number of
// pages to be expected. It may also be passed to [Depaginate] to
// hint to the total number of pages to be expected.
type TotalPages int
// apply applies an option.
func (o TotalPages) apply(opts *options) {
opts.totalPages = int(o)
}
// PerPage is used to indicate an update to the number of items per
// page to be expected. It may also be passed to [Depaginate] to hint
// to the number of items per page to be expected.
type PerPage int
// apply applies an option.
func (o PerPage) apply(opts *options) {
opts.perPage = int(o)
}
// Capacity may be passed to [Depaginate] to control the size of the
// updates queue on the [Depaginator]. This defaults to
// [DefaultCapacity], which is set to a generous size. Applications
// should only need to use this option if the default is insufficient
// for efficient operation.
type Capacity int
// apply applies an option.
func (o Capacity) apply(opts *options) {
opts.capacity = int(o)
}
// WithStarterOption is an [Option] implementation that explicitly
// sets the [Starter] to use.
type WithStarterOption struct {
starter Starter
}
// apply applies an option.
func (o WithStarterOption) apply(opts *options) {
opts.starter = o.starter
}
// WithStarter returns an [Option] that can be passed to [Depaginate]
// which sets an [Starter] to be called when [Depaginate] begins its
// work. The [Starter.Start] method is called with the initial values
// for total pages, total items, and per-page. The default is the
// [Handler], if it implements [Starter].
func WithStarter(starter Starter) WithStarterOption {
return WithStarterOption{
starter: starter,
}
}
// WithUpdaterOption is an [Option] implementation that explicitly
// sets the [Updater] to use.
type WithUpdaterOption struct {
updater Updater
}
// apply applies an option.
func (o WithUpdaterOption) apply(opts *options) {
opts.updater = o.updater
}
// WithUpdater returns an [Option] that can be passed to [Depaginate]
// which sets an [Updater] to be called when the total pages, total
// items, or per-page values are altered. The default is the
// [Handler], if it implements [Updater].
func WithUpdater(updater Updater) WithUpdaterOption {
return WithUpdaterOption{
updater: updater,
}
}
// WithDonerOption is an [Option] implementation that explicitly
// sets the [Doner] to use.
type WithDonerOption struct {
doner Doner
}
// apply applies an option.
func (o WithDonerOption) apply(opts *options) {
opts.doner = o.doner
}
// WithDoner returns an [Option] that can be passed to [Depaginate]
// which sets an [Doner] to be called once all pages are retrieved.
// The default is the [Handler], if it implements [Doner].
func WithDoner(doner Doner) WithDonerOption {
return WithDonerOption{
doner: doner,
}
}
// WithRequestOption is an [Option] implementation that sets the
// initial request.
type WithRequestOption struct {
req any
}
// apply applies an option.
func (o WithRequestOption) apply(opts *options) {
opts.initReq = o.req
}
// WithRequest returns an [Option] which sets the request object for
// the initial page load. By default, the request will be set to nil.
func WithRequest(req any) WithRequestOption {
return WithRequestOption{
req: req,
}
}
// update describes an update to be processed by the [Depaginator]'s
// daemon. The daemon processes updates to metadata, such as the
// total number of items, as well as issuing new page requests.
type update[T any] interface {
// applyUpdate applies an update.
applyUpdate(depag *Depaginator[T])
}
// cancelerFor is an [update] implementation that registers a canceler
// for a specific page.
type cancelerFor[T any] struct {
page int // Index of the page
cancelFn context.CancelFunc // Function to call to cancel page load
}
// applyUpdate applies an update.
func (u cancelerFor[T]) applyUpdate(depag *Depaginator[T]) {
depag.cancelers[u.page] = u.cancelFn
}
// withdrawCancelerUpdate is an [update] that withdraws a canceler for
// a specific page.
type withdrawCanceler[T any] int
// applyUpdate applies an update.
func (u withdrawCanceler[T]) applyUpdate(depag *Depaginator[T]) {
delete(depag.cancelers, int(u))
}
// errorSaver is an [update] implementation that saves an error.
type errorSaver[T any] struct {
req PageRequest // The request that caused the error
err error // The error that was caused
}
// applyUpdate applies an update.
func (u errorSaver[T]) applyUpdate(depag *Depaginator[T]) {
// Skip context-related errors
if errors.Is(u.err, context.Canceled) || errors.Is(u.err, context.DeadlineExceeded) {
return
}
// Save the error
depag.errors = append(depag.errors, PageError{
PageRequest: u.req,
Err: u.err,
})
}
// itemHandler is an [update] implementation that handles a page of
// items. The items are handled in a separate goroutine.
type itemHandler[T any] struct {
idx int // Page index
page []T // The page of items to handle
}
// applyUpdate applies an update.
func (u itemHandler[T]) applyUpdate(depag *Depaginator[T]) {
// Is this page short?
if len(u.page) < depag.perPage {
// Got the page count and item count now
totPages := u.idx + 1
totItems := depag.perPage*u.idx + len(u.page)
if depag.totalPages == 0 || depag.totalPages > totPages {
depag.totalPages = totPages
}
if depag.totalItems == 0 || depag.totalItems > totItems {
depag.totalItems = totItems
}
// Cancel pages we no longer need
for page, canceler := range depag.cancelers {
if page > u.idx {
canceler()
}
}
}
// Compute the base item index and handle the items
depag.wg.Add(1)
go u.handle(depag, depag.perPage*u.idx)
}
// handle handles each item in the page.
func (u itemHandler[T]) handle(depag *Depaginator[T], itemBase int) {
defer depag.wg.Done()
for i, item := range u.page {
depag.handler.Handle(depag.ctx, itemBase+i, item)
}
}
// pageDone is a sentinel [update] implementation that decrements the
// wait group.
type pageDone[T any] struct{}
// applyUpdate applies an update.
func (u pageDone[T]) applyUpdate(depag *Depaginator[T]) {
depag.wg.Done()
}
// totalItems is an [update] that updates the total number of items to
// expect.
type totalItems[T any] int
// applyUpdate applies an update.
func (u totalItems[T]) applyUpdate(depag *Depaginator[T]) {
if int(u) > 0 {
depag.totalItems = int(u)
}
}
// totalPages is an [update] that updates the total number of pages to
// expect.
type totalPages[T any] int
// applyUpdate applies an update.
func (u totalPages[T]) applyUpdate(depag *Depaginator[T]) {
if int(u) > 0 {
depag.totalPages = int(u)
}
}
// perPage is an [update] that updates the number of items to expect
// in each page.
type perPage[T any] int
// applyUpdate applies an update.
func (u perPage[T]) applyUpdate(depag *Depaginator[T]) {
if int(u) > 0 {
depag.perPage = int(u)
}
}
// bundle is an [update] that bundles together several updates.
type bundle[T any] []update[T]
// applyUpdate applies an update.
func (u bundle[T]) applyUpdate(depag *Depaginator[T]) {
for _, update := range u {
update.applyUpdate(depag)
}
}
// pageRequest is an [update] implementation that requests a page.
type pageRequest[T any] struct {
idx int // Page index
req any // Request-specific data
}
// applyUpdate applies an update.
func (u pageRequest[T]) applyUpdate(depag *Depaginator[T]) {
// Does the page exist?
if depag.totalPages > 0 && u.idx >= depag.totalPages {
return
}
// Has the page been requested already?
if depag.pages.CheckAndSet(u.idx) {
return
}
// Place the request
depag.wg.Add(1)
go depag.getPage(PageRequest{
PageIndex: u.idx,
Request: u.req,
})
}