Skip to content

Commit 6791c17

Browse files
NETOBSERV-584: Support for writing logs to Loki (distributor) using gRPC (#1086)
* grpc support to write logs to Loki * update vendors * addressed feedback
1 parent 692aa5b commit 6791c17

File tree

12 files changed

+1189
-106
lines changed

12 files changed

+1189
-106
lines changed

docs/api.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,10 @@ Following is the supported API format for writing to loki:
324324
timestampScale: timestamp units scale (e.g. for UNIX = 1s)
325325
format: the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)
326326
reorder: reorder json map keys
327+
clientProtocol: type of client protocol to use: 'http' or 'grpc' (default: 'http')
328+
grpcConfig: gRPC client configuration (used only for gRPC client type)
329+
keepAlive: keep alive interval
330+
keepAliveTimeout: keep alive timeout
327331
</pre>
328332
## Write Standard Output
329333
Following is the supported API format for writing to standard output:

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
github.com/minio/minio-go/v7 v7.0.95
1818
github.com/mitchellh/mapstructure v1.5.0
1919
github.com/netobserv/gopipes v0.3.0
20-
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847
20+
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3
2121
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community
2222
github.com/netsampler/goflow2 v1.3.7
2323
github.com/pkg/errors v0.9.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J
255255
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
256256
github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60=
257257
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
258-
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 h1:hjzhVZSSKIOmAzHbGUV4JhVIPkgKs/UtrWDx6JSVKMw=
259-
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
258+
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3 h1:rxQipq0xpoiao7ifls/82JCcOVALC4n08ppTLCUFGL4=
259+
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
260260
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community h1:ghW16OO4QRWj0Uh1gMYX+NjAlgx2sZmCsO3Tkwoj4Do=
261261
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community/go.mod h1:17OaUNAwx0LxoeV/SaHlJIJP6bpN7zSvUP3GtZelESQ=
262262
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=

pkg/api/write_loki.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package api
2020
import (
2121
"errors"
2222
"fmt"
23+
"time"
2324

2425
promConfig "github.com/prometheus/common/config"
2526
"github.com/prometheus/common/model"
@@ -46,6 +47,15 @@ type WriteLoki struct {
4647
TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"`
4748
Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)"`
4849
Reorder bool `yaml:"reorder,omitempty" json:"reorder,omitempty" doc:"reorder json map keys"`
50+
51+
// Client protocol selection
52+
ClientProtocol string `yaml:"clientProtocol,omitempty" json:"clientProtocol,omitempty" doc:"type of client protocol to use: 'http' or 'grpc' (default: 'http')"`
53+
GRPCConfig *GRPCLokiConfig `yaml:"grpcConfig,omitempty" json:"grpcConfig,omitempty" doc:"gRPC client configuration (used only for gRPC client type)"`
54+
}
55+
56+
type GRPCLokiConfig struct {
57+
KeepAlive string `yaml:"keepAlive,omitempty" json:"keepAlive,omitempty" doc:"keep alive interval"`
58+
KeepAliveTimeout string `yaml:"keepAliveTimeout,omitempty" json:"keepAliveTimeout,omitempty" doc:"keep alive timeout"`
4959
}
5060

