Skip to content

Commit

Permalink
redis: added endpoint filtering
Browse files Browse the repository at this point in the history
Signed-off-by: Cagri Yonca <[email protected]>
  • Loading branch information
CagriYonca committed Jan 15, 2025
1 parent 254e6a5 commit ad0ad74
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 30 deletions.
53 changes: 48 additions & 5 deletions src/instana/collector/helpers/runtime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# (c) Copyright IBM Corp. 2021
# (c) Copyright Instana Inc. 2020

""" Collection helper for the Python runtime """
"""Collection helper for the Python runtime"""

import gc
import importlib.metadata
import os
Expand All @@ -19,16 +20,58 @@

PATH_OF_DEPRECATED_INSTALLATION_VIA_HOST_AGENT = "/tmp/.instana/python"

PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR = '/opt/instana/instrumentation/python/'
PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR = "/opt/instana/instrumentation/python/"


def is_autowrapt_instrumented():
return 'instana' in os.environ.get('AUTOWRAPT_BOOTSTRAP', ())
return "instana" in os.environ.get("AUTOWRAPT_BOOTSTRAP", ())


def is_webhook_instrumented():
return any(map(lambda p: PATH_OF_AUTOTRACE_WEBHOOK_SITEDIR in p, sys.path))


def parse_ignored_endpoints(option_string):
"""
This function parses option string to prepare a list for ignored endpoints.
@param option_string [String] The string user enter with INSTANA_IGNORE_ENDPOINTS variable
Format: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
"""
try:
ignored_endpoints = []
if not option_string or not isinstance(option_string, str):
return ignored_endpoints

instrumentation_pairs = option_string.split(";")

for pair in instrumentation_pairs:
instrumentation = pair.strip()

if not instrumentation:
continue

if ":" in pair:
instrumentation, endpoints = pair.split(":", 1)
instrumentation = instrumentation.strip()

# Split endpoints by comma and clean whitespace
endpoint_list = [
ep.strip() for ep in endpoints.split(",") if ep.strip()
]
if endpoint_list:
for endpoint in endpoint_list:
ignored_endpoints.append(f"{instrumentation}.{endpoint}")
else:
# Handle case where only service name is provided
ignored_endpoints.append(instrumentation)

return ignored_endpoints
except Exception as e:
logger.debug("Error parsing ignored endpoints: %s", str(e))
return []


class RuntimeHelper(BaseHelper):
"""Helper class to collect snapshot and metrics for this Python runtime"""

Expand Down Expand Up @@ -316,9 +359,9 @@ def _collect_runtime_snapshot(self, plugin_data):
snapshot_payload["iv"] = VERSION

if is_autowrapt_instrumented():
snapshot_payload['m'] = 'Autowrapt'
snapshot_payload["m"] = "Autowrapt"
elif is_webhook_instrumented():
snapshot_payload['m'] = 'AutoTrace'
snapshot_payload["m"] = "AutoTrace"
else:
snapshot_payload["m"] = "Manual"

Expand Down
4 changes: 3 additions & 1 deletion src/instana/instrumentation/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from instana.log import logger
from instana.span.span import InstanaSpan
from instana.util.traceutils import get_tracer_tuple, tracing_is_off
from instana.util.traceutils import check_if_ignored, get_tracer_tuple, tracing_is_off

