From e52ebfea4aecf49df55f419d89c0558c02dd5a80 Mon Sep 17 00:00:00 2001 From: dmitryk-dk Date: Mon, 4 May 2026 20:41:06 +0200 Subject: [PATCH 1/4] add victoriamterics stack as telemetry storage --- research/telemetry_storage_backend/Makefile | 58 +- .../docker-compose.otel.yml | 12 +- .../docker-compose.victoriametrics.yml | 73 ++ .../otel-collector-config.yaml | 18 +- .../correlation_by_timestamp.logql | 1 + .../correlation_by_trace_id.logql | 1 + .../queries/victoriametrics/data_volume.logql | 1 + .../victoriametrics/data_volume.metricsql | 1 + .../victoriametrics/data_volume.traceql | 1 + .../logs_errors_by_service.logql | 1 + .../queries/victoriametrics/logs_recent.logql | 1 + .../victoriametrics/logs_search_error.logql | 1 + .../metrics_by_service_hourly.metricsql | 1 + .../metrics_p95_latency.metricsql | 1 + .../sla_latency_compliance.traceql | 1 + .../spans_error_by_service.traceql | 1 + .../victoriametrics/trace_by_id.traceql | 1 + .../traces_slow_by_service.traceql | 1 + .../requirements.txt | 5 + .../runner/bench_compare.py | 701 +++++++++++++----- .../runner/run_otlp_ingest.py | 77 +- 21 files changed, 739 insertions(+), 219 deletions(-) create mode 100644 research/telemetry_storage_backend/docker-compose.victoriametrics.yml create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/correlation_by_timestamp.logql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/correlation_by_trace_id.logql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/data_volume.logql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/data_volume.metricsql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/data_volume.traceql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/logs_errors_by_service.logql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/logs_recent.logql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/logs_search_error.logql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/metrics_by_service_hourly.metricsql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/metrics_p95_latency.metricsql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/sla_latency_compliance.traceql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/spans_error_by_service.traceql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/trace_by_id.traceql create mode 100644 research/telemetry_storage_backend/queries/victoriametrics/traces_slow_by_service.traceql diff --git a/research/telemetry_storage_backend/Makefile b/research/telemetry_storage_backend/Makefile index 81ee72b..ec73512 100644 --- a/research/telemetry_storage_backend/Makefile +++ b/research/telemetry_storage_backend/Makefile @@ -4,7 +4,7 @@ OUT := $(ROOT)/out DATA_DIR ?= $(ROOT)/telemetry_data BATCH ?= 5000 -.PHONY: up down up-compare up-otel schema load query bench bench-compare bench-otlp +.PHONY: up down up-compare up-vm up-otel schema load query bench bench-compare bench-vm bench-otlp up: docker compose up -d doris @@ -15,7 +15,7 @@ up: up-compare: mkdir -p druid_data/var druid_data/shared - docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml up -d + docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml -f docker-compose.victoriametrics.yml up -d @echo "Waiting for Doris health..." @for i in {1..60}; do \ curl -sf http://localhost:8030/api/health && echo "Doris is healthy" && break || sleep 2; \ @@ -36,9 +36,21 @@ up-compare: @for i in {1..30}; do \ nc -z 127.0.0.1 3100 && echo "Loki is ready" && break || sleep 2; \ done + @echo "Waiting for VictoriaMetrics (port 8428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:8428/-/healthy && echo "VictoriaMetrics is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaLogs (port 9428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:9428/health && echo "VictoriaLogs is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaTraces (port 10428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:10428/health && echo "VictoriaTraces is ready" && break || sleep 2; \ + done up-otel: - docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.otel.yml up -d + docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml -f docker-compose.victoriametrics.yml -f docker-compose.otel.yml up -d @echo "Waiting for Doris health..." @for i in {1..60}; do \ curl -sf http://localhost:8030/api/health && echo "Doris is healthy" && break || sleep 2; \ @@ -47,13 +59,40 @@ up-otel: @for i in {1..60}; do \ curl -sf http://localhost:28223/ping && echo "ClickHouse is healthy" && break || sleep 2; \ done + @echo "Waiting for VictoriaMetrics (port 8428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:8428/-/healthy && echo "VictoriaMetrics is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaLogs (port 9428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:9428/health && echo "VictoriaLogs is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaTraces (port 10428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:10428/health && echo "VictoriaTraces is ready" && break || sleep 2; \ + done @echo "Waiting for OTLP collector..." @for i in {1..30}; do \ nc -z 127.0.0.1 4317 && echo "OTLP collector ready" && break || sleep 2; \ done +up-vm: + docker compose -f docker-compose.victoriametrics.yml up -d + @echo "Waiting for VictoriaMetrics (port 8428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:8428/-/healthy && echo "VictoriaMetrics is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaLogs (port 9428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:9428/health && echo "VictoriaLogs is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaTraces (port 10428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:10428/health && echo "VictoriaTraces is ready" && break || sleep 2; \ + done + down: - docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.otel.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml down -v + docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.otel.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml -f docker-compose.victoriametrics.yml down -v schema: @echo "Applying Doris schema..." @@ -76,6 +115,7 @@ bench-compare: @echo "Running Doris vs ClickHouse vs Druid comparison..." DORIS_USER="root" DORIS_PASS="" CLICKHOUSE_HTTP="http://localhost:28223" CLICKHOUSE_PASSWORD="changeme" \ LOKI_HTTP="http://localhost:3100" \ + VM_HTTP="http://localhost:8428" VL_HTTP="http://localhost:9428" VT_HTTP="http://localhost:10428" \ python3 runner/bench_compare.py --all --data-dir "$(DATA_DIR)" --out "$(OUT)" --batch $(BATCH) $(SCALE_ARGS) $(STREAMING_ARGS) # Scaling: BATCH=10000 make bench-compare @@ -86,10 +126,18 @@ SCALE_ARGS := $(if $(SCALE_TO),--scale-to $(SCALE_TO),) STREAMING_BATCH ?= STREAMING_ARGS := $(if $(STREAMING_BATCH),--streaming-batch $(STREAMING_BATCH),) +bench-vm: + @mkdir -p "$(OUT)" + @echo "Running VictoriaMetrics-only benchmark..." + VM_HTTP="http://localhost:8428" VL_HTTP="http://localhost:9428" VT_HTTP="http://localhost:10428" \ + python3 runner/bench_compare.py --all --vm-only --data-dir "$(DATA_DIR)" --out "$(OUT)" --batch $(BATCH) $(SCALE_ARGS) $(STREAMING_ARGS) + bench-otlp: @mkdir -p "$(OUT)" @echo "Running benchmark with OTLP ingestion ($(OTLP_COUNT) spans, $(OTLP_COUNT) logs, $(OTLP_COUNT) metrics)..." - DORIS_USER="root" DORIS_PASS="" CLICKHOUSE_HTTP="http://localhost:28223" CLICKHOUSE_PASSWORD="changeme" python3 runner/bench_compare.py \ + DORIS_USER="root" DORIS_PASS="" CLICKHOUSE_HTTP="http://localhost:28223" CLICKHOUSE_PASSWORD="changeme" \ + VM_HTTP="http://localhost:8428" VL_HTTP="http://localhost:9428" VT_HTTP="http://localhost:10428" \ + python3 runner/bench_compare.py \ --data-dir "$(DATA_DIR)" --out "$(OUT)" --all --otlp --otlp-count $(OTLP_COUNT) OTLP_COUNT ?= 1000 diff --git a/research/telemetry_storage_backend/docker-compose.otel.yml b/research/telemetry_storage_backend/docker-compose.otel.yml index 2410c70..7a499ad 100644 --- a/research/telemetry_storage_backend/docker-compose.otel.yml +++ b/research/telemetry_storage_backend/docker-compose.otel.yml @@ -13,8 +13,16 @@ services: - "4317:4317" # OTLP gRPC - "4318:4318" # OTLP HTTP depends_on: - - doris - - clickhouse + doris: + condition: service_healthy + clickhouse: + condition: service_healthy + victoriametrics: + condition: service_healthy + victorialogs: + condition: service_healthy + victoriatraces: + condition: service_healthy networks: - default restart: unless-stopped diff --git a/research/telemetry_storage_backend/docker-compose.victoriametrics.yml b/research/telemetry_storage_backend/docker-compose.victoriametrics.yml new file mode 100644 index 0000000..faf9402 --- /dev/null +++ b/research/telemetry_storage_backend/docker-compose.victoriametrics.yml @@ -0,0 +1,73 @@ +# VictoriaMetrics stack: metrics + logs + traces +# Extends main compose: docker compose -f docker-compose.yml -f docker-compose.victoriametrics.yml up -d +# VictoriaMetrics API: http://localhost:8428 +# VictoriaLogs API: http://localhost:9428 +# VictoriaTraces API: http://localhost:10428, OTLP: 14317/14318 + +services: + victoriametrics: + image: victoriametrics/victoria-metrics:v1.142.0 + container_name: tsb-victoriametrics + ports: + - "8428:8428" + volumes: + - vmdata:/storage + command: + - "--storageDataPath=/storage" + - "--httpListenAddr=:8428" + - "--retentionPeriod=100y" + networks: + - default + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:8428/-/healthy"] + interval: 10s + timeout: 5s + retries: 30 + restart: unless-stopped + + victorialogs: + image: victoriametrics/victoria-logs:v1.50.0 + container_name: tsb-victorialogs + ports: + - "9428:9428" + volumes: + - vldata:/vlogs + command: + - "--storageDataPath=/vlogs" + - "--httpListenAddr=:9428" + - "--retentionPeriod=100y" + networks: + - default + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:9428/health"] + interval: 10s + timeout: 5s + retries: 30 + restart: unless-stopped + + victoriatraces: + image: docker.io/victoriametrics/victoria-traces:latest + container_name: tsb-victoriatraces + ports: + - "10428:10428" + - "14317:4317" + - "14318:4318" + volumes: + - vtdata:/vtraces + command: + - "--storageDataPath=/vtraces" + - "--httpListenAddr=:10428" + - "--retentionPeriod=100y" + networks: + - default + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:10428/health"] + interval: 10s + timeout: 5s + retries: 30 + restart: unless-stopped + +volumes: + vmdata: + vldata: + vtdata: diff --git a/research/telemetry_storage_backend/otel-collector-config.yaml b/research/telemetry_storage_backend/otel-collector-config.yaml index 1d92555..21bde61 100644 --- a/research/telemetry_storage_backend/otel-collector-config.yaml +++ b/research/telemetry_storage_backend/otel-collector-config.yaml @@ -33,18 +33,30 @@ exporters: username: default password: changeme create_schema: true + otlphttp/victoriametrics: + endpoint: http://tsb-victoriametrics:8428/opentelemetry + tls: + insecure: true + otlphttp/victorialogs: + endpoint: http://tsb-victorialogs:9428/insert/opentelemetry + tls: + insecure: true + otlphttp/victoriatraces: + endpoint: http://tsb-victoriatraces:10428/insert/opentelemetry + tls: + insecure: true service: pipelines: traces: receivers: [otlp] processors: [batch] - exporters: [doris, clickhouse] + exporters: [doris, clickhouse, otlphttp/victoriatraces] metrics: receivers: [otlp] processors: [batch] - exporters: [doris, clickhouse] + exporters: [doris, clickhouse, otlphttp/victoriametrics] logs: receivers: [otlp] processors: [batch] - exporters: [doris, clickhouse] + exporters: [doris, clickhouse, otlphttp/victorialogs] diff --git a/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_timestamp.logql b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_timestamp.logql new file mode 100644 index 0000000..4449d3c --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_timestamp.logql @@ -0,0 +1 @@ +_time:30d * | stats by (_time:1m, service) count() as event_count | sort by (event_count) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_trace_id.logql b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_trace_id.logql new file mode 100644 index 0000000..8ab136f --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_trace_id.logql @@ -0,0 +1 @@ +_time:30d trace_id:* | stats by (trace_id, service) count() as log_count | sort by (log_count) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/data_volume.logql b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.logql new file mode 100644 index 0000000..38f7ab6 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.logql @@ -0,0 +1 @@ +* | stats count() as total \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/data_volume.metricsql b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.metricsql new file mode 100644 index 0000000..106a556 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.metricsql @@ -0,0 +1 @@ +count({__name__!=""}) \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/data_volume.traceql b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.traceql new file mode 100644 index 0000000..38f7ab6 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.traceql @@ -0,0 +1 @@ +* | stats count() as total \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/logs_errors_by_service.logql b/research/telemetry_storage_backend/queries/victoriametrics/logs_errors_by_service.logql new file mode 100644 index 0000000..a47cb4b --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/logs_errors_by_service.logql @@ -0,0 +1 @@ +_time:1d (level:error OR error) | stats by (service) count() as err_count | sort by (err_count) desc | limit 20 diff --git a/research/telemetry_storage_backend/queries/victoriametrics/logs_recent.logql b/research/telemetry_storage_backend/queries/victoriametrics/logs_recent.logql new file mode 100644 index 0000000..0a46d5b --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/logs_recent.logql @@ -0,0 +1 @@ +* | sort by (_time) desc | limit 100 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/logs_search_error.logql b/research/telemetry_storage_backend/queries/victoriametrics/logs_search_error.logql new file mode 100644 index 0000000..6fc3bdd --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/logs_search_error.logql @@ -0,0 +1 @@ +_time:30d error | sort by (_time) desc | limit 100 diff --git a/research/telemetry_storage_backend/queries/victoriametrics/metrics_by_service_hourly.metricsql b/research/telemetry_storage_backend/queries/victoriametrics/metrics_by_service_hourly.metricsql new file mode 100644 index 0000000..baacec5 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/metrics_by_service_hourly.metricsql @@ -0,0 +1 @@ +avg_over_time({__name__!=""}[1h]) diff --git a/research/telemetry_storage_backend/queries/victoriametrics/metrics_p95_latency.metricsql b/research/telemetry_storage_backend/queries/victoriametrics/metrics_p95_latency.metricsql new file mode 100644 index 0000000..15b3356 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/metrics_p95_latency.metricsql @@ -0,0 +1 @@ +topk(20, quantile_over_time(0.95, {__name__!=""}[365d])) \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/sla_latency_compliance.traceql b/research/telemetry_storage_backend/queries/victoriametrics/sla_latency_compliance.traceql new file mode 100644 index 0000000..1177d81 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/sla_latency_compliance.traceql @@ -0,0 +1 @@ +_time:30d * | stats count() as total, count() if (duration:<500ms) as fast | math fast * 100 / total as pct_under_500ms \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/spans_error_by_service.traceql b/research/telemetry_storage_backend/queries/victoriametrics/spans_error_by_service.traceql new file mode 100644 index 0000000..7d398dd --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/spans_error_by_service.traceql @@ -0,0 +1 @@ +_time:30d (duration:>5s OR "span_attr:http.status_code":>=500) | stats by ("resource_attr:service.name") count() as error_span_count | sort by (error_span_count) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/trace_by_id.traceql b/research/telemetry_storage_backend/queries/victoriametrics/trace_by_id.traceql new file mode 100644 index 0000000..037042e --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/trace_by_id.traceql @@ -0,0 +1 @@ +trace_id:* | sort by (_time) | limit 1 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/traces_slow_by_service.traceql b/research/telemetry_storage_backend/queries/victoriametrics/traces_slow_by_service.traceql new file mode 100644 index 0000000..221972c --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/traces_slow_by_service.traceql @@ -0,0 +1 @@ +_time:1d duration:>500ms | stats by ("resource_attr:service.name") count() as slow_cnt | sort by (slow_cnt) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/requirements.txt b/research/telemetry_storage_backend/requirements.txt index e5ff456..15a042c 100644 --- a/research/telemetry_storage_backend/requirements.txt +++ b/research/telemetry_storage_backend/requirements.txt @@ -1,2 +1,7 @@ # For OceanBase loader (replay_oceanbase.py) pymysql>=1.0.0 +# For all loaders and bench runner +requests>=2.28.0 +# For VictoriaMetrics remote write (protobuf + snappy) +protobuf>=4.21.0 +python-snappy>=0.6.0 diff --git a/research/telemetry_storage_backend/runner/bench_compare.py b/research/telemetry_storage_backend/runner/bench_compare.py index ce73915..85cdddc 100644 --- a/research/telemetry_storage_backend/runner/bench_compare.py +++ b/research/telemetry_storage_backend/runner/bench_compare.py @@ -22,9 +22,11 @@ CH_QDIR = ROOT / "queries" / "clickhouse" DRUID_QDIR = ROOT / "queries" / "druid" OB_QDIR = ROOT / "queries" / "oceanbase" +VM_QDIR = ROOT / "queries" / "victoriametrics" OB_SCHEMA = ROOT / "schemas" / "oceanbase.sql" DORIS_FE_HTTP = os.getenv("DORIS_FE_HTTP", "http://localhost:8030") +DORIS_MYSQL_PORT = int(os.getenv("DORIS_MYSQL_PORT", "9030")) DORIS_PASS = os.getenv("DORIS_PASS", "") CH_HTTP = os.getenv("CLICKHOUSE_HTTP", "http://localhost:8123") CH_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "") @@ -33,8 +35,16 @@ OB_PORT = int(os.getenv("OCEANBASE_PORT", "2881")) OB_CONTAINER = os.getenv("OCEANBASE_CONTAINER", "tsb-oceanbase") LOKI_HTTP = os.getenv("LOKI_HTTP", "http://localhost:3100") +VM_HTTP = os.getenv("VM_HTTP", "http://localhost:8428") +VL_HTTP = os.getenv("VL_HTTP", "http://localhost:9428") +VT_HTTP = os.getenv("VT_HTTP", "http://localhost:10428") DB = "telemetry" +def _port_of(url: str) -> int: + from urllib.parse import urlparse + p = urlparse(url) + return p.port or (443 if p.scheme == "https" else 80) + def _ch_params(extra: dict | None = None) -> dict: params = dict(extra) if extra else {} if CH_PASSWORD: @@ -170,6 +180,45 @@ def truncate_oceanbase_tables() -> None: print(f"[truncate] OceanBase {t}: {e}") print("[truncate] OceanBase tables cleared") +def reset_vm_storage() -> None: + """Stop VM containers, remove volumes, restart with clean storage.""" + compose = ["docker", "compose", "-f", str(ROOT / "docker-compose.yml"), + "-f", str(ROOT / "docker-compose.victoriametrics.yml")] + subprocess.run(compose + ["stop", "victoriametrics", "victorialogs", "victoriatraces"], + capture_output=True, check=False) + subprocess.run(compose + ["rm", "-f", "victoriametrics", "victorialogs", "victoriatraces"], + capture_output=True, check=False) + for vol_suffix in ["vmdata", "vldata", "vtdata"]: + out = subprocess.run(["docker", "volume", "ls", "-q", "--filter", f"name={vol_suffix}"], + capture_output=True, text=True) + for vol in out.stdout.strip().splitlines(): + subprocess.run(["docker", "volume", "rm", "-f", vol], capture_output=True, check=False) + subprocess.run(compose + ["up", "-d", "victoriametrics", "victorialogs", "victoriatraces"], check=True) + print("[truncate] VM storage reset, containers restarted") + +def wait_vm_healthy(timeout_s: int = 120) -> bool: + """Wait until all three VM services are healthy after restart.""" + endpoints = [ + (VM_HTTP, "/-/healthy", "VictoriaMetrics"), + (VL_HTTP, "/health", "VictoriaLogs"), + (VT_HTTP, "/health", "VictoriaTraces"), + ] + for base, path, name in endpoints: + t0 = time.time() + while time.time() - t0 < timeout_s: + try: + r = requests.get(f"{base}{path}", timeout=5) + if r.status_code == 200: + print(f"[wait] {name} healthy") + break + except Exception: + pass + time.sleep(2) + else: + print(f"[wait] {name} not healthy after {timeout_s}s") + return False + return True + def run_doris_query(sql: str) -> dict: pass_arg = f"-p{DORIS_PASS}" if DORIS_PASS else "" sh_script = ( @@ -279,6 +328,82 @@ def bench_loki_logs(run_id: str) -> dict: return res +def run_vm_logql(query: str, run_id: str = "") -> dict: + """Run a LogsQL query against VictoriaLogs native API.""" + url = f"{VL_HTTP}/select/logsql/query" + if run_id: + if " | " in query: + filt, pipes = query.split(" | ", 1) + query = f"run_id:{run_id} {filt} | {pipes}" + else: + query = f"run_id:{run_id} {query}" + params = {"query": query, "limit": 1000} + t0 = time.time() + try: + r = requests.get(url, params=params, timeout=120) + dt = time.time() - t0 + if r.status_code != 200: + return {"latency_s": dt, "rows": 0, "error": r.text[:300]} + rows = len([ln for ln in r.text.strip().splitlines() if ln.strip()]) + return {"latency_s": dt, "rows": rows} + except Exception as e: + return {"latency_s": 0, "rows": 0, "error": str(e)[:200]} + + +def run_vm_metricsql(query: str) -> dict: + """Run a MetricsQL query against VictoriaMetrics.""" + url = f"{VM_HTTP}/api/v1/query" + params = {"query": query, "step": "24h"} + t0 = time.time() + try: + r = requests.get(url, params=params, timeout=120) + dt = time.time() - t0 + if r.status_code != 200: + return {"latency_s": dt, "rows": 0, "error": r.text[:300]} + data = r.json() + results = data.get("data", {}).get("result", []) + return {"latency_s": dt, "rows": len(results)} + except Exception as e: + return {"latency_s": 0, "rows": 0, "error": str(e)[:200]} + + +def run_vm_traceql(query: str) -> dict: + """Run a LogsQL query against VictoriaTraces (uses same query language as VictoriaLogs).""" + url = f"{VT_HTTP}/select/logsql/query" + params = {"query": query, "limit": 1000} + t0 = time.time() + try: + r = requests.get(url, params=params, timeout=120) + dt = time.time() - t0 + if r.status_code != 200: + return {"latency_s": dt, "rows": 0, "error": r.text[:300]} + rows = len([ln for ln in r.text.strip().splitlines() if ln.strip()]) + return {"latency_s": dt, "rows": rows} + except Exception as e: + return {"latency_s": 0, "rows": 0, "error": str(e)[:200]} + + +def bench_vm(run_id: str = "") -> tuple[dict, dict, dict]: + """Run all VM queries. Returns (victorialogs_results, victoriametrics_results, victoriatraces_results).""" + vl, vm, vt = {}, {}, {} + for f in sorted(VM_QDIR.glob("*")): + if f.suffix not in (".logql", ".metricsql", ".traceql"): + continue + name = f.stem + query = f.read_text().strip() + try: + if f.suffix == ".logql": + vl[name] = run_vm_logql(query, run_id) + elif f.suffix == ".metricsql": + vm[name] = run_vm_metricsql(query) + elif f.suffix == ".traceql": + vt[name] = run_vm_traceql(query) + except Exception as e: + target = vl if f.suffix == ".logql" else vm if f.suffix == ".metricsql" else vt + target[name] = {"error": str(e)[:200]} + return vl, vm, vt + + def bench_backend(qdir: Path, run_fn) -> dict: results = {} for f in sorted(qdir.glob("*.sql")): @@ -292,17 +417,15 @@ def bench_backend(qdir: Path, run_fn) -> dict: return results -def get_data_volume(use_doris: bool, use_oceanbase: bool = True) -> tuple: - """Run full-scan COUNT on each backend; return (doris_vol, ch_vol, druid_vol, ob_vol).""" +def get_data_volume(use_doris: bool, use_oceanbase: bool = True, use_vm: bool = False, use_sql: bool = True) -> tuple: + """Run full-scan COUNT on each backend; return (doris_vol, ch_vol, druid_vol, ob_vol, vl_vol, vm_vol, vt_vol).""" doris_vol = {} ch_vol = {} druid_vol = {} ob_vol = {} - sql_doris = (DORIS_QDIR / "data_volume.sql").read_text() - sql_ch = (CH_QDIR / "data_volume.sql").read_text() - sql_druid = (DRUID_QDIR / "data_volume.sql").read_text() - sql_ob = (OB_QDIR / "data_volume.sql").read_text() - + vl_vol = {} + vm_vol = {} + vt_vol = {} def _parse_tsv(lines: list[str]) -> dict: counts = {} for ln in lines: @@ -315,6 +438,7 @@ def _parse_tsv(lines: list[str]) -> dict: if use_doris: try: + sql_doris = (DORIS_QDIR / "data_volume.sql").read_text() pass_arg = f"-p{DORIS_PASS}" if DORIS_PASS else "" sh_script = ( "cat > /tmp/dv.sql <<'EOSQL'\n" + sql_doris + "\nEOSQL\n" @@ -333,45 +457,49 @@ def _parse_tsv(lines: list[str]) -> dict: except Exception as e: doris_vol = {"error": str(e)[:200]} - try: - t0 = time.time() - r = requests.post(CH_HTTP, params=_ch_params({"query": sql_ch}), timeout=120) - dt = time.time() - t0 - if r.status_code == 200: - lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] - ch_vol = _parse_tsv(lines) - ch_vol["latency_s"] = dt - else: - ch_vol = {"error": r.text[:200]} - except Exception as e: - ch_vol = {"error": str(e)[:200]} + if use_sql: + sql_ch = (CH_QDIR / "data_volume.sql").read_text() + sql_druid = (DRUID_QDIR / "data_volume.sql").read_text() + try: + t0 = time.time() + r = requests.post(CH_HTTP, params=_ch_params({"query": sql_ch}), timeout=120) + dt = time.time() - t0 + if r.status_code == 200: + lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] + ch_vol = _parse_tsv(lines) + ch_vol["latency_s"] = dt + else: + ch_vol = {"error": r.text[:200]} + except Exception as e: + ch_vol = {"error": str(e)[:200]} - try: - t0 = time.time() - r = requests.post( - f"{DRUID_HTTP}/druid/v2/sql", - json={"query": sql_druid, "resultFormat": "array"}, - headers={"Content-Type": "application/json"}, - timeout=120, - ) - dt = time.time() - t0 - if r.status_code == 200: - arr = r.json() - counts = {} - for row in (arr or []): - if len(row) >= 2: - tbl = str(row[0]).lower() - cnt = int(row[1]) if isinstance(row[1], (int, float)) else 0 - counts[tbl] = cnt - counts["total"] = counts.get("logs", 0) + counts.get("spans", 0) + counts.get("metrics", 0) - counts["latency_s"] = dt - druid_vol = counts - else: - druid_vol = {"error": r.text[:200]} - except Exception as e: - druid_vol = {"error": str(e)[:200]} + try: + t0 = time.time() + r = requests.post( + f"{DRUID_HTTP}/druid/v2/sql", + json={"query": sql_druid, "resultFormat": "array"}, + headers={"Content-Type": "application/json"}, + timeout=120, + ) + dt = time.time() - t0 + if r.status_code == 200: + arr = r.json() + counts = {} + for row in (arr or []): + if len(row) >= 2: + tbl = str(row[0]).lower() + cnt = int(row[1]) if isinstance(row[1], (int, float)) else 0 + counts[tbl] = cnt + counts["total"] = counts.get("logs", 0) + counts.get("spans", 0) + counts.get("metrics", 0) + counts["latency_s"] = dt + druid_vol = counts + else: + druid_vol = {"error": r.text[:200]} + except Exception as e: + druid_vol = {"error": str(e)[:200]} if use_oceanbase: + sql_ob = (OB_QDIR / "data_volume.sql").read_text() try: t0 = time.time() out = subprocess.run( @@ -389,138 +517,211 @@ def _parse_tsv(lines: list[str]) -> dict: except Exception as e: ob_vol = {"error": str(e)[:200]} - return doris_vol, ch_vol, druid_vol, ob_vol + if use_vm: + # VictoriaLogs: count log entries via stats + try: + t0 = time.time() + r = requests.get(f"{VL_HTTP}/select/logsql/query", + params={"query": "* | stats count() as total", "limit": 1}, + timeout=30) + dt = time.time() - t0 + if r.status_code == 200: + lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] + if lines: + row = json.loads(lines[0]) + vl_vol = {"rows": int(row.get("total", 0)), "latency_s": dt} + else: + vl_vol = {"rows": 0, "latency_s": dt} + except Exception as e: + vl_vol = {"error": str(e)[:200]} + # VictoriaMetrics: count total inserted rows via /metrics internal counter + try: + t0 = time.time() + r = requests.get(f"{VM_HTTP}/metrics", timeout=30) + dt = time.time() - t0 + if r.status_code == 200: + total_rows = 0 + for line in r.text.splitlines(): + if line.startswith("vm_rows_inserted_total{"): + parts = line.rsplit(" ", 1) + if len(parts) == 2: + total_rows += int(float(parts[1])) + vm_vol = {"rows": total_rows, "latency_s": dt} + except Exception as e: + vm_vol = {"error": str(e)[:200]} + # VictoriaTraces: count spans via LogsQL stats + try: + t0 = time.time() + r = requests.get(f"{VT_HTTP}/select/logsql/query", + params={"query": "* | stats count() as total", "limit": 1}, + timeout=30) + dt = time.time() - t0 + if r.status_code == 200: + lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] + if lines: + row = json.loads(lines[0]) + vt_vol = {"rows": int(row.get("total", 0)), "latency_s": dt} + else: + vt_vol = {"rows": 0, "latency_s": dt} + except Exception as e: + vt_vol = {"error": str(e)[:200]} + + return doris_vol, ch_vol, druid_vol, ob_vol, vl_vol, vm_vol, vt_vol def write_combined_report(out_dir: Path, doris_ingest: dict, ch_ingest: dict, druid_ingest: dict, doris_qres: dict, ch_qres: dict, druid_qres: dict, ob_ingest: dict | None = None, ob_qres: dict | None = None, loki_ingest: dict | None = None, loki_qres: dict | None = None, + vl_ingest: dict | None = None, vl_qres: dict | None = None, + vm_ingest: dict | None = None, vm_qres: dict | None = None, + vt_ingest: dict | None = None, vt_qres: dict | None = None, otlp_ingest: dict | None = None, data_vol: tuple | None = None) -> None: out_dir.mkdir(parents=True, exist_ok=True) - all_queries = sorted(set(doris_qres.keys()) | set(ch_qres.keys()) | set(druid_qres.keys()) - | set((ob_qres or {}).keys()) | set((loki_qres or {}).keys())) - rows = [] - chart_data = {"queries": [], "doris": [], "clickhouse": [], "druid": [], "oceanbase": [], "loki": []} backends = [("doris", doris_qres), ("clickhouse", ch_qres), ("druid", druid_qres)] if ob_qres: backends.append(("oceanbase", ob_qres)) if loki_qres: backends.append(("loki", loki_qres)) + if vl_qres: + backends.append(("victorialogs", vl_qres)) + if vm_qres: + backends.append(("victoriametrics", vm_qres)) + if vt_qres: + backends.append(("victoriatraces", vt_qres)) + all_queries = sorted(set().union(*(qr.keys() for _, qr in backends))) + _display = {"doris": "Doris", "clickhouse": "ClickHouse", "druid": "Druid", + "oceanbase": "OceanBase", "loki": "Loki", + "victorialogs": "VictoriaLogs", "victoriametrics": "VictoriaMetrics", + "victoriatraces": "VictoriaTraces"} + rows = [] + chart_data = {"queries": []} + for bname, _ in backends: + chart_data[bname] = [] for q in all_queries: - d = doris_qres.get(q, {}) - c = ch_qres.get(q, {}) - dr = druid_qres.get(q, {}) - ob = (ob_qres or {}).get(q, {}) - loki = (loki_qres or {}).get(q, {}) - d_lat, c_lat, dr_lat = d.get("latency_s", ""), c.get("latency_s", ""), dr.get("latency_s", "") - ob_lat = ob.get("latency_s", "") - loki_lat = loki.get("latency_s", "") - lats = [(d_lat, "Doris"), (c_lat, "ClickHouse"), (dr_lat, "Druid")] - if ob_qres: - lats.append((ob_lat, "OceanBase")) - if loki_qres: - lats.append((loki_lat, "Loki")) + lats = [] + cells = "" + for bname, bqres in backends: + bq = bqres.get(q, {}) + lat = bq.get("latency_s", "") + cells += f"{lat}{bq.get('rows', '')}{bq.get('error', '')}" + lats.append((lat, _display.get(bname, bname))) + chart_data[bname].append(round(lat, 4) if isinstance(lat, (int, float)) else None) valid = [(x, n) for x, n in lats if isinstance(x, (int, float))] winner, pct_diff = "", "" if len(valid) >= 2: fastest = min(valid, key=lambda t: t[0]) slowest = max(valid, key=lambda t: t[0]) winner = fastest[1] + if winner in ("VictoriaLogs", "VictoriaTraces"): + winner = "VictoriaMetrics" if slowest[0] >= 1e-9: pct = (slowest[0] - fastest[0]) / slowest[0] * 100 pct_diff = f"{pct:.1f}%" - ob_cells = f"{ob_lat}{ob.get('rows', '')}{ob.get('error', '')}" if ob_qres else "" - loki_cells = f"{loki_lat}{loki.get('rows', '')}{loki.get('error', '')}" if loki_qres else "" - rows.append(f"{q}{d_lat}{d.get('rows', '')}{d.get('error', '')}" - f"{c_lat}{c.get('rows', '')}{c.get('error', '')}" - f"{dr_lat}{dr.get('rows', '')}{dr.get('error', '')}" - f"{ob_cells}{loki_cells}{pct_diff}{winner}") + rows.append(f"{q}{cells}{pct_diff}{winner}") chart_data["queries"].append(q) - chart_data["doris"].append(round(d_lat, 4) if isinstance(d_lat, (int, float)) else None) - chart_data["clickhouse"].append(round(c_lat, 4) if isinstance(c_lat, (int, float)) else None) - chart_data["druid"].append(round(dr_lat, 4) if isinstance(dr_lat, (int, float)) else None) - chart_data["oceanbase"].append(round(ob_lat, 4) if isinstance(ob_lat, (int, float)) else None) - chart_data["loki"].append(round(loki_lat, 4) if isinstance(loki_lat, (int, float)) else None) - ingest_labels = ["Doris", "ClickHouse", "Druid"] - ingest_dur = [doris_ingest.get("duration_s"), ch_ingest.get("duration_s"), druid_ingest.get("duration_s")] - ingest_rps = [doris_ingest.get("rows_per_sec"), ch_ingest.get("rows_per_sec"), druid_ingest.get("rows_per_sec")] + all_ingests = [("Doris", doris_ingest), ("ClickHouse", ch_ingest), ("Druid", druid_ingest)] if ob_ingest and ob_ingest.get("status") == "ok": - ingest_labels.append("OceanBase") - ingest_dur.append(ob_ingest.get("duration_s")) - ingest_rps.append(ob_ingest.get("rows_per_sec")) + all_ingests.append(("OceanBase", ob_ingest)) if loki_ingest and loki_ingest.get("status") == "ok": - ingest_labels.append("Loki") - ingest_dur.append(loki_ingest.get("duration_s")) - ingest_rps.append(loki_ingest.get("rows_per_sec")) + all_ingests.append(("Loki", loki_ingest)) + if vl_ingest and vl_ingest.get("status") == "ok": + all_ingests.append(("VictoriaLogs", vl_ingest)) + if vm_ingest and vm_ingest.get("status") == "ok": + all_ingests.append(("VictoriaMetrics", vm_ingest)) + if vt_ingest and vt_ingest.get("status") == "ok": + all_ingests.append(("VictoriaTraces", vt_ingest)) + ingest_labels = [n for n, _ in all_ingests] + ingest_dur = [i.get("duration_s") for _, i in all_ingests] + ingest_rps = [i.get("rows_per_sec") for _, i in all_ingests] ingest_chart = {"labels": ingest_labels, "duration_s": ingest_dur, "rows_per_sec": ingest_rps} - otlp_rows = "" - if otlp_ingest: - mech = otlp_ingest.get("mechanism", "OTLP") - dur = otlp_ingest.get("duration_s", "-") - for backend, key in [("Doris", "doris"), ("ClickHouse", "clickhouse")]: - d = otlp_ingest.get(key, {}) - r = d.get("rows", "-") - rps = d.get("rows_per_sec", "-") - otlp_rows += f' {backend}{mech}{dur}{r}{rps}\n' - data_vol_row = "" if data_vol: d_vol, c_vol, dr_vol = data_vol[0], data_vol[1], data_vol[2] - ob_vol = data_vol[3] if len(data_vol) > 3 else {} + ob_vol_d = data_vol[3] if len(data_vol) > 3 else {} + vl_vol_d = data_vol[4] if len(data_vol) > 4 else {} + vm_vol_d = data_vol[5] if len(data_vol) > 5 else {} + vt_vol_d = data_vol[6] if len(data_vol) > 6 else {} def _fmt(v: dict) -> str: if not v or "error" in v: return v.get("error", "-") if v else "-" - total = v.get("total", 0) - return f"{total:,} (logs={v.get('logs',0):,}, spans={v.get('spans',0):,}, metrics={v.get('metrics',0):,})" + if "total" in v: + total = v["total"] + return f"{total:,} (logs={v.get('logs',0):,}, spans={v.get('spans',0):,}, metrics={v.get('metrics',0):,})" + return f"{v.get('rows', 0):,}" def _lat(v: dict) -> str: lat = v.get("latency_s") if v else None return f"{lat:.3f}s" if isinstance(lat, (int, float)) else "-" - ob_row = f"OceanBase{_fmt(ob_vol)}{_lat(ob_vol)}" if ob_vol else "" + vol_rows = [ + ("Doris", d_vol), ("ClickHouse", c_vol), ("Druid", dr_vol), + ] + if ob_vol_d: + vol_rows.append(("OceanBase", ob_vol_d)) + if vl_vol_d: + vol_rows.append(("VictoriaLogs", vl_vol_d)) + if vm_vol_d: + vol_rows.append(("VictoriaMetrics", vm_vol_d)) + if vt_vol_d: + vol_rows.append(("VictoriaTraces", vt_vol_d)) + vol_html = "\n".join(f" {name}{_fmt(v)}{_lat(v)}" for name, v in vol_rows) data_vol_row = f"""

