Skip to content

Commit 5f060b7

Browse files
authored
Implement stream connection for remote write (#6580)
* Implement stream connection for remote write Signed-off-by: Alex Le <[email protected]> * fix lint Signed-off-by: Alex Le <[email protected]> * addressed comments Signed-off-by: Alex Le <[email protected]> * Added test for draining logic in closableHealthAndIngesterClient Signed-off-by: Alex Le <[email protected]> * addressed comments Signed-off-by: Alex Le <[email protected]> * Fixed stream push error response and add integration test Signed-off-by: Alex Le <[email protected]> * fix lint Signed-off-by: Alex Le <[email protected]> * update CHANGELOG Signed-off-by: Alex Le <[email protected]> * updated WriteResponse to have code and message field Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent ee387c5 commit 5f060b7

File tree

15 files changed

+1157
-256
lines changed

15 files changed

+1157
-256
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
1212
* [FEATURE] Ruler: Add support for group labels. #6665
1313
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
14+
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1415
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
1516
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
1617
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2690,6 +2690,11 @@ ha_tracker:
26902690
# CLI flag: -distributor.sign-write-requests
26912691
[sign_write_requests: <boolean> | default = false]
26922692
2693+
# EXPERIMENTAL: If enabled, distributor would use stream connection to send
2694+
# requests to ingesters.
2695+
# CLI flag: -distributor.use-stream-push
2696+
[use_stream_push: <boolean> | default = false]
2697+
26932698
ring:
26942699
kvstore:
26952700
# Backend storage to use for the ring. Supported values are: consul, etcd,

docs/configuration/v1-guarantees.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,6 @@ Currently experimental features are:
127127
- `-ingester.instance-limits.cpu-utilization`
128128
- `-ingester.instance-limits.heap-utilization`
129129
- `-store-gateway.instance-limits.cpu-utilization`
130-
- `-store-gateway.instance-limits.heap-utilization`
130+
- `-store-gateway.instance-limits.heap-utilization`
131+
- Distributor/Ingester: Stream push connection
132+
- Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"fmt"
8+
"math/rand"
9+
"strconv"
10+
"testing"
11+
"time"
12+
13+
"github.com/prometheus/prometheus/model/labels"
14+
"github.com/prometheus/prometheus/prompb"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
18+
"github.com/cortexproject/cortex/integration/e2e"
19+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
20+
"github.com/cortexproject/cortex/integration/e2ecortex"
21+
)
22+
23+
func TestIngesterStreamPushConnection(t *testing.T) {
24+
25+
s, err := e2e.NewScenario(networkName)
26+
require.NoError(t, err)
27+
defer s.Close()
28+
29+
maxGlobalSeriesPerMetric := 300
30+
maxGlobalSeriesPerTenant := 1000
31+
32+
flags := BlocksStorageFlags()
33+
flags["-distributor.use-stream-push"] = "true"
34+
flags["-distributor.replication-factor"] = "1"
35+
flags["-distributor.shard-by-all-labels"] = "true"
36+
flags["-distributor.sharding-strategy"] = "shuffle-sharding"
37+
flags["-distributor.ingestion-tenant-shard-size"] = "1"
38+
flags["-ingester.max-series-per-user"] = "0"
39+
flags["-ingester.max-series-per-metric"] = "0"
40+
flags["-ingester.max-global-series-per-user"] = strconv.Itoa(maxGlobalSeriesPerTenant)
41+
flags["-ingester.max-global-series-per-metric"] = strconv.Itoa(maxGlobalSeriesPerMetric)
42+
flags["-ingester.heartbeat-period"] = "1s"
43+
44+
// Start dependencies.
45+
consul := e2edb.NewConsul()
46+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
47+
require.NoError(t, s.StartAndWaitReady(consul, minio))
48+
49+
// Start Cortex components.
50+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
51+
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
52+
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
53+
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
54+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))
55+
56+
// Wait until distributor has updated the ring.
57+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
58+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
59+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
60+
61+
// Wait until ingesters have heartbeated the ring after all ingesters were active,
62+
// in order to update the number of instances. Since we have no metric, we have to
63+
// rely on a ugly sleep.
64+
time.Sleep(2 * time.Second)
65+
66+
now := time.Now()
67+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
68+
require.NoError(t, err)
69+
70+
numSeriesWithSameMetricName := 0
71+
numSeriesTotal := 0
72+
maxErrorsBeforeStop := 100
73+
74+
// Try to push as many series with the same metric name as we can.
75+
for i, errs := 0, 0; i < 10000; i++ {
76+
series, _ := generateSeries("test_limit_per_metric", now, prompb.Label{
77+
Name: "cardinality",
78+
Value: strconv.Itoa(rand.Int()),
79+
})
80+
81+
res, err := client.Push(series)
82+
require.NoError(t, err)
83+
84+
if res.StatusCode == 200 {
85+
numSeriesTotal++
86+
numSeriesWithSameMetricName++
87+
} else if errs++; errs >= maxErrorsBeforeStop {
88+
break
89+
}
90+
}
91+
92+
// Try to push as many series with the different metric name as we can.
93+
for i, errs := 0, 0; i < 10000; i++ {
94+
series, _ := generateSeries(fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()), now)
95+
res, err := client.Push(series)
96+
require.NoError(t, err)
97+
98+
if res.StatusCode == 200 {
99+
numSeriesTotal++
100+
} else if errs++; errs >= maxErrorsBeforeStop {
101+
break
102+
}
103+
}
104+
105+
// We expect the number of series we've been successfully pushed to be around
106+
// the limit. Due to how the global limit implementation works (lack of centralised
107+
// coordination) the actual number of written series could be slightly different
108+
// than the global limit, so we allow a 10% difference.
109+
delta := 0.1
110+
assert.InDelta(t, maxGlobalSeriesPerMetric, numSeriesWithSameMetricName, float64(maxGlobalSeriesPerMetric)*delta)
111+
assert.InDelta(t, maxGlobalSeriesPerTenant, numSeriesTotal, float64(maxGlobalSeriesPerTenant)*delta)
112+
113+
// Ensure no service-specific metrics prefix is used by the wrong service.
114+
assertServiceMetricsPrefixes(t, Distributor, distributor)
115+
assertServiceMetricsPrefixes(t, Ingester, ingester1)
116+
assertServiceMetricsPrefixes(t, Ingester, ingester2)
117+
assertServiceMetricsPrefixes(t, Ingester, ingester3)
118+
}

0 commit comments

Comments
 (0)