-
Notifications
You must be signed in to change notification settings - Fork 47
/
main.go
189 lines (169 loc) · 5.68 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package main
import (
"context"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
"time"
"github.com/alecthomas/kingpin/v2"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/es-operator/operator"
"github.com/zalando-incubator/es-operator/pkg/clientset"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)
const (
defaultInterval = "10s"
defaultAutoscalerInterval = "30s"
defaultMetricsAddress = ":7979"
defaultClientGoTimeout = 30 * time.Second
defaultClusterDNSZone = "cluster.local."
)
var (
config struct {
Interval time.Duration
AutoscalerInterval time.Duration
APIServer *url.URL
PodSelectors Labels
PriorityNodeSelectors Labels
MetricsAddress string
ClientGoTimeout time.Duration
Debug bool
OperatorID string
Namespace string
ClusterDNSZone string
ElasticsearchEndpoint *url.URL
}
)
func main() {
config.PodSelectors = Labels(map[string]string{})
config.PriorityNodeSelectors = Labels(map[string]string{})
kingpin.Flag("debug", "Enable debug logging.").BoolVar(&config.Debug)
kingpin.Flag("interval", "Interval between syncing.").
Default(defaultInterval).DurationVar(&config.Interval)
kingpin.Flag("autoscaler-interval", "Interval between checking if autoscaling is needed.").
Default(defaultAutoscalerInterval).DurationVar(&config.AutoscalerInterval)
kingpin.Flag("apiserver", "API server url.").URLVar(&config.APIServer)
kingpin.Flag("pod-selector", "Operator will manage all pods selected by this label selector. <key>=<value>,+.").
SetValue(&config.PodSelectors)
kingpin.Flag("priority-node-selector", "Specify a label selector for finding nodes with the highest priority. Common use case for this is to priorize nodes that are ready over nodes that are about to be terminated.").
SetValue(&config.PriorityNodeSelectors)
kingpin.Flag("metrics-address", "defines where to serve metrics").
Default(defaultMetricsAddress).StringVar(&config.MetricsAddress)
kingpin.Flag("client-go-timeout", "Set the timeout used for the Kubernetes client").
Default(defaultClientGoTimeout.String()).DurationVar(&config.ClientGoTimeout)
kingpin.Flag("operator-id", "ID of the operator used to determine ownership of EDS resources").
StringVar(&config.OperatorID)
kingpin.Flag("cluster-dns-zone", "The zone used for the cluster internal DNS. Used when generating ES service endpoint").
Default(defaultClusterDNSZone).StringVar(&config.ClusterDNSZone)
kingpin.Flag("elasticsearch-endpoint", "The Elasticsearch endpoint to use for reaching Elasticsearch API. By default the service endpoint for the EDS is used").
URLVar(&config.ElasticsearchEndpoint)
kingpin.Flag("namespace", "Limit operator to a certain namespace").
Default(v1.NamespaceAll).StringVar(&config.Namespace)
kingpin.Parse()
if config.Debug {
log.SetLevel(log.DebugLevel)
}
ctx, cancel := context.WithCancel(context.Background())
kubeConfig, err := configureKubeConfig(config.APIServer, defaultClientGoTimeout, ctx.Done())
if err != nil {
log.Fatalf("Failed to setup Kubernetes config: %v", err)
}
client, err := clientset.NewClientset(kubeConfig)
if err != nil {
log.Fatalf("Failed to setup Kubernetes client: %v", err)
}
operator := operator.NewElasticsearchOperator(
client,
config.PriorityNodeSelectors,
config.Interval,
config.AutoscalerInterval,
config.OperatorID,
config.Namespace,
config.ClusterDNSZone,
config.ElasticsearchEndpoint,
)
go handleSigterm(cancel)
go serveMetrics(config.MetricsAddress)
err = operator.Run(ctx)
if err != nil {
cancel()
log.Fatalf("Failed to run operator: %v", err)
}
}
// handleSigterm handles SIGTERM signal sent to the process.
func handleSigterm(cancelFunc func()) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM)
<-signals
log.Info("Received Term signal. Terminating...")
cancelFunc()
}
// configureKubeConfig configures a kubeconfig.
func configureKubeConfig(apiServerURL *url.URL, timeout time.Duration, stopCh <-chan struct{}) (*rest.Config, error) {
tr := &http.Transport{
DialContext: (&net.Dialer{
Timeout: timeout,
KeepAlive: 30 * time.Second,
DualStack: false, // K8s do not work well with IPv6
}).DialContext,
TLSHandshakeTimeout: timeout,
ResponseHeaderTimeout: 10 * time.Second,
MaxIdleConns: 10,
MaxIdleConnsPerHost: 2,
IdleConnTimeout: 20 * time.Second,
}
// We need this to reliably fade on DNS change, which is right
// now not fixed with IdleConnTimeout in the http.Transport.
// https://github.com/golang/go/issues/23427
go func(d time.Duration) {
for {
select {
case <-time.After(d):
tr.CloseIdleConnections()
case <-stopCh:
return
}
}
}(20 * time.Second)
if apiServerURL != nil {
return &rest.Config{
Host: apiServerURL.String(),
Timeout: timeout,
Transport: tr,
QPS: 100.0,
Burst: 500,
}, nil
}
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
// patch TLS config
restTransportConfig, err := config.TransportConfig()
if err != nil {
return nil, err
}
restTLSConfig, err := transport.TLSConfigFor(restTransportConfig)
if err != nil {
return nil, err
}
tr.TLSClientConfig = restTLSConfig
config.Timeout = timeout
config.Transport = tr
config.QPS = 100.0
config.Burst = 500
// disable TLSClientConfig to make the custom Transport work
config.TLSClientConfig = rest.TLSClientConfig{}
return config, nil
}
// gather go metrics
func serveMetrics(address string) {
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(address, nil))
}