Skip to content

Commit 1907e41

Browse files
authored
feat: implement fluent-bit input plugin for cloudwatch logs (#1)
1 parent c25813f commit 1907e41

File tree

17 files changed

+725
-0
lines changed

17 files changed

+725
-0
lines changed

Dockerfile

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM golang:1.24-bookworm as build
2+
3+
WORKDIR /src
4+
5+
COPY go.mod go.sum ./
6+
RUN go mod download -x
7+
8+
COPY . .
9+
10+
RUN go build -buildmode=c-shared -o cloudwatch-input.so plugin.go
11+
12+
13+
FROM ghcr.io/selectel/fluent-bit:2025-04-22
14+
15+
COPY --from=build /src/cloudwatch-input.so /fluent-bit/plugins/
16+
17+
ENTRYPOINT ["/fluent-bit/bin/fluent-bit", "-e", "/fluent-bit/plugins/cloudwatch-input.so"]
18+
CMD ["-c", "/fluent-bit/etc/fluent-bit.yaml"]

Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
build:
2+
go build -buildmode=c-shared -o cloudwatch-input.so plugin.go
3+
4+
run:
5+
fluent-bit -e ./cloudwatch-input.so -c config/fluent-bit.yaml
6+
7+
image:
8+
docker build -t ghcr.io/selectel/fluent-bit-cloudwatch-input-plugin:development .

README.md

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Fluent Bit input plugin for CloudWatch logs
2+
3+
## Quick start
4+
5+
```bash
6+
docker run \
7+
--name fluent-bit-cloudwatch \
8+
--rm \
9+
-v ${PWD}/config/fluent-bit.yaml:/fluent-bit/etc/fluent-bit.yaml:ro \
10+
-v ${PWD}/sqlite:/var/lib/fluent-bit/cloudwatch/sqlite:rw \
11+
-e AWS_ACCESS_KEY_ID=your_access_key \
12+
-e AWS_SECRET_ACCESS_KEY=your_secret_key \
13+
ghcr.io/selectel/fluent-bit-cloudwatch-input-plugin:latest
14+
```

config/fluent-bit.yaml

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
service:
2+
log_level: debug
3+
4+
pipeline:
5+
inputs:
6+
- name: cloudwatch-input
7+
region: us-east-1
8+
endpoint: https://logs.us-east-1.amazonaws.com
9+
log_group_name: my-group-name
10+
log_stream_name: my-stream-name
11+
sqlite_path: /var/lib/fluent-bit/cloudwatch/sqlite/db.sqlite
12+
13+
filters:
14+
- name: lua
15+
match: '*'
16+
call: split_events
17+
code: |
18+
function split_events(tag, timestamp, record)
19+
return 2, timestamp, record["events"]
20+
end
21+
22+
outputs:
23+
- name: stdout
24+
match: '*'

go.mod

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
module github.com/selectel/fluent-bit-cloudwatch-input-plugin
2+
3+
go 1.24
4+
5+
require (
6+
github.com/aws/aws-sdk-go-v2 v1.36.3
7+
github.com/aws/aws-sdk-go-v2/config v1.29.14
8+
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.47.3
9+
github.com/mattn/go-sqlite3 v1.14.28
10+
github.com/vmihailenco/msgpack/v5 v5.4.1
11+
)
12+
13+
require (
14+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
15+
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
16+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
17+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
18+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
19+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
20+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
21+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
22+
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect
23+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect
24+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
25+
github.com/aws/smithy-go v1.22.3 // indirect
26+
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
27+
)

go.sum

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
2+
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
3+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
4+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
5+
github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM=
6+
github.com/aws/aws-sdk-go-v2/config v1.29.14/go.mod h1:wVPHWcIFv3WO89w0rE10gzf17ZYy+UVS1Geq8Iei34g=
7+
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 h1:9KxtdcIA/5xPNQyZRgUSpYOE6j9Bc4+D7nZua0KGYOM=
8+
github.com/aws/aws-sdk-go-v2/credentials v1.17.67/go.mod h1:p3C44m+cfnbv763s52gCqrjaqyPikj9Sg47kUVaNZQQ=
9+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw=
10+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M=
11+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
12+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
13+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
14+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
15+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
16+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
17+
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.47.3 h1:3y0jkGtsaZLCg+n73BoSXOAkLFtgmD/+4prXW1pzovc=
18+
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.47.3/go.mod h1:uo14VBn5cNk/BPGTPz3kyLBxgpgOObgO8lmz+H7Z4Ck=
19+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
20+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
21+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
22+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
23+
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 h1:1Gw+9ajCV1jogloEv1RRnvfRFia2cL6c9cuKV2Ps+G8=
24+
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI=
25+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 h1:hXmVKytPfTy5axZ+fYbR5d0cFmC3JvwLm5kM83luako=
26+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs=
27+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 h1:1XuUZ8mYJw9B6lzAkXhqHlJd/XvaX32evhproijJEZY=
28+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4=
29+
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
30+
github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
31+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
32+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33+
github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A=
34+
github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
35+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
36+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
37+
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
38+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
39+
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
40+
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
41+
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
42+
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
43+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
44+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/infra/client/cloudwatch.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/aws/aws-sdk-go-v2/aws"
8+
"github.com/aws/aws-sdk-go-v2/config"
9+
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
10+
11+
"github.com/selectel/fluent-bit-cloudwatch-input-plugin/internal/model"
12+
)
13+
14+
type Cloudwatch struct {
15+
client *cloudwatchlogs.Client
16+
}
17+
18+
func NewCloudwatchClient(ctx context.Context, region, endpoint string) (*Cloudwatch, error) {
19+
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region), config.WithBaseEndpoint(endpoint))
20+
if err != nil {
21+
return nil, fmt.Errorf("failed to load aws config: %w", err)
22+
}
23+
24+
client := &Cloudwatch{
25+
client: cloudwatchlogs.NewFromConfig(cfg),
26+
}
27+
28+
return client, nil
29+
}
30+
31+
func (cw *Cloudwatch) GetLogEvents(
32+
ctx context.Context,
33+
logGroupName, logStreamName, nextToken string,
34+
) ([]model.Event, string, error) {
35+
params := &cloudwatchlogs.GetLogEventsInput{
36+
LogGroupName: aws.String(logGroupName),
37+
LogStreamName: aws.String(logStreamName),
38+
StartFromHead: aws.Bool(true),
39+
}
40+
41+
if nextToken != "" {
42+
params.NextToken = aws.String(nextToken)
43+
}
44+
45+
resp, err := cw.client.GetLogEvents(ctx, params)
46+
if err != nil {
47+
return nil, "", fmt.Errorf("failed to complete request: %w", err)
48+
}
49+
50+
events := make([]model.Event, 0, len(resp.Events))
51+
52+
for _, event := range resp.Events {
53+
events = append(events, model.Event{
54+
IngestionTime: *event.IngestionTime,
55+
Timestamp: *event.Timestamp,
56+
Message: *event.Message,
57+
})
58+
}
59+
60+
return events, *resp.NextForwardToken, nil
61+
}

