Skip to content

Commit cdf0064

Browse files
committed
[PoC] Dynamic routing connector for routing based on estimated counts
1 parent e8474a2 commit cdf0064

File tree

14 files changed

+1228
-0
lines changed

14 files changed

+1228
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector"
19+
20+
import (
21+
"errors"
22+
"sort"
23+
"time"
24+
25+
"go.opentelemetry.io/collector/pipeline"
26+
)
27+
28+
type Config struct {
29+
// TODO(lahsivjar): Revisit the decision to route to default pipeline
30+
// if NO metadata key results in empty str OR if primary key doesn't exist.
31+
DefaultPipelines []pipeline.ID `mapstructure:"default_pipelines"`
32+
EvaluationInterval time.Duration `mapstructure:"evalaution_interval"`
33+
Pipelines [][]pipeline.ID `mapstructure:"pipelines"`
34+
Thresholds []int `mapstructure:"thresholds"`
35+
PrimaryMetadataKey string `mapstructure:"primary_metadata_key"`
36+
MetadataKeys []string `mapstructure:"metadata_keys"`
37+
}
38+
39+
func (c *Config) Validate() error {
40+
if len(c.Pipelines) == 0 {
41+
return errors.New("atleast one pipeline needs to be defined")
42+
}
43+
if len(c.Pipelines)+1 != len(c.Thresholds) {
44+
return errors.New("pipelines need to be defined for each threshold bucket, including +inf")
45+
}
46+
if !sort.IntsAreSorted(c.Thresholds) {
47+
return errors.New("thresolds is expected to be in increasing order")
48+
}
49+
50+
for i := 1; i < len(c.Thresholds); i++ {
51+
if c.Thresholds[i] == c.Thresholds[i-1] {
52+
return errors.New("thresholds are expected to be unique")
53+
}
54+
}
55+
return nil
56+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector"
19+
20+
import (
21+
"context"
22+
23+
"go.opentelemetry.io/collector/component"
24+
"go.opentelemetry.io/collector/connector"
25+
"go.opentelemetry.io/collector/consumer"
26+
27+
"github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector/internal/metadata"
28+
)
29+
30+
// NewFactory returns a connector.Factory.
31+
func NewFactory() connector.Factory {
32+
return connector.NewFactory(
33+
metadata.Type,
34+
createDefaultConfig,
35+
connector.WithTracesToTraces(createTracesToTraces, metadata.TracesToTracesStability),
36+
connector.WithLogsToLogs(createLogsToLogs, metadata.LogsToLogsStability),
37+
connector.WithMetricsToMetrics(createMetricsToMetrics, metadata.MetricsToMetricsStability),
38+
)
39+
}
40+
41+
func createTracesToTraces(
42+
_ context.Context,
43+
set connector.Settings,
44+
cfg component.Config,
45+
traces consumer.Traces,
46+
) (connector.Traces, error) {
47+
return newTracesConnector(set, cfg, traces)
48+
}
49+
50+
func createLogsToLogs(
51+
_ context.Context,
52+
set connector.Settings,
53+
cfg component.Config,
54+
logs consumer.Logs,
55+
) (connector.Logs, error) {
56+
return newLogsConnector(set, cfg, logs)
57+
}
58+
59+
func createMetricsToMetrics(
60+
_ context.Context,
61+
set connector.Settings,
62+
cfg component.Config,
63+
metrics consumer.Metrics,
64+
) (connector.Metrics, error) {
65+
return newMetricsConnector(set, cfg, metrics)
66+
}
67+
68+
func createDefaultConfig() component.Config {
69+
return &Config{}
70+
}

connector/dynamicroutingconnector/generated_component_test.go

Lines changed: 107 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

connector/dynamicroutingconnector/generated_package_test.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
module github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector
2+
3+
go 1.24.0
4+
5+
toolchain go1.24.2
6+
7+
require (
8+
github.com/axiomhq/hyperloglog v0.2.5
9+
github.com/cespare/xxhash/v2 v2.3.0
10+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.135.0
11+
github.com/stretchr/testify v1.11.1
12+
go.opentelemetry.io/collector/client v1.41.0
13+
go.opentelemetry.io/collector/component v1.41.0
14+
go.opentelemetry.io/collector/component/componenttest v0.135.0
15+
go.opentelemetry.io/collector/confmap v1.41.0
16+
go.opentelemetry.io/collector/connector v0.135.0
17+
go.opentelemetry.io/collector/connector/connectortest v0.135.0
18+
go.opentelemetry.io/collector/consumer v1.41.0
19+
go.opentelemetry.io/collector/consumer/consumertest v0.135.0
20+
go.opentelemetry.io/collector/pdata v1.41.0
21+
go.opentelemetry.io/collector/pipeline v1.41.0
22+
go.uber.org/goleak v1.3.0
23+
go.uber.org/zap v1.27.0
24+
)
25+
26+
require (
27+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
28+
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
29+
github.com/go-logr/logr v1.4.3 // indirect
30+
github.com/go-logr/stdr v1.2.2 // indirect
31+
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
32+
github.com/gobwas/glob v0.2.3 // indirect
33+
github.com/gogo/protobuf v1.3.2 // indirect
34+
github.com/google/uuid v1.6.0 // indirect
35+
github.com/hashicorp/go-version v1.7.0 // indirect
36+
github.com/json-iterator/go v1.1.12 // indirect
37+
github.com/kamstrup/intmap v0.5.1 // indirect
38+
github.com/knadh/koanf/maps v0.1.2 // indirect
39+
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
40+
github.com/knadh/koanf/v2 v2.2.2 // indirect
41+
github.com/mitchellh/copystructure v1.2.0 // indirect
42+
github.com/mitchellh/reflectwalk v1.0.2 // indirect
43+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
44+
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
45+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.135.0 // indirect
46+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
47+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
48+
go.opentelemetry.io/collector/connector/xconnector v0.135.0 // indirect
49+
go.opentelemetry.io/collector/consumer/xconsumer v0.135.0 // indirect
50+
go.opentelemetry.io/collector/featuregate v1.41.0 // indirect
51+
go.opentelemetry.io/collector/internal/fanoutconsumer v0.135.0 // indirect
52+
go.opentelemetry.io/collector/internal/telemetry v0.135.0 // indirect
53+
go.opentelemetry.io/collector/pdata/pprofile v0.135.0 // indirect
54+
go.opentelemetry.io/collector/pipeline/xpipeline v0.135.0 // indirect
55+
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
56+
go.opentelemetry.io/otel v1.38.0 // indirect
57+
go.opentelemetry.io/otel/log v0.14.0 // indirect
58+
go.opentelemetry.io/otel/metric v1.38.0 // indirect
59+
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
60+
go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect
61+
go.opentelemetry.io/otel/trace v1.38.0 // indirect
62+
go.uber.org/multierr v1.11.0 // indirect
63+
go.yaml.in/yaml/v3 v3.0.4 // indirect
64+
golang.org/x/net v0.43.0 // indirect
65+
golang.org/x/sys v0.35.0 // indirect
66+
golang.org/x/text v0.28.0 // indirect
67+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
68+
google.golang.org/grpc v1.75.0 // indirect
69+
google.golang.org/protobuf v1.36.8 // indirect
70+
gopkg.in/yaml.v3 v3.0.1 // indirect
71+
)
72+
73+
replace github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent => ../../internal/sharedcomponent
74+
75+
replace github.com/elastic/opentelemetry-collector-components/processor/elastictraceprocessor => ../../processor/elastictraceprocessor
76+
77+
replace github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor => ../../processor/lsmintervalprocessor

0 commit comments

Comments
 (0)