diff --git a/.gitignore b/.gitignore index 24e5b0a..c295d16 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .build +build diff --git a/cmd/args.go b/cmd/args.go index efeaa4d..072e6e9 100644 --- a/cmd/args.go +++ b/cmd/args.go @@ -41,6 +41,7 @@ type options struct { flushInterval time.Duration backend backends.StorageBackend backendRoot string + prefix string verbosity log.Level } diff --git a/cmd/root.go b/cmd/root.go index 3391566..dc9bb2a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -58,6 +58,13 @@ func rootCmd() *cobra.Command { "root path/location for the specified backend (e.g. bucket name for AWS S3)", ) + root.PersistentFlags().StringVar( + &opts.prefix, + prefixFlag, + "", + "directory prefix for saving parquet files", + ) + root.PersistentFlags().VarP( enumflag.New(&opts.verbosity, verbosityFlag, logLevelIDs, enumflag.EnumCaseInsensitive), verbosityFlag, diff --git a/cmd/server.go b/cmd/server.go index f67d961..31b8a90 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -117,8 +117,17 @@ func (self *promserver) sendTimeseries(ctx context.Context, timeserieses []promp var ok bool nameLabel, _ := lo.Find(ts.Labels, func(i prompb.Label) bool { return i.Name == model.MetricNameLabel }) - prefixLabel, _ := lo.Find(ts.Labels, func(i prompb.Label) bool { return i.Name == prefixLabelKey }) - channelName := prefixLabel.Value + "/" + nameLabel.Value + prefixLabel, prefixFound := lo.Find(ts.Labels, func(i prompb.Label) bool { return i.Name == prefixLabelKey }) + + prefix := prefixLabel.Value + if !prefixFound || prefix == "" { + prefix = self.opts.prefix + } + + channelName := prefix + "/" + nameLabel.Value + if prefix == "" { + channelName = nameLabel.Value + } log.Debugf("received timeseries data for %s", channelName) diff --git a/cmd/server_test.go b/cmd/server_test.go index d4053d7..3e38ce8 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -121,3 +121,69 @@ func TestSpawnWriter(t *testing.T) { assert.Nil(t, err) assert.Contains(t, srv.channels, channelName) } + +func TestSendTimeseriesWithCLIPrefix(t *testing.T) { + cliPrefix := "cli-prefix" + srv := newServer(&options{prefix: cliPrefix}) + expectedChannelName := cliPrefix + "/" + metricName + srv.channels[expectedChannelName] = make(chan prompb.TimeSeries) + + ts := prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: model.MetricNameLabel, + Value: metricName, + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Value: 1.0, + Timestamp: 0, + }, + }, + } + + go func() { + err := srv.sendTimeseries(context.TODO(), []prompb.TimeSeries{ts}) + assert.Nil(t, err) + }() + + val := <-srv.channels[expectedChannelName] + assert.Equal(t, ts, val) +} + +func TestSendTimeseriesWithoutPrefix(t *testing.T) { + srv := newServer(&options{}) + srv.channels[metricName] = make(chan prompb.TimeSeries) + + ts := prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: model.MetricNameLabel, + Value: metricName, + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Value: 1.0, + Timestamp: 0, + }, + }, + } + + go func() { + err := srv.sendTimeseries(context.TODO(), []prompb.TimeSeries{ts}) + assert.Nil(t, err) + }() + + val := <-srv.channels[metricName] + assert.Equal(t, ts, val) +}