From da675686fc96d788a987193c6c33b27c771617ee Mon Sep 17 00:00:00 2001 From: Andreas Bergmeier Date: Fri, 7 Feb 2025 10:55:42 +0100 Subject: [PATCH] Add support for scanning Hashes Extends redis_scan with new optional type field. Setting type to hash enables Hash type support. Without type field set falls back to previous string type processing. --- .../components/pages/inputs/redis_scan.adoc | 35 +++++++- internal/impl/redis/input_scan.go | 87 ++++++++++++++++--- 2 files changed, 106 insertions(+), 16 deletions(-) diff --git a/docs/modules/components/pages/inputs/redis_scan.adoc b/docs/modules/components/pages/inputs/redis_scan.adoc index dd1a3bf8a3..8f1ad389b2 100644 --- a/docs/modules/components/pages/inputs/redis_scan.adoc +++ b/docs/modules/components/pages/inputs/redis_scan.adoc @@ -23,7 +23,7 @@ component_type_dropdown::[] -Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands. +Scans the set of keys in the current selected database and gets their values, using the Scan and respective, typed, Get commands. Introduced in version 4.27.0. @@ -42,6 +42,7 @@ input: url: redis://:6379 # No default (required) auto_replay_nacks: true match: "" + type: string ``` -- @@ -66,6 +67,7 @@ input: client_certs: [] auto_replay_nacks: true match: "" + type: string ``` -- @@ -76,11 +78,26 @@ Optionally, iterates only elements matching a blob-style pattern. For example: - `*foo*` iterates only keys which contain `foo` in it. - `foo*` iterates only keys starting with `foo`. -This input generates a message for each key value pair in the following format: +With no type specified (default) this input generates a message for each key value pair in the legacy format: ```json {"key":"foo","value":"bar"} ``` +For `type` set to `string` this input generates a string for each key value pair in the following format: + +```json +"foovalue" +``` + +The key is stored in the metadata field `key`. + +For `type` set to `hash` this input generates a message for each key fields pair in the following format: + +```json +{"field1":"Hello","field2":"Hi"} +``` + +The key is stored in the metadata field `key`. == Fields @@ -335,4 +352,18 @@ match: foo match: '*4*' ``` +=== `type` + +The type of the Redis keys to scan. + + +*Type*: `string` + +*Default*: `"string"` + +Options: +`hash` +, `string` +. + diff --git a/internal/impl/redis/input_scan.go b/internal/impl/redis/input_scan.go index 818271e79b..e83edb12f5 100644 --- a/internal/impl/redis/input_scan.go +++ b/internal/impl/redis/input_scan.go @@ -38,21 +38,39 @@ func init() { } } -const matchFieldName = "match" +const ( + matchFieldName = "match" + typeFieldName = "type" +) func redisScanInputConfig() *service.ConfigSpec { spec := service.NewConfigSpec(). - Summary(`Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands.`). + Summary(`Scans the set of keys in the current selected database and gets their values, using the Scan and respective, typed, Get commands.`). Description(`Optionally, iterates only elements matching a blob-style pattern. For example: - ` + "`*foo*`" + ` iterates only keys which contain ` + "`foo`" + ` in it. - ` + "`foo*`" + ` iterates only keys starting with ` + "`foo`" + `. -This input generates a message for each key value pair in the following format: +With no type specified (default) this input generates a message for each key value pair in the legacy format: ` + "```json" + ` {"key":"foo","value":"bar"} ` + "```" + ` +For ` + "`type`" + ` set to ` + "`string`" + ` this input generates a string for each key value pair in the following format: + +` + "```json" + ` +"foovalue" +` + "```" + ` + +The key is stored in the metadata field ` + "`key`" + `. + +For ` + "`type`" + ` set to ` + "`hash`" + ` this input generates a message for each key fields pair in the following format: + +` + "```json" + ` +{"field1":"Hello","field2":"Hi"} +` + "```" + ` + +The key is stored in the metadata field ` + "`key`" + `. `). Categories("Services"). Version("4.27.0") @@ -70,7 +88,11 @@ This input generates a message for each key value pair in the following format: Example("foo*"). Example("foo"). Example("*4*"). - Default("")) + Default("")). + Field(service.NewStringEnumField(typeFieldName, "hash", "string"). + Description("The type of the Redis keys to scan."). + Default("string"). + Optional()) } func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { @@ -82,10 +104,17 @@ func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resour if err != nil { return nil, fmt.Errorf("error retrieving %s: %v", matchFieldName, err) } + + t, err := conf.FieldString(typeFieldName) + if err != nil { + return nil, fmt.Errorf("error retrieving %s: %v", typeFieldName, err) + } + r := &redisScanReader{ client: client, match: match, log: mgr.Logger(), + t: t, } return r, nil } @@ -95,6 +124,7 @@ type redisScanReader struct { client redis.UniversalClient iter *redis.ScanIterator log *service.Logger + t string } func (r *redisScanReader) Connect(ctx context.Context) error { @@ -102,7 +132,11 @@ func (r *redisScanReader) Connect(ctx context.Context) error { if err != nil { return err } - r.iter = r.client.Scan(context.Background(), 0, r.match, 0).Iterator() + if r.t == "" { + r.iter = r.client.Scan(context.Background(), 0, r.match, 0).Iterator() + } else { + r.iter = r.client.ScanType(context.Background(), 0, r.match, 0, r.t).Iterator() + } return r.iter.Err() } @@ -110,16 +144,41 @@ func (r *redisScanReader) Read(ctx context.Context) (*service.Message, service.A if r.iter.Next(ctx) { key := r.iter.Val() - res := r.client.Get(ctx, key) - if err := res.Err(); err != nil { - return nil, nil, err - } - msg := service.NewMessage(nil) - msg.SetStructuredMut(map[string]any{ - "key": key, - "value": res.Val(), - }) + switch r.t { + case "hash": + res := r.client.HGetAll(ctx, key) + if err := res.Err(); err != nil { + return nil, nil, err + } + + m := make(map[string]any, len(res.Val())) + for k, v := range res.Val() { + m[k] = v + } + + msg.SetStructuredMut(m) + msg.MetaSet("key", key) + case "string": + res := r.client.Get(ctx, key) + if err := res.Err(); err != nil { + return nil, nil, err + } + + msg.SetStructuredMut(res.Val()) + msg.MetaSet("key", key) + default: + // legacy behavior + res := r.client.Get(ctx, key) + if err := res.Err(); err != nil { + return nil, nil, err + } + + msg.SetStructuredMut(map[string]any{ + "key": key, + "value": res.Val(), + }) + } return msg, func(ctx context.Context, err error) error { return err }, nil