Skip to content
This repository was archived by the owner on Jun 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ benchmarks/longmemeval/**/*.pyc
benchmarks/longmemeval/data/
benchmarks/longmemeval/results/
benchmarks/longmemeval/outputs/
!benchmarks/common/
!benchmarks/common/**
benchmarks/common/**/__pycache__/
benchmarks/common/**/*.pyc
!benchmarks/locomo/
!benchmarks/locomo/**
benchmarks/locomo/**/__pycache__/
benchmarks/locomo/**/*.pyc
benchmarks/locomo/data/
benchmarks/locomo/results/
benchmarks/locomo/outputs/
!benchmarks/beam/
!benchmarks/beam/**
benchmarks/beam/**/__pycache__/
benchmarks/beam/**/*.pyc
benchmarks/beam/data/
benchmarks/beam/results/
benchmarks/beam/outputs/
backboard/
rust/

Expand Down
6 changes: 4 additions & 2 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
This directory contains benchmark harnesses for XMem.

- `longmemeval/`: Python-only LongMemEval benchmark runner targeting the XMem HTTP API.
- `locomo/`: Python-only LoCoMo benchmark runner targeting the XMem HTTP API.
- `beam/`: Python-only BEAM runner, defaulting to the Hugging Face BEAM 1M split.

Benchmark runs can create large dataset and result artifacts. Keep those files under
`benchmarks/longmemeval/data`, `benchmarks/longmemeval/results`, or
`benchmarks/longmemeval/outputs`; those paths are intentionally ignored by git.
each benchmark's `data`, `results`, or `outputs` directory; those paths are
intentionally ignored by git.
53 changes: 53 additions & 0 deletions benchmarks/beam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# BEAM 1M Benchmark for XMem Python

This harness benchmarks the Python XMem API on the BEAM dataset, defaulting to
the `1M` split from `Mohammadta/BEAM` on Hugging Face. It does not run or
compare the Go implementation.

BEAM rows contain long `chat` histories and stringified `probing_questions`.
The dataset card lists ten memory ability types: abstention, contradiction
resolution, event ordering, information extraction, instruction following,
knowledge update, multi-session reasoning, preference following, summarization,
and temporal reasoning.

## Dependencies

BEAM is distributed as parquet. Install `pyarrow` before reading the downloaded
dataset:

```bash
pip install pyarrow
```

## Smoke Check

```bash
python -m benchmarks.beam.run \
--split 1M \
--download \
--dry-run \
--limit 1
```

This downloads the BEAM 1M parquet file, validates parsing, counts ingest items,
and does not call XMem.

## Run Against XMem

```bash
export XMEM_API_KEY="..."

python -m benchmarks.beam.run \
--split 1M \
--dataset-path benchmarks/beam/data/1M-00000-of-00001.parquet \
--api-base-url https://api.xmem.in \
--output-dir benchmarks/beam/results/beam-1m
```

Outputs:

- `results.jsonl`: full per-question records with local proxy metrics
- `predictions.jsonl`: `question_id` and `hypothesis`
- `summary.json`: local exact/contains/token-F1 grouped by BEAM question type

Use BEAM's official/equivalent evaluator for publication-quality accuracy.
1 change: 1 addition & 0 deletions benchmarks/beam/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""BEAM benchmark harness for the Python XMem API."""
63 changes: 63 additions & 0 deletions benchmarks/beam/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Configuration for the BEAM benchmark harness."""

from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path


DEFAULT_API_BASE_URL = "https://api.xmem.in"
DEFAULT_API_KEY_ENV = "XMEM_API_KEY"
DEFAULT_SPLIT = "1M"
DEFAULT_DATASET_URLS = {
"100K": (
"https://huggingface.co/datasets/Mohammadta/BEAM/resolve/main/"
"data/100K-00000-of-00001.parquet"
),
"500K": (
"https://huggingface.co/datasets/Mohammadta/BEAM/resolve/main/"
"data/500K-00000-of-00001.parquet"
),
"1M": (
"https://huggingface.co/datasets/Mohammadta/BEAM/resolve/main/"
"data/1M-00000-of-00001.parquet"
),
}


@dataclass(frozen=True)
class BenchmarkConfig:
dataset_path: Path
output_dir: Path
api_base_url: str = DEFAULT_API_BASE_URL
api_key_env: str = DEFAULT_API_KEY_ENV
api_timeout_seconds: float = 120.0
max_retries: int = 3
retry_backoff_seconds: float = 2.0
batch_size: int = 25
ingest_api_version: str = "v2"
poll_interval_seconds: float = 2.0
poll_timeout_seconds: float = 1800.0
top_k: int = 10
effort_level: str = "low"
user_prefix: str = "beam"
limit: int | None = None
offset: int = 0
question_type: str | None = None
split: str = DEFAULT_SPLIT
skip_ingest: bool = False
resume: bool = True
dry_run: bool = False

@property
def api_key(self) -> str:
return os.getenv(self.api_key_env, "").strip()

def require_api_key(self) -> str:
api_key = self.api_key
if not api_key:
raise RuntimeError(
f"Missing API key. Set {self.api_key_env} before running BEAM."
)
return api_key
223 changes: 223 additions & 0 deletions benchmarks/beam/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
"""Dataset loading and normalization for BEAM."""

from __future__ import annotations

import ast
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable

from benchmarks.common.io import download_file, read_records

from .config import DEFAULT_DATASET_URLS


