Skip to content

Commit

Permalink
chore: make cslb configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jan 22, 2025
1 parent aaae529 commit cb0224b
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/bufbuild/httplb/health"
"io"
"net"
"net/http"
Expand All @@ -17,6 +18,8 @@ import (
"time"

"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/conn"
"github.com/bufbuild/httplb/picker"
"github.com/bufbuild/httplb/resolver"
"github.com/cenkalti/backoff"
jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -259,9 +262,11 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op
}, config.GetDuration("Transformer.Client.ttl", 120, time.Second))
case "httplb":
trans.httpClient = httplb.NewClient(
httplb.WithPicker(trans.getPicker()),
httplb.WithTransport("http", &HTTPLBTransport{
Transport: transport,
}),
httplb.WithHealthChecks(trans.getChecker()),
httplb.WithResolver(
resolver.NewDNSResolver(
net.DefaultResolver,
Expand Down Expand Up @@ -310,6 +315,39 @@ func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.T
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}
}

func (trans *handle) getPicker() func(prev picker.Picker, allConns conn.Conns) picker.Picker {
pickerType := config.GetString("Transformer.Client.httplb.pickerType", "power_of_two")
switch pickerType {
case "power_of_two":
return picker.NewPowerOfTwo
case "round_robin":
return picker.NewRoundRobin
case "least_loaded_random":
return picker.NewLeastLoadedRandom
case "least_loaded_round_robin":
return picker.NewLeastLoadedRoundRobin
case "random":
return picker.NewRandom
default:
panic(fmt.Sprintf("unknown picker type: %s", pickerType))
}
}

func (trans *handle) getChecker() health.Checker {
checkerType := config.GetString("Transformer.Client.httplb.checkerType", "nop")
switch checkerType {
case "nop":
return health.NopChecker
case "polling":
return health.NewPollingChecker(
health.PollingCheckerConfig{},
health.NewSimpleProber(trans.config.userTransformationURL),
)
default:
panic(fmt.Sprintf("unknown checker type: %s", checkerType))
}
}

func (trans *handle) transform(
ctx context.Context,
clientEvents []TransformerEvent,
Expand Down

0 comments on commit cb0224b

Please sign in to comment.