diff --git a/research/telemetry_storage_backend/Makefile b/research/telemetry_storage_backend/Makefile index 81ee72b..690e80c 100644 --- a/research/telemetry_storage_backend/Makefile +++ b/research/telemetry_storage_backend/Makefile @@ -4,7 +4,7 @@ OUT := $(ROOT)/out DATA_DIR ?= $(ROOT)/telemetry_data BATCH ?= 5000 -.PHONY: up down up-compare up-otel schema load query bench bench-compare bench-otlp +.PHONY: up down up-compare up-vm up-otel schema load query bench bench-compare bench-vm bench-otlp up: docker compose up -d doris @@ -15,7 +15,7 @@ up: up-compare: mkdir -p druid_data/var druid_data/shared - docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml up -d + docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml -f docker-compose.victoriametrics.yml up -d @echo "Waiting for Doris health..." @for i in {1..60}; do \ curl -sf http://localhost:8030/api/health && echo "Doris is healthy" && break || sleep 2; \ @@ -36,9 +36,21 @@ up-compare: @for i in {1..30}; do \ nc -z 127.0.0.1 3100 && echo "Loki is ready" && break || sleep 2; \ done + @echo "Waiting for VictoriaMetrics (port 8428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:8428/-/healthy && echo "VictoriaMetrics is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaLogs (port 9428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:9428/-/healthy && echo "VictoriaLogs is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaTraces (port 10428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:10428/-/healthy && echo "VictoriaTraces is ready" && break || sleep 2; \ + done up-otel: - docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.otel.yml up -d + docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml -f docker-compose.victoriametrics.yml -f docker-compose.otel.yml up -d @echo "Waiting for Doris health..." @for i in {1..60}; do \ curl -sf http://localhost:8030/api/health && echo "Doris is healthy" && break || sleep 2; \ @@ -47,13 +59,40 @@ up-otel: @for i in {1..60}; do \ curl -sf http://localhost:28223/ping && echo "ClickHouse is healthy" && break || sleep 2; \ done + @echo "Waiting for VictoriaMetrics (port 8428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:8428/-/healthy && echo "VictoriaMetrics is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaLogs (port 9428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:9428/-/healthy && echo "VictoriaLogs is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaTraces (port 10428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:10428/-/healthy && echo "VictoriaTraces is ready" && break || sleep 2; \ + done @echo "Waiting for OTLP collector..." @for i in {1..30}; do \ nc -z 127.0.0.1 4317 && echo "OTLP collector ready" && break || sleep 2; \ done +up-vm: + docker compose -f docker-compose.victoriametrics.yml up -d + @echo "Waiting for VictoriaMetrics (port 8428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:8428/-/healthy && echo "VictoriaMetrics is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaLogs (port 9428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:9428/-/healthy && echo "VictoriaLogs is ready" && break || sleep 2; \ + done + @echo "Waiting for VictoriaTraces (port 10428)..." + @for i in {1..30}; do \ + curl -sf http://localhost:10428/-/healthy && echo "VictoriaTraces is ready" && break || sleep 2; \ + done + down: - docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.otel.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml down -v + docker compose -f docker-compose.yml -f docker-compose.druid.yml -f docker-compose.otel.yml -f docker-compose.oceanbase.yml -f docker-compose.loki.yml -f docker-compose.victoriametrics.yml down -v schema: @echo "Applying Doris schema..." @@ -76,6 +115,7 @@ bench-compare: @echo "Running Doris vs ClickHouse vs Druid comparison..." DORIS_USER="root" DORIS_PASS="" CLICKHOUSE_HTTP="http://localhost:28223" CLICKHOUSE_PASSWORD="changeme" \ LOKI_HTTP="http://localhost:3100" \ + VM_HTTP="http://localhost:8428" VL_HTTP="http://localhost:9428" VT_HTTP="http://localhost:10428" \ python3 runner/bench_compare.py --all --data-dir "$(DATA_DIR)" --out "$(OUT)" --batch $(BATCH) $(SCALE_ARGS) $(STREAMING_ARGS) # Scaling: BATCH=10000 make bench-compare @@ -86,10 +126,18 @@ SCALE_ARGS := $(if $(SCALE_TO),--scale-to $(SCALE_TO),) STREAMING_BATCH ?= STREAMING_ARGS := $(if $(STREAMING_BATCH),--streaming-batch $(STREAMING_BATCH),) +bench-vm: + @mkdir -p "$(OUT)" + @echo "Running VictoriaMetrics-only benchmark..." + VM_HTTP="http://localhost:8428" VL_HTTP="http://localhost:9428" VT_HTTP="http://localhost:10428" \ + python3 runner/bench_compare.py --all --vm-only --data-dir "$(DATA_DIR)" --out "$(OUT)" --batch $(BATCH) $(SCALE_ARGS) $(STREAMING_ARGS) + bench-otlp: @mkdir -p "$(OUT)" @echo "Running benchmark with OTLP ingestion ($(OTLP_COUNT) spans, $(OTLP_COUNT) logs, $(OTLP_COUNT) metrics)..." - DORIS_USER="root" DORIS_PASS="" CLICKHOUSE_HTTP="http://localhost:28223" CLICKHOUSE_PASSWORD="changeme" python3 runner/bench_compare.py \ + DORIS_USER="root" DORIS_PASS="" CLICKHOUSE_HTTP="http://localhost:28223" CLICKHOUSE_PASSWORD="changeme" \ + VM_HTTP="http://localhost:8428" VL_HTTP="http://localhost:9428" VT_HTTP="http://localhost:10428" \ + python3 runner/bench_compare.py \ --data-dir "$(DATA_DIR)" --out "$(OUT)" --all --otlp --otlp-count $(OTLP_COUNT) OTLP_COUNT ?= 1000 diff --git a/research/telemetry_storage_backend/README.md b/research/telemetry_storage_backend/README.md index 2a5e0ba..0d03a1b 100644 --- a/research/telemetry_storage_backend/README.md +++ b/research/telemetry_storage_backend/README.md @@ -33,12 +33,17 @@ Tech Stack: Python (Pandas/NumPy), k8s ## 📂 Repository Structure ```text -├── loeaders/ # logic to load the data +├── loaders/ # logic to load data into each backend ├── out/ # benchmark run test results ├── queries/ # Queries to produce the performance benchmark +│ ├── doris/ # SQL queries for Doris +│ ├── clickhouse/ # SQL queries for ClickHouse +│ ├── druid/ # SQL queries for Druid +│ ├── oceanbase/ # SQL queries for OceanBase +│ └── victoriametrics/ # MetricsQL, LogQL, TraceQL queries ├── runner/ # benchmark run logic ├── docs/ # In-depth documentation and literature review -├── schemas/ # backend storage chemas +├── schemas/ # backend storage schemas ├── telemetry_data/ # static logs, metrics, traces and metadata └── README.md # This file ``` @@ -53,16 +58,20 @@ This harness replays pre-collected OpenTelemetry-like ground-truth (`telemetry_d - `docker-compose.yml` — Doris + ClickHouse for comparison trials - `docker-compose.druid.yml` — Druid (extends main) - `docker-compose.oceanbase.yml` — OceanBase CE (extends main) +- `docker-compose.victoriametrics.yml` — VictoriaMetrics stack (metrics + logs + traces) - `schemas/doris.sql`, `schemas/clickhouse.sql`, `schemas/oceanbase.sql` — database and tables (logs, spans, metrics) - `loaders/replay_doris.py` — Doris replayer using Stream Load HTTP API - `loaders/replay_clickhouse.py` — ClickHouse replayer via HTTP - `loaders/replay_druid.py` — Druid native batch ingestion - `loaders/replay_oceanbase.py` — OceanBase via MySQL protocol (pymysql) +- `loaders/replay_victoriametrics.py` — VictoriaMetrics replayer (Prometheus remote write for metrics, Loki API for logs, OTLP HTTP for traces) +- `loaders/remote_write.py` — Prometheus remote write encoding helper - `queries/{doris,clickhouse,druid,oceanbase}/*.sql` — canonical queries per backend +- `queries/victoriametrics/*.{metricsql,logql,traceql}` — VictoriaMetrics queries (MetricsQL, LogQL, TraceQL) - `runner/bench.py` — Doris-only: schema → load → queries → report -- `runner/bench_compare.py` — Doris vs ClickHouse vs Druid vs OceanBase: same flow on all, combined report +- `runner/bench_compare.py` — Doris vs ClickHouse vs Druid vs OceanBase vs VictoriaMetrics: same flow on all, combined report - `out/` — run outputs (`storage_bench_doris_/`, `storage_bench_compare_/`, `rolling_index.html`) -- `otel-collector-config.yaml` — OTLP receiver → Doris + ClickHouse exporters +- `otel-collector-config.yaml` — OTLP receiver → Doris + ClickHouse + VictoriaMetrics exporters - `docker-compose.otel.yml` — OTLP collector service (extends main compose) - `runner/run_otlp_ingest.py` — sends 1000 spans/logs/metrics via telemetrygen → collector - `runner/map_otlp_to_telemetry.py` — maps `otel.*` → `telemetry.*` so queries use batch + OTLP data @@ -77,18 +86,25 @@ make bench # run benchmark (uses telemetry_data/) make down # stop ``` -**Doris vs ClickHouse vs Druid vs OceanBase comparison:** +**Doris vs ClickHouse vs Druid vs OceanBase vs VictoriaMetrics comparison:** ```bash -make up-compare # start Doris + ClickHouse + Druid + OceanBase +make up-compare # start Doris + ClickHouse + Druid + OceanBase + VictoriaMetrics make bench-compare # run comparison benchmark (file load only, no telemetrygen) make bench-compare SCALE_TO=5000 # scale to 5k rows per type make down ``` +**VictoriaMetrics-only:** +```bash +make up-vm # start VictoriaMetrics + VictoriaLogs + VictoriaTraces +make bench-vm # run VM-only benchmark +make down +``` + **OTLP ingestion (telemetrygen → collector):** ```bash -make up-otel # start stack + OTLP collector -make bench-otlp # file load + telemetrygen (1000 spans, 1000 logs, 1000 metrics) via OTLP → Doris + ClickHouse +make up-otel # start stack + OTLP collector (includes VictoriaMetrics) +make bench-otlp # file load + telemetrygen (1000 spans, 1000 logs, 1000 metrics) via OTLP → Doris + ClickHouse + VictoriaMetrics make down ``` @@ -96,10 +112,11 @@ make down See `docs/DATA_SOURCES.md` for 50k correlated benchmark options. -| Run | Data source | Notes | -|-----|-------------|-------| -| `bench-compare` | `telemetry_data/` | Pre-collected files (logs_*.txt, traces_*.json, metrics_*.json). No telemetrygen. | -| `bench-otlp` | `telemetry_data/` + telemetrygen | Same file load; additionally sends 1000 spans/logs/metrics via telemetrygen → OTLP collector → Doris + ClickHouse. | +| Run | Data source | Notes | +|-----------------|--------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------| +| `bench-compare` | `telemetry_data/` | Pre-collected files (logs_*.txt, traces_*.json, metrics_*.json). No telemetrygen. | +| `bench-vm` | `telemetry_data/` | Same file load, VictoriaMetrics stack only. | +| `bench-otlp` | `telemetry_data/` + telemetrygen | Same file load; additionally sends 1000 spans/logs/metrics via telemetrygen → OTLP collector → Doris + ClickHouse + VictoriaMetrics. | Notes: - Requires Docker + docker compose. @@ -117,33 +134,35 @@ Notes: ## Outputs - `out/storage_bench_doris_/` — Doris-only: `summary.html`, `ingest.json`, `queries.json` -- `out/storage_bench_compare_/` — Doris vs ClickHouse vs Druid vs OceanBase: `compare.html`, `*_queries.json` per backend +- `out/storage_bench_compare_/` — Doris vs ClickHouse vs Druid vs OceanBase vs VictoriaMetrics: `compare.html`, `*_queries.json` per backend - `out/rolling_index.html` — unified index of all runs (newest first) The ingestion comparison table shows: **Backend | Mechanism | Duration (s) | Rows | Rows/sec**. Mechanism describes the ingest method (e.g. `Batch file load (5000 rows)` or `OTLP (1000 spans, 1000 logs, 1000 metrics)`). With `--otlp`, OTLP rows are appended to the table. ## Ingestion benchmark (how it works) -**Batch file load** — Same JSON/JSONL files from `telemetry_data/` are replayed into Doris, ClickHouse, Druid, and OceanBase via each loader: +**Batch file load** — Same JSON/JSONL files from `telemetry_data/` are replayed into Doris, ClickHouse, Druid, OceanBase, and VictoriaMetrics via each loader: - Doris: Stream Load HTTP API (`loaders/replay_doris.py`) - ClickHouse: HTTP INSERT (`loaders/replay_clickhouse.py`) - Druid: Native batch ingestion via Overlord (`loaders/replay_druid.py`) - OceanBase: MySQL protocol via pymysql (`loaders/replay_oceanbase.py`) +- VictoriaMetrics: Prometheus remote write for metrics (`loaders/remote_write.py`), Loki-compatible API for logs, OTLP HTTP for traces (`loaders/replay_victoriametrics.py`) -All four backends get identical data. Ingest duration and rows/sec are measured per backend. **Message size** is capped at 200KB in `loaders/common.py` to avoid huge Druid/OceanBase files when scaling. +All five backends get identical data. Ingest duration and rows/sec are measured per backend. **Message size** is capped at 200KB in `loaders/common.py` to avoid huge Druid/OceanBase files when scaling. -**OTLP ingestion** — When `--otlp` is used, telemetrygen sends N spans, N logs, and N metrics (gRPC) to the OpenTelemetry Collector (`otel-collector-config.yaml`). The collector batches and exports to Doris and ClickHouse only. Rows are counted in `otel.*` tables after a short flush delay; duration is end-to-end (telemetrygen start → last batch exported). The OTLP data is then **mapped into `telemetry.*`** via `runner/map_otlp_to_telemetry.py` (INSERT … SELECT from `otel.*` with column mapping), so canonical queries run against batch + OTLP data combined. +**OTLP ingestion** — When `--otlp` is used, telemetrygen sends N spans, N logs, and N metrics (gRPC) to the OpenTelemetry Collector (`otel-collector-config.yaml`). The collector batches and exports to Doris, ClickHouse, and VictoriaMetrics (VictoriaTraces accepts OTLP natively on port 14317/14318). Rows are counted in `otel.*` tables after a short flush delay; duration is end-to-end (telemetrygen start → last batch exported). The OTLP data is then **mapped into `telemetry.*`** via `runner/map_otlp_to_telemetry.py` (INSERT … SELECT from `otel.*` with column mapping), so canonical queries run against batch + OTLP data combined. **Why Druid is not in OTLP** — The OpenTelemetry Collector has no Druid exporter. Doris and ClickHouse both have official OTLP/contrib exporters; Druid typically ingests OTLP data via Kafka (collector → Kafka → Druid). Adding Druid to the OTLP path would require a Kafka-based pipeline, which this harness does not implement. ## Query benchmark (how it works) -After ingestion (and OTLP mapping if `--otlp`), the runner executes the same set of SQL queries on each backend. Each query file in `queries/{doris,clickhouse,druid,oceanbase}/*.sql` is run once per backend via its native API: +After ingestion (and OTLP mapping if `--otlp`), the runner executes the same set of queries on each backend. Each query file in `queries/{doris,clickhouse,druid,oceanbase}/*.sql` and `queries/victoriametrics/*.{metricsql,logql,traceql}` is run once per backend via its native API: - **Doris** — `mysql` client over Docker (`telemetry.logs`, `telemetry.spans`, `telemetry.metrics`) - **ClickHouse** — HTTP POST to `:8123` with `?query=...` - **Druid** — HTTP POST to `:8888/druid/v2/sql` with JSON body - **OceanBase** — `mysql` client over Docker (port 2881, MySQL-compatible) +- **VictoriaMetrics** — MetricsQL via `:8428/api/v1/query_range`, LogQL via `:9428/select/logsql/query`, TraceQL via `:10428/api/traces` **What is measured** — For each query, the runner records: - **Latency (s)** — Wall-clock time from query start to completion (includes network, parsing, execution) @@ -154,6 +173,18 @@ After ingestion (and OTLP mapping if `--otlp`), the runner executes the same set - **Query comparison** — Bar chart and table with latency, result row count, and error per backend. Includes a `data_volume` query that runs full COUNT on all three tables. The fastest backend and % difference vs. slowest are shown. - All backends query the same data: batch load + (when `--otlp`) mapped OTLP data in `telemetry.*`. +### VictoriaMetrics stack + +VictoriaMetrics uses a split architecture — three separate components handle different telemetry signals: + +| Component | Port | Signal | Ingest API | Query language | +|------------------|-------|---------|------------------------------------|----------------| +| VictoriaMetrics | 8428 | Metrics | Prometheus remote write (`/api/v1/write`) | MetricsQL | +| VictoriaLogs | 9428 | Logs | Loki-compatible (`/insert/loki/api/v1/push`) | LogQL | +| VictoriaTraces | 10428 | Traces | OTLP HTTP/gRPC (ports 14317/14318) | TraceQL | + +Environment variables: `VM_HTTP`, `VL_HTTP`, `VT_HTTP` configure endpoints (defaults: `http://localhost:8428`, `:9428`, `:10428`). + ### Query result differences (row count) Some queries return different row counts across backends: diff --git a/research/telemetry_storage_backend/docker-compose.otel.yml b/research/telemetry_storage_backend/docker-compose.otel.yml index 2410c70..7a499ad 100644 --- a/research/telemetry_storage_backend/docker-compose.otel.yml +++ b/research/telemetry_storage_backend/docker-compose.otel.yml @@ -13,8 +13,16 @@ services: - "4317:4317" # OTLP gRPC - "4318:4318" # OTLP HTTP depends_on: - - doris - - clickhouse + doris: + condition: service_healthy + clickhouse: + condition: service_healthy + victoriametrics: + condition: service_healthy + victorialogs: + condition: service_healthy + victoriatraces: + condition: service_healthy networks: - default restart: unless-stopped diff --git a/research/telemetry_storage_backend/docker-compose.victoriametrics.yml b/research/telemetry_storage_backend/docker-compose.victoriametrics.yml new file mode 100644 index 0000000..70b8d19 --- /dev/null +++ b/research/telemetry_storage_backend/docker-compose.victoriametrics.yml @@ -0,0 +1,78 @@ +# VictoriaMetrics stack: metrics + logs + traces +# Extends main compose: docker compose -f docker-compose.yml -f docker-compose.victoriametrics.yml up -d +# VictoriaMetrics API: http://localhost:8428 +# VictoriaLogs API: http://localhost:9428 +# VictoriaTraces API: http://localhost:10428, OTLP: 14317/14318 + +services: + victoriametrics: + image: victoriametrics/victoria-metrics:v1.142.0 + container_name: tsb-victoriametrics + ports: + - "8428:8428" + volumes: + - vmdata:/storage + command: + - "--storageDataPath=/storage" + - "--httpListenAddr=:8428" + - "--retentionPeriod=100y" + networks: + - default + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:8428/-/healthy"] + interval: 10s + timeout: 5s + retries: 30 + restart: unless-stopped + + victorialogs: + image: victoriametrics/victoria-logs:v1.50.0 + container_name: tsb-victorialogs + ports: + - "9428:9428" + volumes: + - vldata:/vlogs + command: + - "--storageDataPath=/vlogs" + - "--httpListenAddr=:9428" + - "--retentionPeriod=100y" + networks: + - default + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:9428/-/healthy"] + interval: 10s + timeout: 5s + retries: 30 + restart: unless-stopped + + victoriatraces: + image: docker.io/victoriametrics/victoria-traces:latest + container_name: tsb-victoriatraces + ports: + - "10428:10428" + - "14317:4317" + - "14318:4318" + volumes: + - vtdata:/vtraces + command: + - "--storageDataPath=/vtraces" + - "--httpListenAddr=:10428" + - "--retentionPeriod=100y" + networks: + - default + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://127.0.0.1:10428/-/healthy"] + interval: 10s + timeout: 5s + retries: 30 + restart: unless-stopped + +networks: + default: + name: tsb-net + driver: bridge + +volumes: + vmdata: + vldata: + vtdata: diff --git a/research/telemetry_storage_backend/loaders/remote_write.py b/research/telemetry_storage_backend/loaders/remote_write.py new file mode 100644 index 0000000..827f079 --- /dev/null +++ b/research/telemetry_storage_backend/loaders/remote_write.py @@ -0,0 +1,69 @@ +""" +Minimal Prometheus remote-write encoder (protobuf + snappy). +Implements just enough of the WriteRequest proto to push samples +to VictoriaMetrics /api/v1/write. + +Proto schema (from prometheus/prometheus): + message WriteRequest { repeated TimeSeries timeseries = 1; } + message TimeSeries { repeated Label labels = 1; repeated Sample samples = 2; } + message Label { string name = 1; string value = 2; } + message Sample { double value = 1; int64 timestamp = 2; } +""" +from __future__ import annotations +import struct + +import snappy + + +def _encode_varint(value: int) -> bytes: + bits = value & 0x7F + value >>= 7 + out = b"" + while value: + out += bytes([0x80 | bits]) + bits = value & 0x7F + value >>= 7 + out += bytes([bits]) + return out + + +def _encode_bytes(field_number: int, data: bytes) -> bytes: + tag = _encode_varint((field_number << 3) | 2) + return tag + _encode_varint(len(data)) + data + + +def _encode_double(field_number: int, value: float) -> bytes: + tag = _encode_varint((field_number << 3) | 1) + return tag + struct.pack(" bytes: + tag = _encode_varint((field_number << 3) | 0) + return tag + _encode_varint(value) + + +def _encode_label(name: str, value: str) -> bytes: + inner = _encode_bytes(1, name.encode()) + _encode_bytes(2, value.encode()) + return _encode_bytes(1, inner) + + +def _encode_sample(value: float, timestamp_ms: int) -> bytes: + inner = _encode_double(1, value) + _encode_sint64(2, timestamp_ms) + return _encode_bytes(2, inner) + + +def encode_write_request(timeseries: list[dict]) -> bytes: + """ + Encode a list of time series into a snappy-compressed WriteRequest. + + Each entry in timeseries: {"labels": {"__name__": "x", ...}, "value": float, "timestamp_ms": int} + """ + body = b"" + for ts in timeseries: + labels_bytes = b"" + for k, v in sorted(ts["labels"].items()): + labels_bytes += _encode_label(k, v) + sample_bytes = _encode_sample(ts["value"], ts["timestamp_ms"]) + ts_msg = labels_bytes + sample_bytes + body += _encode_bytes(1, ts_msg) + return snappy.compress(body) \ No newline at end of file diff --git a/research/telemetry_storage_backend/loaders/replay_victoriametrics.py b/research/telemetry_storage_backend/loaders/replay_victoriametrics.py new file mode 100644 index 0000000..dc517f7 --- /dev/null +++ b/research/telemetry_storage_backend/loaders/replay_victoriametrics.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +Replay telemetry files into VictoriaMetrics (metrics), VictoriaLogs (logs), +and VictoriaTraces (traces via OTLP HTTP). +Uses shared extraction from loaders.common. +Environment: + VM_HTTP (default: http://localhost:8428) — VictoriaMetrics + VL_HTTP (default: http://localhost:9428) — VictoriaLogs + VT_HTTP (default: http://localhost:10428) — VictoriaTraces (OTLP via main HTTP) +""" +from __future__ import annotations +import argparse +import json +import os +import sys +import time +from datetime import datetime +from pathlib import Path + +import requests + +sys.path.insert(0, str(Path(__file__).resolve().parent)) +import common +import remote_write + +VM_HTTP = os.environ.get("VM_HTTP", "http://localhost:8428") +VL_HTTP = os.environ.get("VL_HTTP", "http://localhost:9428") +VL_PUSH_URL = f"{VL_HTTP}/insert/loki/api/v1/push" +VT_HTTP = os.environ.get("VT_HTTP", "http://localhost:10428") +PUSH_CHUNK_SIZE = 5 + + +def push_metrics(rows: list[dict]) -> bool: + """Push metric rows to VictoriaMetrics via /api/v1/write (Prometheus remote write).""" + url = f"{VM_HTTP}/api/v1/write" + timeseries = [] + for row in rows: + ts_epoch_ms = _ts_to_epoch_ms(row.get("ts", "")) + labels = {"__name__": row.get("metric_name", "unknown")} + row_labels = row.get("labels", {}) + if isinstance(row_labels, str): + try: + row_labels = json.loads(row_labels) + except Exception: + row_labels = {} + for k, v in row_labels.items(): + if k and v: + labels[k] = str(v) + timeseries.append({ + "labels": labels, + "value": float(row.get("value", 0)), + "timestamp_ms": ts_epoch_ms, + }) + body = remote_write.encode_write_request(timeseries) + try: + r = requests.post(url, data=body, + headers={"Content-Type": "application/x-protobuf", + "Content-Encoding": "snappy", + "X-Prometheus-Remote-Write-Version": "0.1.0"}, + timeout=60) + if r.status_code in (200, 204): + return True + print(f"[vm] metrics push HTTP {r.status_code}: {r.text[:300]}") + except Exception as e: + print(f"[vm] metrics push error: {e}") + return False + + +def _ts_to_ns(ts: str) -> str: + """Convert SQL-style timestamp to nanoseconds since epoch (same as Loki loader).""" + try: + if "." in ts: + dt = datetime.strptime(ts[:26], "%Y-%m-%d %H:%M:%S.%f") + else: + dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") + return str(int(dt.timestamp() * 1_000_000_000)) + except Exception: + return str(int(time.time() * 1_000_000_000)) + + +def _row_to_stream(row: dict, run_id: str = "") -> dict: + """Convert one log row to a Loki stream (same format as replay_loki.py).""" + service = (row.get("service") or "unknown").replace('"', '\\"')[:64] + level = (row.get("level") or "info").replace('"', '\\"')[:32] + stream = {"job": "telemetry", "service": service or "unknown", "level": level or "info"} + if run_id: + stream["run_id"] = run_id + if row.get("trace_id"): + stream["trace_id"] = str(row["trace_id"])[:64] + if row.get("span_id"): + stream["span_id"] = str(row["span_id"])[:64] + ts_ns = _ts_to_ns(row.get("ts", "")) + line = (row.get("message") or "").replace("\n", " ").replace("\r", "") + if len(line) > 200_000: + line = line[:200_000] + return {"stream": stream, "values": [[ts_ns, line]]} + + +def push_logs(rows: list[dict], run_id: str = "") -> bool: + """Push log rows to VictoriaLogs via Loki-compatible push API.""" + streams = [_row_to_stream(r, run_id) for r in rows] + for i in range(0, len(streams), PUSH_CHUNK_SIZE): + chunk = streams[i : i + PUSH_CHUNK_SIZE] + payload = {"streams": chunk} + try: + r = requests.post(VL_PUSH_URL, json=payload, + headers={"Content-Type": "application/json"}, timeout=60) + if r.status_code not in (200, 204): + print(f"[vl] logs push HTTP {r.status_code}: {r.text[:300]}") + return False + except Exception as e: + print(f"[vl] logs push error: {e}") + return False + return True + + +def push_traces(rows: list[dict]) -> bool: + """Push span rows to VictoriaTraces via OTLP HTTP JSON (/v1/traces).""" + url = f"{VT_HTTP}/insert/opentelemetry/v1/traces" + spans_by_service: dict[str, list] = {} + for row in rows: + svc = row.get("service", "unknown") + spans_by_service.setdefault(svc, []).append(row) + + resource_spans = [] + for svc, svc_rows in spans_by_service.items(): + otlp_spans = [] + for row in svc_rows: + start_ns = _ts_to_epoch_ns(row.get("ts_start", "")) + end_ns = _ts_to_epoch_ns(row.get("ts_end", "")) + attrs = row.get("attributes", {}) + if isinstance(attrs, str): + try: + attrs = json.loads(attrs) + except Exception: + attrs = {} + otlp_attrs = [{"key": k, "value": {"stringValue": str(v)}} for k, v in attrs.items()] + otlp_spans.append({ + "traceId": _hex_pad(row.get("trace_id", ""), 32), + "spanId": _hex_pad(row.get("span_id", ""), 16), + "parentSpanId": _hex_pad(row.get("parent_span_id", ""), 16) if row.get("parent_span_id") else "", + "name": row.get("name", ""), + "kind": 1, + "startTimeUnixNano": str(start_ns), + "endTimeUnixNano": str(end_ns), + "attributes": otlp_attrs, + "status": {}, + }) + resource_spans.append({ + "resource": { + "attributes": [{"key": "service.name", "value": {"stringValue": svc}}], + }, + "scopeSpans": [{"scope": {"name": "telemetry-bench"}, "spans": otlp_spans}], + }) + + payload = {"resourceSpans": resource_spans} + for attempt in range(5): + try: + r = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=60) + if r.status_code in (200, 202): + return True + print(f"[vt] traces push HTTP {r.status_code}: {r.text[:300]}") + except (requests.exceptions.ConnectionError, ConnectionResetError) as e: + if attempt < 4: + print(f"[vt] traces push retry {attempt + 1}/5 ({e})") + time.sleep(3) + continue + print(f"[vt] traces push error after retries: {e}") + except Exception as e: + print(f"[vt] traces push error: {e}") + break + return False + + +def _ts_to_epoch_ms(ts: str) -> int: + try: + if "." in ts: + from datetime import datetime + dt = datetime.strptime(ts[:26], "%Y-%m-%d %H:%M:%S.%f") + return int(dt.timestamp() * 1000) + elif ts: + from datetime import datetime + dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") + return int(dt.timestamp() * 1000) + except Exception: + pass + return int(time.time() * 1000) + + +def _ts_to_epoch_ns(ts: str) -> int: + return _ts_to_epoch_ms(ts) * 1_000_000 + + +def _hex_pad(val: str, length: int) -> str: + """Ensure hex string is exactly `length` chars, zero-padded or truncated.""" + if not val: + return "0" * length + cleaned = val.replace("-", "") + if len(cleaned) < length: + cleaned = cleaned.zfill(length) + return cleaned[:length] + + +def main() -> int: + ap = argparse.ArgumentParser(description="Replay telemetry into VictoriaMetrics/Logs/Traces") + ap.add_argument("--data-dir", type=Path, required=True) + ap.add_argument("--batch", type=int, default=500) + ap.add_argument("--scale-to", type=int, default=None, help="Target row count per type") + ap.add_argument("--stats", type=Path, default=None, help="Write ingestion stats JSON") + ap.add_argument("--run-id", type=str, default="", help="Label to isolate this run in VictoriaLogs") + args = ap.parse_args() + data_dir = args.data_dir + assert data_dir.exists(), f"DATA_DIR not found: {data_dir}" + target = args.scale_to + stats = {"logs": 0, "spans": 0, "metrics": 0, + "logs_duration_s": 0, "spans_duration_s": 0, "metrics_duration_s": 0} + + t0 = time.time() + for log_rows in common.extract_log_rows(data_dir, args.batch, target_rows=target): + if not push_logs(log_rows, run_id=args.run_id): + return 1 + stats["logs"] += len(log_rows) + stats["logs_duration_s"] = round(time.time() - t0, 3) + + t0 = time.time() + for span_rows in common.extract_span_rows(data_dir, args.batch, target_rows=target): + if not push_traces(span_rows): + return 1 + stats["spans"] += len(span_rows) + stats["spans_duration_s"] = round(time.time() - t0, 3) + + t0 = time.time() + for met_rows in common.extract_metric_rows(data_dir, args.batch, target_rows=target): + if not push_metrics(met_rows): + return 1 + stats["metrics"] += len(met_rows) + stats["metrics_duration_s"] = round(time.time() - t0, 3) + + if args.stats: + args.stats.write_text(json.dumps(stats)) + print("[replay] victoriametrics done") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/research/telemetry_storage_backend/otel-collector-config.yaml b/research/telemetry_storage_backend/otel-collector-config.yaml index 1d92555..21bde61 100644 --- a/research/telemetry_storage_backend/otel-collector-config.yaml +++ b/research/telemetry_storage_backend/otel-collector-config.yaml @@ -33,18 +33,30 @@ exporters: username: default password: changeme create_schema: true + otlphttp/victoriametrics: + endpoint: http://tsb-victoriametrics:8428/opentelemetry + tls: + insecure: true + otlphttp/victorialogs: + endpoint: http://tsb-victorialogs:9428/insert/opentelemetry + tls: + insecure: true + otlphttp/victoriatraces: + endpoint: http://tsb-victoriatraces:10428/insert/opentelemetry + tls: + insecure: true service: pipelines: traces: receivers: [otlp] processors: [batch] - exporters: [doris, clickhouse] + exporters: [doris, clickhouse, otlphttp/victoriatraces] metrics: receivers: [otlp] processors: [batch] - exporters: [doris, clickhouse] + exporters: [doris, clickhouse, otlphttp/victoriametrics] logs: receivers: [otlp] processors: [batch] - exporters: [doris, clickhouse] + exporters: [doris, clickhouse, otlphttp/victorialogs] diff --git a/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_timestamp.logql b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_timestamp.logql new file mode 100644 index 0000000..4449d3c --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_timestamp.logql @@ -0,0 +1 @@ +_time:30d * | stats by (_time:1m, service) count() as event_count | sort by (event_count) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_trace_id.logql b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_trace_id.logql new file mode 100644 index 0000000..8ab136f --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/correlation_by_trace_id.logql @@ -0,0 +1 @@ +_time:30d trace_id:* | stats by (trace_id, service) count() as log_count | sort by (log_count) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/data_volume.logql b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.logql new file mode 100644 index 0000000..38f7ab6 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.logql @@ -0,0 +1 @@ +* | stats count() as total \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/data_volume.metricsql b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.metricsql new file mode 100644 index 0000000..106a556 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.metricsql @@ -0,0 +1 @@ +count({__name__!=""}) \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/data_volume.traceql b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.traceql new file mode 100644 index 0000000..38f7ab6 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/data_volume.traceql @@ -0,0 +1 @@ +* | stats count() as total \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/logs_errors_by_service.logql b/research/telemetry_storage_backend/queries/victoriametrics/logs_errors_by_service.logql new file mode 100644 index 0000000..a47cb4b --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/logs_errors_by_service.logql @@ -0,0 +1 @@ +_time:1d (level:error OR error) | stats by (service) count() as err_count | sort by (err_count) desc | limit 20 diff --git a/research/telemetry_storage_backend/queries/victoriametrics/logs_recent.logql b/research/telemetry_storage_backend/queries/victoriametrics/logs_recent.logql new file mode 100644 index 0000000..0a46d5b --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/logs_recent.logql @@ -0,0 +1 @@ +* | sort by (_time) desc | limit 100 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/logs_search_error.logql b/research/telemetry_storage_backend/queries/victoriametrics/logs_search_error.logql new file mode 100644 index 0000000..6fc3bdd --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/logs_search_error.logql @@ -0,0 +1 @@ +_time:30d error | sort by (_time) desc | limit 100 diff --git a/research/telemetry_storage_backend/queries/victoriametrics/metrics_by_service_hourly.metricsql b/research/telemetry_storage_backend/queries/victoriametrics/metrics_by_service_hourly.metricsql new file mode 100644 index 0000000..baacec5 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/metrics_by_service_hourly.metricsql @@ -0,0 +1 @@ +avg_over_time({__name__!=""}[1h]) diff --git a/research/telemetry_storage_backend/queries/victoriametrics/metrics_p95_latency.metricsql b/research/telemetry_storage_backend/queries/victoriametrics/metrics_p95_latency.metricsql new file mode 100644 index 0000000..15b3356 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/metrics_p95_latency.metricsql @@ -0,0 +1 @@ +topk(20, quantile_over_time(0.95, {__name__!=""}[365d])) \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/sla_latency_compliance.traceql b/research/telemetry_storage_backend/queries/victoriametrics/sla_latency_compliance.traceql new file mode 100644 index 0000000..1177d81 --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/sla_latency_compliance.traceql @@ -0,0 +1 @@ +_time:30d * | stats count() as total, count() if (duration:<500ms) as fast | math fast * 100 / total as pct_under_500ms \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/spans_error_by_service.traceql b/research/telemetry_storage_backend/queries/victoriametrics/spans_error_by_service.traceql new file mode 100644 index 0000000..7d398dd --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/spans_error_by_service.traceql @@ -0,0 +1 @@ +_time:30d (duration:>5s OR "span_attr:http.status_code":>=500) | stats by ("resource_attr:service.name") count() as error_span_count | sort by (error_span_count) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/trace_by_id.traceql b/research/telemetry_storage_backend/queries/victoriametrics/trace_by_id.traceql new file mode 100644 index 0000000..037042e --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/trace_by_id.traceql @@ -0,0 +1 @@ +trace_id:* | sort by (_time) | limit 1 \ No newline at end of file diff --git a/research/telemetry_storage_backend/queries/victoriametrics/traces_slow_by_service.traceql b/research/telemetry_storage_backend/queries/victoriametrics/traces_slow_by_service.traceql new file mode 100644 index 0000000..221972c --- /dev/null +++ b/research/telemetry_storage_backend/queries/victoriametrics/traces_slow_by_service.traceql @@ -0,0 +1 @@ +_time:1d duration:>500ms | stats by ("resource_attr:service.name") count() as slow_cnt | sort by (slow_cnt) desc | limit 20 \ No newline at end of file diff --git a/research/telemetry_storage_backend/requirements.txt b/research/telemetry_storage_backend/requirements.txt index e5ff456..15a042c 100644 --- a/research/telemetry_storage_backend/requirements.txt +++ b/research/telemetry_storage_backend/requirements.txt @@ -1,2 +1,7 @@ # For OceanBase loader (replay_oceanbase.py) pymysql>=1.0.0 +# For all loaders and bench runner +requests>=2.28.0 +# For VictoriaMetrics remote write (protobuf + snappy) +protobuf>=4.21.0 +python-snappy>=0.6.0 diff --git a/research/telemetry_storage_backend/runner/bench_compare.py b/research/telemetry_storage_backend/runner/bench_compare.py index ce73915..85cdddc 100644 --- a/research/telemetry_storage_backend/runner/bench_compare.py +++ b/research/telemetry_storage_backend/runner/bench_compare.py @@ -22,9 +22,11 @@ CH_QDIR = ROOT / "queries" / "clickhouse" DRUID_QDIR = ROOT / "queries" / "druid" OB_QDIR = ROOT / "queries" / "oceanbase" +VM_QDIR = ROOT / "queries" / "victoriametrics" OB_SCHEMA = ROOT / "schemas" / "oceanbase.sql" DORIS_FE_HTTP = os.getenv("DORIS_FE_HTTP", "http://localhost:8030") +DORIS_MYSQL_PORT = int(os.getenv("DORIS_MYSQL_PORT", "9030")) DORIS_PASS = os.getenv("DORIS_PASS", "") CH_HTTP = os.getenv("CLICKHOUSE_HTTP", "http://localhost:8123") CH_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "") @@ -33,8 +35,16 @@ OB_PORT = int(os.getenv("OCEANBASE_PORT", "2881")) OB_CONTAINER = os.getenv("OCEANBASE_CONTAINER", "tsb-oceanbase") LOKI_HTTP = os.getenv("LOKI_HTTP", "http://localhost:3100") +VM_HTTP = os.getenv("VM_HTTP", "http://localhost:8428") +VL_HTTP = os.getenv("VL_HTTP", "http://localhost:9428") +VT_HTTP = os.getenv("VT_HTTP", "http://localhost:10428") DB = "telemetry" +def _port_of(url: str) -> int: + from urllib.parse import urlparse + p = urlparse(url) + return p.port or (443 if p.scheme == "https" else 80) + def _ch_params(extra: dict | None = None) -> dict: params = dict(extra) if extra else {} if CH_PASSWORD: @@ -170,6 +180,45 @@ def truncate_oceanbase_tables() -> None: print(f"[truncate] OceanBase {t}: {e}") print("[truncate] OceanBase tables cleared") +def reset_vm_storage() -> None: + """Stop VM containers, remove volumes, restart with clean storage.""" + compose = ["docker", "compose", "-f", str(ROOT / "docker-compose.yml"), + "-f", str(ROOT / "docker-compose.victoriametrics.yml")] + subprocess.run(compose + ["stop", "victoriametrics", "victorialogs", "victoriatraces"], + capture_output=True, check=False) + subprocess.run(compose + ["rm", "-f", "victoriametrics", "victorialogs", "victoriatraces"], + capture_output=True, check=False) + for vol_suffix in ["vmdata", "vldata", "vtdata"]: + out = subprocess.run(["docker", "volume", "ls", "-q", "--filter", f"name={vol_suffix}"], + capture_output=True, text=True) + for vol in out.stdout.strip().splitlines(): + subprocess.run(["docker", "volume", "rm", "-f", vol], capture_output=True, check=False) + subprocess.run(compose + ["up", "-d", "victoriametrics", "victorialogs", "victoriatraces"], check=True) + print("[truncate] VM storage reset, containers restarted") + +def wait_vm_healthy(timeout_s: int = 120) -> bool: + """Wait until all three VM services are healthy after restart.""" + endpoints = [ + (VM_HTTP, "/-/healthy", "VictoriaMetrics"), + (VL_HTTP, "/health", "VictoriaLogs"), + (VT_HTTP, "/health", "VictoriaTraces"), + ] + for base, path, name in endpoints: + t0 = time.time() + while time.time() - t0 < timeout_s: + try: + r = requests.get(f"{base}{path}", timeout=5) + if r.status_code == 200: + print(f"[wait] {name} healthy") + break + except Exception: + pass + time.sleep(2) + else: + print(f"[wait] {name} not healthy after {timeout_s}s") + return False + return True + def run_doris_query(sql: str) -> dict: pass_arg = f"-p{DORIS_PASS}" if DORIS_PASS else "" sh_script = ( @@ -279,6 +328,82 @@ def bench_loki_logs(run_id: str) -> dict: return res +def run_vm_logql(query: str, run_id: str = "") -> dict: + """Run a LogsQL query against VictoriaLogs native API.""" + url = f"{VL_HTTP}/select/logsql/query" + if run_id: + if " | " in query: + filt, pipes = query.split(" | ", 1) + query = f"run_id:{run_id} {filt} | {pipes}" + else: + query = f"run_id:{run_id} {query}" + params = {"query": query, "limit": 1000} + t0 = time.time() + try: + r = requests.get(url, params=params, timeout=120) + dt = time.time() - t0 + if r.status_code != 200: + return {"latency_s": dt, "rows": 0, "error": r.text[:300]} + rows = len([ln for ln in r.text.strip().splitlines() if ln.strip()]) + return {"latency_s": dt, "rows": rows} + except Exception as e: + return {"latency_s": 0, "rows": 0, "error": str(e)[:200]} + + +def run_vm_metricsql(query: str) -> dict: + """Run a MetricsQL query against VictoriaMetrics.""" + url = f"{VM_HTTP}/api/v1/query" + params = {"query": query, "step": "24h"} + t0 = time.time() + try: + r = requests.get(url, params=params, timeout=120) + dt = time.time() - t0 + if r.status_code != 200: + return {"latency_s": dt, "rows": 0, "error": r.text[:300]} + data = r.json() + results = data.get("data", {}).get("result", []) + return {"latency_s": dt, "rows": len(results)} + except Exception as e: + return {"latency_s": 0, "rows": 0, "error": str(e)[:200]} + + +def run_vm_traceql(query: str) -> dict: + """Run a LogsQL query against VictoriaTraces (uses same query language as VictoriaLogs).""" + url = f"{VT_HTTP}/select/logsql/query" + params = {"query": query, "limit": 1000} + t0 = time.time() + try: + r = requests.get(url, params=params, timeout=120) + dt = time.time() - t0 + if r.status_code != 200: + return {"latency_s": dt, "rows": 0, "error": r.text[:300]} + rows = len([ln for ln in r.text.strip().splitlines() if ln.strip()]) + return {"latency_s": dt, "rows": rows} + except Exception as e: + return {"latency_s": 0, "rows": 0, "error": str(e)[:200]} + + +def bench_vm(run_id: str = "") -> tuple[dict, dict, dict]: + """Run all VM queries. Returns (victorialogs_results, victoriametrics_results, victoriatraces_results).""" + vl, vm, vt = {}, {}, {} + for f in sorted(VM_QDIR.glob("*")): + if f.suffix not in (".logql", ".metricsql", ".traceql"): + continue + name = f.stem + query = f.read_text().strip() + try: + if f.suffix == ".logql": + vl[name] = run_vm_logql(query, run_id) + elif f.suffix == ".metricsql": + vm[name] = run_vm_metricsql(query) + elif f.suffix == ".traceql": + vt[name] = run_vm_traceql(query) + except Exception as e: + target = vl if f.suffix == ".logql" else vm if f.suffix == ".metricsql" else vt + target[name] = {"error": str(e)[:200]} + return vl, vm, vt + + def bench_backend(qdir: Path, run_fn) -> dict: results = {} for f in sorted(qdir.glob("*.sql")): @@ -292,17 +417,15 @@ def bench_backend(qdir: Path, run_fn) -> dict: return results -def get_data_volume(use_doris: bool, use_oceanbase: bool = True) -> tuple: - """Run full-scan COUNT on each backend; return (doris_vol, ch_vol, druid_vol, ob_vol).""" +def get_data_volume(use_doris: bool, use_oceanbase: bool = True, use_vm: bool = False, use_sql: bool = True) -> tuple: + """Run full-scan COUNT on each backend; return (doris_vol, ch_vol, druid_vol, ob_vol, vl_vol, vm_vol, vt_vol).""" doris_vol = {} ch_vol = {} druid_vol = {} ob_vol = {} - sql_doris = (DORIS_QDIR / "data_volume.sql").read_text() - sql_ch = (CH_QDIR / "data_volume.sql").read_text() - sql_druid = (DRUID_QDIR / "data_volume.sql").read_text() - sql_ob = (OB_QDIR / "data_volume.sql").read_text() - + vl_vol = {} + vm_vol = {} + vt_vol = {} def _parse_tsv(lines: list[str]) -> dict: counts = {} for ln in lines: @@ -315,6 +438,7 @@ def _parse_tsv(lines: list[str]) -> dict: if use_doris: try: + sql_doris = (DORIS_QDIR / "data_volume.sql").read_text() pass_arg = f"-p{DORIS_PASS}" if DORIS_PASS else "" sh_script = ( "cat > /tmp/dv.sql <<'EOSQL'\n" + sql_doris + "\nEOSQL\n" @@ -333,45 +457,49 @@ def _parse_tsv(lines: list[str]) -> dict: except Exception as e: doris_vol = {"error": str(e)[:200]} - try: - t0 = time.time() - r = requests.post(CH_HTTP, params=_ch_params({"query": sql_ch}), timeout=120) - dt = time.time() - t0 - if r.status_code == 200: - lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] - ch_vol = _parse_tsv(lines) - ch_vol["latency_s"] = dt - else: - ch_vol = {"error": r.text[:200]} - except Exception as e: - ch_vol = {"error": str(e)[:200]} + if use_sql: + sql_ch = (CH_QDIR / "data_volume.sql").read_text() + sql_druid = (DRUID_QDIR / "data_volume.sql").read_text() + try: + t0 = time.time() + r = requests.post(CH_HTTP, params=_ch_params({"query": sql_ch}), timeout=120) + dt = time.time() - t0 + if r.status_code == 200: + lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] + ch_vol = _parse_tsv(lines) + ch_vol["latency_s"] = dt + else: + ch_vol = {"error": r.text[:200]} + except Exception as e: + ch_vol = {"error": str(e)[:200]} - try: - t0 = time.time() - r = requests.post( - f"{DRUID_HTTP}/druid/v2/sql", - json={"query": sql_druid, "resultFormat": "array"}, - headers={"Content-Type": "application/json"}, - timeout=120, - ) - dt = time.time() - t0 - if r.status_code == 200: - arr = r.json() - counts = {} - for row in (arr or []): - if len(row) >= 2: - tbl = str(row[0]).lower() - cnt = int(row[1]) if isinstance(row[1], (int, float)) else 0 - counts[tbl] = cnt - counts["total"] = counts.get("logs", 0) + counts.get("spans", 0) + counts.get("metrics", 0) - counts["latency_s"] = dt - druid_vol = counts - else: - druid_vol = {"error": r.text[:200]} - except Exception as e: - druid_vol = {"error": str(e)[:200]} + try: + t0 = time.time() + r = requests.post( + f"{DRUID_HTTP}/druid/v2/sql", + json={"query": sql_druid, "resultFormat": "array"}, + headers={"Content-Type": "application/json"}, + timeout=120, + ) + dt = time.time() - t0 + if r.status_code == 200: + arr = r.json() + counts = {} + for row in (arr or []): + if len(row) >= 2: + tbl = str(row[0]).lower() + cnt = int(row[1]) if isinstance(row[1], (int, float)) else 0 + counts[tbl] = cnt + counts["total"] = counts.get("logs", 0) + counts.get("spans", 0) + counts.get("metrics", 0) + counts["latency_s"] = dt + druid_vol = counts + else: + druid_vol = {"error": r.text[:200]} + except Exception as e: + druid_vol = {"error": str(e)[:200]} if use_oceanbase: + sql_ob = (OB_QDIR / "data_volume.sql").read_text() try: t0 = time.time() out = subprocess.run( @@ -389,138 +517,211 @@ def _parse_tsv(lines: list[str]) -> dict: except Exception as e: ob_vol = {"error": str(e)[:200]} - return doris_vol, ch_vol, druid_vol, ob_vol + if use_vm: + # VictoriaLogs: count log entries via stats + try: + t0 = time.time() + r = requests.get(f"{VL_HTTP}/select/logsql/query", + params={"query": "* | stats count() as total", "limit": 1}, + timeout=30) + dt = time.time() - t0 + if r.status_code == 200: + lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] + if lines: + row = json.loads(lines[0]) + vl_vol = {"rows": int(row.get("total", 0)), "latency_s": dt} + else: + vl_vol = {"rows": 0, "latency_s": dt} + except Exception as e: + vl_vol = {"error": str(e)[:200]} + # VictoriaMetrics: count total inserted rows via /metrics internal counter + try: + t0 = time.time() + r = requests.get(f"{VM_HTTP}/metrics", timeout=30) + dt = time.time() - t0 + if r.status_code == 200: + total_rows = 0 + for line in r.text.splitlines(): + if line.startswith("vm_rows_inserted_total{"): + parts = line.rsplit(" ", 1) + if len(parts) == 2: + total_rows += int(float(parts[1])) + vm_vol = {"rows": total_rows, "latency_s": dt} + except Exception as e: + vm_vol = {"error": str(e)[:200]} + # VictoriaTraces: count spans via LogsQL stats + try: + t0 = time.time() + r = requests.get(f"{VT_HTTP}/select/logsql/query", + params={"query": "* | stats count() as total", "limit": 1}, + timeout=30) + dt = time.time() - t0 + if r.status_code == 200: + lines = [ln for ln in r.text.strip().splitlines() if ln.strip()] + if lines: + row = json.loads(lines[0]) + vt_vol = {"rows": int(row.get("total", 0)), "latency_s": dt} + else: + vt_vol = {"rows": 0, "latency_s": dt} + except Exception as e: + vt_vol = {"error": str(e)[:200]} + + return doris_vol, ch_vol, druid_vol, ob_vol, vl_vol, vm_vol, vt_vol def write_combined_report(out_dir: Path, doris_ingest: dict, ch_ingest: dict, druid_ingest: dict, doris_qres: dict, ch_qres: dict, druid_qres: dict, ob_ingest: dict | None = None, ob_qres: dict | None = None, loki_ingest: dict | None = None, loki_qres: dict | None = None, + vl_ingest: dict | None = None, vl_qres: dict | None = None, + vm_ingest: dict | None = None, vm_qres: dict | None = None, + vt_ingest: dict | None = None, vt_qres: dict | None = None, otlp_ingest: dict | None = None, data_vol: tuple | None = None) -> None: out_dir.mkdir(parents=True, exist_ok=True) - all_queries = sorted(set(doris_qres.keys()) | set(ch_qres.keys()) | set(druid_qres.keys()) - | set((ob_qres or {}).keys()) | set((loki_qres or {}).keys())) - rows = [] - chart_data = {"queries": [], "doris": [], "clickhouse": [], "druid": [], "oceanbase": [], "loki": []} backends = [("doris", doris_qres), ("clickhouse", ch_qres), ("druid", druid_qres)] if ob_qres: backends.append(("oceanbase", ob_qres)) if loki_qres: backends.append(("loki", loki_qres)) + if vl_qres: + backends.append(("victorialogs", vl_qres)) + if vm_qres: + backends.append(("victoriametrics", vm_qres)) + if vt_qres: + backends.append(("victoriatraces", vt_qres)) + all_queries = sorted(set().union(*(qr.keys() for _, qr in backends))) + _display = {"doris": "Doris", "clickhouse": "ClickHouse", "druid": "Druid", + "oceanbase": "OceanBase", "loki": "Loki", + "victorialogs": "VictoriaLogs", "victoriametrics": "VictoriaMetrics", + "victoriatraces": "VictoriaTraces"} + rows = [] + chart_data = {"queries": []} + for bname, _ in backends: + chart_data[bname] = [] for q in all_queries: - d = doris_qres.get(q, {}) - c = ch_qres.get(q, {}) - dr = druid_qres.get(q, {}) - ob = (ob_qres or {}).get(q, {}) - loki = (loki_qres or {}).get(q, {}) - d_lat, c_lat, dr_lat = d.get("latency_s", ""), c.get("latency_s", ""), dr.get("latency_s", "") - ob_lat = ob.get("latency_s", "") - loki_lat = loki.get("latency_s", "") - lats = [(d_lat, "Doris"), (c_lat, "ClickHouse"), (dr_lat, "Druid")] - if ob_qres: - lats.append((ob_lat, "OceanBase")) - if loki_qres: - lats.append((loki_lat, "Loki")) + lats = [] + cells = "" + for bname, bqres in backends: + bq = bqres.get(q, {}) + lat = bq.get("latency_s", "") + cells += f"{lat}{bq.get('rows', '')}{bq.get('error', '')}" + lats.append((lat, _display.get(bname, bname))) + chart_data[bname].append(round(lat, 4) if isinstance(lat, (int, float)) else None) valid = [(x, n) for x, n in lats if isinstance(x, (int, float))] winner, pct_diff = "", "" if len(valid) >= 2: fastest = min(valid, key=lambda t: t[0]) slowest = max(valid, key=lambda t: t[0]) winner = fastest[1] + if winner in ("VictoriaLogs", "VictoriaTraces"): + winner = "VictoriaMetrics" if slowest[0] >= 1e-9: pct = (slowest[0] - fastest[0]) / slowest[0] * 100 pct_diff = f"{pct:.1f}%" - ob_cells = f"{ob_lat}{ob.get('rows', '')}{ob.get('error', '')}" if ob_qres else "" - loki_cells = f"{loki_lat}{loki.get('rows', '')}{loki.get('error', '')}" if loki_qres else "" - rows.append(f"{q}{d_lat}{d.get('rows', '')}{d.get('error', '')}" - f"{c_lat}{c.get('rows', '')}{c.get('error', '')}" - f"{dr_lat}{dr.get('rows', '')}{dr.get('error', '')}" - f"{ob_cells}{loki_cells}{pct_diff}{winner}") + rows.append(f"{q}{cells}{pct_diff}{winner}") chart_data["queries"].append(q) - chart_data["doris"].append(round(d_lat, 4) if isinstance(d_lat, (int, float)) else None) - chart_data["clickhouse"].append(round(c_lat, 4) if isinstance(c_lat, (int, float)) else None) - chart_data["druid"].append(round(dr_lat, 4) if isinstance(dr_lat, (int, float)) else None) - chart_data["oceanbase"].append(round(ob_lat, 4) if isinstance(ob_lat, (int, float)) else None) - chart_data["loki"].append(round(loki_lat, 4) if isinstance(loki_lat, (int, float)) else None) - ingest_labels = ["Doris", "ClickHouse", "Druid"] - ingest_dur = [doris_ingest.get("duration_s"), ch_ingest.get("duration_s"), druid_ingest.get("duration_s")] - ingest_rps = [doris_ingest.get("rows_per_sec"), ch_ingest.get("rows_per_sec"), druid_ingest.get("rows_per_sec")] + all_ingests = [("Doris", doris_ingest), ("ClickHouse", ch_ingest), ("Druid", druid_ingest)] if ob_ingest and ob_ingest.get("status") == "ok": - ingest_labels.append("OceanBase") - ingest_dur.append(ob_ingest.get("duration_s")) - ingest_rps.append(ob_ingest.get("rows_per_sec")) + all_ingests.append(("OceanBase", ob_ingest)) if loki_ingest and loki_ingest.get("status") == "ok": - ingest_labels.append("Loki") - ingest_dur.append(loki_ingest.get("duration_s")) - ingest_rps.append(loki_ingest.get("rows_per_sec")) + all_ingests.append(("Loki", loki_ingest)) + if vl_ingest and vl_ingest.get("status") == "ok": + all_ingests.append(("VictoriaLogs", vl_ingest)) + if vm_ingest and vm_ingest.get("status") == "ok": + all_ingests.append(("VictoriaMetrics", vm_ingest)) + if vt_ingest and vt_ingest.get("status") == "ok": + all_ingests.append(("VictoriaTraces", vt_ingest)) + ingest_labels = [n for n, _ in all_ingests] + ingest_dur = [i.get("duration_s") for _, i in all_ingests] + ingest_rps = [i.get("rows_per_sec") for _, i in all_ingests] ingest_chart = {"labels": ingest_labels, "duration_s": ingest_dur, "rows_per_sec": ingest_rps} - otlp_rows = "" - if otlp_ingest: - mech = otlp_ingest.get("mechanism", "OTLP") - dur = otlp_ingest.get("duration_s", "-") - for backend, key in [("Doris", "doris"), ("ClickHouse", "clickhouse")]: - d = otlp_ingest.get(key, {}) - r = d.get("rows", "-") - rps = d.get("rows_per_sec", "-") - otlp_rows += f' {backend}{mech}{dur}{r}{rps}\n' - data_vol_row = "" if data_vol: d_vol, c_vol, dr_vol = data_vol[0], data_vol[1], data_vol[2] - ob_vol = data_vol[3] if len(data_vol) > 3 else {} + ob_vol_d = data_vol[3] if len(data_vol) > 3 else {} + vl_vol_d = data_vol[4] if len(data_vol) > 4 else {} + vm_vol_d = data_vol[5] if len(data_vol) > 5 else {} + vt_vol_d = data_vol[6] if len(data_vol) > 6 else {} def _fmt(v: dict) -> str: if not v or "error" in v: return v.get("error", "-") if v else "-" - total = v.get("total", 0) - return f"{total:,} (logs={v.get('logs',0):,}, spans={v.get('spans',0):,}, metrics={v.get('metrics',0):,})" + if "total" in v: + total = v["total"] + return f"{total:,} (logs={v.get('logs',0):,}, spans={v.get('spans',0):,}, metrics={v.get('metrics',0):,})" + return f"{v.get('rows', 0):,}" def _lat(v: dict) -> str: lat = v.get("latency_s") if v else None return f"{lat:.3f}s" if isinstance(lat, (int, float)) else "-" - ob_row = f"OceanBase{_fmt(ob_vol)}{_lat(ob_vol)}" if ob_vol else "" + vol_rows = [ + ("Doris", d_vol), ("ClickHouse", c_vol), ("Druid", dr_vol), + ] + if ob_vol_d: + vol_rows.append(("OceanBase", ob_vol_d)) + if vl_vol_d: + vol_rows.append(("VictoriaLogs", vl_vol_d)) + if vm_vol_d: + vol_rows.append(("VictoriaMetrics", vm_vol_d)) + if vt_vol_d: + vol_rows.append(("VictoriaTraces", vt_vol_d)) + vol_html = "\n".join(f" {name}{_fmt(v)}{_lat(v)}" for name, v in vol_rows) data_vol_row = f"""