internal/infra/storage/sqlite.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package storage
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"net/url"
7+
8+
_ "github.com/mattn/go-sqlite3"
9+
)
10+
11+
func NewSQLite(path string) (*sql.DB, error) {
12+
params := url.Values{}
13+
14+
params.Add("cache", "shared")
15+
params.Add("mode", "rwc")
16+
params.Add("_journal", "WAL")
17+
18+
conn, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?%s", path, params.Encode()))
19+
if err != nil {
20+
return nil, fmt.Errorf("failed to create conntection: %w", err)
21+
}
22+
23+
// https://github.com/mattn/go-sqlite3/issues/209
24+
conn.SetMaxOpenConns(1)
25+
26+
err = conn.Ping()
27+
if err != nil {
28+
return nil, fmt.Errorf("failed to ping connection: %w", err)
29+
}
30+
31+
err = migrateSQLite(conn)
32+
if err != nil {
33+
return nil, fmt.Errorf("failed to run migrations: %w", err)
34+
}
35+
36+
return conn, nil
37+
}
38+
39+
func migrateSQLite(db *sql.DB) error {
40+
const schema = `
41+
CREATE TABLE IF NOT EXISTS state (
42+
region TEXT NOT NULL,
43+
log_group_name TEXT NOT NULL,
44+
log_stream_name TEXT NOT NULL,
45+
next_token TEXT NOT NULL,
46+
updated_at INTEGER NOT NULL,
47+
48+
PRIMARY KEY (region, log_group_name, log_stream_name)
49+
);
50+
`
51+
52+
_, err := db.Exec(schema)
53+
if err != nil {
54+
return fmt.Errorf("failed to execute sql query: %w", err)
55+
}
56+
57+
return nil
58+
}
+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package sqlite
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"time"
9+
)
10+
11+
type State struct {
12+
db *sql.DB
13+
}
14+
15+
func NewState(db *sql.DB) *State {
16+
return &State{
17+
db: db,
18+
}
19+
}
20+
21+
func (s *State) GetNextToken(ctx context.Context, region, logGroupName, logStreamName string) (string, error) {
22+
const query = `SELECT next_token FROM state WHERE region = ? and log_group_name = ? and log_stream_name = ?`
23+
24+
var nextToken string
25+
26+
err := s.db.QueryRowContext(ctx, query, region, logGroupName, logStreamName).Scan(&nextToken)
27+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
28+
return "", fmt.Errorf("failed to execute sql query: %w", err)
29+
}
30+
31+
return nextToken, nil
32+
}
33+
34+
func (s *State) SetNextToken(ctx context.Context, region, logGroupName, logStreamName, nextToken string) error {
35+
const query = `
36+
INSERT INTO state (region, log_group_name, log_stream_name, next_token, updated_at)
37+
VALUES (?, ?, ?, ?, ?)
38+
ON CONFLICT (region, log_group_name, log_stream_name) DO UPDATE SET
39+
next_token = excluded.next_token,
40+
updated_at = excluded.updated_at
41+
`
42+
43+
_, err := s.db.ExecContext(ctx, query, region, logGroupName, logStreamName, nextToken, time.Now().UTC().Unix())
44+
if err != nil {
45+
return fmt.Errorf("failed to execute sql query: %w", err)
46+
}
47+
48+
return nil
49+
}

