Skip to content

Commit f21d832

Browse files
authored
Request count in ConsumedCapacity (#241)
* add request count in ConsumedCapacity * refactor addConsumedCapacity, add a lil test
1 parent 7a00ba8 commit f21d832

File tree

10 files changed

+62
-38
lines changed

10 files changed

+62
-38
lines changed

batchget.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,14 +341,15 @@ redo:
341341
itr.err = itr.bg.batch.table.db.retry(ctx, func() error {
342342
var err error
343343
itr.output, err = itr.bg.batch.table.db.client.BatchGetItem(ctx, itr.input)
344+
itr.bg.cc.incRequests()
344345
return err
345346
})
346347
if itr.err != nil {
347348
return false
348349
}
349350
if itr.bg.cc != nil {
350-
for _, cc := range itr.output.ConsumedCapacity {
351-
addConsumedCapacity(itr.bg.cc, &cc)
351+
for i := range itr.output.ConsumedCapacity {
352+
itr.bg.cc.add(&itr.output.ConsumedCapacity[i])
352353
}
353354
}
354355

batchwrite.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,15 @@ func (bw *BatchWrite) Run(ctx context.Context) (wrote int, err error) {
139139
err := bw.batch.table.db.retry(ctx, func() error {
140140
var err error
141141
res, err = bw.batch.table.db.client.BatchWriteItem(ctx, req)
142+
bw.cc.incRequests()
142143
return err
143144
})
144145
if err != nil {
145146
return wrote, err
146147
}
147148
if bw.cc != nil {
148-
for _, cc := range res.ConsumedCapacity {
149-
addConsumedCapacity(bw.cc, &cc)
149+
for i := range res.ConsumedCapacity {
150+
bw.cc.add(&res.ConsumedCapacity[i])
150151
}
151152
}
152153

delete.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,11 @@ func (d *Delete) run(ctx context.Context) (*dynamodb.DeleteItemOutput, error) {
108108
err := d.table.db.retry(ctx, func() error {
109109
var err error
110110
output, err = d.table.db.client.DeleteItem(ctx, input)
111+
d.cc.incRequests()
111112
return err
112113
})
113-
if d.cc != nil && output != nil {
114-
addConsumedCapacity(d.cc, output.ConsumedCapacity)
114+
if output != nil {
115+
d.cc.add(output.ConsumedCapacity)
115116
}
116117
return output, err
117118
}

put.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,11 @@ func (p *Put) run(ctx context.Context) (output *dynamodb.PutItemOutput, err erro
8181
req := p.input()
8282
p.table.db.retry(ctx, func() error {
8383
output, err = p.table.db.client.PutItem(ctx, req)
84+
p.cc.incRequests()
8485
return err
8586
})
86-
if p.cc != nil && output != nil {
87-
addConsumedCapacity(p.cc, output.ConsumedCapacity)
87+
if output != nil {
88+
p.cc.add(output.ConsumedCapacity)
8889
}
8990
return
9091
}

query.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func (q *Query) One(ctx context.Context, out interface{}) error {
221221
err := q.table.db.retry(ctx, func() error {
222222
var err error
223223
res, err = q.table.db.client.GetItem(ctx, req)
224+
q.cc.incRequests()
224225
if err != nil {
225226
return err
226227
}
@@ -232,9 +233,7 @@ func (q *Query) One(ctx context.Context, out interface{}) error {
232233
if err != nil {
233234
return err
234235
}
235-
if q.cc != nil {
236-
addConsumedCapacity(q.cc, res.ConsumedCapacity)
237-
}
236+
q.cc.add(res.ConsumedCapacity)
238237

239238
return unmarshalItem(res.Item, out)
240239
}
@@ -246,6 +245,7 @@ func (q *Query) One(ctx context.Context, out interface{}) error {
246245
err := q.table.db.retry(ctx, func() error {
247246
var err error
248247
res, err = q.table.db.client.Query(ctx, req)
248+
q.cc.incRequests()
249249
if err != nil {
250250
return err
251251
}
@@ -264,9 +264,7 @@ func (q *Query) One(ctx context.Context, out interface{}) error {
264264
if err != nil {
265265
return err
266266
}
267-
if q.cc != nil {
268-
addConsumedCapacity(q.cc, res.ConsumedCapacity)
269-
}
267+
q.cc.add(res.ConsumedCapacity)
270268

271269
return unmarshalItem(res.Items[0], out)
272270
}
@@ -288,6 +286,7 @@ func (q *Query) Count(ctx context.Context) (int, error) {
288286
err := q.table.db.retry(ctx, func() error {
289287
var err error
290288
res, err = q.table.db.client.Query(ctx, input)
289+
q.cc.incRequests()
291290
if err != nil {
292291
return err
293292
}
@@ -301,9 +300,7 @@ func (q *Query) Count(ctx context.Context) (int, error) {
301300
if err != nil {
302301
return 0, err
303302
}
304-
if q.cc != nil {
305-
addConsumedCapacity(q.cc, res.ConsumedCapacity)
306-
}
303+
q.cc.add(res.ConsumedCapacity)
307304

308305
q.startKey = res.LastEvaluatedKey
309306
if res.LastEvaluatedKey == nil ||
@@ -392,15 +389,14 @@ func (itr *queryIter) Next(ctx context.Context, out interface{}) bool {
392389
itr.err = itr.query.table.db.retry(ctx, func() error {
393390
var err error
394391
itr.output, err = itr.query.table.db.client.Query(ctx, itr.input)
392+
itr.query.cc.incRequests()
395393
return err
396394
})
397395

398396
if itr.err != nil {
399397
return false
400398
}
401-
if itr.query.cc != nil {
402-
addConsumedCapacity(itr.query.cc, itr.output.ConsumedCapacity)
403-
}
399+
itr.query.cc.add(itr.output.ConsumedCapacity)
404400
if len(itr.output.LastEvaluatedKey) > len(itr.exLEK) {
405401
itr.exLEK = itr.output.LastEvaluatedKey
406402
}

scan.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ func (s *Scan) Count(ctx context.Context) (int, error) {
254254
err := s.table.db.retry(ctx, func() error {
255255
var err error
256256
out, err = s.table.db.client.Scan(ctx, input)
257+
s.cc.incRequests()
257258
return err
258259
})
259260
if err != nil {
@@ -263,10 +264,7 @@ func (s *Scan) Count(ctx context.Context) (int, error) {
263264

264265
count += int(out.Count)
265266
scanned += out.ScannedCount
266-
267-
if s.cc != nil {
268-
addConsumedCapacity(s.cc, out.ConsumedCapacity)
269-
}
267+
s.cc.add(out.ConsumedCapacity)
270268

271269
if out.LastEvaluatedKey == nil ||
272270
(s.limit > 0 && count >= s.limit) ||
@@ -399,15 +397,14 @@ redo:
399397
itr.err = itr.scan.table.db.retry(ctx, func() error {
400398
var err error
401399
itr.output, err = itr.scan.table.db.client.Scan(ctx, itr.input)
400+
itr.scan.cc.incRequests()
402401
return err
403402
})
404403

405404
if itr.err != nil {
406405
return false
407406
}
408-
if itr.scan.cc != nil {
409-
addConsumedCapacity(itr.scan.cc, itr.output.ConsumedCapacity)
410-
}
407+
itr.scan.cc.add(itr.output.ConsumedCapacity)
411408
if len(itr.output.LastEvaluatedKey) > len(itr.exLEK) {
412409
itr.exLEK = itr.output.LastEvaluatedKey
413410
}

table.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ type ConsumedCapacity struct {
207207
// Write is the total number of write capacity units consumed during this operation.
208208
// This seems to be only set for transactions.
209209
Write float64
210+
210211
// GSI is a map of Global Secondary Index names to total consumed capacity units.
211212
GSI map[string]float64
212213
// GSIRead is a map of Global Secondary Index names to consumed read capacity units.
@@ -215,6 +216,7 @@ type ConsumedCapacity struct {
215216
// GSIWrite is a map of Global Secondary Index names to consumed write capacity units.
216217
// This seems to be only set for transactions.
217218
GSIWrite map[string]float64
219+
218220
// LSI is a map of Local Secondary Index names to total consumed capacity units.
219221
LSI map[string]float64
220222
// LSIRead is a map of Local Secondary Index names to consumed read capacity units.
@@ -223,6 +225,7 @@ type ConsumedCapacity struct {
223225
// LSIWrite is a map of Local Secondary Index names to consumed write capacity units.
224226
// This seems to be only set for transactions.
225227
LSIWrite map[string]float64
228+
226229
// Table is the amount of total throughput consumed by the table.
227230
Table float64
228231
// TableRead is the amount of read throughput consumed by the table.
@@ -233,9 +236,12 @@ type ConsumedCapacity struct {
233236
TableWrite float64
234237
// TableName is the name of the table affected by this operation.
235238
TableName string
239+
240+
// Requests is the number of SDK requests made against DynamoDB's API.
241+
Requests int
236242
}
237243

238-
func addConsumedCapacity(cc *ConsumedCapacity, raw *types.ConsumedCapacity) {
244+
func (cc *ConsumedCapacity) add(raw *types.ConsumedCapacity) {
239245
if cc == nil || raw == nil {
240246
return
241247
}
@@ -302,6 +308,13 @@ func addConsumedCapacity(cc *ConsumedCapacity, raw *types.ConsumedCapacity) {
302308
}
303309
}
304310

311+
func (cc *ConsumedCapacity) incRequests() {
312+
if cc == nil {
313+
return
314+
}
315+
cc.Requests++
316+
}
317+
305318
func mergeConsumedCapacity(dst, src *ConsumedCapacity) {
306319
if dst == nil || src == nil {
307320
return
@@ -363,4 +376,5 @@ func mergeConsumedCapacity(dst, src *ConsumedCapacity) {
363376
if dst.TableName == "" && src.TableName != "" {
364377
dst.TableName = src.TableName
365378
}
379+
dst.Requests += src.Requests
366380
}

table_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,20 @@ func TestAddConsumedCapacity(t *testing.T) {
170170
}
171171

172172
var cc = new(ConsumedCapacity)
173-
addConsumedCapacity(cc, raw)
173+
cc.add(raw)
174174

175175
if !reflect.DeepEqual(cc, expected) {
176176
t.Error("bad ConsumedCapacity:", cc, "≠", expected)
177177
}
178+
179+
t.Run("request count", func(t *testing.T) {
180+
const expectedReqs = 2
181+
cc.incRequests()
182+
cc.incRequests()
183+
if cc.Requests != expectedReqs {
184+
t.Error("bad Requests count:", cc.Requests, "≠", expectedReqs)
185+
}
186+
})
178187
}
179188

180189
func normalizeDesc(desc *Description) {

tx.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ func (tx *GetTx) Run(ctx context.Context) error {
7070
err = tx.db.retry(ctx, func() error {
7171
var err error
7272
resp, err = tx.db.client.TransactGetItems(ctx, input)
73+
tx.cc.incRequests()
7374
if tx.cc != nil && resp != nil {
74-
for _, cc := range resp.ConsumedCapacity {
75-
addConsumedCapacity(tx.cc, &cc)
75+
for i := range resp.ConsumedCapacity {
76+
tx.cc.add(&resp.ConsumedCapacity[i])
7677
}
7778
}
7879
return err
@@ -110,9 +111,10 @@ func (tx *GetTx) All(ctx context.Context, out interface{}) error {
110111
err = tx.db.retry(ctx, func() error {
111112
var err error
112113
resp, err = tx.db.client.TransactGetItems(ctx, input)
114+
tx.cc.incRequests()
113115
if tx.cc != nil && resp != nil {
114-
for _, cc := range resp.ConsumedCapacity {
115-
addConsumedCapacity(tx.cc, &cc)
116+
for i := range resp.ConsumedCapacity {
117+
tx.cc.add(&resp.ConsumedCapacity[i])
116118
}
117119
}
118120
return err
@@ -256,9 +258,10 @@ func (tx *WriteTx) Run(ctx context.Context) error {
256258
}
257259
err = tx.db.retry(ctx, func() error {
258260
out, err := tx.db.client.TransactWriteItems(ctx, input)
259-
if tx.cc != nil && out != nil {
260-
for _, cc := range out.ConsumedCapacity {
261-
addConsumedCapacity(tx.cc, &cc)
261+
tx.cc.incRequests()
262+
if out != nil {
263+
for i := range out.ConsumedCapacity {
264+
tx.cc.add(&out.ConsumedCapacity[i])
262265
}
263266
}
264267
return err

update.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,11 @@ func (u *Update) run(ctx context.Context) (*dynamodb.UpdateItemOutput, error) {
347347
err := u.table.db.retry(ctx, func() error {
348348
var err error
349349
output, err = u.table.db.client.UpdateItem(ctx, input)
350+
u.cc.incRequests()
350351
return err
351352
})
352-
if u.cc != nil && output != nil {
353-
addConsumedCapacity(u.cc, output.ConsumedCapacity)
353+
if output != nil {
354+
u.cc.add(output.ConsumedCapacity)
354355
}
355356
return output, err
356357
}

0 commit comments

Comments
 (0)