Skip to content

Commit 678a74b

Browse files
committed
feat: transfer 支持日志聚类清洗
1 parent b6b918e commit 678a74b

File tree

29 files changed

+941
-352
lines changed

29 files changed

+941
-352
lines changed

pkg/transfer/config/metadata_options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ const (
131131

132132
// PipelineConfigIsLogCluster 是否开启日志聚类
133133
PipelineConfigOptIsLogCluster = "is_log_cluster"
134+
135+
// PipelineConfigOptBackendFields 聚类中清洗 backend 需要配置指定的入库字段
136+
PipelineConfigOptBackendFields = "backend_fields"
137+
138+
// PipelineConfigOptLogClusterConfig 聚类配置
139+
PipelineConfigOptLogClusterConfig = "log_cluster_config"
134140
)
135141

136142
// MetaResultTableConfig 专用

pkg/transfer/define/data_processor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
package define
1111

1212
import (
13+
"time"
14+
1315
"github.com/pkg/errors"
1416
)
1517

@@ -18,6 +20,7 @@ type BaseDataProcessor struct {
1820
Name string
1921
DisabledBizIDs map[string]struct{}
2022
baseIndex int
23+
poll time.Duration
2124
}
2225

2326
// String : return frontend name
@@ -42,6 +45,14 @@ func (f *BaseDataProcessor) Index() int {
4245
return f.baseIndex
4346
}
4447