try:
import redis
Expand Down Expand Up @@ -43,6 +43,8 @@ def execute_command_with_instana(
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
if check_if_ignored("redis", args[0]):
return
tracer, parent_span, operation_name = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

Expand Down
68 changes: 45 additions & 23 deletions src/instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,67 @@
- AWSFargateOptions - Options class for AWS Fargate. Holds settings specific to AWS Fargate.
- GCROptions - Options class for Google cloud Run. Holds settings specific to GCR.
"""

import os
import logging

from .log import logger
from .util.runtime import determine_service_name
from instana.collector.helpers.runtime import parse_ignored_endpoints

from instana.log import logger
from instana.util.runtime import determine_service_name


class BaseOptions(object):
""" Base class for all option classes. Holds items common to all """
"""Base class for all option classes. Holds items common to all"""

def __init__(self, **kwds):
self.debug = False
self.log_level = logging.WARN
self.service_name = determine_service_name()
self.extra_http_headers = None
self.allow_exit_as_root = False
self.ignored_endpoints = {}

if "INSTANA_DEBUG" in os.environ:
self.log_level = logging.DEBUG
self.debug = True

if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ:
self.extra_http_headers = str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(';')
self.extra_http_headers = (
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
)

if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == '1':
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignored_endpoints = parse_ignored_endpoints(
os.environ["INSTANA_IGNORE_ENDPOINTS"]
)

if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
self.allow_exit_as_root = True

# Defaults
self.secrets_matcher = 'contains-ignore-case'
self.secrets_list = ['key', 'pass', 'secret']
self.secrets_matcher = "contains-ignore-case"
self.secrets_list = ["key", "pass", "secret"]

# Env var format: <matcher>:<secret>[,<secret>]
self.secrets = os.environ.get("INSTANA_SECRETS", None)

if self.secrets is not None:
parts = self.secrets.split(':')
parts = self.secrets.split(":")
if len(parts) == 2:
self.secrets_matcher = parts[0]
self.secrets_list = parts[1].split(',')
self.secrets_list = parts[1].split(",")
else:
logger.warning("Couldn't parse INSTANA_SECRETS env var: %s", self.secrets)
logger.warning(
"Couldn't parse INSTANA_SECRETS env var: %s", self.secrets
)

self.__dict__.update(kwds)


class StandardOptions(BaseOptions):
""" The options class used when running directly on a host/node with an Instana agent """
"""The options class used when running directly on a host/node with an Instana agent"""

AGENT_DEFAULT_HOST = "localhost"
AGENT_DEFAULT_PORT = 42699

Expand All @@ -74,7 +88,7 @@ def __init__(self, **kwds):


class ServerlessOptions(BaseOptions):
""" Base class for serverless environments. Holds settings common to all serverless environments. """
"""Base class for serverless environments. Holds settings common to all serverless environments."""

def __init__(self, **kwds):
super(ServerlessOptions, self).__init__()
Expand All @@ -86,7 +100,7 @@ def __init__(self, **kwds):
if self.endpoint_url is not None and self.endpoint_url[-1] == "/":
self.endpoint_url = self.endpoint_url[:-1]

if 'INSTANA_DISABLE_CA_CHECK' in os.environ:
if "INSTANA_DISABLE_CA_CHECK" in os.environ:
self.ssl_verify = False
else:
self.ssl_verify = True
Expand All @@ -95,7 +109,7 @@ def __init__(self, **kwds):
if proxy is None:
self.endpoint_proxy = {}
else:
self.endpoint_proxy = {'https': proxy}
self.endpoint_proxy = {"https": proxy}

timeout_in_ms = os.environ.get("INSTANA_TIMEOUT", None)
if timeout_in_ms is None:
Expand All @@ -105,9 +119,14 @@ def __init__(self, **kwds):
try:
self.timeout = int(timeout_in_ms) / 1000
except ValueError:
logger.warning("Likely invalid INSTANA_TIMEOUT=%s value. Using default.", timeout_in_ms)
logger.warning("INSTANA_TIMEOUT should specify timeout in milliseconds. See "
"https://www.instana.com/docs/reference/environment_variables/#serverless-monitoring")
logger.warning(
"Likely invalid INSTANA_TIMEOUT=%s value. Using default.",
timeout_in_ms,
)
logger.warning(
"INSTANA_TIMEOUT should specify timeout in milliseconds. See "
"https://www.instana.com/docs/reference/environment_variables/#serverless-monitoring"
)
self.timeout = 0.8

value = os.environ.get("INSTANA_LOG_LEVEL", None)
Expand All @@ -129,14 +148,14 @@ def __init__(self, **kwds):


class AWSLambdaOptions(ServerlessOptions):
""" Options class for AWS Lambda. Holds settings specific to AWS Lambda. """
"""Options class for AWS Lambda. Holds settings specific to AWS Lambda."""

def __init__(self, **kwds):
super(AWSLambdaOptions, self).__init__()


class AWSFargateOptions(ServerlessOptions):
""" Options class for AWS Fargate. Holds settings specific to AWS Fargate. """
"""Options class for AWS Fargate. Holds settings specific to AWS Fargate."""

def __init__(self, **kwds):
super(AWSFargateOptions, self).__init__()
Expand All @@ -146,9 +165,9 @@ def __init__(self, **kwds):
if tag_list is not None:
try:
self.tags = dict()
tags = tag_list.split(',')
tags = tag_list.split(",")
for tag_and_value in tags:
parts = tag_and_value.split('=')
parts = tag_and_value.split("=")
length = len(parts)
if length == 1:
self.tags[parts[0]] = None
Expand All @@ -159,13 +178,16 @@ def __init__(self, **kwds):

self.zone = os.environ.get("INSTANA_ZONE", None)


class EKSFargateOptions(AWSFargateOptions):
""" Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate. """
"""Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate."""

def __init__(self, **kwds):
super(EKSFargateOptions, self).__init__()


class GCROptions(ServerlessOptions):
""" Options class for Google Cloud Run. Holds settings specific to Google Cloud Run. """
"""Options class for Google Cloud Run. Holds settings specific to Google Cloud Run."""

def __init__(self, **kwds):
super(GCROptions, self).__init__()
7 changes: 7 additions & 0 deletions src/instana/util/traceutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,10 @@ def get_tracer_tuple() -> (

def tracing_is_off() -> bool:
return not (bool(get_active_tracer()) or agent.options.allow_exit_as_root)


def check_if_ignored(instrumentation, command) -> bool:
return (
instrumentation in agent.options.ignored_endpoints
or f"{instrumentation}.{command.lower()}" in agent.options.ignored_endpoints
)
72 changes: 71 additions & 1 deletion tests/clients/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@


import logging
import os
from typing import Generator
from unittest.mock import patch

import pytest
import redis

from instana.options import StandardOptions
from instana.singletons import agent, tracer
from instana.span.span import get_current_span
from tests.helpers import testenv
from instana.singletons import agent, tracer


class TestRedis:
Expand All @@ -21,6 +24,8 @@ def _resource(self) -> Generator[None, None, None]:
self.recorder.clear_spans()
self.client = redis.Redis(host=testenv["redis_host"], db=testenv["redis_db"])
yield
if os.environ.get("INSTANA_IGNORE_ENDPOINTS"):
del os.environ["INSTANA_IGNORE_ENDPOINTS"]
agent.options.allow_exit_as_root = False

def test_set_get(self) -> None:
Expand Down Expand Up @@ -454,3 +459,68 @@ def test_execute_with_instana_exception(
pipe.get("foox")
pipe.execute()
assert "Error collecting pipeline commands" in caplog.messages

def test_ignore_redis(self) -> None:
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis"
agent.options = StandardOptions()

with tracer.start_as_current_span("test"):
self.client.set("foox", "barX")
self.client.get("foox")

spans = self.recorder.queued_spans()
assert len(spans) == 1

sdk_span = spans[0]

assert sdk_span.n == "sdk"

def test_ignore_redis_single_command(self) -> None:
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set"
agent.options = StandardOptions()
with tracer.start_as_current_span("test"):
self.client.set("foox", "barX")
self.client.get("foox")

spans = self.recorder.queued_spans()
assert len(spans) == 2

redis_get_span = spans[0]
sdk_span = spans[1]

assert redis_get_span.n == "redis"
assert redis_get_span.data["redis"]["command"] == "GET"

assert sdk_span.n == "sdk"

def test_ignore_redis_multiple_commands(self) -> None:
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set,get"
agent.options = StandardOptions()
with tracer.start_as_current_span("test"):
self.client.set("foox", "barX")
self.client.get("foox")

spans = self.recorder.queued_spans()
assert len(spans) == 1

sdk_span = spans[0]

assert sdk_span.n == "sdk"

def test_ignore_redis_with_another_instrumentation(self) -> None:
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set;something_else:something"
agent.options = StandardOptions()
with tracer.start_as_current_span("test"):
self.client.set("foox", "barX")
self.client.get("foox")

spans = self.recorder.queued_spans()
assert len(spans) == 2

redis_get_span = spans[0]
sdk_span = spans[1]

assert redis_get_span.n == "redis"
assert redis_get_span.data["redis"]["command"] == "GET"

assert sdk_span.n == "sdk"

0 comments on commit ad0ad74

Please sign in to comment.