Skip to content

Commit

Permalink
Tempo support (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
Avi-Robusta authored Jan 15, 2025
1 parent 109cbd5 commit 542440b
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 32 deletions.
3 changes: 2 additions & 1 deletion holmes/plugins/toolsets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from holmes.core.supabase_dal import SupabaseDal
from holmes.plugins.toolsets.findings import FindingsToolset
from holmes.plugins.toolsets.grafana_loki import GrafanaConfig, GrafanaLokiToolset
from holmes.plugins.toolsets.grafana_tempo import GrafanaTempoToolset
from holmes.plugins.toolsets.internet import InternetToolset
from pydantic import BaseModel

Expand Down Expand Up @@ -42,7 +43,7 @@ def load_toolsets_from_file(path: str, silent_fail: bool = False) -> List[YAMLTo

def load_python_toolsets(dal:Optional[SupabaseDal], grafana_config:GrafanaConfig) -> List[Toolset]:
logging.debug("loading python toolsets")
return [InternetToolset(), FindingsToolset(dal), GrafanaLokiToolset(grafana_config.loki)]
return [InternetToolset(), FindingsToolset(dal), GrafanaLokiToolset(grafana_config.loki), GrafanaTempoToolset()]


def load_builtin_toolsets(dal:Optional[SupabaseDal] = None, grafana_config:GrafanaConfig = GrafanaConfig()) -> List[Toolset]:
Expand Down
69 changes: 47 additions & 22 deletions holmes/plugins/toolsets/grafana/loki_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import logging
import os
import requests
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Optional
import backoff

def headers(api_key:str):
return {
Expand Down Expand Up @@ -46,23 +47,31 @@ def parse_loki_response(results: List[Dict]) -> List[Dict]:
})
return parsed_logs

@backoff.on_exception(
backoff.expo, # Exponential backoff
requests.exceptions.RequestException, # Retry on request exceptions
max_tries=5, # Maximum retries
giveup=lambda e: isinstance(e, requests.exceptions.HTTPError) and e.response.status_code < 500,
)
def execute_loki_query(
loki_datasource_id:str,
loki_datasource_id: str,
query: str,
start: int,
end: int,
limit: int) -> List[Dict]:
limit: int
) -> List[Dict]:
"""
Execute a Loki query through Grafana
Execute a Loki query through Grafana with retry and backoff.
Args:
query: Loki query string
start: Start of the time window to fetch the logs for. Epoch timestamp in seconds
end: End of the time window to fetch the logs for. Epoch timestamp in seconds
limit: Maximum number of log lines to return
loki_datasource_id: The ID of the Loki datasource.
query: Loki query string.
start: Start of the time window to fetch the logs for (Epoch timestamp in seconds).
end: End of the time window to fetch the logs for (Epoch timestamp in seconds).
limit: Maximum number of log lines to return.
Returns:
List of log entries
List of log entries.
"""

