2020#include " envoy/registry/registry.h"
2121#include " envoy/server/factory_context.h"
2222#include " envoy/singleton/manager.h"
23+ #include " envoy/thread_local/thread_local.h"
2324#include " extensions/common/metadata_object.h"
2425#include " parser/parser.h"
2526#include " source/common/grpc/common.h"
@@ -313,7 +314,7 @@ struct Context : public Singleton::Instance {
313314
314315using ContextSharedPtr = std::shared_ptr<Context>;
315316
316- SINGLETON_MANAGER_REGISTRATION (Context )
317+ SINGLETON_MANAGER_REGISTRATION (istio_stats_filter_context )
317318
318319using google::api::expr::runtime::CelValue;
319320
@@ -418,21 +419,21 @@ struct MetricOverrides : public Logger::Loggable<Logger::Id::filter> {
418419// periodically to replace the current scope.
419420//
420421// The replaced stats scope is deleted gracefully after a minimum of 1s delay
421- // for two reasons:
422- //
423- // 1. Stats flushing is asynchronous and the data may be lost if not flushed
424- // before the deletion (see stats_flush_interval).
425- //
426- // 2. The implementation avoids locking by releasing a raw pointer to workers.
427- // When the rotation happens on the main, the raw pointer may still be in-use
428- // by workers for a short duration.
422+ // because of stats flushing is asynchronous and the data may be lost if not
423+ // flushed before the deletion (see stats_flush_interval).
429424class RotatingScope : public Logger ::Loggable<Logger::Id::filter> {
430425public:
431426 RotatingScope (Server::Configuration::FactoryContext& factory_context, uint64_t rotate_interval_ms,
432427 uint64_t delete_interval_ms)
433428 : parent_scope_(factory_context.scope()), active_scope_(parent_scope_.createScope(" " )),
434- raw_scope_ (active_scope_.get()), rotate_interval_ms_(rotate_interval_ms),
429+ tls_scope_ (factory_context.serverFactoryContext().threadLocal()),
430+ rotate_interval_ms_(rotate_interval_ms),
435431 delete_interval_ms_(delete_interval_ms) {
432+
433+ tls_scope_.set ([&scope = *active_scope_](Event::Dispatcher&){
434+ return std::make_shared<TlsCachedScope>(scope);
435+ });
436+
436437 if (rotate_interval_ms_ > 0 ) {
437438 ASSERT (delete_interval_ms_ < rotate_interval_ms_);
438439 ASSERT (delete_interval_ms_ >= 1000 );
@@ -452,36 +453,58 @@ class RotatingScope : public Logger::Loggable<Logger::Id::filter> {
452453 delete_timer_.reset ();
453454 }
454455 }
455- Stats::Scope* scope () { return raw_scope_. load () ; }
456+ Stats::Scope& scope () { return tls_scope_-> _scope ; }
456457
457458private:
459+ struct TlsCachedScope :ThreadLocal::ThreadLocalObject{
460+ TlsCachedScope (Stats::Scope& scope):_scope(scope) {};
461+ std::reference_wrapper<Stats::Scope> _scope;
462+ };
463+
458464 void onRotate () {
459465 ENVOY_LOG (info, " Rotating active Istio stats scope after {}ms." , rotate_interval_ms_);
460466 draining_scope_ = active_scope_;
461- delete_timer_->enableTimer (std::chrono::milliseconds (delete_interval_ms_));
462467 active_scope_ = parent_scope_.createScope (" " );
463- raw_scope_.store (active_scope_.get ());
464- rotate_timer_->enableTimer (std::chrono::milliseconds (rotate_interval_ms_));
468+ tls_scope_.runOnAllThreads (
469+ [&scope = *active_scope_](OptRef<TlsCachedScope> tls_cache) {
470+ tls_cache->_scope = scope;
471+ },
472+ // Start the delete and rotate timer after the new scope has been propagated to all worker threads.
473+ // The RotatingScope instance can go away before the dispatcher has a chance to execute the callback
474+ // and the still_alive shared_ptr will be deallocated when the current instance is deallocated.
475+ // We rely on a weak_ptr to still_alive flag to determine if the instance is still valid.
476+ [this , maybe_still_alive = std::weak_ptr<bool >(still_alive_)]() -> void {
477+ if (!maybe_still_alive.expired ()){
478+ delete_timer_->enableTimer (std::chrono::milliseconds (delete_interval_ms_));
479+ rotate_timer_->enableTimer (std::chrono::milliseconds (rotate_interval_ms_));
480+ }
481+ });
465482 }
466483 void onDelete () {
467484 ENVOY_LOG (info, " Deleting draining Istio stats scope after {}ms." , delete_interval_ms_);
468485 draining_scope_.reset ();
469486 }
487+
470488 Stats::Scope& parent_scope_;
471489 Stats::ScopeSharedPtr active_scope_;
472- std::atomic<Stats::Scope*> raw_scope_;
473490 Stats::ScopeSharedPtr draining_scope_{nullptr };
491+ ThreadLocal::TypedSlot<TlsCachedScope> tls_scope_;
474492 const uint64_t rotate_interval_ms_;
475493 const uint64_t delete_interval_ms_;
476494 Event::TimerPtr rotate_timer_{nullptr };
477495 Event::TimerPtr delete_timer_{nullptr };
496+
497+ // A sentinel shared_ptr used for keeping track of whether the RotatingContext is still alive.
498+ // It is only held by a weak reference in the callback that will be invoked after the new active
499+ // scope has been propagated to all worker threads.
500+ std::shared_ptr<bool > still_alive_{std::make_shared<bool >(true )};
478501};
479502
480503struct Config : public Logger ::Loggable<Logger::Id::filter> {
481504 Config (const stats::PluginConfig& proto_config,
482505 Server::Configuration::FactoryContext& factory_context)
483506 : context_(factory_context.serverFactoryContext().singletonManager().getTyped<Context>(
484- SINGLETON_MANAGER_REGISTERED_NAME (Context ),
507+ SINGLETON_MANAGER_REGISTERED_NAME (istio_stats_filter_context ),
485508 [&factory_context] {
486509 return std::make_shared<Context>(
487510 factory_context.serverFactoryContext ().scope ().symbolTable (),
@@ -514,7 +537,7 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
514537 break ;
515538 }
516539 if (proto_config.metrics_size () > 0 || proto_config.definitions_size () > 0 ) {
517- metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope ()-> symbolTable ());
540+ metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope (). symbolTable ());
518541 for (const auto & definition : proto_config.definitions ()) {
519542 const auto & it = context_->all_metrics_ .find (definition.name ());
520543 if (it != context_->all_metrics_ .end ()) {
@@ -698,12 +721,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
698721 return ;
699722 }
700723 auto new_tags = parent_.metric_overrides_ ->overrideTags (metric, tags, expr_values_);
701- Stats::Utility::counterFromStatNames (* parent_.scope (),
724+ Stats::Utility::counterFromStatNames (parent_.scope (),
702725 {parent_.context_ ->stat_namespace_ , metric}, new_tags)
703726 .add (amount);
704727 return ;
705728 }
706- Stats::Utility::counterFromStatNames (* parent_.scope (),
729+ Stats::Utility::counterFromStatNames (parent_.scope (),
707730 {parent_.context_ ->stat_namespace_ , metric}, tags)
708731 .add (amount);
709732 }
@@ -717,12 +740,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
717740 }
718741 auto new_tags = parent_.metric_overrides_ ->overrideTags (metric, tags, expr_values_);
719742 Stats::Utility::histogramFromStatNames (
720- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, new_tags)
743+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, new_tags)
721744 .recordValue (value);
722745 return ;
723746 }
724747 Stats::Utility::histogramFromStatNames (
725- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, tags)
748+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, tags)
726749 .recordValue (value);
727750 }
728751
@@ -735,17 +758,17 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
735758 switch (metric.type_ ) {
736759 case MetricOverrides::MetricType::Counter:
737760 Stats::Utility::counterFromStatNames (
738- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ }, tags)
761+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ }, tags)
739762 .add (amount);
740763 break ;
741764 case MetricOverrides::MetricType::Histogram:
742765 Stats::Utility::histogramFromStatNames (
743- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ },
766+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ },
744767 Stats::Histogram::Unit::Bytes, tags)
745768 .recordValue (amount);
746769 break ;
747770 case MetricOverrides::MetricType::Gauge:
748- Stats::Utility::gaugeFromStatNames (* parent_.scope (),
771+ Stats::Utility::gaugeFromStatNames (parent_.scope (),
749772 {parent_.context_ ->stat_namespace_ , metric.name_ },
750773 Stats::Gauge::ImportMode::Accumulate, tags)
751774 .set (amount);
@@ -769,14 +792,14 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
769792 tags.push_back ({context_->tag_ , context_->istio_version_ .empty () ? context_->unknown_
770793 : context_->istio_version_ });
771794
772- Stats::Utility::gaugeFromStatNames (* scope (),
795+ Stats::Utility::gaugeFromStatNames (scope (),
773796 {context_->stat_namespace_ , context_->istio_build_ },
774797 Stats::Gauge::ImportMode::Accumulate, tags)
775798 .set (1 );
776799 }
777800
778801 Reporter reporter () const { return reporter_; }
779- Stats::Scope* scope () { return scope_.scope (); }
802+ Stats::Scope& scope () { return scope_.scope (); }
780803
781804 ContextSharedPtr context_;
782805 RotatingScope scope_;
@@ -795,7 +818,7 @@ class IstioStatsFilter : public Http::PassThroughFilter,
795818 public Network::ConnectionCallbacks {
796819public:
797820 IstioStatsFilter (ConfigSharedPtr config)
798- : config_(config), context_(*config->context_), pool_(config->scope ()-> symbolTable()),
821+ : config_(config), context_(*config->context_), pool_(config->scope (). symbolTable()),
799822 stream_(*config_, pool_) {
800823 tags_.reserve (25 );
801824 switch (config_->reporter ()) {
0 commit comments