internal/model/event.go

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package model
2+
3+
type Event struct {
4+
IngestionTime int64 `msgpack:"ingestion_time"`
5+
Timestamp int64 `msgpack:"timestamp"`
6+
Message string `msgpack:"message"`
7+
}

internal/plugin/adapter/cloudwatch.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
6+
"github.com/selectel/fluent-bit-cloudwatch-input-plugin/internal/model"
7+
)
8+
9+
type Cloudwatch interface {
10+
GetLogEvents(ctx context.Context, logGroupName, logStreamName, nextToken string) ([]model.Event, string, error)
11+
}

internal/plugin/adapter/storage.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package adapter
2+
3+
import "context"
4+
5+
type Storage interface {
6+
GetNextToken(ctx context.Context, region, logGroupName, logStreamName string) (string, error)
7+
SetNextToken(ctx context.Context, region, logGroupName, logStreamName, nextToken string) error
8+
}

internal/plugin/plugin.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package plugin
2+
3+
import (
4+
"context"
5+
6+
"github.com/selectel/fluent-bit-cloudwatch-input-plugin/internal/model"
7+
"github.com/selectel/fluent-bit-cloudwatch-input-plugin/internal/plugin/adapter"
8+
)
9+
10+
type Plugin struct {
11+
region string
12+
endpoint string
13+
logGroupName string
14+
logStreamName string
15+
16+
cloudwatch adapter.Cloudwatch
17+
storage adapter.Storage
18+
}
19+
20+
func NewPlugin(
21+
region, endpoint, logGroupName, logStreamName string,
22+
cloudwatch adapter.Cloudwatch,
23+
storage adapter.Storage,
24+
) *Plugin {
25+
return &Plugin{
26+
region: region,
27+
endpoint: endpoint,
28+
logGroupName: logGroupName,
29+
logStreamName: logStreamName,
30+
cloudwatch: cloudwatch,
31+
storage: storage,
32+
}
33+
}
34+
35+
func (p *Plugin) GetLogEvents(ctx context.Context, nextToken string) ([]model.Event, string, error) {
36+
return p.cloudwatch.GetLogEvents(ctx, p.logGroupName, p.logStreamName, nextToken)
37+
}
38+
39+
func (p *Plugin) GetNextToken(ctx context.Context) (string, error) {
40+
return p.storage.GetNextToken(ctx, p.region, p.logGroupName, p.logStreamName)
41+
}
42+
43+
func (p *Plugin) SetNextToken(ctx context.Context, nextToken string) error {
44+
return p.storage.SetNextToken(ctx, p.region, p.logGroupName, p.logStreamName, nextToken)
45+
}

0 commit comments

Comments
 (0)