Skip to content

Commit

Permalink
feat: make batch type configurable on cassandra output (redpanda-data…
Browse files Browse the repository at this point in the history
…#1498)

* feat: make batch type configurable on cassandra output

* docs: add loged_batch to cassandra output docs

* fix: set batch type on newCassandraWriter
  • Loading branch information
eduardodbr authored Oct 7, 2022
1 parent a119c09 commit e013a51
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
2 changes: 2 additions & 0 deletions internal/component/output/config_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type CassandraConfig struct {
ArgsMapping string `json:"args_mapping" yaml:"args_mapping"`
Consistency string `json:"consistency" yaml:"consistency"`
Timeout string `json:"timeout" yaml:"timeout"`
LoggedBatch bool `json:"logged_batch" yaml:"logged_batch"`
// TODO: V4 Remove this and replace with explicit values.
retries.Config `json:",inline" yaml:",inline"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
Expand Down Expand Up @@ -54,5 +55,6 @@ func NewCassandraConfig() CassandraConfig {
Config: rConf,
MaxInFlight: 64,
Batching: batchconfig.NewConfig(),
LoggedBatch: true,
}
}
11 changes: 10 additions & 1 deletion internal/impl/cassandra/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ output:
).HasOptions(
"ANY", "ONE", "TWO", "THREE", "QUORUM", "ALL", "LOCAL_QUORUM", "EACH_QUORUM", "LOCAL_ONE",
).Advanced(),
docs.FieldBool(
"logged_batch",
"If enabled the driver will perform a logged batch.",
).Advanced(),
docs.FieldInt("max_retries", "The maximum number of retries before giving up on a request.").Advanced(),
docs.FieldObject("backoff", "Control time intervals between retry attempts.").WithChildren(
docs.FieldString("initial_interval", "The initial period to wait between retry attempts."),
Expand Down Expand Up @@ -145,6 +149,7 @@ type cassandraWriter struct {
connLock sync.RWMutex

argsMapping *mapping.Executor
batchType gocql.BatchType
}

func newCassandraWriter(conf output.CassandraConfig, mgr bundle.NewManagement) (*cassandraWriter, error) {
Expand All @@ -169,6 +174,10 @@ func newCassandraWriter(conf output.CassandraConfig, mgr bundle.NewManagement) (
if err = c.parseArgs(mgr); err != nil {
return nil, fmt.Errorf("parsing args: %w", err)
}
c.batchType = gocql.UnloggedBatch
if c.conf.LoggedBatch {
c.batchType = gocql.LoggedBatch
}

return &c, nil
}
Expand Down Expand Up @@ -255,7 +264,7 @@ func (c *cassandraWriter) writeRow(session *gocql.Session, msg message.Batch) er
}

func (c *cassandraWriter) writeBatch(session *gocql.Session, msg message.Batch) error {
batch := session.NewBatch(gocql.LoggedBatch)
batch := session.NewBatch(c.batchType)

if err := msg.Iter(func(i int, p *message.Part) error {
values, err := c.mapArgs(msg, i)
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/outputs/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ output:
query: ""
args_mapping: ""
consistency: QUORUM
logged_batch: true
max_retries: 3
backoff:
initial_interval: 1s
Expand Down Expand Up @@ -368,6 +369,14 @@ Type: `string`
Default: `"QUORUM"`
Options: `ANY`, `ONE`, `TWO`, `THREE`, `QUORUM`, `ALL`, `LOCAL_QUORUM`, `EACH_QUORUM`, `LOCAL_ONE`.

### `logged_batch`

If enabled the driver will perform a logged batch.


Type: `bool`
Default: `true`

### `max_retries`

The maximum number of retries before giving up on a request.
Expand Down

0 comments on commit e013a51

Please sign in to comment.