@@ -38,21 +38,39 @@ func init() {
3838 }
3939}
4040
41- const matchFieldName = "match"
41+ const (
42+ matchFieldName = "match"
43+ typeFieldName = "type"
44+ )
4245
4346func redisScanInputConfig () * service.ConfigSpec {
4447 spec := service .NewConfigSpec ().
45- Summary (`Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands.` ).
48+ Summary (`Scans the set of keys in the current selected database and gets their values, using the Scan and respective, typed, Get commands.` ).
4649 Description (`Optionally, iterates only elements matching a blob-style pattern. For example:
4750
4851- ` + "`*foo*`" + ` iterates only keys which contain ` + "`foo`" + ` in it.
4952- ` + "`foo*`" + ` iterates only keys starting with ` + "`foo`" + `.
5053
51- This input generates a message for each key value pair in the following format:
54+ With no type specified (default) this input generates a message for each key value pair in the legacy format:
5255
5356` + "```json" + `
5457{"key":"foo","value":"bar"}
5558` + "```" + `
59+ For ` + "`type`" + ` set to ` + "`string`" + ` this input generates a string for each key value pair in the following format:
60+
61+ ` + "```json" + `
62+ "foovalue"
63+ ` + "```" + `
64+
65+ The key is stored in the metadata field ` + "`key`" + `.
66+
67+ For ` + "`type`" + ` set to ` + "`hash`" + ` this input generates a message for each key fields pair in the following format:
68+
69+ ` + "```json" + `
70+ {"field1":"Hello","field2":"Hi"}
71+ ` + "```" + `
72+
73+ The key is stored in the metadata field ` + "`key`" + `.
5674` ).
5775 Categories ("Services" ).
5876 Version ("4.27.0" )
@@ -70,7 +88,11 @@ This input generates a message for each key value pair in the following format:
7088 Example ("foo*" ).
7189 Example ("foo" ).
7290 Example ("*4*" ).
73- Default ("" ))
91+ Default ("" )).
92+ Field (service .NewStringEnumField (typeFieldName , "hash" , "string" ).
93+ Description ("The type of the Redis keys to scan." ).
94+ Default ("string" ).
95+ Optional ())
7496}
7597
7698func newRedisScanInputFromConfig (conf * service.ParsedConfig , mgr * service.Resources ) (service.Input , error ) {
@@ -82,10 +104,17 @@ func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resour
82104 if err != nil {
83105 return nil , fmt .Errorf ("error retrieving %s: %v" , matchFieldName , err )
84106 }
107+
108+ t , err := conf .FieldString (typeFieldName )
109+ if err != nil {
110+ return nil , fmt .Errorf ("error retrieving %s: %v" , typeFieldName , err )
111+ }
112+
85113 r := & redisScanReader {
86114 client : client ,
87115 match : match ,
88116 log : mgr .Logger (),
117+ t : t ,
89118 }
90119 return r , nil
91120}
@@ -95,31 +124,61 @@ type redisScanReader struct {
95124 client redis.UniversalClient
96125 iter * redis.ScanIterator
97126 log * service.Logger
127+ t string
98128}
99129
100130func (r * redisScanReader ) Connect (ctx context.Context ) error {
101131 _ , err := r .client .Ping (ctx ).Result ()
102132 if err != nil {
103133 return err
104134 }
105- r .iter = r .client .Scan (context .Background (), 0 , r .match , 0 ).Iterator ()
135+ if r .t == "" {
136+ r .iter = r .client .Scan (context .Background (), 0 , r .match , 0 ).Iterator ()
137+ } else {
138+ r .iter = r .client .ScanType (context .Background (), 0 , r .match , 0 , r .t ).Iterator ()
139+ }
106140 return r .iter .Err ()
107141}
108142
109143func (r * redisScanReader ) Read (ctx context.Context ) (* service.Message , service.AckFunc , error ) {
110144 if r .iter .Next (ctx ) {
111145 key := r .iter .Val ()
112146
113- res := r .client .Get (ctx , key )
114- if err := res .Err (); err != nil {
115- return nil , nil , err
116- }
117-
118147 msg := service .NewMessage (nil )
119- msg .SetStructuredMut (map [string ]any {
120- "key" : key ,
121- "value" : res .Val (),
122- })
148+ switch r .t {
149+ case "hash" :
150+ res := r .client .HGetAll (ctx , key )
151+ if err := res .Err (); err != nil {
152+ return nil , nil , err
153+ }
154+
155+ m := make (map [string ]any , len (res .Val ()))
156+ for k , v := range res .Val () {
157+ m [k ] = v
158+ }
159+
160+ msg .SetStructuredMut (m )
161+ msg .MetaSet ("key" , key )
162+ case "string" :
163+ res := r .client .Get (ctx , key )
164+ if err := res .Err (); err != nil {
165+ return nil , nil , err
166+ }
167+
168+ msg .SetStructuredMut (res .Val ())
169+ msg .MetaSet ("key" , key )
170+ default :
171+ // legacy behavior
172+ res := r .client .Get (ctx , key )
173+ if err := res .Err (); err != nil {
174+ return nil , nil , err
175+ }
176+
177+ msg .SetStructuredMut (map [string ]any {
178+ "key" : key ,
179+ "value" : res .Val (),
180+ })
181+ }
123182 return msg , func (ctx context.Context , err error ) error {
124183 return err
125184 }, nil
0 commit comments