diff --git a/README.md b/README.md index 88456dc..5d4416b 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ OHLCV data is hosted on Google Drive. Use the `portbench` CLI to download it: ```bash pip install gdown portbench download-data --exchange portfoliobench # crypto + US stocks + global indices -portbench download-data --exchange polymarket # Polymarket prediction-market contracts +portbench download-data --exchange polymarket # Polymarket prediction-market contracts, orderbooks and markets metadata ``` This downloads feather files into the data directory (`user_data/data/portfoliobench/` or `user_data/data/polymarket/`). @@ -264,6 +264,42 @@ PortfolioBench supports backtesting on **Polymarket**, a decentralized predictio bash utils/backtest_polymarket.bash ``` +### Orderbook Dataset Pipeline + +To build a feature-enriched orderbook dataset from raw Polymarket data, use the pipeline script in `dataset/polymarket_orderbook/`: + +```bash +# Run the full pipeline with defaults (2025-10-14 → 2026-03-31, top 70 markets) +python dataset/polymarket_orderbook/run_pipeline.py +``` + +The pipeline runs three stages in sequence: + +| Stage | Script | Output | +|-------|--------|--------| +| 1. Parse | `parser.py` | `markets.parquet`, `tokens.parquet`, `filtered_token_ids.parquet` | +| 2. Fetch | `fetch_orderbook.py` | `raw_orderbook/ob_.parquet` per token | +| 3. Features | `orderbook_feature_generation.py` | `feat_orderbook/feat_.parquet` per token | + +The pipeline is resumable — already-fetched tokens are skipped automatically. + +**Common options:** + +```bash +# Custom date range +python dataset/polymarket_orderbook/run_pipeline.py --start-date 2025-01-01 --end-date 2025-06-30 + +# Skip stages already completed +python dataset/polymarket_orderbook/run_pipeline.py --skip-parse +python dataset/polymarket_orderbook/run_pipeline.py --skip-parse --skip-fetch + +# Re-fetch all orderbooks even if already saved +python dataset/polymarket_orderbook/run_pipeline.py --skip-parse --force + +# Filter by market count or end date +python dataset/polymarket_orderbook/run_pipeline.py --top-markets 100 --min-end-date 2025-06-01 +``` + --- ## Hyperparameter Optimization @@ -330,7 +366,7 @@ PortfolioBench/ ├── benchmark_all.py # Full benchmark matrix runner ├── cli.py # CLI entry point ├── generate_report.py # Report generation utilities -├── dataset/ # Data management module (placeholder) +├── dataset/polymarket_orderbook/ # Orderbook dataset pipeline (parse → fetch → features) ├── tests/ # Unit and integration tests ├── user_data/data/usstock/ # 357 OHLCV feather files (download from Google Drive) └── utils/ # Bash helpers for backtesting and data generation diff --git a/alpha/OrderbookAlpha.py b/alpha/OrderbookAlpha.py new file mode 100644 index 0000000..e1c08da --- /dev/null +++ b/alpha/OrderbookAlpha.py @@ -0,0 +1,103 @@ +"""Orderbook-derived alpha for Polymarket contracts. + +Loads a pre-computed feature parquet (from run_pipeline.py), backward-fills +the latest orderbook snapshot onto each OHLCV candle, and adds two columns: + + ob_imbalance : raw top-3-level orderbook imbalance [-1, 1] + ob_imbalance_ema: EMA-smoothed version (span=ema_span candles) + +Token-id is resolved automatically from the pair name using: + freqtrade_pair_mapping.csv → condition_id + markets.parquet → market_id + tokens.parquet → token_id +""" + +import re +from pathlib import Path + +import numpy as np +import pandas as pd + +from alpha.interface import IAlpha + +_DATA_DIR = Path(__file__).resolve().parents[1] / "user_data" / "data" / "polymarket" +_FEATURE_DIR = _DATA_DIR / "feat_orderbook" + +# Cached once per process: pair_base → token_id +_pair_token_map: dict[str, str] | None = None + + +def _build_pair_token_map() -> dict[str, str]: + mapping = pd.read_csv(_DATA_DIR / "freqtrade_pair_mapping.csv") + markets = pd.read_parquet(_DATA_DIR / "markets.parquet", columns=["market_id", "condition_id"]) + tokens = pd.read_parquet(_DATA_DIR / "tokens.parquet", columns=["token_id", "market_id", "outcome"]) + + cond_to_market = dict(zip(markets["condition_id"].str.lower(), markets["market_id"].astype(str))) + tok_idx = { + (str(r.market_id), r.outcome.strip().lower()): str(r.token_id) + for r in tokens.itertuples(index=False) + } + + result: dict[str, str] = {} + for _, row in mapping.iterrows(): + cond_id = str(row["Original_Condition_ID"]).lower() + market_id = cond_to_market.get(cond_id) + if not market_id: + continue + + # "SomePairYES20250430_USDC-4h.feather" → pair_base = "SomePairYES20250430" + stem = re.sub(r"-\d+[mhd]$", "", str(row["New_Filename"]).removesuffix(".feather")) + if "_" not in stem: + continue + pair_base = stem.rsplit("_", 1)[0] + + outcome_key = "yes" if "YES" in pair_base.upper() else "no" if "NO" in pair_base.upper() else None + if not outcome_key: + continue + + token_id = tok_idx.get((market_id, outcome_key)) + if token_id: + result[pair_base] = token_id + + return result + + +def _lookup_token_id(pair: str) -> str: + global _pair_token_map + if _pair_token_map is None: + _pair_token_map = _build_pair_token_map() + return _pair_token_map.get(pair.split("/")[0], "") + + +class OrderbookAlpha(IAlpha): + def __init__(self, dataframe: pd.DataFrame, metadata: dict = None, ema_span: int = 8): + self.ema_span = ema_span + super().__init__(dataframe, metadata) + + def process(self) -> pd.DataFrame: + df = self.dataframe + token_id = self.metadata.get("token_id") or _lookup_token_id(self.metadata.get("pair", "")) + + feat_path = _FEATURE_DIR / f"feat_{token_id}.parquet" + if not token_id or not feat_path.exists(): + df["ob_imbalance"] = np.nan + df["ob_imbalance_ema"] = np.nan + return df + + feat = pd.read_parquet(feat_path, columns=["snapshot_time", "imbalance_3"]) + feat["snapshot_time"] = pd.to_datetime(feat["snapshot_time"]).dt.tz_localize(None) + feat = feat.sort_values("snapshot_time").reset_index(drop=True) + + candle_dates = pd.to_datetime(df["date"]).dt.tz_localize(None) + order = np.argsort(candle_dates.values) + left = pd.DataFrame({"date": candle_dates.iloc[order].values, "_idx": order}) + merged = ( + pd.merge_asof(left, feat.rename(columns={"snapshot_time": "date"}), on="date", direction="backward") + .sort_values("_idx") + .reset_index(drop=True) + ) + + imb = merged["imbalance_3"].fillna(0.0) + df["ob_imbalance"] = imb.values + df["ob_imbalance_ema"] = imb.ewm(span=self.ema_span, adjust=False).mean().values + return df diff --git a/dataset/polymarket_orderbook/__init__.py b/dataset/polymarket_orderbook/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataset/polymarket_orderbook/fetch_orderbook.py b/dataset/polymarket_orderbook/fetch_orderbook.py new file mode 100644 index 0000000..bff14ed --- /dev/null +++ b/dataset/polymarket_orderbook/fetch_orderbook.py @@ -0,0 +1,75 @@ +import argparse +import sys +from pathlib import Path + +import pandas as pd + +_PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(_PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(_PROJECT_ROOT)) + +from dataset.polymarket_orderbook.utils.paths import FILTERED_TOKEN_IDS, ORDERBOOK_DIR, ensure_dirs +from dataset.polymarket_orderbook.utils.fetch_orderbook import fetch_orderbook_from_ids_async + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Fetch Polymarket orderbook snapshots from DomeAPI for filtered token IDs." + ) + parser.add_argument("--filtered-tokens-path", type=Path, default=FILTERED_TOKEN_IDS) + parser.add_argument("--raw-orderbook-dir", type=Path, default=ORDERBOOK_DIR) + parser.add_argument("--start-date", type=str, default="2025-10-14") + parser.add_argument("--end-date", type=str, default="2026-03-31") + parser.add_argument("--max-concurrent", type=int, default=3) + parser.add_argument("--batch-size", type=int, default=10, + help="Number of tokens to process per batch before freeing memory (default: 10).") + parser.add_argument( + "--force", + action="store_true", + help="Re-fetch tokens that already have a saved parquet file.", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + ensure_dirs() + args.raw_orderbook_dir.mkdir(parents=True, exist_ok=True) + + # Load token IDs from the filtered parquet produced by polymarket_parser.py + token_ids_df = pd.read_parquet(args.filtered_tokens_path) + all_token_ids = token_ids_df["token_id"].astype(str).tolist() + print(f"Loaded {len(all_token_ids)} token IDs from {args.filtered_tokens_path}") + + # Resume: skip tokens whose output file already exists + if args.force: + token_ids = all_token_ids + else: + already_done = { + p.stem[len("ob_"):] for p in args.raw_orderbook_dir.glob("ob_*.parquet") + } + token_ids = [t for t in all_token_ids if t not in already_done] + if already_done: + print( + f"Skipping {len(already_done)} already-fetched token(s). " + f"{len(token_ids)} remaining. Use --force to re-fetch all." + ) + + if not token_ids: + print("Nothing to fetch.") + return + + print(f"Fetching orderbook for {len(token_ids)} token(s) ({args.start_date} → {args.end_date})") + + fetch_orderbook_from_ids_async( + token_ids=token_ids, + start_date=args.start_date, + end_date=args.end_date, + output_path=args.raw_orderbook_dir, + max_concurrent=args.max_concurrent, + batch_size=args.batch_size, + ) + + +if __name__ == "__main__": + main() diff --git a/dataset/polymarket_orderbook/orderbook_feature_generation.py b/dataset/polymarket_orderbook/orderbook_feature_generation.py new file mode 100644 index 0000000..134c132 --- /dev/null +++ b/dataset/polymarket_orderbook/orderbook_feature_generation.py @@ -0,0 +1,87 @@ +import argparse +import sys +from pathlib import Path + +import pandas as pd + +_PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(_PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(_PROJECT_ROOT)) + +from dataset.polymarket_orderbook.utils.paths import ( + DATA_DIR, + FILTERED_TOKEN_IDS, + ORDERBOOK_DIR, + FEATURE_DIR, + ensure_dirs, +) +from dataset.polymarket_orderbook.utils.orderbook_feature_generation import build_token_feature_table_from_parquet + + +def parse_args(): + parser = argparse.ArgumentParser(description="Build per-token orderbook feature parquet files.") + parser.add_argument("--tokens-path", type=Path, default=DATA_DIR / "tokens.parquet") + parser.add_argument("--filtered-tokens-path", type=Path, default=FILTERED_TOKEN_IDS) + parser.add_argument("--raw-orderbook-dir", type=Path, default=ORDERBOOK_DIR) + parser.add_argument("--feat-orderbook-dir", type=Path, default=FEATURE_DIR) + parser.add_argument("--depth-n", type=int, default=3) + parser.add_argument("--drop-json-cols", action="store_true", default=True) + parser.add_argument( + "--process-all-files", + action="store_true", + help="Ignore filtered token list and process all ob_*.parquet files.", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + ensure_dirs() + args.feat_orderbook_dir.mkdir(parents=True, exist_ok=True) + + # Load filtered token IDs + relevant_token_ids = set(pd.read_parquet(args.filtered_tokens_path)["token_id"].astype(str)) + print(f"Loaded {len(relevant_token_ids)} filtered token IDs") + + # Load token metadata, restricted to filtered set + tokens_df = pd.read_parquet(args.tokens_path) + tokens_df = tokens_df[tokens_df["token_id"].astype(str).isin(relevant_token_ids)] + print(f"Token metadata rows after filter: {len(tokens_df):,}") + + files = sorted(args.raw_orderbook_dir.glob("ob_*.parquet")) + if not files: + print(f"No ob_*.parquet files found in {args.raw_orderbook_dir}") + return + + if not args.process_all_files: + files = [p for p in files if p.stem[len("ob_"):] in relevant_token_ids] + + print(f"Files to process: {len(files)}") + + success = 0 + failed = 0 + + for i, file_path in enumerate(files, start=1): + token_stub = file_path.stem[len("ob_"):] + output_path = args.feat_orderbook_dir / f"feat_{token_stub}.parquet" + + print(f"[{i}/{len(files)}] {file_path.name}") + try: + feat_df = build_token_feature_table_from_parquet( + input_path=file_path, + output_path=output_path, + depth_n=args.depth_n, + drop_json_cols=args.drop_json_cols, + token_meta_df=tokens_df, + ) + print(f" Saved {len(feat_df):,} rows -> {output_path.name}") + success += 1 + except Exception as e: + print(f" Failed: {e}") + failed += 1 + + print(f"Done. Success: {success}, Failed: {failed}") + + +if __name__ == "__main__": + main() diff --git a/dataset/polymarket_orderbook/parser.py b/dataset/polymarket_orderbook/parser.py new file mode 100644 index 0000000..6e1ae5d --- /dev/null +++ b/dataset/polymarket_orderbook/parser.py @@ -0,0 +1,65 @@ +import argparse +import sys +from pathlib import Path + +import pandas as pd + +# Ensure project root is on sys.path so dataset.utils imports work +_PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(_PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(_PROJECT_ROOT)) + +from dataset.polymarket_orderbook.utils.paths import DATA_DIR, MARKET_JSONL, FILTERED_TOKEN_IDS, ensure_dirs +from dataset.polymarket_orderbook.utils.parser import read_market_to_df, read_token_to_df + + +def parse_args(): + parser = argparse.ArgumentParser(description="Parse polymarket JSONL into markets, tokens, and filtered token parquet files.") + parser.add_argument("--market-jsonl", type=Path, default=MARKET_JSONL) + parser.add_argument("--markets-path", type=Path, default=DATA_DIR / "markets.parquet") + parser.add_argument("--tokens-path", type=Path, default=DATA_DIR / "tokens.parquet") + parser.add_argument("--filtered-tokens-path", type=Path, default=FILTERED_TOKEN_IDS) + parser.add_argument("--min-end-date", type=str, default="2026-01-01") + parser.add_argument("--top-markets", type=int, default=70) + return parser.parse_args() + + +def main(): + args = parse_args() + ensure_dirs() + + # 1) Markets table + markets_df = read_market_to_df(args.market_jsonl) + print("markets_df shape:", markets_df.shape) + markets_df.to_parquet(args.markets_path, index=False) + print(f"Saved: {args.markets_path}") + + # 2) Select relevant markets + working = markets_df.dropna(subset=["question", "market_id", "end_date", "closed_time"]).copy() + working["end_date_ts"] = pd.to_datetime(working["end_date"], errors="coerce", utc=True) + min_end_ts = pd.Timestamp(args.min_end_date, tz="UTC") + + markets_filtered_df = ( + working[working["end_date_ts"] > min_end_ts] + .sort_values("volume", ascending=False) + .head(args.top_markets) + ) + relevant_market_ids = set(markets_filtered_df["market_id"].astype(str)) + print(f"Selected markets: {len(relevant_market_ids)}") + + # 3) Tokens table + tokens_df = read_token_to_df(args.market_jsonl) + print("tokens_df shape:", tokens_df.shape) + tokens_df.to_parquet(args.tokens_path, index=False) + print(f"Saved: {args.tokens_path}") + + # 4) Filter token IDs by selected markets + tokens_filtered_df = tokens_df[tokens_df["market_id"].astype(str).isin(relevant_market_ids)].copy() + relevant_token_ids = sorted(set(tokens_filtered_df["token_id"].astype(str))) + + pd.DataFrame({"token_id": relevant_token_ids}).to_parquet(args.filtered_tokens_path, index=False) + print(f"Saved token ID list ({len(relevant_token_ids)}): {args.filtered_tokens_path}") + + +if __name__ == "__main__": + main() diff --git a/dataset/polymarket_orderbook/run_pipeline.py b/dataset/polymarket_orderbook/run_pipeline.py new file mode 100644 index 0000000..6b3cf5b --- /dev/null +++ b/dataset/polymarket_orderbook/run_pipeline.py @@ -0,0 +1,98 @@ +""" +Orchestrates the full Polymarket dataset pipeline in order: + + 1. parser.py — parse JSONL → markets/tokens/filtered_token_ids parquet + 2. fetch_orderbook.py — fetch raw orderbook snapshots per token + 3. orderbook_feature_generation.py — build per-token feature parquet files + +Run with defaults: + python dataset/polymarket_orderbook/run_pipeline.py + +Skip a stage you've already completed: + python dataset/polymarket_orderbook/run_pipeline.py --skip-parse --skip-fetch +""" + +import argparse +import subprocess +import sys +from pathlib import Path + +DATASET_DIR = Path(__file__).resolve().parent +PYTHON = sys.executable + + +def run(cmd: list[str], label: str) -> None: + print(f"\n{'='*60}") + print(f"STAGE: {label}") + print(f"{'='*60}") + result = subprocess.run(cmd) + if result.returncode != 0: + print(f"\nERROR: stage '{label}' failed (exit {result.returncode}). Aborting.") + sys.exit(result.returncode) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run the full Polymarket dataset pipeline end-to-end." + ) + + # Stage skips + parser.add_argument("--skip-parse", action="store_true", help="Skip parser.py") + parser.add_argument("--skip-fetch", action="store_true", help="Skip fetch_orderbook.py") + parser.add_argument("--skip-features", action="store_true", help="Skip orderbook_feature_generation.py") + + # Parser args + parser.add_argument("--market-jsonl", type=str, help="Path to polymarket_markets_1y.jsonl") + parser.add_argument("--min-end-date", type=str, default="2026-01-01") + parser.add_argument("--top-markets", type=int, default=70) + + # Fetch args + parser.add_argument("--start-date", type=str, default="2025-10-14") + parser.add_argument("--end-date", type=str, default="2026-03-31") + parser.add_argument("--max-concurrent", type=int, default=3) + parser.add_argument("--batch-size", type=int, default=10) + parser.add_argument("--force", action="store_true", help="Re-fetch already-saved orderbook files") + + # Feature pipeline args + parser.add_argument("--depth-n", type=int, default=3) + parser.add_argument("--process-all-files", action="store_true") + + return parser.parse_args() + + +def main() -> None: + args = parse_args() + + # --- Stage 1: Parse --- + if not args.skip_parse: + cmd = [PYTHON, str(DATASET_DIR / "parser.py"), + "--min-end-date", args.min_end_date, + "--top-markets", str(args.top_markets)] + if args.market_jsonl: + cmd += ["--market-jsonl", args.market_jsonl] + run(cmd, "parser.py") + + # --- Stage 2: Fetch orderbooks --- + if not args.skip_fetch: + cmd = [PYTHON, str(DATASET_DIR / "fetch_orderbook.py"), + "--start-date", args.start_date, + "--end-date", args.end_date, + "--max-concurrent", str(args.max_concurrent), + "--batch-size", str(args.batch_size)] + if args.force: + cmd.append("--force") + run(cmd, "fetch_orderbook.py") + + # --- Stage 3: Build features --- + if not args.skip_features: + cmd = [PYTHON, str(DATASET_DIR / "orderbook_feature_generation.py"), + "--depth-n", str(args.depth_n)] + if args.process_all_files: + cmd.append("--process-all-files") + run(cmd, "orderbook_feature_generation.py") + + print("\nPipeline complete.") + + +if __name__ == "__main__": + main() diff --git a/dataset/polymarket_orderbook/utils/__init__.py b/dataset/polymarket_orderbook/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataset/polymarket_orderbook/utils/fetch_orderbook.py b/dataset/polymarket_orderbook/utils/fetch_orderbook.py new file mode 100644 index 0000000..568fcb8 --- /dev/null +++ b/dataset/polymarket_orderbook/utils/fetch_orderbook.py @@ -0,0 +1,156 @@ +import json +import random +import time +import numpy as np +import pandas as pd +import requests +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +API_KEY = "beb79777a5762ef81b41fbaae1dbb75d23fcee28" +DOME_HEADERS = { + "Authorization": f"Bearer {API_KEY}", + "x-api-key": API_KEY, + "Accept-Encoding": "identity", +} + +DOME_URL = "https://api.domeapi.io/v1/polymarket/orderbooks" +RETRYABLE_STATUS = {429, 500, 502, 503, 504} + + +def fetch_orderbook_from_ids_async( + token_ids, + start_date, + end_date, + output_path, + max_concurrent=3, + max_retries=6, + batch_size=10, +): + """Fetch orderbook snapshots for a list of token IDs and save as parquet files. + """ + output_path = Path(output_path) + + dome_history_start = pd.Timestamp("2025-10-14", tz="UTC") + + def _to_utc_timestamp(value): + if value is None or pd.isna(value): + return None + ts = pd.Timestamp(value) + return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC") + + start_ts = _to_utc_timestamp(start_date) + end_ts = _to_utc_timestamp(end_date) + + if start_ts is None or end_ts is None: + raise ValueError("start_date and end_date must be valid timestamps") + + if start_ts < dome_history_start: + print(f"Warning: start_date {start_ts} is before Dome history start {dome_history_start}. Adjusting.") + start_ts = dome_history_start + + start_ms = int(start_ts.value // 1_000_000) + end_ms = int(end_ts.value // 1_000_000) + + def _fetch_all_snapshots(session, token_id): + params = { + "token_id": str(token_id), + "start_time": int(start_ms), + "end_time": int(end_ms), + "limit": 200, + } + + all_snaps = [] + + while True: + for attempt in range(1, max_retries + 1): + try: + resp = session.get(DOME_URL, params=params, headers=DOME_HEADERS, timeout=60) + if resp.status_code in RETRYABLE_STATUS: + raise requests.HTTPError(f"retryable status={resp.status_code}", response=resp) + resp.raise_for_status() + payload = resp.json() + break + except Exception as e: + if attempt == max_retries: + raise RuntimeError(f"Failed token_id={token_id} after {max_retries} retries: {e}") + backoff = min(30, (2 ** attempt) + random.random()) + print(f"Retry {attempt}/{max_retries} token_id={token_id} in {backoff:.1f}s ({e})") + time.sleep(backoff) + + snaps = payload.get("snapshots", []) + all_snaps.extend(snaps) + + pagination = payload.get("pagination", {}) or {} + has_more = pagination.get("has_more", False) + pagination_key = pagination.get("pagination_key") or pagination.get("paginationKey") + + if not has_more or not pagination_key: + break + + params["pagination_key"] = pagination_key + time.sleep(0.05) + + return all_snaps + + def _save_orderbook_of_token(i, token_id): + # Each thread gets its own requests.Session for connection isolation + with requests.Session() as session: + print(f"Processing token_id={token_id}, {i}/{len(token_ids)}") + try: + snapshots = _fetch_all_snapshots(session, token_id) + except Exception as e: + print(f"Skipping token_id={token_id}: {e}") + return 0 + + if not snapshots: + return 0 + + snapshots = sorted( + snapshots, + key=lambda s: (s.get("timestamp", 0), s.get("indexedAt", 0)) + ) + + out = [] + for snap in snapshots: + snap_ts = pd.to_datetime(snap.get("timestamp"), unit="ms", utc=True, errors="coerce") + indexed_ts = pd.to_datetime(snap.get("indexedAt"), unit="ms", utc=True, errors="coerce") + + bids = snap.get("bids", []) or [] + asks = snap.get("asks", []) or [] + + out.append({ + "token_id": token_id, + "snapshot_time": snap_ts, + "snapshot_timestamp_ms": snap.get("timestamp"), + "indexed_at_time": indexed_ts, + "indexed_at_ms": snap.get("indexedAt"), + "market_hash": snap.get("market"), + "asset_id": snap.get("assetId"), + "tick_size": float(snap["tickSize"]) if snap.get("tickSize") is not None else np.nan, + "min_order_size": float(snap["minOrderSize"]) if snap.get("minOrderSize") is not None else np.nan, + "orderbook_neg_risk": snap.get("negRisk"), + "bids_json": json.dumps(bids), + "asks_json": json.dumps(asks), + }) + + token_df = pd.DataFrame(out).sort_values(["snapshot_time"], kind="stable").reset_index(drop=True) + token_df.to_parquet(output_path / f"ob_{token_id}.parquet", index=False) + n = len(token_df) + print(f"Saved {n:,} rows for token_id={token_id}") + return n + + all_counts = [] + for batch_start in range(0, len(token_ids), batch_size): + batch = list(enumerate(token_ids[batch_start : batch_start + batch_size], start=batch_start + 1)) + with ThreadPoolExecutor(max_workers=max_concurrent) as pool: + futures = {pool.submit(_save_orderbook_of_token, i, tid): tid for i, tid in batch} + for fut in as_completed(futures): + try: + all_counts.append(fut.result()) + except Exception as e: + print(f"Unexpected error for token_id={futures[fut]}: {e}") + all_counts.append(0) + print(f"Batch done: {batch_start + len(batch)}/{len(token_ids)} tokens processed.") + + print(f"Done processing all tokens. Total rows: {sum(all_counts):,}") diff --git a/dataset/polymarket_orderbook/utils/orderbook_feature_generation.py b/dataset/polymarket_orderbook/utils/orderbook_feature_generation.py new file mode 100644 index 0000000..9750eb4 --- /dev/null +++ b/dataset/polymarket_orderbook/utils/orderbook_feature_generation.py @@ -0,0 +1,272 @@ +import json +from pathlib import Path + +import numpy as np +import pandas as pd + + +def parse_orderbook_side(side_json): + """ + Parse a bids_json / asks_json string into a list of dicts: + [{"price": float, "size": float}, ...] + for consistency. + """ + if side_json is None: + return [] + + if isinstance(side_json, float) and np.isnan(side_json): + return [] + + if isinstance(side_json, str): + try: + levels = json.loads(side_json) + except Exception: + return [] + elif isinstance(side_json, list): + levels = side_json + else: + return [] + + out = [] + for level in levels: + try: + out.append({ + "price": float(level["price"]), + "size": float(level["size"]), + }) + except Exception: + continue + + return out + + +def get_best_bid(levels): + if not levels: + return np.nan, 0.0 + best = max(levels, key=lambda x: x["price"]) + return best["price"], best["size"] + + +def get_best_ask(levels): + if not levels: + return np.nan, 0.0 + best = min(levels, key=lambda x: x["price"]) + return best["price"], best["size"] + + +def get_top_n_bid_depth(levels, n=3): + if not levels: + return 0.0 + top = sorted(levels, key=lambda x: x["price"], reverse=True)[:n] + return float(sum(x["size"] for x in top)) + + +def get_top_n_ask_depth(levels, n=3): + if not levels: + return 0.0 + top = sorted(levels, key=lambda x: x["price"])[:n] + return float(sum(x["size"] for x in top)) + + +def get_total_depth(levels): + if not levels: + return 0.0 + return float(sum(x["size"] for x in levels)) + + +def extract_snapshot_features(bids_json, asks_json, depth_n=3): + bids = parse_orderbook_side(bids_json) + asks = parse_orderbook_side(asks_json) + + best_bid, best_bid_size = get_best_bid(bids) + best_ask, best_ask_size = get_best_ask(asks) + + if np.isnan(best_bid) or np.isnan(best_ask): + mid_price = np.nan + spread = np.nan + else: + mid_price = (best_bid + best_ask) / 2.0 + spread = best_ask - best_bid + + bid_depth_n = get_top_n_bid_depth(bids, n=depth_n) + ask_depth_n = get_top_n_ask_depth(asks, n=depth_n) + + bid_depth_total = get_total_depth(bids) + ask_depth_total = get_total_depth(asks) + + denom_n = bid_depth_n + ask_depth_n + imbalance_n = (bid_depth_n - ask_depth_n) / denom_n if denom_n > 0 else np.nan + + denom_total = bid_depth_total + ask_depth_total + imbalance_total = ( + (bid_depth_total - ask_depth_total) / denom_total + if denom_total > 0 else np.nan + ) + + return { + "best_bid": best_bid, + "best_bid_size": best_bid_size, + "best_ask": best_ask, + "best_ask_size": best_ask_size, + "mid_price": mid_price, + "spread": spread, + "bid_depth_3": bid_depth_n, + "ask_depth_3": ask_depth_n, + "imbalance_3": imbalance_n, + "bid_depth_total": bid_depth_total, + "ask_depth_total": ask_depth_total, + "imbalance_total": imbalance_total, + "n_bid_levels": len(bids), + "n_ask_levels": len(asks), + } + + +def attach_token_metadata( + feat_df: pd.DataFrame, + token_meta_df: pd.DataFrame, + metadata_cols=None, + add_time_features=True, +) -> pd.DataFrame: + if metadata_cols is None: + metadata_cols = [ + "token_id", + "market_id", + "outcome_index", + "outcome", + "end_date", + "closed_time", + "fee_decimal", + "neg_risk", + "volume", + "volume_clob", + ] + + cols_to_use = [c for c in metadata_cols if c in token_meta_df.columns] + meta_small = token_meta_df[cols_to_use].copy() + + overlapping = [c for c in cols_to_use if c != "token_id" and c in feat_df.columns] + if overlapping: + feat_df = feat_df.drop(columns=overlapping) + + out = feat_df.merge(meta_small, on="token_id", how="left") + + if add_time_features: + out["snapshot_time"] = pd.to_datetime(out["snapshot_time"], utc=True, errors="coerce") + out["closed_time"] = pd.to_datetime(out.get("closed_time"), utc=True, errors="coerce") + out["end_date"] = pd.to_datetime(out.get("end_date"), utc=True, errors="coerce") + + out["effective_end_time"] = out["closed_time"].where( + out["closed_time"].notna(), + out["end_date"] + ) + + delta = out["effective_end_time"] - out["snapshot_time"] + + out["time_to_expiry_hours"] = delta.dt.total_seconds() / 3600.0 + out["log_time_to_expiry_hours"] = np.log1p( + out["time_to_expiry_hours"].clip(lower=0) + ) + + return out + + +def add_token_time_series_features(feat_df: pd.DataFrame) -> pd.DataFrame: + # Work on a copy so the original dataframe is not modified in place. + out = feat_df.copy() + + # Make sure snapshot_time is a proper datetime column. + out["snapshot_time"] = pd.to_datetime(out["snapshot_time"], utc=True, errors="coerce") + + # Sort so "previous row" actually means previous snapshot in time for each token. + out = out.sort_values(["token_id", "snapshot_time"], kind="stable").reset_index(drop=True) + + # Previous timestamp within the same token. + out["prev_snapshot_time"] = out.groupby("token_id")["snapshot_time"].shift(1) + + # Previous best bid within the same token. + out["prev_best_bid"] = out.groupby("token_id")["best_bid"].shift(1) + + # Previous best ask within the same token. + out["prev_best_ask"] = out.groupby("token_id")["best_ask"].shift(1) + + # Previous mid price within the same token. + out["prev_mid_price"] = out.groupby("token_id")["mid_price"].shift(1) + + # Change in best bid from previous snapshot. + out["delta_best_bid"] = out["best_bid"] - out["prev_best_bid"] + + # Change in best ask from previous snapshot. + out["delta_best_ask"] = out["best_ask"] - out["prev_best_ask"] + + # Change in mid price from previous snapshot. + out["delta_mid_price"] = out["mid_price"] - out["prev_mid_price"] + + # Time gap between this snapshot and the previous one, in seconds. + out["seconds_since_prev_snapshot"] = ( + out["snapshot_time"] - out["prev_snapshot_time"] + ).dt.total_seconds() + + return out + + +def build_token_feature_table_from_parquet( + input_path, + output_path=None, + depth_n=3, + drop_json_cols=True, + token_meta_df=None, +): + """ + Read one raw orderbook parquet for a single token, + compute token-level base-table-3 features, + and optionally save to parquet. + """ + input_path = Path(input_path) + + df = pd.read_parquet(input_path) + + rows = [] + for row in df.to_dict("records"): + feats = extract_snapshot_features( + bids_json=row["bids_json"], + asks_json=row["asks_json"], + depth_n=depth_n, + ) + + out_row = dict(row) + out_row.update(feats) + rows.append(out_row) + + feat_df = pd.DataFrame(rows).sort_values( + ["token_id", "snapshot_time"], + kind="stable" + ).reset_index(drop=True) + + if drop_json_cols: + drop_cols = [c for c in ["bids_json", "asks_json"] if c in feat_df.columns] + feat_df = feat_df.drop(columns=drop_cols) + + if token_meta_df is not None: + feat_df = attach_token_metadata(feat_df, token_meta_df) + feat_df = add_token_time_series_features(feat_df) + feat_df = feat_df.drop( + columns=[ + "market_hash", + "asset_id", + "prev_snapshot_time", + "prev_best_bid", + "prev_best_ask", + "prev_mid_price", + "indexed_at_time", + "outcome", + "end_date", + "closed_time", + ], + errors="ignore", + ) + if output_path is not None: + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + feat_df.to_parquet(output_path, index=False) + + return feat_df diff --git a/dataset/polymarket_orderbook/utils/parser.py b/dataset/polymarket_orderbook/utils/parser.py new file mode 100644 index 0000000..d3936ce --- /dev/null +++ b/dataset/polymarket_orderbook/utils/parser.py @@ -0,0 +1,154 @@ +import json +import numpy as np +import pandas as pd + +import json + +# Define a helper function to safely parse fields that are JSON-encoded strings. +def parse_json_list(value): + # If the value is already a list, return it directly. + if isinstance(value, list): + return value + + # If the value is missing, return an empty list. + if value is None: + return [] + + # If the value is a string, try to parse it as JSON. + if isinstance(value, str): + # Remove surrounding whitespace. + value = value.strip() + + # Return an empty list for blank strings. + if value == "": + return [] + + # Parse the string as JSON. + return json.loads(value) + + # For anything unexpected, return an empty list. + return [] + + +# Define a helper function to convert fee strings into decimal form. +def parse_fee_decimal(value): + if value is None: + return np.nan + try: + s = str(value).strip() + if s == "": + return np.nan + + # If already a decimal string like "0.02" + if "." in s or "e" in s.lower(): + return float(s) + + # If integer-like and likely wei-scaled + i = int(s) + return i / 1e18 if i > 1 else float(i) + except Exception: + return np.nan + + +# Define a helper function to safely convert values to float. +def safe_float(value): + # Return NaN if the value is missing. + if value is None: + return np.nan + + # Try to cast the value to float. + try: + return float(value) + except Exception: + return np.nan + + +def read_market_to_df(market_path) -> pd.DataFrame: + # Create an empty list to collect market-level rows. + market_rows = [] + + # Open the market file for reading. + with open(market_path, "r", encoding="utf-8") as f: + # Loop through each line in the file. + for line in f: + # Parse the current line as JSON. + market = json.loads(line) + + # Parse the outcomes field, which is stored as a JSON string. + outcomes = parse_json_list(market.get("outcomes")) + + # Parse the outcomePrices field, which is stored as a JSON string. + outcome_prices = parse_json_list(market.get("outcomePrices")) + + # Build a clean market-level row. + market_rows.append({ + "market_id": market.get("id"), + "condition_id": market.get("conditionId"), + "question": market.get("question"), + "slug": market.get("slug"), + "description": market.get("description"), + "start_date": market.get("startDate"), + "end_date": market.get("endDate"), + "closed_time": market.get("closedTime"), + "active": market.get("active"), + "closed": market.get("closed"), + "archived": market.get("archived"), + "accepting_orders": market.get("acceptingOrders"), + "enable_order_book": market.get("enableOrderBook"), + "neg_risk": market.get("negRisk"), + "fee_decimal": parse_fee_decimal(market.get("fee")), + "volume": safe_float(market.get("volume")), + "volume_clob": safe_float(market.get("volumeClob")), + "market_best_bid": safe_float(market.get("bestBid")), + "market_best_ask": safe_float(market.get("bestAsk")), + "last_trade_price": safe_float(market.get("lastTradePrice")), + "resolution_source": market.get("resolutionSource"), + "uma_resolution_status": market.get("umaResolutionStatus"), + "created_at": market.get("createdAt"), + "updated_at": market.get("updatedAt"), + "n_outcomes": len(outcomes), + "outcomes_raw": json.dumps(outcomes), + "outcome_prices_raw": json.dumps(outcome_prices), + }) + + # Convert the list of dicts into a pandas DataFrame. + markets_df = pd.DataFrame(market_rows) + return markets_df + +def read_token_to_df(token_path) -> pd.DataFrame: + market_token_rows = [] + + with open(token_path, "r", encoding="utf-8") as f: + for line in f: + market = json.loads(line) + + outcomes = parse_json_list(market.get("outcomes")) + outcome_prices = parse_json_list(market.get("outcomePrices")) + token_ids = parse_json_list(market.get("clobTokenIds")) + + for i, token_id in enumerate(token_ids): + market_token_rows.append({ + "market_id": market.get("id"), + "token_id": str(token_id), + "outcome_index": i, + "outcome": outcomes[i] if i < len(outcomes) else None, + "outcome_price_initial": safe_float(outcome_prices[i]) if i < len(outcome_prices) else np.nan, + "question": market.get("question"), + "slug": market.get("slug"), + "description": market.get("description"), + "start_date": market.get("startDate"), + "end_date": market.get("endDate"), + "closed_time": market.get("closedTime"), + "active": market.get("active"), + "closed": market.get("closed"), + "archived": market.get("archived"), + "accepting_orders": market.get("acceptingOrders"), + "enable_order_book": market.get("enableOrderBook"), + "neg_risk": market.get("negRisk"), + "fee_decimal": parse_fee_decimal(market.get("fee")), + "volume": safe_float(market.get("volume")), + "volume_clob": safe_float(market.get("volumeClob")), + }) + + market_tokens_df = pd.DataFrame(market_token_rows) + return market_tokens_df diff --git a/dataset/polymarket_orderbook/utils/paths.py b/dataset/polymarket_orderbook/utils/paths.py new file mode 100644 index 0000000..e6b9e43 --- /dev/null +++ b/dataset/polymarket_orderbook/utils/paths.py @@ -0,0 +1,17 @@ +from pathlib import Path + +# Project root = PortfolioBench/ +PROJECT_ROOT = Path(__file__).resolve().parents[3] + +DATA_DIR = PROJECT_ROOT / "user_data" / "data" / "polymarket" +ORDERBOOK_DIR = DATA_DIR / "raw_orderbook" +FEATURE_DIR = DATA_DIR / "feat_orderbook" + +MARKET_JSONL = DATA_DIR / "polymarket_markets_1y.jsonl" + +# User-defined token filter output/input shared by both notebooks +FILTERED_TOKEN_IDS = DATA_DIR / "filtered_token_ids.parquet" + +def ensure_dirs() -> None: + for p in [DATA_DIR, ORDERBOOK_DIR, FEATURE_DIR]: + p.mkdir(parents=True, exist_ok=True) \ No newline at end of file diff --git a/freqtrade b/freqtrade index ab093ff..5fb0011 160000 --- a/freqtrade +++ b/freqtrade @@ -1 +1 @@ -Subproject commit ab093ff0e1af445f0b8491ea1168c46e1a51b2c0 +Subproject commit 5fb00116889f8d1d67c817f8aa1eaeecfbbfbfbe diff --git a/strategy/OrderbookImbalanceStrategy.py b/strategy/OrderbookImbalanceStrategy.py new file mode 100644 index 0000000..26acb68 --- /dev/null +++ b/strategy/OrderbookImbalanceStrategy.py @@ -0,0 +1,57 @@ +"""Orderbook Imbalance Strategy — baseline proof-of-concept for Polymarket CLOB features. + +Signal: when the EMA-smoothed top-3 orderbook imbalance is positive and rising, +enter long. Exit when it turns negative. + +Works with any pair that has a corresponding feat_orderbook parquet file. +Token-id resolution is automatic via freqtrade_pair_mapping.csv in data.zip. + +Backtest command (Panthers YES example): +---------------------------------------- + portbench backtesting \\ + --strategy OrderbookImbalanceStrategy \\ + --strategy-path ./strategy \\ + --config user_data/config_polymarket.json \\ + --datadir user_data/data/polymarket/data \\ + --timeframe 4h \\ + --timerange 20251015-20260112 \\ + --pairs WillTheCarolinaPanthersWinSuperBowYES20250430/USDC +""" + +import pandas as pd +from freqtrade.strategy import IStrategy + +from alpha.OrderbookAlpha import OrderbookAlpha + + +class OrderbookImbalanceStrategy(IStrategy): + INTERFACE_VERSION = 3 + + can_short = False + minimal_roi = {"0": 0.40} + stoploss = -0.20 + trailing_stop = False + process_only_new_candles = True + use_exit_signal = True + startup_candle_count = 10 + + def populate_indicators(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame: + # OrderbookAlpha resolves the token_id automatically from metadata["pair"] + dataframe = OrderbookAlpha(dataframe, metadata).process() + return dataframe + + def populate_entry_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame: + dataframe.loc[ + (dataframe["ob_imbalance_ema"] > 0.10) + & (dataframe["ob_imbalance_ema"] > dataframe["ob_imbalance_ema"].shift(1)) + & (dataframe["close"] < 0.95), + "enter_long", + ] = 1 + return dataframe + + def populate_exit_trend(self, dataframe: pd.DataFrame, metadata: dict) -> pd.DataFrame: + dataframe.loc[ + dataframe["ob_imbalance_ema"] < -0.05, + "exit_long", + ] = 1 + return dataframe diff --git a/tests/test_orderbook_alpha.py b/tests/test_orderbook_alpha.py new file mode 100644 index 0000000..db036c7 --- /dev/null +++ b/tests/test_orderbook_alpha.py @@ -0,0 +1,118 @@ +"""Tests for OrderbookAlpha.""" + +import numpy as np +import pandas as pd +import pytest + +import alpha.OrderbookAlpha as ob_mod +from alpha.OrderbookAlpha import OrderbookAlpha, _lookup_token_id + +PANTHERS_PAIR = "WillTheCarolinaPanthersWinSuperBowYES20250430/USDC" +PANTHERS_TOKEN = "26875704435144560123124814164931171497339462799728449796809868985717551034984" + + +# --------------------------------------------------------------------------- +# Fixtures / helpers +# --------------------------------------------------------------------------- + +def _make_ohlcv(n=100, start="2025-10-15", freq="4h"): + """Synthetic OHLCV frame in Polymarket probability range [0, 1].""" + np.random.seed(42) + close = np.clip(0.01 + np.cumsum(np.random.randn(n) * 0.002), 0.001, 0.99) + return pd.DataFrame({ + "date": pd.date_range(start, periods=n, freq=freq), + "open": close - 0.001, + "high": close + 0.002, + "low": close - 0.002, + "close": close, + "volume": np.ones(n) * 100.0, + }) + + +def _make_feat_orderbook(tmp_path, token_id="TEST_TOKEN", n=300, start="2025-10-14", + imbalance_values=None): + """Write a minimal feat_.parquet into tmp_path/feat_orderbook/. + + Parameters + ---------- + imbalance_values : array-like, optional + Fixed imbalance_3 values (length n). Defaults to random uniform [-1, 1]. + """ + feat_dir = tmp_path / "feat_orderbook" + feat_dir.mkdir(exist_ok=True) + + np.random.seed(7) + times = pd.date_range(start, periods=n, freq="1h", tz="UTC") + imb = imbalance_values if imbalance_values is not None else np.random.uniform(-1, 1, n) + + df = pd.DataFrame({"snapshot_time": times, "imbalance_3": imb}) + path = feat_dir / f"feat_{token_id}.parquet" + df.to_parquet(path, index=False) + return path + + +@pytest.fixture() +def patched_feat_dir(tmp_path): + """Fixture: patch _FEATURE_DIR to a temp directory and restore after test.""" + original = ob_mod._FEATURE_DIR + ob_mod._FEATURE_DIR = tmp_path / "feat_orderbook" + yield tmp_path + ob_mod._FEATURE_DIR = original + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestOrderbookAlpha: + + def test_adds_ob_columns(self, patched_feat_dir): + _make_feat_orderbook(patched_feat_dir) + result = OrderbookAlpha(_make_ohlcv(), {"token_id": "TEST_TOKEN"}).process() + assert "ob_imbalance" in result.columns + assert "ob_imbalance_ema" in result.columns + + def test_unknown_token_returns_nan(self): + """Missing feat file → both columns are NaN, no crash.""" + result = OrderbookAlpha(_make_ohlcv(), {"token_id": "does_not_exist"}).process() + assert result["ob_imbalance"].isna().all() + assert result["ob_imbalance_ema"].isna().all() + + def test_empty_metadata_returns_nan(self): + result = OrderbookAlpha(_make_ohlcv(), {}).process() + assert result["ob_imbalance"].isna().all() + + def test_imbalance_values_are_correct(self, patched_feat_dir): + """Candles fully inside the snapshot range should reflect real imbalance data.""" + # Use a constant imbalance so we know the expected value + _make_feat_orderbook(patched_feat_dir, imbalance_values=np.full(300, 0.5)) + df = _make_ohlcv() # starts 2025-10-15, snapshots start 2025-10-14 → all covered + result = OrderbookAlpha(df, {"token_id": "TEST_TOKEN"}).process() + # Every candle has a prior snapshot → imbalance should be 0.5 throughout + assert (result["ob_imbalance"] == 0.5).all() + + def test_imbalance_in_range(self, patched_feat_dir): + _make_feat_orderbook(patched_feat_dir) + result = OrderbookAlpha(_make_ohlcv(), {"token_id": "TEST_TOKEN"}).process() + assert result["ob_imbalance"].between(-1, 1).all() + + def test_ema_is_smoother_than_raw(self, patched_feat_dir): + _make_feat_orderbook(patched_feat_dir) + result = OrderbookAlpha(_make_ohlcv(), {"token_id": "TEST_TOKEN"}).process() + assert result["ob_imbalance_ema"].std() <= result["ob_imbalance"].std() + + def test_no_future_leakage(self, patched_feat_dir): + """Candles before the first snapshot should get 0.0, not a future value.""" + # Snapshots start 2025-11-01; candles start 2025-10-01 (4 weeks earlier) + _make_feat_orderbook(patched_feat_dir, start="2025-11-01", + imbalance_values=np.full(300, 0.9)) + df = _make_ohlcv(n=80, start="2025-10-01") + result = OrderbookAlpha(df, {"token_id": "TEST_TOKEN"}).process() + pre_snapshot = result[result["date"] < pd.Timestamp("2025-11-01")] + assert (pre_snapshot["ob_imbalance"] == 0.0).all() + + def test_lookup_known_pair(self): + assert _lookup_token_id(PANTHERS_PAIR) == PANTHERS_TOKEN + + def test_lookup_unknown_pair(self): + assert _lookup_token_id("BTC/USDT") == ""