-
Notifications
You must be signed in to change notification settings - Fork 24
/
write_grpc.go
55 lines (49 loc) · 1.57 KB
/
write_grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package write
import (
"context"
"encoding/json"
"fmt"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/anypb"
)
type writeGRPC struct {
hostIP string
hostPort int
clientConn *grpc.ClientConnection
}
// Write writes a flow before being stored
func (t *writeGRPC) Write(v config.GenericMap) {
logrus.Tracef("entering writeGRPC Write %s", v)
value, _ := json.Marshal(v)
if _, err := t.clientConn.Client().Send(context.TODO(), &genericmap.Flow{
GenericMap: &anypb.Any{
Value: value,
},
}); err != nil {
logrus.Errorf("writeGRPC send error: %v", err)
}
}
// NewWriteGRPC create a new write
func NewWriteGRPC(params config.StageParam) (Writer, error) {
logrus.Debugf("entering NewWriteGRPC")
writeGRPC := &writeGRPC{}
if params.Write != nil && params.Write.GRPC != nil {
if err := params.Write.GRPC.Validate(); err != nil {
return nil, fmt.Errorf("the provided config is not valid: %w", err)
}
writeGRPC.hostIP = params.Write.GRPC.TargetHost
writeGRPC.hostPort = params.Write.GRPC.TargetPort
} else {
return nil, fmt.Errorf("write.grpc param is mandatory: %v", params.Write)
}
logrus.Debugf("NewWriteGRPC ConnectClient %s:%d...", writeGRPC.hostIP, writeGRPC.hostPort)
clientConn, err := grpc.ConnectClient(writeGRPC.hostIP, writeGRPC.hostPort)
if err != nil {
return nil, err
}
writeGRPC.clientConn = clientConn
return writeGRPC, nil
}