Data volume (full scan)

Total rows in telemetry tables at query time. Latency = full COUNT(*) time.

- - - - - {ob_row} + +{vol_html}
BackendTotal rows (logs, spans, metrics)Full-scan latency
Doris{_fmt(d_vol)}{_lat(d_vol)}
ClickHouse{_fmt(c_vol)}{_lat(c_vol)}
Druid{_fmt(dr_vol)}{_lat(dr_vol)}
BackendTotal rowsFull-scan latency
""" chart_json = json.dumps(chart_data) ingest_json = json.dumps(ingest_chart) + report_title = " vs ".join(_display.get(n, n) for n, _ in backends) + " benchmark" + ingest_rows_html = "\n".join( + f' {name}{ing.get("mechanism", "-")}{ing.get("duration_s", "-")}' + f'{ing.get("rows", "-")}{ing.get("rows_per_sec", "-")}' + for name, ing in all_ingests) + if otlp_ingest: + mech = otlp_ingest.get("mechanism", "OTLP") + dur = otlp_ingest.get("duration_s", "-") + otlp_count = otlp_ingest.get("count", 1000) + vm_mech = { + "victorialogs": f"OTLP ({otlp_count} logs)", + "victoriametrics": f"OTLP ({otlp_count} metrics)", + "victoriatraces": f"OTLP ({otlp_count} spans)", + } + for backend, key in [("Doris", "doris"), ("ClickHouse", "clickhouse"), + ("VictoriaLogs", "victorialogs"), ("VictoriaMetrics", "victoriametrics"), + ("VictoriaTraces", "victoriatraces")]: + d = otlp_ingest.get(key, {}) + if d and d.get("rows", 0) > 0: + row_mech = vm_mech.get(key, mech) + ingest_rows_html += f'\n {backend}{row_mech}{dur}{d.get("rows", "-")}{d.get("rows_per_sec", "-")}' + query_th = "".join(f'{_display.get(n,n)} lat{_display.get(n,n)} rows{_display.get(n,n)} err' for n, _ in backends) + colors = ['rgba(54,162,235,0.7)', 'rgba(75,192,192,0.7)', 'rgba(255,159,64,0.7)', + 'rgba(153,102,255,0.7)', 'rgba(255,99,132,0.7)', + 'rgba(0,200,83,0.7)', 'rgba(255,206,86,0.7)', 'rgba(231,76,60,0.7)'] + chart_datasets = ", ".join( + f'{{ label: "{_display.get(n,n)}", data: data["{n}"], backgroundColor: "{colors[i % len(colors)]}" }}' + for i, (n, _) in enumerate(backends)) html = f""" -Doris vs ClickHouse vs Druid benchmark +{report_title} -