params = {
Expand Down Expand Up @@ -90,29 +99,45 @@ def execute_loki_query(
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to query Loki logs: {str(e)}")

def list_loki_datasources() -> List[Dict]:

@backoff.on_exception(
backoff.expo, # Exponential backoff
requests.exceptions.RequestException, # Retry on request exceptions
max_tries=5, # Maximum retries
giveup=lambda e: isinstance(e, requests.exceptions.HTTPError) and e.response.status_code < 500,
)
def list_grafana_datasources(source_name: Optional[str] = None) -> List[Dict]:
"""
List all configured Loki datasources from a Grafana instance
List all configured datasources from a Grafana instance with retry and backoff.
Args:
source_name: Optional. Filter for datasources matching this type.
Returns:
List of Loki datasource configurations
List of datasource configurations.
"""
try:
(grafana_url, api_key) = get_connection_info()
response = requests.get(
f'{grafana_url}/api/datasources',
headers=headers(api_key=api_key)
)
grafana_url, api_key = get_connection_info()
url = f'{grafana_url}/api/datasources'
headers_ = headers(api_key=api_key)

logging.info(f"Fetching datasources from: {url}")
response = requests.get(url, headers=headers_, timeout=10) # Added timeout
response.raise_for_status()

datasources = response.json()
if not source_name:
return datasources

relevant_datasources = [
ds for ds in datasources
if ds['type'].lower() == source_name.lower()
]

loki_datasources = []
for ds in datasources:
for ds in relevant_datasources:
logging.info(f"Found datasource: {ds['name']} (type: {ds['type']}, id: {ds['id']})")
if ds['type'].lower() == 'loki':
loki_datasources.append(ds)

return loki_datasources
return relevant_datasources
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to list datasources: {str(e)}")

Expand Down
214 changes: 214 additions & 0 deletions holmes/plugins/toolsets/grafana/tempo_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import logging
import os
import requests
from typing import Dict, List, Tuple
import backoff

# Constants for environment variables
GRAFANA_URL_ENV_NAME = "GRAFANA_URL"
GRAFANA_API_KEY_ENV_NAME = "GRAFANA_API_KEY"

def headers(api_key: str):
return {
'Authorization': f'Bearer {api_key}',
'Accept': 'application/json',
'Content-Type': 'application/json'
}

def get_connection_info() -> Tuple[str, str]:
"""
Retrieves Grafana connection details from environment variables.
"""
grafana_url = os.environ.get(GRAFANA_URL_ENV_NAME)
if not grafana_url:
raise Exception(f'Missing env var {GRAFANA_URL_ENV_NAME}')
api_key = os.environ.get(GRAFANA_API_KEY_ENV_NAME)
if not api_key:
raise Exception(f'Missing env var {GRAFANA_API_KEY_ENV_NAME}')
return grafana_url, api_key



def execute_tempo_query_with_retry(tempo_datasource_id: str, query_params: dict, retries: int = 3, timeout: int = 5):
"""
Execute a Tempo API query through Grafana with retries and timeout.
Args:
tempo_datasource_id: The ID of the Tempo datasource.
query_params: Query parameters for the API.
retries: Number of retries for the request.
timeout: Timeout for each request in seconds.
Returns:
List of trace results.
"""
grafana_url, api_key = get_connection_info()
url = f'{grafana_url}/api/datasources/proxy/{tempo_datasource_id}/api/search'

@backoff.on_exception(
backoff.expo, # Exponential backoff
requests.exceptions.RequestException, # Retry on request exceptions
max_tries=retries, # Maximum retries
giveup=lambda e: isinstance(e, requests.exceptions.HTTPError) and e.response.status_code < 500,
)
def make_request():
response = requests.post(
url,
headers={
'Authorization': f'Bearer {api_key}',
'Accept': 'application/json',
'Content-Type': 'application/json',
},
json=query_params,
timeout=timeout, # Set timeout for the request
)
response.raise_for_status() # Raise an error for non-2xx responses
return response.json()

try:
return make_request()
except requests.exceptions.RequestException as e:
raise Exception(f"Request to Tempo API failed after retries: {e}")


def query_tempo_traces_by_duration(
tempo_datasource_id: str,
min_duration: str,
start: int,
end: int,
limit: int = 50,
) -> List[Dict]:
"""
Query Tempo for traces exceeding a minimum duration.
Args:
tempo_datasource_id: The ID of the Tempo datasource.
min_duration: Minimum duration for traces (e.g., "5s").
start: Start of the time range (epoch in seconds).
end: End of the time range (epoch in seconds).
limit: Maximum number of traces to return.
Returns:
List of trace results.
"""
query_params = {
"minDuration": min_duration,
"start": str(start),
"end": str(end),
"limit": str(limit),
}
return execute_tempo_query_with_retry(tempo_datasource_id, query_params)


def query_tempo_trace_by_id(
tempo_datasource_id: str,
trace_id: str,
retries: int = 3,
timeout: int = 5,
) -> Dict:
"""
Query Tempo for a specific trace by its ID with retries and backoff.
Args:
tempo_datasource_id: The ID of the Tempo datasource.
trace_id: The trace ID to retrieve.
retries: Number of retries for the request.
timeout: Timeout for each request in seconds.
Returns:
Trace details.
"""
grafana_url, api_key = get_connection_info()
url = f'{grafana_url}/api/datasources/proxy/{tempo_datasource_id}/api/traces/{trace_id}'

@backoff.on_exception(
backoff.expo, # Exponential backoff
requests.exceptions.RequestException, # Retry on request exceptions
max_tries=retries, # Maximum retries
giveup=lambda e: isinstance(e, requests.exceptions.HTTPError) and e.response.status_code < 500,
)
def make_request():
response = requests.get(
url,
headers={
'Authorization': f'Bearer {api_key}',
'Accept': 'application/json',
},
timeout=timeout, # Set timeout for the request
)
response.raise_for_status() # Raise an error for non-2xx responses
return process_trace_json(response.json())

try:
return make_request()
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to retrieve trace by ID after retries: {e} \n for URL: {url}")

def process_trace_json(trace_json):
result = {
"total_elapsed_time_ms": 0,
"applications": []
}

# First pass: Collect basic details about spans
spans_info = {}
for batch in trace_json.get("batches", []):
attributes = batch.get("resource", {}).get("attributes", [])
app_name = None
service_name = None
for attr in attributes:
key = attr.get("key")
value = attr.get("value", {}).get("stringValue")
if key == "app":
app_name = value

scope_spans = batch.get("scopeSpans", [])
for scope_span in scope_spans:
spans = scope_span.get("spans", [])
for span in spans:
span_id = span.get("spanId")
parent_span_id = span.get("parentSpanId")
start_time = int(span.get("startTimeUnixNano", 0))
end_time = int(span.get("endTimeUnixNano", 0))
elapsed_time_ns = end_time - start_time

spans_info[span_id] = {
"app_name": app_name,
"service_name": service_name,
"parent_span_id": parent_span_id,
"elapsed_time_ms": elapsed_time_ns / 1_000_000,
"exclusive_time_ms": elapsed_time_ns / 1_000_000,
"start_time": start_time,
"end_time": end_time,
"loki_labels": {"app": app_name}
}

# Second pass: Subtract child span times from parent spans
for span_id, span_data in spans_info.items():
parent_span_id = span_data["parent_span_id"]
if parent_span_id in spans_info:
parent_data = spans_info[parent_span_id]
parent_data["exclusive_time_ms"] -= span_data["elapsed_time_ms"]

# Build the result
for span_id, span_data in spans_info.items():
app_info = {
"app_name": span_data["app_name"],
"service_name": span_data["service_name"],
#"elapsed_time_ms": span_data["elapsed_time_ms"], # this confuses the llm
"elapsed_service_time_ms": span_data["exclusive_time_ms"],
"start_time": span_data["start_time"],
"end_time": span_data["end_time"],
"loki_labels": span_data["loki_labels"]
}

if app_info["app_name"]:
result["applications"].append(app_info)

# Set the total elapsed time to the root span's time (if available)
root_span = max(spans_info.values(), key=lambda x: x["elapsed_time_ms"], default=None)
if root_span:
result["total_elapsed_time_ms"] = root_span["elapsed_time_ms"]

return result

Loading

0 comments on commit 542440b

Please sign in to comment.