Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (c *Config) SetDefault() {
if c.Snapshot.Mode == "" {
c.Snapshot.Mode = SnapshotModeNever
}
if c.Snapshot.ChunkingMode == "" {
c.Snapshot.ChunkingMode = SnapshotChunkingModeAuto
}
if c.Snapshot.ChunkSize == 0 {
c.Snapshot.ChunkSize = 8_000
}
Expand Down Expand Up @@ -240,13 +243,14 @@ func isEmpty(s string) bool {
}

type SnapshotConfig struct {
Mode SnapshotMode `json:"mode" yaml:"mode"`
InstanceID string `json:"instanceId" yaml:"instanceId"`
Tables publication.Tables `json:"tables" yaml:"tables"`
ChunkSize int64 `json:"chunkSize" yaml:"chunkSize"`
ClaimTimeout time.Duration `json:"claimTimeout" yaml:"claimTimeout"`
HeartbeatInterval time.Duration `json:"heartbeatInterval" yaml:"heartbeatInterval"`
Enabled bool `json:"enabled" yaml:"enabled"`
Mode SnapshotMode `json:"mode" yaml:"mode"`
InstanceID string `json:"instanceId" yaml:"instanceId"`
Tables publication.Tables `json:"tables" yaml:"tables"`
ChunkSize int64 `json:"chunkSize" yaml:"chunkSize"`
ChunkingMode SnapshotChunkingMode `json:"chunkingMode" yaml:"chunkingMode"`
ClaimTimeout time.Duration `json:"claimTimeout" yaml:"claimTimeout"`
HeartbeatInterval time.Duration `json:"heartbeatInterval" yaml:"heartbeatInterval"`
Enabled bool `json:"enabled" yaml:"enabled"`
}

func (s *SnapshotConfig) Validate() error {
Expand Down Expand Up @@ -282,6 +286,14 @@ func (s *SnapshotConfig) Validate() error {
return errors.New("snapshot.tables must be specified for snapshot_only mode")
}

// Validate chunking mode
switch s.ChunkingMode {
case SnapshotChunkingModeAuto, SnapshotChunkingModeRange, SnapshotChunkingModeKeyset, SnapshotChunkingModeOffset:
// ok
default:
return fmt.Errorf("snapshot chunkingMode must be 'auto', 'range', 'keyset', or 'offset'")
}

return nil
}

Expand All @@ -292,3 +304,12 @@ const (
SnapshotModeNever SnapshotMode = "never"
SnapshotModeSnapshotOnly SnapshotMode = "snapshot_only"
)

type SnapshotChunkingMode string

const (
SnapshotChunkingModeAuto SnapshotChunkingMode = "auto"
SnapshotChunkingModeRange SnapshotChunkingMode = "range"
SnapshotChunkingModeKeyset SnapshotChunkingMode = "keyset"
SnapshotChunkingModeOffset SnapshotChunkingMode = "offset"
)
Comment on lines +308 to +315
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user might not want the decision made according to the tree. I added this so that, if possible, we can choose a moderator and direct them to the type of snapshot they want.

42 changes: 42 additions & 0 deletions docs/SNAPSHOT_FEATURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,48 @@ sequenceDiagram
Note over I1: CDC continues from snapshot LSN
```

#### Chunking Strategy (How Snapshot Picks Range vs Keyset vs Offset)

We keep the decision tree simple for operators:

- **Step 1: Primary Key inspection**
- If there is **one integer PK** (`smallint`, `integer`, `bigint`), range/keyset is allowed.
- Otherwise → **Offset mode** (fallback).

- **Step 2: Bounds & row count**
- Read `MIN(pk)`, `MAX(pk)`, `COUNT(*)`.
- If `COUNT(*) == 0` → single chunk (empty table).

- **Step 3: Sparsity check (snowflake-style IDs)**
- `sparsityRatio = (max - min + 1) / rowCount`
- If `sparsityRatio > 100` **or** projected range chunks would exceed `maxChunksPerTable = 1,000,000`:
- Use **Keyset mode** (exact boundaries via `NTILE` when feasible; otherwise sequential keyset).

- **Step 4: Mode selection**
- **Dense PK** (sparsity ok) → **Range mode**
`SELECT ... WHERE pk BETWEEN start AND end ORDER BY pk LIMIT chunkSize`
- **Sparse PK** → **Keyset mode**
`SELECT ... WHERE pk > cursor ORDER BY pk LIMIT chunkSize`
- **No single integer PK** → **Offset mode**
`SELECT ... ORDER BY ctid LIMIT ... OFFSET ...`

- **Safety rails**
- Caps chunk count at `1,000,000` to avoid memory pressure.
- Uses quoted identifiers for schema/table/column names.
- Logs chosen mode at `INFO` level (e.g., `"sparse primary key detected, using keyset pagination"`).

##### Concurrency Notes
- **Range mode** and **keyset with NTILE boundaries**: chunks are independent; multiple pods process the same table in parallel.
- **Sequential keyset mode** (used only when sparsity is extreme and NTILE is not feasible): chunks of that table advance one-by-one; adding pods does not speed that table, but other tables still run in parallel.
- **Offset mode**: chunks are independent; multiple pods can process in parallel.

##### Operator override
- Config key: `snapshot.chunkingMode` (`auto` | `range` | `keyset` | `offset`)
- Default is `auto` (decision tree above). Overrides:
- `range`: force range chunking (requires single integer PK; else falls back to offset).
- `keyset`: force keyset (requires single integer PK + bounds + row count; else falls back to offset).
- `offset`: force offset (order by `ctid`, suitable if you want to avoid range/keyset or have no PK).

**ASCII Diagram:**

```
Expand Down
Loading