Data volume (full scan)

Total rows in telemetry tables at query time. Latency = full COUNT(*) time.

- - - - - {ob_row} + +{vol_html}
BackendTotal rows (logs, spans, metrics)Full-scan latency
Doris{_fmt(d_vol)}{_lat(d_vol)}
ClickHouse{_fmt(c_vol)}{_lat(c_vol)}
Druid{_fmt(dr_vol)}{_lat(dr_vol)}
BackendTotal rowsFull-scan latency
""" chart_json = json.dumps(chart_data) ingest_json = json.dumps(ingest_chart) + report_title = " vs ".join(_display.get(n, n) for n, _ in backends) + " benchmark" + ingest_rows_html = "\n".join( + f' {name}{ing.get("mechanism", "-")}{ing.get("duration_s", "-")}' + f'{ing.get("rows", "-")}{ing.get("rows_per_sec", "-")}' + for name, ing in all_ingests) + if otlp_ingest: + mech = otlp_ingest.get("mechanism", "OTLP") + dur = otlp_ingest.get("duration_s", "-") + otlp_count = otlp_ingest.get("count", 1000) + vm_mech = { + "victorialogs": f"OTLP ({otlp_count} logs)", + "victoriametrics": f"OTLP ({otlp_count} metrics)", + "victoriatraces": f"OTLP ({otlp_count} spans)", + } + for backend, key in [("Doris", "doris"), ("ClickHouse", "clickhouse"), + ("VictoriaLogs", "victorialogs"), ("VictoriaMetrics", "victoriametrics"), + ("VictoriaTraces", "victoriatraces")]: + d = otlp_ingest.get(key, {}) + if d and d.get("rows", 0) > 0: + row_mech = vm_mech.get(key, mech) + ingest_rows_html += f'\n {backend}{row_mech}{dur}{d.get("rows", "-")}{d.get("rows_per_sec", "-")}' + query_th = "".join(f'{_display.get(n,n)} lat{_display.get(n,n)} rows{_display.get(n,n)} err' for n, _ in backends) + colors = ['rgba(54,162,235,0.7)', 'rgba(75,192,192,0.7)', 'rgba(255,159,64,0.7)', + 'rgba(153,102,255,0.7)', 'rgba(255,99,132,0.7)', + 'rgba(0,200,83,0.7)', 'rgba(255,206,86,0.7)', 'rgba(231,76,60,0.7)'] + chart_datasets = ", ".join( + f'{{ label: "{_display.get(n,n)}", data: data["{n}"], backgroundColor: "{colors[i % len(colors)]}" }}' + for i, (n, _) in enumerate(backends)) html = f""" -Doris vs ClickHouse vs Druid benchmark +{report_title} -

