@@ -38,21 +38,31 @@ func init() {
3838 }
3939}
4040
41- const matchFieldName = "match"
41+ const (
42+ matchFieldName = "match"
43+ typeFieldName = "type"
44+ )
4245
4346func redisScanInputConfig () * service.ConfigSpec {
4447 spec := service .NewConfigSpec ().
45- Summary (`Scans the set of keys in the current selected database and gets their values, using the Scan and Get commands.` ).
48+ Summary (`Scans the set of keys in the current selected database and gets their values, using the Scan and respective, typed, Get commands.` ).
4649 Description (`Optionally, iterates only elements matching a blob-style pattern. For example:
4750
4851- ` + "`*foo*`" + ` iterates only keys which contain ` + "`foo`" + ` in it.
4952- ` + "`foo*`" + ` iterates only keys starting with ` + "`foo`" + `.
5053
51- This input generates a message for each key value pair in the following format:
54+ For string types (default) this input generates a message for each key value pair in the following format:
5255
5356` + "```json" + `
5457{"key":"foo","value":"bar"}
5558` + "```" + `
59+ For hash types this input generates a message for each key fields pair in the following format:
60+
61+ ` + "```json" + `
62+ {"field1":"Hello","field2":"Hi"}
63+ ` + "```" + `
64+
65+ The key is stored in the metadata field ` + "`key`" + `.
5666` ).
5767 Categories ("Services" ).
5868 Version ("4.27.0" )
@@ -70,7 +80,11 @@ This input generates a message for each key value pair in the following format:
7080 Example ("foo*" ).
7181 Example ("foo" ).
7282 Example ("*4*" ).
73- Default ("" ))
83+ Default ("" )).
84+ Field (service .NewStringEnumField (typeFieldName , "hash" , "string" ).
85+ Description ("The type of the Redis keys to scan." ).
86+ Default ("string" ).
87+ Optional ())
7488}
7589
7690func newRedisScanInputFromConfig (conf * service.ParsedConfig , mgr * service.Resources ) (service.Input , error ) {
@@ -82,10 +96,17 @@ func newRedisScanInputFromConfig(conf *service.ParsedConfig, mgr *service.Resour
8296 if err != nil {
8397 return nil , fmt .Errorf ("error retrieving %s: %v" , matchFieldName , err )
8498 }
99+
100+ t , err := conf .FieldString (typeFieldName )
101+ if err != nil {
102+ return nil , fmt .Errorf ("error retrieving %s: %v" , typeFieldName , err )
103+ }
104+
85105 r := & redisScanReader {
86106 client : client ,
87107 match : match ,
88108 log : mgr .Logger (),
109+ t : t ,
89110 }
90111 return r , nil
91112}
@@ -95,31 +116,52 @@ type redisScanReader struct {
95116 client redis.UniversalClient
96117 iter * redis.ScanIterator
97118 log * service.Logger
119+ t string
98120}
99121
100122func (r * redisScanReader ) Connect (ctx context.Context ) error {
101123 _ , err := r .client .Ping (ctx ).Result ()
102124 if err != nil {
103125 return err
104126 }
105- r .iter = r .client .Scan (context .Background (), 0 , r .match , 0 ).Iterator ()
127+ if r .t == "" {
128+ r .iter = r .client .Scan (context .Background (), 0 , r .match , 0 ).Iterator ()
129+ } else {
130+ r .iter = r .client .ScanType (context .Background (), 0 , r .match , 0 , r .t ).Iterator ()
131+ }
106132 return r .iter .Err ()
107133}
108134
109135func (r * redisScanReader ) Read (ctx context.Context ) (* service.Message , service.AckFunc , error ) {
110136 if r .iter .Next (ctx ) {
111137 key := r .iter .Val ()
112138
113- res := r .client .Get (ctx , key )
114- if err := res .Err (); err != nil {
115- return nil , nil , err
116- }
117-
118139 msg := service .NewMessage (nil )
119- msg .SetStructuredMut (map [string ]any {
120- "key" : key ,
121- "value" : res .Val (),
122- })
140+ switch r .t {
141+ case "hash" :
142+ res := r .client .HGetAll (ctx , key )
143+ if err := res .Err (); err != nil {
144+ return nil , nil , err
145+ }
146+
147+ m := make (map [string ]any , len (res .Val ()))
148+ for k , v := range res .Val () {
149+ m [k ] = v
150+ }
151+
152+ msg .SetStructuredMut (m )
153+ msg .MetaSet ("key" , key )
154+ default :
155+ res := r .client .Get (ctx , key )
156+ if err := res .Err (); err != nil {
157+ return nil , nil , err
158+ }
159+
160+ msg .SetStructuredMut (map [string ]any {
161+ "key" : key ,
162+ "value" : res .Val (),
163+ })
164+ }
123165 return msg , func (ctx context.Context , err error ) error {
124166 return err
125167 }, nil
0 commit comments