@@ -27,15 +27,19 @@ import (
2727
2828 "go.uber.org/net/metrics"
2929 "go.uber.org/net/metrics/bucket"
30+ "go.uber.org/yarpc/api/metrics/metricstagdecorator"
3031 "go.uber.org/yarpc/api/transport"
3132 "go.uber.org/yarpc/internal/digester"
33+ "go.uber.org/yarpc/internal/sampledlogger"
3234 "go.uber.org/zap"
3335 "go.uber.org/zap/zapcore"
3436)
3537
3638var (
3739 _timeNow = time .Now // for tests
3840 _defaultGraphSize = 128
41+ _logInterval = time .Duration (5 * time .Minute )
42+
3943 // Latency buckets for histograms. At some point, we may want to make these
4044 // configurable.
4145 _bucketsMs = bucket .NewRPCLatency ()
@@ -72,9 +76,10 @@ type graph struct {
7276 logger * zap.Logger
7377 extract ContextExtractor
7478 ignoreMetricsTag * metricsTagIgnore
75-
76- edgesMu sync.RWMutex
77- edges map [string ]* edge
79+ decoratorTags []metricstagdecorator.MetricsTagDecorator
80+ sampledLogger * sampledlogger.SampledLogger
81+ edgesMu sync.RWMutex
82+ edges map [string ]* edge
7883
7984 inboundLevels , outboundLevels levels
8085}
@@ -164,13 +169,15 @@ func (m *metricsTagIgnore) tags(req *transport.Request, direction string, rpcTyp
164169 return tags
165170}
166171
167- func newGraph (meter * metrics.Scope , logger * zap.Logger , extract ContextExtractor , metricTagsIgnore []string ) graph {
172+ func newGraph (meter * metrics.Scope , logger * zap.Logger , extract ContextExtractor , metricTagsIgnore []string , metricsTagsDecorators []metricstagdecorator. MetricsTagDecorator ) graph {
168173 return graph {
169174 edges : make (map [string ]* edge , _defaultGraphSize ),
170175 meter : meter ,
171176 logger : logger ,
172177 extract : extract ,
173178 ignoreMetricsTag : newMetricsTagIgnore (metricTagsIgnore ),
179+ decoratorTags : metricsTagsDecorators ,
180+ sampledLogger : sampledlogger .NewSampledLogger (_logInterval , logger ),
174181 inboundLevels : levels {
175182 success : zapcore .DebugLevel ,
176183 failure : zapcore .ErrorLevel ,
@@ -220,7 +227,7 @@ func (g *graph) begin(ctx context.Context, rpcType transport.Type, direction dir
220227 if ! g .ignoreMetricsTag .rpcType {
221228 d .Add (rpcType .String ())
222229 }
223- e := g .getOrCreateEdge (d .Digest (), req , string (direction ), rpcType )
230+ e := g .getOrCreateEdge (ctx , d .Digest (), req , string (direction ), rpcType )
224231 d .Free ()
225232
226233 levels := & g .inboundLevels
@@ -240,11 +247,11 @@ func (g *graph) begin(ctx context.Context, rpcType transport.Type, direction dir
240247 }
241248}
242249
243- func (g * graph ) getOrCreateEdge (key []byte , req * transport.Request , direction string , rpcType transport.Type ) * edge {
250+ func (g * graph ) getOrCreateEdge (ctx context. Context , key []byte , req * transport.Request , direction string , rpcType transport.Type ) * edge {
244251 if e := g .getEdge (key ); e != nil {
245252 return e
246253 }
247- return g .createEdge (key , req , direction , rpcType )
254+ return g .createEdge (ctx , key , req , direction , rpcType )
248255}
249256
250257func (g * graph ) getEdge (key []byte ) * edge {
@@ -254,7 +261,7 @@ func (g *graph) getEdge(key []byte) *edge {
254261 return e
255262}
256263
257- func (g * graph ) createEdge (key []byte , req * transport.Request , direction string , rpcType transport.Type ) * edge {
264+ func (g * graph ) createEdge (ctx context. Context , key []byte , req * transport.Request , direction string , rpcType transport.Type ) * edge {
258265 g .edgesMu .Lock ()
259266 // Since we'll rarely hit this code path, the overhead of defer is acceptable.
260267 defer g .edgesMu .Unlock ()
@@ -264,7 +271,7 @@ func (g *graph) createEdge(key []byte, req *transport.Request, direction string,
264271 return e
265272 }
266273
267- e := newEdge (g .logger , g .meter , g .ignoreMetricsTag , req , direction , rpcType )
274+ e := newEdge (ctx , g .logger , g .sampledLogger , g . meter , g .ignoreMetricsTag , g . decoratorTags , req , direction , rpcType )
268275 g .edges [string (key )] = e
269276 return e
270277}
@@ -309,9 +316,17 @@ type streamEdge struct {
309316
310317// newEdge constructs a new edge. Since Registries enforce metric uniqueness,
311318// edges should be cached and re-used for each RPC.
312- func newEdge (logger * zap.Logger , meter * metrics.Scope , tagToIgnore * metricsTagIgnore , req * transport.Request , direction string , rpcType transport.Type ) * edge {
319+ func newEdge (ctx context. Context , logger * zap.Logger , sampledLogger * sampledlogger. SampledLogger , meter * metrics.Scope , tagToIgnore * metricsTagIgnore , decoratorTags []metricstagdecorator. MetricsTagDecorator , req * transport.Request , direction string , rpcType transport.Type ) * edge {
313320 tags := tagToIgnore .tags (req , direction , rpcType )
314321
322+ // Merge custom decorator tags into the YARPC metrics base tag set.
323+ customDecoratorTags := getDecoratorTags (ctx , sampledLogger , decoratorTags )
324+ if customDecoratorTags != nil {
325+ for key , value := range * customDecoratorTags {
326+ tags [key ] = value
327+ }
328+ }
329+
315330 // metrics for all RPCs
316331 calls , err := meter .Counter (metrics.Spec {
317332 Name : "calls" ,
@@ -603,3 +618,23 @@ func unknownIfEmpty(t string) string {
603618 }
604619 return t
605620}
621+
622+ // getDecoratorTags collects tags from all provided MetricsTagDecorators.
623+ func getDecoratorTags (ctx context.Context , sampledLogger * sampledlogger.SampledLogger , metricsTagsDecorators []metricstagdecorator.MetricsTagDecorator ) * metrics.Tags {
624+ if len (metricsTagsDecorators ) == 0 {
625+ return nil
626+ }
627+
628+ decoratorTags := metrics.Tags {}
629+ for _ , decorator := range metricsTagsDecorators {
630+ tags := decorator .ProvideTags (ctx )
631+ for key , value := range tags {
632+ if oldValue , exists := decoratorTags [key ]; exists {
633+ sampledLogger .Warn ("MetricsTagsDecorators is overwriting metric tag" , zap .String ("key" , key ), zap .String ("old" , oldValue ), zap .String ("new" , value ))
634+ }
635+ decoratorTags [key ] = value
636+ }
637+ }
638+
639+ return & decoratorTags
640+ }
0 commit comments