5161
func (w *WriteLoki) SetDefaults() {
@@ -76,6 +86,26 @@ func (w *WriteLoki) SetDefaults() {
7686
if w.Format == "" {
7787
w.Format = "json"
7888
}
89+
if w.ClientProtocol == "" {
90+
w.ClientProtocol = "http"
91+
}
92+
93+
// Set defaults for gRPC config if gRPC client protocol is selected
94+
if w.ClientProtocol == "grpc" {
95+
if w.GRPCConfig == nil {
96+
w.GRPCConfig = &GRPCLokiConfig{}
97+
}
98+
w.GRPCConfig.SetDefaults()
99+
}
100+
}
101+
102+
func (g *GRPCLokiConfig) SetDefaults() {
103+
if g.KeepAlive == "" {
104+
g.KeepAlive = "30s"
105+
}
106+
if g.KeepAliveTimeout == "" {
107+
g.KeepAliveTimeout = "5s"
108+
}
79109
}
80110

81111
func (w *WriteLoki) Validate() error {
@@ -85,11 +115,51 @@ func (w *WriteLoki) Validate() error {
85115
if w.TimestampScale == "" {
86116
return errors.New("timestampUnit must be a valid Duration > 0 (e.g. 1m, 1s or 1ms)")
87117
}
88-
if w.URL == "" {
89-
return errors.New("url can't be empty")
90-
}
91118
if w.BatchSize <= 0 {
92119
return fmt.Errorf("invalid batchSize: %v. Required > 0", w.BatchSize)
93120
}
121+
122+
// Validate client protocol
123+
if w.ClientProtocol != "" && w.ClientProtocol != "http" && w.ClientProtocol != "grpc" {
124+
return fmt.Errorf("invalid clientProtocol: %s. Must be 'http' or 'grpc'", w.ClientProtocol)
125+
}
126+
127+
// Validate based on client protocol
128+
switch w.ClientProtocol {
129+
case "http", "":
130+
if w.URL == "" {
131+
return errors.New("url can't be empty for HTTP client")
132+
}
133+
case "grpc":
134+
if w.URL == "" {
135+
return errors.New("url can't be empty for gRPC client")
136+
}
137+
if w.GRPCConfig == nil {
138+
return errors.New("grpcConfig is required when using gRPC client protocol")
139+
}
140+
if err := w.GRPCConfig.Validate(); err != nil {
141+
return fmt.Errorf("gRPC config validation failed: %w", err)
142+
}
143+
}
144+
145+
return nil
146+
}
147+
148+
func (g *GRPCLokiConfig) Validate() error {
149+
if g == nil {
150+
return errors.New("gRPC config cannot be nil")
151+
}
152+
// Validate duration fields
153+
if g.KeepAlive != "" {
154+
if _, err := time.ParseDuration(g.KeepAlive); err != nil {
155+
return fmt.Errorf("invalid keepAlive duration: %w", err)
156+
}
157+
}
158+
if g.KeepAliveTimeout != "" {
159+
if _, err := time.ParseDuration(g.KeepAliveTimeout); err != nil {
160+
return fmt.Errorf("invalid keepAliveTimeout duration: %w", err)
161+
}
162+
}
163+
94164
return nil
95165
}

pkg/pipeline/write/write_loki.go

Lines changed: 108 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
3131

3232
logAdapter "github.com/go-kit/kit/log/logrus"
33+
"github.com/netobserv/loki-client-go/grpc"
3334
"github.com/netobserv/loki-client-go/loki"
3435
"github.com/netobserv/loki-client-go/pkg/backoff"
3536
"github.com/netobserv/loki-client-go/pkg/urlutil"
@@ -49,7 +50,6 @@ type emitter interface {
4950

5051
// Loki record writer
5152
type Loki struct {
52-
lokiConfig loki.Config
5353
apiConfig api.WriteLoki
5454
timestampScale float64
5555
saneLabels map[string]model.LabelName
@@ -61,7 +61,46 @@ type Loki struct {
6161
formatter func(config.GenericMap) string
6262
}
6363

64-
func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
64+
func createLokiClient(c *api.WriteLoki) (emitter, error) {
65+
switch c.ClientProtocol {
66+
case "grpc":
67+
return createGRPCClient(c)
68+
case "http", "":
69+
return createHTTPClient(c)
70+
default:
71+
return nil, fmt.Errorf("unsupported client protocol: %s", c.ClientProtocol)
72+
}
73+
}
74+
75+
func createHTTPClient(c *api.WriteLoki) (emitter, error) {
76+
cfg, err := buildHTTPLokiConfig(c)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
client, err := loki.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki")))
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to create HTTP Loki client: %w", err)
84+
}
85+
86+
return client, nil
87+
}
88+
89+
func createGRPCClient(c *api.WriteLoki) (emitter, error) {
90+
cfg, err := buildGRPCLokiConfig(c)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
client, err := grpc.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki-grpc")))
96+
if err != nil {
97+
return nil, fmt.Errorf("failed to create gRPC Loki client: %w", err)
98+
}
99+
100+
return client, nil
101+
}
102+
103+
func buildHTTPLokiConfig(c *api.WriteLoki) (loki.Config, error) {
65104
batchWait, err := time.ParseDuration(c.BatchWait)
66105
if err != nil {
67106
return loki.Config{}, fmt.Errorf("failed in parsing BatchWait : %w", err)
@@ -105,6 +144,69 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
105144
return cfg, nil
106145
}
107146

147+
func buildGRPCLokiConfig(c *api.WriteLoki) (grpc.Config, error) {
148+
if c.GRPCConfig == nil {
149+
return grpc.Config{}, fmt.Errorf("gRPC config is required for gRPC client protocol")
150+
}
151+
152+
batchWait, err := time.ParseDuration(c.BatchWait)
153+
if err != nil {
154+
return grpc.Config{}, fmt.Errorf("failed in parsing BatchWait: %w", err)
155+
}
156+
157+
timeout, err := time.ParseDuration(c.Timeout)
158+
if err != nil {
159+
return grpc.Config{}, fmt.Errorf("failed in parsing Timeout: %w", err)
160+
}
161+
162+
minBackoff, err := time.ParseDuration(c.MinBackoff)
163+
if err != nil {
164+
return grpc.Config{}, fmt.Errorf("failed in parsing MinBackoff: %w", err)
165+
}
166+
167+
maxBackoff, err := time.ParseDuration(c.MaxBackoff)
168+
if err != nil {
169+
return grpc.Config{}, fmt.Errorf("failed in parsing MaxBackoff: %w", err)
170+
}
171+
172+
keepAlive, err := time.ParseDuration(c.GRPCConfig.KeepAlive)
173+
if err != nil {
174+
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAlive: %w", err)
175+
}
176+
177+
keepAliveTimeout, err := time.ParseDuration(c.GRPCConfig.KeepAliveTimeout)
178+
if err != nil {
179+
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAliveTimeout: %w", err)
180+
}
181+
182+
cfg := grpc.Config{
183+
ServerAddress: c.URL,
184+
TenantID: c.TenantID,
185+
BatchWait: batchWait,
186+
BatchSize: c.BatchSize,
187+
Timeout: timeout,
188+
KeepAlive: keepAlive,
189+
KeepAliveTimeout: keepAliveTimeout,
190+
BackoffConfig: backoff.BackoffConfig{
191+
MinBackoff: minBackoff,
192+
MaxBackoff: maxBackoff,
193+
MaxRetries: c.MaxRetries,
194+
},
195+
}
196+
197+
// Set external labels from static labels
198+
if len(c.StaticLabels) > 0 {
199+
cfg.ExternalLabels.LabelSet = c.StaticLabels
200+
}
201+
202+
// Configure TLS from shared ClientConfig (same as HTTP client)
203+
if c.ClientConfig != nil {
204+
cfg.TLS = c.ClientConfig.TLSConfig
205+
}
206+
207+
return cfg, nil
208+
}
209+
108210
func (l *Loki) ProcessRecord(in config.GenericMap) error {
109211
labels, lines := l.splitLabelsLines(in)
110212

@@ -219,13 +321,10 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
219321
return nil, fmt.Errorf("the provided config is not valid: %w", err)
220322
}
221323

222-
lokiConfig, buildconfigErr := buildLokiConfig(&lokiConfigIn)
223-
if buildconfigErr != nil {
224-
return nil, buildconfigErr
225-
}
226-
client, newWithLoggerErr := loki.NewWithLogger(lokiConfig, logAdapter.NewLogger(log.WithField("module", "export/loki")))
227-
if newWithLoggerErr != nil {
228-
return nil, newWithLoggerErr
324+
// Create the appropriate client based on clientProtocol
325+
client, err := createLokiClient(&lokiConfigIn)
326+
if err != nil {
327+
return nil, fmt.Errorf("failed to create Loki client: %w", err)
229328
}
230329

231330
timestampScale, err := time.ParseDuration(lokiConfigIn.TimestampScale)
@@ -253,7 +352,6 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
253352

254353
f := formatter(lokiConfigIn.Format, lokiConfigIn.Reorder)
255354
l := &Loki{
256-
lokiConfig: lokiConfig,
257355
apiConfig: lokiConfigIn,
258356
timestampScale: float64(timestampScale),
259357
saneLabels: saneLabels,

0 commit comments

Comments
 (0)