Skip to content

Commit 5e67e31

Browse files
Add support for scanning Hashes
Extends redis_scan with new optional type field. Setting type to hash enables Hash type support. Without type field set falls back to previous string type processing.
1 parent 8706c1e commit 5e67e31

File tree

1 file changed

+56
-14
lines changed

1 file changed

+56
-14
lines changed

internal/impl/redis/input_scan.go

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,31 @@ func init() {
3838
}
3939
}
4040

41-
const matchFieldName = "match"
41+
const (
42+
matchFieldName = "match"
43+
typeFieldName = "type"
44+
)
4245

4346
func redisScanInputConfig() *service.ConfigSpec {
4447
spec := service.NewConfigSpec().
45-
Summary(`Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands.`).
48+
Summary(`Scans the set of keys in the current selected database and gets their values, using the Scan and respective, typed, Get commands.`).
4649
Description(`Optionally, iterates only elements matching a blob-style pattern. For example:
4750
4851
- ` + "`*foo*`" + ` iterates only keys which contain ` + "`foo`" + ` in it.
4952
- ` + "`foo*`" + ` iterates only keys starting with ` + "`foo`" + `.
5053
51-
This input generates a message for each key value pair in the following format:
54+
For string types (default) this input generates a message for each key value pair in the following format:
5255
5356
` + "```json" + `
5457
{"key":"foo","value":"bar"}
5558
` + "```" + `
59+
For hash types this input generates a message for each key fields pair in the following format:
60+
61+
` + "```json" + `
62+
{"field1":"Hello","field2":"Hi"}
63+
` + "```" + `
64+
65+
The key is stored in the metadata field ` + "`key`" + `.
5666
`).
5767
Categories("Services").
5868
Version("4.27.0")
@@ -70,7 +80,11 @@ This input generates a message for each key value pair in the following format:
7080
Example("foo*").
7181
Example("foo").
7282
Example("*4*").
73-
Default(""))
83+
Default("")).
84+
Field(service.NewStringEnumField(typeFieldName, "hash", "string").
85+
Description("The type of the Redis keys to scan.").
86+
Default("string").
87+
Optional())
7488
}
7589

7690
func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
@@ -82,10 +96,17 @@ func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resour
8296
if err != nil {
8397
return nil, fmt.Errorf("error retrieving %s: %v", matchFieldName, err)
8498
}
99+
100+
t, err := conf.FieldString(typeFieldName)
101+
if err != nil {
102+
return nil, fmt.Errorf("error retrieving %s: %v", typeFieldName, err)
103+
}
104+
85105
r := &redisScanReader{
86106
client: client,
87107
match: match,
88108
log: mgr.Logger(),
109+
t: t,
89110
}
90111
return r, nil
91112
}
@@ -95,31 +116,52 @@ type redisScanReader struct {
95116
client redis.UniversalClient
96117
iter *redis.ScanIterator
97118
log *service.Logger
119+
t string
98120
}
99121

100122
func (r *redisScanReader) Connect(ctx context.Context) error {
101123
_, err := r.client.Ping(ctx).Result()
102124
if err != nil {
103125
return err
104126
}
105-
r.iter = r.client.Scan(context.Background(), 0, r.match, 0).Iterator()
127+
if r.t == "" {
128+
r.iter = r.client.Scan(context.Background(), 0, r.match, 0).Iterator()
129+
} else {
130+
r.iter = r.client.ScanType(context.Background(), 0, r.match, 0, r.t).Iterator()
131+
}
106132
return r.iter.Err()
107133
}
108134

109135
func (r *redisScanReader) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
110136
if r.iter.Next(ctx) {
111137
key := r.iter.Val()
112138

113-
res := r.client.Get(ctx, key)
114-
if err := res.Err(); err != nil {
115-
return nil, nil, err
116-
}
117-
118139
msg := service.NewMessage(nil)
119-
msg.SetStructuredMut(map[string]any{
120-
"key": key,
121-
"value": res.Val(),
122-
})
140+
switch r.t {
141+
case "hash":
142+
res := r.client.HGetAll(ctx, key)
143+
if err := res.Err(); err != nil {
144+
return nil, nil, err
145+
}
146+
147+
m := make(map[string]any, len(res.Val()))
148+
for k, v := range res.Val() {
149+
m[k] = v
150+
}
151+
152+
msg.SetStructuredMut(m)
153+
msg.MetaSet("key", key)
154+
default:
155+
res := r.client.Get(ctx, key)
156+
if err := res.Err(); err != nil {
157+
return nil, nil, err
158+
}
159+
160+
msg.SetStructuredMut(map[string]any{
161+
"key": key,
162+
"value": res.Val(),
163+
})
164+
}
123165
return msg, func(ctx context.Context, err error) error {
124166
return err
125167
}, nil

0 commit comments

Comments
 (0)