Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions docs/modules/components/pages/inputs/redis_scan.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
component_type_dropdown::[]


Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands.
Scans the set of keys in the current selected database and gets their values, using the Scan and respective, typed, Get commands.

Introduced in version 4.27.0.

Expand All @@ -42,6 +42,7 @@ input:
url: redis://:6379 # No default (required)
auto_replay_nacks: true
match: ""
type: string
```

--
Expand All @@ -66,6 +67,7 @@ input:
client_certs: []
auto_replay_nacks: true
match: ""
type: string
```

--
Expand All @@ -76,11 +78,26 @@ Optionally, iterates only elements matching a blob-style pattern. For example:
- `*foo*` iterates only keys which contain `foo` in it.
- `foo*` iterates only keys starting with `foo`.

This input generates a message for each key value pair in the following format:
With no type specified (default) this input generates a message for each key value pair in the legacy format:

```json
{"key":"foo","value":"bar"}
```
For `type` set to `string` this input generates a string for each key value pair in the following format:

```json
"foovalue"
```

The key is stored in the metadata field `key`.

For `type` set to `hash` this input generates a message for each key fields pair in the following format:

```json
{"field1":"Hello","field2":"Hi"}
```

The key is stored in the metadata field `key`.


== Fields
Expand Down Expand Up @@ -335,4 +352,18 @@ match: foo
match: '*4*'
```

=== `type`

The type of the Redis keys to scan.


*Type*: `string`

*Default*: `"string"`

Options:
`hash`
, `string`
.


87 changes: 73 additions & 14 deletions internal/impl/redis/input_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,39 @@ func init() {
}
}

const matchFieldName = "match"
const (
matchFieldName = "match"
typeFieldName = "type"
)

func redisScanInputConfig() *service.ConfigSpec {
spec := service.NewConfigSpec().
Summary(`Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands.`).
Summary(`Scans the set of keys in the current selected database and gets their values, using the Scan and respective, typed, Get commands.`).
Description(`Optionally, iterates only elements matching a blob-style pattern. For example:

- ` + "`*foo*`" + ` iterates only keys which contain ` + "`foo`" + ` in it.
- ` + "`foo*`" + ` iterates only keys starting with ` + "`foo`" + `.

This input generates a message for each key value pair in the following format:
With no type specified (default) this input generates a message for each key value pair in the legacy format:

` + "```json" + `
{"key":"foo","value":"bar"}
` + "```" + `
For ` + "`type`" + ` set to ` + "`string`" + ` this input generates a string for each key value pair in the following format:

` + "```json" + `
"foovalue"
` + "```" + `

The key is stored in the metadata field ` + "`key`" + `.

For ` + "`type`" + ` set to ` + "`hash`" + ` this input generates a message for each key fields pair in the following format:

` + "```json" + `
{"field1":"Hello","field2":"Hi"}
` + "```" + `

The key is stored in the metadata field ` + "`key`" + `.
`).
Categories("Services").
Version("4.27.0")
Expand All @@ -70,7 +88,11 @@ This input generates a message for each key value pair in the following format:
Example("foo*").
Example("foo").
Example("*4*").
Default(""))
Default("")).
Field(service.NewStringEnumField(typeFieldName, "hash", "string").
Description("The type of the Redis keys to scan.").
Default("string").
Optional())
}

func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
Expand All @@ -82,10 +104,17 @@ func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resour
if err != nil {
return nil, fmt.Errorf("error retrieving %s: %v", matchFieldName, err)
}

t, err := conf.FieldString(typeFieldName)
if err != nil {
return nil, fmt.Errorf("error retrieving %s: %v", typeFieldName, err)
}

r := &redisScanReader{
client: client,
match: match,
log: mgr.Logger(),
t: t,
}
return r, nil
}
Expand All @@ -95,31 +124,61 @@ type redisScanReader struct {
client redis.UniversalClient
iter *redis.ScanIterator
log *service.Logger
t string
}

func (r *redisScanReader) Connect(ctx context.Context) error {
_, err := r.client.Ping(ctx).Result()
if err != nil {
return err
}
r.iter = r.client.Scan(context.Background(), 0, r.match, 0).Iterator()
if r.t == "" {
r.iter = r.client.Scan(context.Background(), 0, r.match, 0).Iterator()
} else {
r.iter = r.client.ScanType(context.Background(), 0, r.match, 0, r.t).Iterator()
}
return r.iter.Err()
}

func (r *redisScanReader) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
if r.iter.Next(ctx) {
key := r.iter.Val()

res := r.client.Get(ctx, key)
if err := res.Err(); err != nil {
return nil, nil, err
}

msg := service.NewMessage(nil)
msg.SetStructuredMut(map[string]any{
"key": key,
"value": res.Val(),
})
switch r.t {
case "hash":
res := r.client.HGetAll(ctx, key)
if err := res.Err(); err != nil {
return nil, nil, err
}

m := make(map[string]any, len(res.Val()))
for k, v := range res.Val() {
m[k] = v
}

msg.SetStructuredMut(m)
msg.MetaSet("key", key)
case "string":
res := r.client.Get(ctx, key)
if err := res.Err(); err != nil {
return nil, nil, err
}

msg.SetStructuredMut(res.Val())
msg.MetaSet("key", key)
default:
// legacy behavior
res := r.client.Get(ctx, key)
if err := res.Err(); err != nil {
return nil, nil, err
}

msg.SetStructuredMut(map[string]any{
"key": key,
"value": res.Val(),
})
}
return msg, func(ctx context.Context, err error) error {
return err
}, nil
Expand Down
Loading