48+
func (f *BaseDataProcessor) SetPoll(poll time.Duration) {
49+
f.poll = poll
50+
}
51+
52+
func (f *BaseDataProcessor) Poll() time.Duration {
53+
return f.poll
54+
}
55+
4556
// NewBaseDataProcessor :
4657
func NewBaseDataProcessor(name string) *BaseDataProcessor {
4758
return &BaseDataProcessor{

pkg/transfer/define/etl.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@
1010
package define
1111

1212
import (
13+
"fmt"
14+
"maps"
1315
"time"
1416

17+
"github.com/cespare/xxhash/v2"
1518
"github.com/pkg/errors"
1619

20+
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/transfer/bufferpool"
1721
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/transfer/types"
1822
)
1923

@@ -65,6 +69,87 @@ type ETLRecord struct {
6569
Exemplar map[string]interface{} `json:"exemplar"`
6670
}
6771

72+
type ETLRecordFields struct {
73+
KeepMetrics []string `json:"keep_metrics" mapstructure:"keep_metrics"`
74+
DropMetrics []string `json:"drop_metrics" mapstructure:"drop_metrics"`
75+
KeepDimensions []string `json:"keep_dimensions" mapstructure:"keep_dimensions"`
76+
DropDimensions []string `json:"drop_dimensions" mapstructure:"drop_dimensions"`
77+
GroupKeys []string `json:"group_keys" mapstructure:"group_keys"`
78+
}
79+
80+
// Filter 过滤 ELTRecord
81+
//
82+
// 白名单规则优先与黑名单规则 当且仅当没有白名单时黑名单才会生效
83+
func (f *ETLRecordFields) Filter(record ETLRecord) ETLRecord {
84+
newRecord := ETLRecord{
85+
Time: record.Time,
86+
Exemplar: record.Exemplar,
87+
Dimensions: record.Dimensions,
88+
Metrics: record.Metrics,
89+
}
90+
91+
if len(f.KeepMetrics) > 0 {
92+
// 指标白名单
93+
newMetrics := make(map[string]interface{})
94+
for _, k := range f.KeepMetrics {
95+
if v, ok := record.Metrics[k]; ok {
96+
newMetrics[k] = v
97+
}
98+
}
99+
newRecord.Metrics = newMetrics
100+
} else {
101+
// 指标黑名单
102+
if len(f.DropMetrics) > 0 {
103+
cloned := maps.Clone(record.Metrics)
104+
for _, k := range f.DropMetrics {
105+
if _, ok := cloned[k]; ok {
106+
delete(cloned, k)
107+
}
108+
}
109+
newRecord.Metrics = cloned
110+
}
111+
}
112+
113+
if len(f.KeepDimensions) > 0 {
114+
// 维度白名单
115+
newDimensions := make(map[string]interface{})
116+
for _, k := range f.KeepDimensions {
117+
v, ok := record.Dimensions[k]
118+
if ok {
119+
newDimensions[k] = v
120+
}
121+
}
122+
newRecord.Dimensions = newDimensions
123+
} else {
124+
// 维度黑名单
125+
if len(f.DropDimensions) > 0 {
126+
cloned := maps.Clone(record.Dimensions)
127+
for _, k := range f.DropDimensions {
128+
if _, ok := cloned[k]; ok {
129+
delete(cloned, k)
130+
}
131+
}
132+
newRecord.Dimensions = cloned
133+
}
134+
}
135+
return newRecord
136+
}
137+
138+
func (f *ETLRecordFields) GroupID(document map[string]interface{}) uint64 {
139+
buf := bufferpool.Get()
140+
defer bufferpool.Put(buf)
141+
142+
for _, key := range f.GroupKeys {
143+
v, ok := document[key]
144+
if !ok {
145+
continue
146+
}
147+
buf.WriteString(key + "/")
148+
fmt.Fprintf(buf, "%s/", v)
149+
}
150+
return xxhash.Sum64(buf.Bytes())
151+
}
152+
68153
type GroupETLRecord struct {
69154
*ETLRecord
70155
GroupInfo []map[string]interface{} `json:"group_info"`

pkg/transfer/define/interface.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type DataProcessor interface {
7575
Finish(outputChan chan<- Payload, killChan chan<- error)
7676
SetIndex(i int)
7777
Index() int
78+
Poll() time.Duration
79+
SetPoll(t time.Duration)
7880
}
7981

8082
// Frontend : Processor to pull data
@@ -90,7 +92,7 @@ type Backend interface {
9092
Stringer
9193
SavePoint
9294
Push(d Payload, killChan chan<- error)
93-
SetFilter(opts map[string]interface{})
95+
SetETLRecordFields(f *ETLRecordFields)
9496
}
9597

9698
// Pipeline : pipeline to process data

pkg/transfer/elasticsearch/backend.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type BulkHandler struct {
4040
writer BulkWriter
4141
indexRender IndexRenderFn
4242
transformers map[string]etl.TransformFn
43+
44+
fields *define.ETLRecordFields
4345
}
4446

4547
func (b *BulkHandler) makeRecordID(values map[string]interface{}) string {
@@ -100,6 +102,9 @@ func (b *BulkHandler) Handle(ctx context.Context, payload define.Payload, killCh
100102
}
101103
}
102104

105+
if b.fields != nil {
106+
etlRecord = b.fields.Filter(etlRecord)
107+
}
103108
return &etlRecord, utils.ParseTimeStamp(*etlRecord.Time), true
104109
}
105110

@@ -170,6 +175,24 @@ func (b *BulkHandler) flush(ctx context.Context, index string, records Records)
170175
return count, errs.AsError()
171176
}
172177

178+
func (b *BulkHandler) grouping(records Records) Records {
179+
if b.fields == nil || len(b.fields.GroupKeys) <= 0 {
180+
return records
181+
}
182+
183+
uniq := make(map[uint64]*Record)
184+
for _, record := range records {
185+
uid := b.fields.GroupID(record.Document)
186+
uniq[uid] = record
187+
}
188+
189+
dst := make(Records, 0, len(uniq))
190+
for _, item := range uniq {
191+
dst = append(dst, item)
192+
}
193+
return dst
194+
}
195+
173196
// Flush :
174197
func (b *BulkHandler) Flush(ctx context.Context, results []interface{}) (count int, err error) {
175198
lastIndex := ""
@@ -193,9 +216,12 @@ func (b *BulkHandler) Flush(ctx context.Context, results []interface{}) (count i
193216

194217
logging.Debugf("backend %v ready to flush record %#v to index %s", b, record, index)
195218

219+
// TODO(mando): grouping 会导致实际写入数量低于 results 数量
220+
// 但实际上并非写入失败
221+
196222
// 处理跨时间间隔
197223
if index != lastIndex && lastIndex != "" {
198-
cnt, err := b.flush(ctx, lastIndex, records)
224+
cnt, err := b.flush(ctx, lastIndex, b.grouping(records))
199225
records = records[:0]
200226
count += cnt
201227
errs.Add(err)
@@ -205,14 +231,18 @@ func (b *BulkHandler) Flush(ctx context.Context, results []interface{}) (count i
205231
}
206232

207233
if len(records) > 0 {
208-
cnt, err := b.flush(ctx, lastIndex, records)
234+
cnt, err := b.flush(ctx, lastIndex, b.grouping(records))
209235
count += cnt
210236
errs.Add(err)
211237
}
212238

213239
return count, errs.AsError()
214240
}
215241

242+
func (b *BulkHandler) SetETLRecordFields(f *define.ETLRecordFields) {
243+
b.fields = f
244+
}
245+
216246
// Close :
217247
func (b *BulkHandler) Close() error {
218248
return b.writer.Close()
@@ -268,7 +298,7 @@ func NewBulkHandler(cluster *config.ElasticSearchMetaClusterInfo, table *config.
268298
}
269299

270300
// NewBackend :
271-
func NewBackend(ctx context.Context, name string, maxQps int) (define.Backend, error) {
301+
func NewBackend(ctx context.Context, name string, options *utils.MapHelper) (define.Backend, error) {
272302
conf := config.FromContext(ctx)
273303
resultTable := config.ResultTableConfigFromContext(ctx)
274304

@@ -300,6 +330,7 @@ func NewBackend(ctx context.Context, name string, maxQps int) (define.Backend, e
300330
return nil, err
301331
}
302332

333+
maxQps, _ := options.GetInt(config.PipelineConfigOptMaxQps)
303334
return pipeline.NewBulkBackendDefaultAdapter(ctx, name, bulk, maxQps), nil
304335
}
305336

@@ -318,7 +349,6 @@ func init() {
318349
}
319350

320351
options := utils.NewMapHelper(rt.Option)
321-
maxQps, _ := options.GetInt(config.PipelineConfigOptMaxQps)
322-
return NewBackend(ctx, rt.FormatName(name), maxQps)
352+
return NewBackend(ctx, rt.FormatName(name), options)
323353
})
324354
}

pkg/transfer/esb/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
type Client struct {
2020
commonArgs *CommonArgs
2121
agent *sling.Sling
22-
conf define.Configuration
22+
conf define.Configuration
2323
}
2424

2525
// CommonArgs :

pkg/transfer/etl/transformer.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -316,18 +316,16 @@ func ParseDbmSlowQuery(url, content string, retry int) (*DbmResponse, error) {
316316
return nil, err
317317
}
318318

319-
var (
320-
httpClient = &http.Client{
321-
Transport: &http.Transport{
322-
DialContext: (&net.Dialer{
323-
Timeout: time.Minute,
324-
}).DialContext,
325-
MaxIdleConns: 200,
326-
MaxIdleConnsPerHost: 100,
327-
IdleConnTimeout: 2 * time.Minute,
328-
},
329-
}
330-
)
319+
var httpClient = &http.Client{
320+
Transport: &http.Transport{
321+
DialContext: (&net.Dialer{
322+
Timeout: time.Minute,
323+
}).DialContext,
324+
MaxIdleConns: 200,
325+
MaxIdleConnsPerHost: 100,
326+
IdleConnTimeout: 2 * time.Minute,
327+
},
328+
}
331329

332330
func parseDbmSlowQuery(url, content string) (*DbmResponse, error) {
333331
req := DbmRequest{Content: content}

pkg/transfer/filesystem/processor/backend.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ func (b *Backend) Close() error {
8080
return b.file.Close()
8181
}
8282

83+
func (b *Backend) SetETLRecordFields(f *define.ETLRecordFields) {}
84+
8385
func init() {
8486
define.RegisterBackend("file", func(ctx context.Context, name string) (define.Backend, error) {
8587
return NewBackend(ctx, name), nil

pkg/transfer/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ require (
4949
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
5050
gopkg.in/natefinch/lumberjack.v2 v2.0.0
5151
gopkg.in/yaml.v2 v2.4.0
52+
gopkg.in/yaml.v3 v3.0.1
5253
)
5354

5455
require (
@@ -114,7 +115,6 @@ require (
114115
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
115116
gopkg.in/jcmturner/gokrb5.v7 v7.5.0 // indirect
116117
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
117-
gopkg.in/yaml.v3 v3.0.1 // indirect
118118
)
119119

120120
// 背景:官方的 JMESPath SDK 不支持扩展自定义函数

pkg/transfer/influxdb/backend.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ func (b *BulkHandler) Close() error {
198198
return b.cli.Close()
199199
}
200200

201+
func (b *BulkHandler) SetETLRecordFields(f *define.ETLRecordFields) {}
202+
201203
// NewBulkBackend
202204
func NewBulkHandler(rt *config.MetaResultTableConfig, shipper *config.MetaClusterInfo) (*BulkHandler, error) {
203205
cluster := shipper.AsInfluxCluster()

0 commit comments

Comments
 (0)