Doris vs ClickHouse vs Druid benchmark

+

{report_title}

Generated at {datetime.now().isoformat()}

Ingestion comparison

- - - - {f'' if ob_ingest and ob_ingest.get('status') == 'ok' else ''} - {f'' if loki_ingest else ''} - {otlp_rows} +{ingest_rows_html}
BackendMechanismDuration (s)RowsRows/sec
Doris{doris_ingest.get('mechanism', '-')}{doris_ingest.get('duration_s', '-')}{doris_ingest.get('rows', '-')}{doris_ingest.get('rows_per_sec', '-')}
ClickHouse{ch_ingest.get('mechanism', '-')}{ch_ingest.get('duration_s', '-')}{ch_ingest.get('rows', '-')}{ch_ingest.get('rows_per_sec', '-')}
Druid{druid_ingest.get('mechanism', '-')}{druid_ingest.get('duration_s', '-')}{druid_ingest.get('rows', '-')}{druid_ingest.get('rows_per_sec', '-')}
OceanBase{ob_ingest.get("mechanism", "-")}{ob_ingest.get("duration_s", "-")}{ob_ingest.get("rows", "-")}{ob_ingest.get("rows_per_sec", "-")}
Loki{loki_ingest.get("mechanism", "-") if loki_ingest and loki_ingest.get("status") == "ok" else ("skipped" if loki_ingest and loki_ingest.get("status") == "skipped" else loki_ingest.get("error", "error"))}{loki_ingest.get("duration_s", "-") if loki_ingest and loki_ingest.get("status") == "ok" else "-"}{loki_ingest.get("rows", "-") if loki_ingest and loki_ingest.get("status") == "ok" else "-"}{loki_ingest.get("rows_per_sec", "-") if loki_ingest and loki_ingest.get("status") == "ok" else "-"}
-

Raw ingest: Doris {json.dumps(doris_ingest)} | CH {json.dumps(ch_ingest)} | Druid {json.dumps(druid_ingest)}

{data_vol_row}

Query latency (seconds)

Query comparison

- - - - {"" if ob_qres else ""} - {"" if loki_qres else ""} - + {query_th} {chr(10).join(rows)}
queryDoris latDoris rowsDoris errCH latCH rowsCH errDruid latDruid rowsDruid errOceanBase latOceanBase rowsOceanBase errLoki latLoki rowsLoki err% difffaster
query% difffaster