Doris vs ClickHouse vs Druid benchmark

+

{report_title}

Generated at {datetime.now().isoformat()}

Ingestion comparison

- - - - {f'' if ob_ingest and ob_ingest.get('status') == 'ok' else ''} - {f'' if loki_ingest else ''} - {otlp_rows} +{ingest_rows_html}
BackendMechanismDuration (s)RowsRows/sec
Doris{doris_ingest.get('mechanism', '-')}{doris_ingest.get('duration_s', '-')}{doris_ingest.get('rows', '-')}{doris_ingest.get('rows_per_sec', '-')}
ClickHouse{ch_ingest.get('mechanism', '-')}{ch_ingest.get('duration_s', '-')}{ch_ingest.get('rows', '-')}{ch_ingest.get('rows_per_sec', '-')}
Druid{druid_ingest.get('mechanism', '-')}{druid_ingest.get('duration_s', '-')}{druid_ingest.get('rows', '-')}{druid_ingest.get('rows_per_sec', '-')}
OceanBase{ob_ingest.get("mechanism", "-")}{ob_ingest.get("duration_s", "-")}{ob_ingest.get("rows", "-")}{ob_ingest.get("rows_per_sec", "-")}
Loki{loki_ingest.get("mechanism", "-") if loki_ingest and loki_ingest.get("status") == "ok" else ("skipped" if loki_ingest and loki_ingest.get("status") == "skipped" else loki_ingest.get("error", "error"))}{loki_ingest.get("duration_s", "-") if loki_ingest and loki_ingest.get("status") == "ok" else "-"}{loki_ingest.get("rows", "-") if loki_ingest and loki_ingest.get("status") == "ok" else "-"}{loki_ingest.get("rows_per_sec", "-") if loki_ingest and loki_ingest.get("status") == "ok" else "-"}
-

Raw ingest: Doris {json.dumps(doris_ingest)} | CH {json.dumps(ch_ingest)} | Druid {json.dumps(druid_ingest)}

{data_vol_row}

Query latency (seconds)

Query comparison

- - - - {"" if ob_qres else ""} - {"" if loki_qres else ""} - + {query_th} {chr(10).join(rows)}
queryDoris latDoris rowsDoris errCH latCH rowsCH errDruid latDruid rowsDruid errOceanBase latOceanBase rowsOceanBase errLoki latLoki rowsLoki err% difffaster
query% difffaster