@dataclass(frozen=True)
class BeamTurn:
role: str
content: str
time_anchor: str = ""
message_id: str = ""


@dataclass(frozen=True)
class BeamConversation:
conversation_id: str
chat_sessions: list[list[BeamTurn]] = field(default_factory=list)


@dataclass(frozen=True)
class BeamExample:
question_id: str
conversation_id: str
question: str
answer: str
question_type: str = ""
split: str = "1M"
chat_sessions: list[list[BeamTurn]] = field(default_factory=list)

@property
def user_id_suffix(self) -> str:
safe = "".join(
ch if ch.isalnum() or ch in {"-", "_"} else "_" for ch in self.question_id
)
return safe.strip("_") or "example"


@dataclass(frozen=True)
class IngestItem:
user_query: str
agent_response: str
user_id: str
session_datetime: str = ""
effort_level: str = "low"


def download_dataset(split: str, destination: Path) -> Path:
if split not in DEFAULT_DATASET_URLS:
known = ", ".join(sorted(DEFAULT_DATASET_URLS))
raise ValueError(f"Unknown BEAM split '{split}'. Known splits: {known}")
return download_file(
DEFAULT_DATASET_URLS[split],
destination,
timeout_seconds=300.0,
)


def load_examples(path: Path, *, split: str = "1M") -> list[BeamExample]:
records = read_records(path)
examples: list[BeamExample] = []
for conv_index, record in enumerate(records):
conversation_id = str(
record.get("conversation_id") or f"conversation-{conv_index}"
)
chat_sessions = _parse_chat(record.get("chat") or [])
questions = _parse_probing_questions(record.get("probing_questions") or [])
for q_index, question_record in enumerate(questions):
question = _first_text(
question_record,
("question", "query", "prompt", "user_question"),
)
if not question:
continue
answer = _first_text(
question_record,
("answer", "gold_answer", "reference"),
)
question_type = _first_text(
question_record,
("question_type", "type", "ability", "category"),
)
examples.append(
BeamExample(
question_id=str(
question_record.get("question_id")
or question_record.get("id")
or f"{conversation_id}-q-{q_index}"
),
conversation_id=conversation_id,
question=question,
answer=answer,
question_type=question_type or "unknown",
split=split,
chat_sessions=chat_sessions,
)
)
return examples


def select_examples(
examples: Iterable[BeamExample],
*,
offset: int = 0,
limit: int | None = None,
question_type: str | None = None,
) -> list[BeamExample]:
selected = list(examples)
if question_type:
selected = [
example
for example in selected
if example.question_type.lower() == question_type.lower()
]
if offset:
selected = selected[offset:]
if limit is not None:
selected = selected[:limit]
return selected


def build_ingest_items(
example: BeamExample,
*,
user_id: str,
effort_level: str = "low",
) -> list[IngestItem]:
items: list[IngestItem] = []
for session in example.chat_sessions:
for index, turn in enumerate(session):
next_turn = session[index + 1] if index + 1 < len(session) else None
items.append(
IngestItem(
user_query=_format_turn(turn),
agent_response=_format_turn(next_turn) if next_turn else "",
user_id=user_id,
session_datetime=turn.time_anchor,
effort_level=effort_level,
)
)
return items
Comment thread
greptile-apps[bot] marked this conversation as resolved.


def _parse_chat(raw_chat: Any) -> list[list[BeamTurn]]:
raw_chat = _coerce_literal(raw_chat)
if not isinstance(raw_chat, list):
return []
if raw_chat and isinstance(raw_chat[0], dict):
raw_chat = [raw_chat]

sessions: list[list[BeamTurn]] = []
for raw_session in raw_chat:
if not isinstance(raw_session, list):
continue
turns = [_parse_turn(item) for item in raw_session]
turns = [turn for turn in turns if turn and turn.content]
if turns:
sessions.append(turns)
return sessions


def _parse_turn(raw_turn: Any) -> BeamTurn | None:
if not isinstance(raw_turn, dict):
return None
return BeamTurn(
role=str(raw_turn.get("role") or "message"),
content=str(raw_turn.get("content") or raw_turn.get("text") or "").strip(),
time_anchor=str(raw_turn.get("time_anchor") or ""),
message_id=str(raw_turn.get("id") or raw_turn.get("index") or ""),
)


def _parse_probing_questions(raw_questions: Any) -> list[dict[str, Any]]:
raw_questions = _coerce_literal(raw_questions)
if isinstance(raw_questions, dict):
values = raw_questions.values()
return [item for item in values if isinstance(item, dict)]
if isinstance(raw_questions, list):
return [item for item in raw_questions if isinstance(item, dict)]
return []


def _coerce_literal(value: Any) -> Any:
if not isinstance(value, str):
return value
text = value.strip()
if not text:
return []
try:
return ast.literal_eval(text)
except (SyntaxError, ValueError):
try:
return json.loads(text)
except json.JSONDecodeError:
return value
Comment thread
ved015 marked this conversation as resolved.


def _first_text(record: dict[str, Any], keys: tuple[str, ...]) -> str:
for key in keys:
value = record.get(key)
if value is not None:
return str(value).strip()
return ""


def _format_turn(turn: BeamTurn | None) -> str:
if turn is None:
return ""
prefix = turn.role
if turn.message_id:
prefix = f"{prefix} ({turn.message_id})"
if turn.time_anchor:
prefix = f"{prefix} [{turn.time_anchor}]"
return f"{prefix}: {turn.content}"
Loading
Loading