Skip to content

Conversation

@smallinsky
Copy link
Contributor

@smallinsky smallinsky commented Oct 30, 2025

What

Add PutBatch support for Postgres

Why not UPSERT - Postgres don't support UPSERT it needed to be UPDATE ON CONFLICTS

@smallinsky smallinsky force-pushed the smallinsky/pg/pug_batch branch 3 times, most recently from ee56fb5 to 131a7ee Compare October 31, 2025 10:54
@smallinsky smallinsky force-pushed the smallinsky/pg/pug_batch branch 4 times, most recently from b98b8b9 to 48e2da7 Compare October 31, 2025 15:09
@smallinsky smallinsky marked this pull request as ready for review October 31, 2025 15:29
}

if _, err := pgcommon.Retry(ctx, b.log, func() (struct{}, error) {
_, err := b.pool.Exec(ctx, putBatchStmt, keys, values, expires, revs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've liked having the sql right in the Exec/Query in the rest of pgbk so it's obvious what the positional parameters are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance of moving the SQL here so we can see that the parameters are in the right order?

)

const (
defaultUpsertBatchChunk = 300
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little high, until we switch to a weaker transaction isolation every range read that touches one of the items in the batch will cause a serialization failure, I think.

I think this might be the same sort of question that prompted gravitational/teleport.e#4390 in CRDB. cc @dboslee, especially since we're going to want this in crdbbk as well.

@smallinsky smallinsky force-pushed the smallinsky/pg/pug_batch branch from 48e2da7 to cb7dd4c Compare October 31, 2025 18:06
@smallinsky smallinsky force-pushed the smallinsky/pg/pug_batch branch from cb7dd4c to 9e910de Compare October 31, 2025 18:10
@smallinsky smallinsky requested a review from espadolini November 3, 2025 18:07
}

if _, err := pgcommon.Retry(ctx, b.log, func() (struct{}, error) {
_, err := b.pool.Exec(ctx, putBatchStmt, keys, values, expires, revs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance of moving the SQL here so we can see that the parameters are in the right order?

Comment on lines +34 to +36
type PutBatcher interface {
PutBatch(ctx context.Context, items []backend.Item) ([]string, error)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just backend.BatchPutter?

Comment on lines +105 to +106
case <-time.After(watchInitTimeout):
t.Fatal("timed out waiting for init event")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that waiting for arbitrary timeouts on the execution of steps in the test is an antipattern, there's already a mechanism to set and enforce a timeout in the test harness, and a test runner that's slow enough might fail this for no reason other than needing some more time.

Comment on lines +197 to +198
case <-deadline.C:
t.Fatalf("timed out waiting for events: got=%d want=%d", len(out), wantCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +34 to +47
putBatchStmt = `
INSERT INTO kv (key, value, expires, revision)
SELECT * FROM UNNEST(
$1::bytea[],
$2::bytea[],
$3::timestamptz[],
$4::uuid[]
)
ON CONFLICT (key) DO UPDATE
SET
value = EXCLUDED.value,
expires = EXCLUDED.expires,
revision = EXCLUDED.revision;
`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this in a single line makes pg_stat_activity bearable to look at, and avoids accidental mixed indentation.

Suggested change
putBatchStmt = `
INSERT INTO kv (key, value, expires, revision)
SELECT * FROM UNNEST(
$1::bytea[],
$2::bytea[],
$3::timestamptz[],
$4::uuid[]
)
ON CONFLICT (key) DO UPDATE
SET
value = EXCLUDED.value,
expires = EXCLUDED.expires,
revision = EXCLUDED.revision;
`
putBatchStmt = "INSERT INTO kv (key, value, expires, revision) SELECT * FROM UNNEST($1::bytea[], $2::bytea[], $3::timestamptz[], $4::uuid[]) ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, expires = EXCLUDED.expires, revision = EXCLUDED.revision;"

for i := 0; i < itemCount; i++ {
items = append(items, backend.Item{
Key: prefix(fmt.Sprintf("item/%04d", i)),
Value: bytes.Repeat([]byte(fmt.Sprintf("%d", i)), payloadSize),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want variable sized payloads? I think the bytes slice can vary from 1-3 bytes * 300 * 1024.

Wouldn't this consume a large amount of memory in order to run this test? Could we perhaps make the value fixed or reduce the number of items while getting the same test coverage?

for _, item := range chunk {
keys = append(keys, nonNilKey(item.Key))
values = append(values, nonNil(item.Value))
expires = append(expires, zeronull.Timestamptz(item.Expires.UTC()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this handle as expected if item.Expires is zeroed aka it never expires?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shoud, The came approach is used in Put flow where the zeronull.Timestamptz handles IsZero

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Please just make sure we have a test case for this please 🙏🏾 🙏🏾 🙏🏾

Comment on lines +57 to +69
keys := make([][]byte, 0, len(chunk))
values := make([][]byte, 0, len(chunk))
expires := make([]zeronull.Timestamptz, 0, len(chunk))
revs := make([]revision, 0, len(chunk))

for _, item := range chunk {
keys = append(keys, nonNilKey(item.Key))
values = append(values, nonNil(item.Value))
expires = append(expires, zeronull.Timestamptz(item.Expires.UTC()))

revVal := newRevision()
revs = append(revs, revVal)
revOut = append(revOut, revisionToString(revVal))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
keys := make([][]byte, 0, len(chunk))
values := make([][]byte, 0, len(chunk))
expires := make([]zeronull.Timestamptz, 0, len(chunk))
revs := make([]revision, 0, len(chunk))
for _, item := range chunk {
keys = append(keys, nonNilKey(item.Key))
values = append(values, nonNil(item.Value))
expires = append(expires, zeronull.Timestamptz(item.Expires.UTC()))
revVal := newRevision()
revs = append(revs, revVal)
revOut = append(revOut, revisionToString(revVal))
keys := make([][]byte, len(chunk))
values := make([][]byte, len(chunk))
expires := make([]zeronull.Timestamptz, len(chunk))
revs := make([]revision, len(chunk))
for i, item := range chunk {
keys[i] = nonNilKey(item.Key)
values[i] = nonNil(item.Value)
expires[i] = zeronull.Timestamptz(item.Expires.UTC())
revVal := newRevision()
revs[i] = revVal
revOut = append(revOut, revisionToString(revVal))

/nit

Direct indexing is slightly more efficient since the runtime can skip bounds checks.

Disclaimer: I haven't actually tested this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to see a benchmark for that before we go and use a pattern that's more error-prone.

Copy link
Contributor

@cthach cthach Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to see a benchmark for that before we go and use a pattern that's more error-prone.

I gotchu. Heres a simple benchmark.

package teleport_test

import "testing"

func BenchmarkAppendNoPrealloc(b *testing.B) {
	for b.Loop() {
		var s []int

		for i := range 10000 {
			s = append(s, i)
		}
	}
}

func BenchmarkAppendWithPrealloc(b *testing.B) {
	for b.Loop() {
		s := make([]int, 0, 10000)

		for i := range 10000 {
			s = append(s, i)
		}
	}
}

func BenchmarkDirectIndexWithPrealloc(b *testing.B) {
	for b.Loop() {
		s := make([]int, 10000)

		for i := range 10000 {
			s[i] = i
		}
	}
}

Results:

❯ go test -bench=. -benchmem ./benchmark_prealloc_test.go

goos: darwin
goarch: arm64
cpu: Apple M4 Pro
BenchmarkAppendNoPrealloc-12               12636             90313 ns/op          357627 B/op         19 allocs/op
BenchmarkAppendWithPrealloc-12             94554             21342 ns/op           81920 B/op          1 allocs/op
BenchmarkDirectIndexWithPrealloc-12        65499             15631 ns/op           81920 B/op          1 allocs/op
PASS

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a chunk size of 1000 and copying the types we have in the loop in four slices (but with the revision getting copied as a string) I'm getting

BenchmarkAppendWithPrealloc-14         	   45082	     25864 ns/op	   98113 B/op	    1004 allocs/op
BenchmarkDirectIndexWithPrealloc-14    	   48951	     24134 ns/op	   98113 B/op	    1004 allocs/op

and if we add the generation of the revision, like we have in code

BenchmarkAppendWithPrealloc-14         	    5415	    220249 ns/op	  114116 B/op	    2004 allocs/op
BenchmarkDirectIndexWithPrealloc-14    	    5626	    216876 ns/op	  114113 B/op	    2004 allocs/op

an improvement of 6.6% and 1.5% respectively, which is a lot less worth it compared to the potential for misuse that direct indexing has - especially considering that this is minor preparation for a much larger amount of network I/O. It's definitely worth keeping in mind for very tight loops that mainly deal in memory tho, thank you for pointing that out.

Copy link
Contributor

@espadolini espadolini Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benchmark code
package foo

import (
	"fmt"
	"testing"
	"time"

	"github.com/google/uuid"
	"github.com/jackc/pgx/v5/pgtype/zeronull"

	"github.com/gravitational/teleport/lib/backend"
)

func BenchmarkAppendWithPrealloc(b *testing.B) {
	var chunk []backend.Item
	for i := range 1000 {
		chunk = append(chunk, backend.Item{
			Key:      backend.KeyFromString(fmt.Sprintf("/%d", i)),
			Value:    []byte("foo"),
			Expires:  time.Now(),
			Revision: uuid.NewString(),
		})
	}

	for b.Loop() {
		keys := make([][]byte, 0, len(chunk))
		values := make([][]byte, 0, len(chunk))
		expires := make([]zeronull.Timestamptz, 0, len(chunk))
		revs := make([]uuid.UUID, 0, len(chunk))
		// revs := make([]string, 0, len(chunk))

		for _, item := range chunk {
			keys = append(keys, []byte(item.Key.String()))
			values = append(values, item.Value)
			expires = append(expires, zeronull.Timestamptz(item.Expires.UTC()))
			revs = append(revs, uuid.New())
			// revs = append(revs, item.Revision)
		}
	}
}

func BenchmarkDirectIndexWithPrealloc(b *testing.B) {
	var chunk []backend.Item
	for i := range 1000 {
		chunk = append(chunk, backend.Item{
			Key:      backend.KeyFromString(fmt.Sprintf("/%d", i)),
			Value:    []byte("foo"),
			Expires:  time.Now(),
			Revision: uuid.NewString(),
		})
	}

	for b.Loop() {
		keys := make([][]byte, len(chunk))
		values := make([][]byte, len(chunk))
		expires := make([]zeronull.Timestamptz, len(chunk))
		revs := make([]uuid.UUID, len(chunk))
		// revs := make([]string, len(chunk))

		for i, item := range chunk {
			keys[i] = []byte(item.Key.String())
			values[i] = item.Value
			expires[i] = zeronull.Timestamptz(item.Expires.UTC())
			revs[i] = uuid.New()
			// revs[i] = item.Revision
		}
	}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for providing that! Yep, the efficiency gain is very small and probably only worth the tradeoff in performance-critical code.

That being said, I don't have a preference, so will defer to @smallinsky on which direction we go with.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants