diff --git a/.gitignore b/.gitignore index 900c3b1..7a3570b 100644 --- a/.gitignore +++ b/.gitignore @@ -108,3 +108,6 @@ ENV/ *.h5ad mydask.png *.zarr + +# Concat on disk tutorial +tmpdata \ No newline at end of file diff --git a/concat-on-disk.ipynb b/concat-on-disk.ipynb new file mode 100644 index 0000000..5527cd1 --- /dev/null +++ b/concat-on-disk.ipynb @@ -0,0 +1,881 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1446b800-7ded-4a4c-b54c-8510100c3bda", + "metadata": {}, + "source": [ + "# On-Disk Concatenation of AnnData Files" + ] + }, + { + "cell_type": "markdown", + "id": "aa4bd620", + "metadata": {}, + "source": [ + "**Author:** Selman Ă–zleyen" + ] + }, + { + "cell_type": "markdown", + "id": "cb9b747f-4384-4b16-8f4f-806edfdc0b06", + "metadata": {}, + "source": [ + "\n", + "## Initializing\n", + "\n", + "Let's begin by importing the necessary libraries and modules. This notebook also uses the [memray](https://pypi.org/project/memray/) module. Ensure you've installed it using `pip install memray` before proceeding.\n", + "\n", + "For all dependencies, do `pip install anndata zarr dask[array,distributed] pytest memray`." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "f65fb557", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n", + "import gc\n", + "import shutil\n", + "import logging\n", + "import tempfile\n", + "import itertools\n", + "from pathlib import Path\n", + "from typing import Literal, Callable\n", + "\n", + "import zarr\n", + "import numpy as np\n", + "import pandas as pd\n", + "import memray\n", + "from scipy import sparse\n", + "from dask.distributed import Client, LocalCluster\n", + "\n", + "import anndata\n", + "from anndata.tests.helpers import gen_typed_df\n", + "from anndata.experimental import write_dispatched\n", + "from anndata.experimental import concat_on_disk\n", + "from dask.distributed.diagnostics.memray import memray_scheduler, memray_workers" + ] + }, + { + "cell_type": "markdown", + "id": "9f991e3b", + "metadata": {}, + "source": [ + "## Data Creation and Analysis\n", + "\n", + "In this section, we'll demonstrate the core functionality of the `concat_on_disk` method. We'll create datasets and analyze how this method performs in terms of memory usage. This will help us understand its efficiency and benefits, especially when working with large datasets.\n", + "\n", + "We will define parameters that will influence the structure of our datasets:\n", + "\n", + "- **Shapes**: Defines the shape of array (e.g., \"fat\", \"tall\", \"square\").\n", + "- **Sizes**: The size of the array, indicating the number of elements.\n", + "- **Densities**: Specifies the data density. 1 means dense numpy array.\n", + "\n", + "These parameters will be utilized in subsequent sections to generate and analyze datasets.\n", + "\n", + "### Ignoring Logs or Not\n", + "\n", + "By default we will ignore logs for the sake of readability. These are mostly reports given from dask distributed. However if one would like to see what is happening behind the dask distributed system, they can change the parameter dedicated to this below. These logs usually also refer to a dashboard link in order to monitor the workers." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "29f0bd5a", + "metadata": {}, + "outputs": [], + "source": [ + "# Directory where the data will be stored\n", + "TMPDIR = tempfile.TemporaryDirectory()\n", + "OUTDIR = Path(TMPDIR.name)\n", + "\n", + "# Parameters that will influence the structure and size of our datasets:\n", + "\n", + "# Shapes of the arrays: \"fat\", \"tall\", or \"square\"\n", + "shapes = [\"fat\", \"tall\", \"square\"]\n", + "\n", + "# Sizes of the dataset, indicating the number of elements\n", + "sizes = [10_000]\n", + "\n", + "# Densities: Specifies the data density. A higher value means more non-zero elements\n", + "densities = [0.1, 1]\n", + "\n", + "# Number of times each array type will be created\n", + "num_runs = 3\n", + "\n", + "# Set to False to see the logs and warnings\n", + "ignore_logs = True\n", + "\n", + "dask_log = logging.CRITICAL\n", + "if not ignore_logs:\n", + " dask_log = logging.DEBUG" + ] + }, + { + "cell_type": "markdown", + "id": "d6dbc5a4", + "metadata": {}, + "source": [ + "### create_adata\n", + "\n", + "This function is designed to create an `AnnData` object, which is a foundational data structure used in bioinformatics to store high-dimensional data such as gene expression matrices. Given a data matrix `X` and its shape, the function constructs the `AnnData` object complete with observation (`obs`) and variable (`var`) metadata.\n", + "\n", + "- `shape`: The shape (dimensions) of the data matrix.\n", + "- `X`: The actual data matrix (could be dense or sparse).\n", + "\n", + "Returns: An `AnnData` object constructed from the input data and metadata.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "6e919ecc", + "metadata": {}, + "outputs": [], + "source": [ + "def create_adata(X):\n", + " # Shape of the data matrix\n", + " M, N = X.shape\n", + "\n", + " # Generating observation and variable names\n", + " obs_names = pd.Index(f\"cell{i}\" for i in range(M))\n", + " var_names = pd.Index(f\"gene{i}\" for i in range(N))\n", + "\n", + " # Creating observation and variable dataframes\n", + " obs = gen_typed_df(M, obs_names)\n", + " var = gen_typed_df(N, var_names)\n", + "\n", + " # Renaming columns to ensure uniqueness\n", + " obs.rename(columns=dict(cat=\"obs_cat\"), inplace=True)\n", + " var.rename(columns=dict(cat=\"var_cat\"), inplace=True)\n", + "\n", + " # Constructing the AnnData object\n", + " adata = anndata.AnnData(X, obs=obs, var=var)\n", + "\n", + " return adata" + ] + }, + { + "cell_type": "markdown", + "id": "9449622e", + "metadata": {}, + "source": [ + "### array_creators\n", + "\n", + "This function returns a `dict` that takes a string as key and a function to create an array of that type as a value. The type of array format and their corresponding names based on the provided `density` parameter.\n", + "\n", + "- `density`: The density of the dataset. If the density is 1, the dataset is dense; otherwise, it's sparse.\n", + "\n", + "Returns: A dict containing the array creator functions and their corresponding names.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c2eb98ac", + "metadata": {}, + "outputs": [], + "source": [ + "def array_creators(\n", + " density: Literal[1] | float,\n", + ") -> dict[str, Callable[[np.ndarray | sparse.spmatrix], np.ndarray | sparse.spmatrix]]:\n", + " \"\"\"Returns a dictionary of array creators for the given density\"\"\"\n", + " array_funcs = {}\n", + "\n", + " # Check if dataset is dense\n", + " if density == 1:\n", + " array_funcs[\"np\"] = lambda x: x.toarray()\n", + " else:\n", + " # For sparse datasets, consider both csc and csr formats\n", + " array_funcs[\"csc\"] = sparse.csc_matrix\n", + " array_funcs[\"csr\"] = sparse.csr_matrix\n", + " return array_funcs" + ] + }, + { + "cell_type": "markdown", + "id": "d9786c59", + "metadata": {}, + "source": [ + "### generate_dimensions\n", + "\n", + "Given a shape description (like \"fat\", \"tall\", or \"square\") and a base size, this function computes the exact dimensions \\(M\\) and \\(N\\) of the dataset. \n", + "\n", + "- `shape_type`: Description of the desired shape of the dataset. In terms of a string description.\n", + "- `size`: Base size for the dataset.\n", + "\n", + "Returns: The dimensions \\(M\\) and \\(N\\) of the dataset.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "3dd5068e", + "metadata": {}, + "outputs": [], + "source": [ + "def generate_dimensions(shape_type, size):\n", + " # Default dimensions\n", + " M = size\n", + " N = size\n", + "\n", + " # If the shape isn't square, adjust the dimensions\n", + " if shape_type != \"square\":\n", + " other_size = size + int(size * np.random.uniform(0.2, 0.4))\n", + " if shape_type == \"fat\":\n", + " M = other_size\n", + " elif shape_type == \"tall\":\n", + " N = other_size\n", + "\n", + " return M, N" + ] + }, + { + "cell_type": "markdown", + "id": "96a00c66", + "metadata": {}, + "source": [ + "## Writing The Arrays To Disk\n", + "\n", + "We will use the functions defined below to write the anndatas. There is no need to understand them all. However, the functions are also explained below for users who would like to create their own datasets to do the measurements.\n", + "\n", + "### Functions Overview\n", + "\n", + "#### 1. `write_data_to_zarr`\n", + "\n", + "This function is responsible for writing a given dataset `X` to a Zarr format file. Zarr is a format for the storage of chunked, compressed, N-dimensional arrays, which is useful for efficient on-disk storage and retrieval of large datasets.\n", + "\n", + "- **Parameters**:\n", + " - `X`: The dataset to be written.\n", + " - `shape_type`: Descriptive shape type of the dataset.\n", + " - `array_name`: Name representing the type of array (e.g., \"np\", \"csc\", \"csr\").\n", + " - `outdir`: Directory where the Zarr file should be stored.\n", + " - `file_id`: Identifier for the file, used in naming.\n", + "\n", + "- **Returns**: A string report detailing the writing operation.\n", + "\n", + "#### 2. `write_temp_data`\n", + "\n", + "This function is designed to write temporary data based on the specified parameters to the output directory. It iteratively generates data sets based on shapes, sizes, densities, and number of runs, and writes each dataset to a Zarr format file using the `write_data_to_zarr` function.\n", + "\n", + "- **Parameters**:\n", + " - `shapes`: List of dataset shapes (e.g., \"fat\", \"tall\", \"square\").\n", + " - `sizes`: List of dataset sizes.\n", + " - `densities`: List of dataset densities.\n", + " - `num_runs`: Number of iterations for data generation.\n", + " - `outdir`: Directory where the Zarr files should be stored.\n", + " - `rewrite`: Boolean flag; if True, any existing data in the output directory will be overwritten.\n", + "\n", + "This function not only writes the datasets but also maintains a log of the datasets written in a file named \"done.txt\".\n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "134c147f", + "metadata": {}, + "outputs": [], + "source": [ + "# TODO: Refer to Ilan's notebook for the code below\n", + "# write a short description of below function also\n", + "# also why/when writing chunked is better but not needed\n", + "\n", + "def write_chunked(func, store, k, elem, dataset_kwargs, iospec):\n", + " \"\"\"Write callback that chunks X and layers\"\"\"\n", + "\n", + " def set_chunks(d, chunks=None):\n", + " \"\"\"Helper function for setting dataset_kwargs. Makes a copy of d.\"\"\"\n", + " d = dict(d)\n", + " if chunks is not None:\n", + " d[\"chunks\"] = chunks\n", + " else:\n", + " d.pop(\"chunks\", None)\n", + " return d\n", + "\n", + " if iospec.encoding_type == \"array\":\n", + " if 'layers' in k or k.endswith('X'):\n", + " dataset_kwargs = set_chunks(dataset_kwargs, (1000, 1000))\n", + " else:\n", + " dataset_kwargs = set_chunks(dataset_kwargs, None)\n", + "\n", + " func(store, k, elem, dataset_kwargs=dataset_kwargs)\n", + "\n", + "def write_data_to_zarr(X, shape_type, array_name, outdir, file_id):\n", + " outfile = outdir / f\"{file_id:02d}_{shape_type}_{array_name}.zarr\"\n", + " adata = create_adata(X)\n", + " z = zarr.open_group(outfile, mode=\"w\")\n", + " \n", + "\n", + " write_dispatched(z, \"/\", adata, callback=write_chunked)\n", + " zarr.consolidate_metadata(z.store)\n", + " return f\"wrote {X.shape[0]}x{X.shape[1]}_{array_name} -> {str(outfile)}\\n\"\n", + "\n", + "\n", + "def write_temp_data(shapes, sizes, densities, num_runs, outdir, rewrite=False):\n", + " outdir.mkdir(exist_ok=True)\n", + " if rewrite:\n", + " (outdir / \"done.txt\").unlink(missing_ok=True)\n", + " if (outdir / \"done.txt\").exists():\n", + " print(\"already done\")\n", + " with open(outdir / \"done.txt\", \"r\") as f:\n", + " for line in f.readlines():\n", + " print(line)\n", + " return\n", + "\n", + " saved = []\n", + " file_id = 1\n", + " for _, shape_type, size, density in itertools.product(\n", + " range(num_runs), shapes, sizes, densities\n", + " ):\n", + " array_funcs = array_creators(density)\n", + " M, N = generate_dimensions(shape_type, size)\n", + "\n", + " X_base = sparse.random(M, N, density=density, format=\"csc\")\n", + "\n", + " for array_name, array_func in array_funcs.items():\n", + " X = array_func(X_base)\n", + " report = write_data_to_zarr(X, shape_type, array_name, outdir, file_id)\n", + " del X\n", + " print(report, end=\"\")\n", + " saved.append(report)\n", + " file_id += 1\n", + " with open(outdir / \"done.txt\", \"w\") as f:\n", + " f.writelines(saved)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "fbf50dbe", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "wrote 12454x10000_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/01_fat_csc.zarr\n", + "wrote 12454x10000_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/02_fat_csr.zarr\n", + "wrote 13757x10000_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/03_fat_np.zarr\n", + "wrote 10000x12370_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/04_tall_csc.zarr\n", + "wrote 10000x12370_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/05_tall_csr.zarr\n", + "wrote 10000x13235_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/06_tall_np.zarr\n", + "wrote 10000x10000_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/07_square_csc.zarr\n", + "wrote 10000x10000_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/08_square_csr.zarr\n", + "wrote 10000x10000_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/09_square_np.zarr\n", + "wrote 12558x10000_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/10_fat_csc.zarr\n", + "wrote 12558x10000_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/11_fat_csr.zarr\n", + "wrote 13957x10000_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/12_fat_np.zarr\n", + "wrote 10000x12236_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/13_tall_csc.zarr\n", + "wrote 10000x12236_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/14_tall_csr.zarr\n", + "wrote 10000x12851_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/15_tall_np.zarr\n", + "wrote 10000x10000_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/16_square_csc.zarr\n", + "wrote 10000x10000_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/17_square_csr.zarr\n", + "wrote 10000x10000_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/18_square_np.zarr\n", + "wrote 12888x10000_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/19_fat_csc.zarr\n", + "wrote 12888x10000_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/20_fat_csr.zarr\n", + "wrote 13955x10000_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/21_fat_np.zarr\n", + "wrote 10000x12558_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/22_tall_csc.zarr\n", + "wrote 10000x12558_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/23_tall_csr.zarr\n", + "wrote 10000x13348_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/24_tall_np.zarr\n", + "wrote 10000x10000_csc -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/25_square_csc.zarr\n", + "wrote 10000x10000_csr -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/26_square_csr.zarr\n", + "wrote 10000x10000_np -> /var/folders/w4/rlbyb2md7y50tspf85v1lc440000gn/T/tmp6_ga523e/27_square_np.zarr\n" + ] + } + ], + "source": [ + "# You can call the function like this:\n", + "write_temp_data(shapes, sizes, densities, num_runs, OUTDIR)" + ] + }, + { + "cell_type": "markdown", + "id": "3b8c7897", + "metadata": {}, + "source": [ + "### Putting our arrays in categories\n", + "\n", + "The `create_datasets` function constructs a dictionary that maps dataset types (dense or sparse) and their axis (0 or 1) to a set of corresponding file paths. The function processes different file sets and, based on conditions like `requires_reindexing`, refines the set of file paths to be associated with each dataset type and axis combination. If there is reindexing required (i.e., datasets don't have the same size in the axis:`1-axis`) then a more costly concatenation strategy will have to be used compared to the case without reindexing. For this reason we will separate the tests that require reindexing and the ones that do not.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "3f71e387", + "metadata": {}, + "outputs": [], + "source": [ + "# files by properties\n", + "filesets = {\n", + " \"nps\": set(OUTDIR.glob(\"*np*\")),\n", + " \"csrs\": set(OUTDIR.glob(\"*csr*\")),\n", + " \"cscs\": set(OUTDIR.glob(\"*csc*\")),\n", + " \"fats\": set(OUTDIR.glob(\"*fat*\")),\n", + " \"talls\": set(OUTDIR.glob(\"*tall*\")),\n", + " \"squares\": set(OUTDIR.glob(\"*square*\")),\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "cfbc8bd5", + "metadata": {}, + "outputs": [], + "source": [ + "def create_datasets(filesets, requires_reindexing=False):\n", + " data = dict()\n", + " for fileset, axis in ((\"cscs\", 1), (\"csrs\", 0), (\"nps\", 0), (\"nps\", 1)):\n", + " filepaths = filesets[fileset].copy()\n", + " if not requires_reindexing:\n", + " tall_or_fat = filesets[\"talls\"] if axis == 1 else filesets[\"fats\"]\n", + " filepaths = filepaths.intersection(tall_or_fat.union(filesets[\"squares\"]))\n", + " fileset_name = \"dense\" if fileset == \"nps\" else \"sparse\"\n", + " data[fileset_name, axis] = filepaths\n", + " return data" + ] + }, + { + "cell_type": "markdown", + "id": "ee43c851", + "metadata": {}, + "source": [ + "Below you can see the both the list of anndatas that would require reindexing when concatenating (i.e, their axis size don't match) and the ones who don't" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "a7c4769d", + "metadata": {}, + "outputs": [], + "source": [ + "datasets_aligned = create_datasets(filesets, requires_reindexing=False)\n", + "datasets_unaligned = create_datasets(filesets, requires_reindexing=True)" + ] + }, + { + "cell_type": "markdown", + "id": "03e833a1", + "metadata": {}, + "source": [ + "## Measuring Performance" + ] + }, + { + "cell_type": "markdown", + "id": "f418d1b7", + "metadata": {}, + "source": [ + "### `get_arr_sizes`\n", + "\n", + "This function calculates the size of the data arrays for a list of given file paths. It can accommodate both sparse and dense formats, adjusting the computation method accordingly.\n", + "\n", + "---\n", + "\n", + "### `get_mem_usage`\n", + "\n", + "The function `get_mem_usage` evaluates the memory usage when performing on-disk concatenation using the `concat_on_disk` method. Depending on whether the dataset is sparse or dense, it either initiates a Dask cluster to handle the data or directly concatenates it. It returns the memory increment, the maximum memory used, the memory usage over time, and the initial memory.\n", + "\n", + "---\n", + "\n", + "### `dataset_max_mem`\n", + "\n", + "The `dataset_max_mem` function profiles and prints the maximum memory usage when concatenating datasets of different types (sparse or dense) and along different axes. For each dataset and axis combination, it determines the files to concatenate, calculates their sizes, and then measures the memory usage during the concatenation process. The results are stored in a dictionary that maps the dataset type and axis to the corresponding memory usage metrics.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "ae86ae46", + "metadata": {}, + "outputs": [], + "source": [ + "def get_arr_sizes(filepaths, is_sparse):\n", + " def get_arr_size(g):\n", + " if is_sparse:\n", + " size = (\n", + " g.store.getsize(\"X/data\")\n", + " + g.store.getsize(\"X/indices\")\n", + " + g.store.getsize(\"X/indptr\")\n", + " )\n", + " else:\n", + " size = g.store.getsize(\"X\")\n", + " return size\n", + "\n", + " return [get_arr_size(zarr.open_group(filepath)) for filepath in filepaths]\n", + "\n", + "\n", + "def get_mem_usage(filepaths, writepth, axis, max_arg, is_sparse):\n", + " global dask_log\n", + " concat_kwargs = {\n", + " \"in_files\": filepaths,\n", + " \"out_file\": writepth,\n", + " \"axis\": axis,\n", + " }\n", + " tracer_kwargs = dict(trace_python_allocators=True, native_traces=True, follow_fork=True)\n", + " if not is_sparse:\n", + " cluster = LocalCluster(\n", + " memory_limit=max_arg,\n", + " silence_logs=dask_log,\n", + " )\n", + " client = Client(cluster)\n", + " else:\n", + " concat_kwargs[\"max_loaded_elems\"] = max_arg\n", + "\n", + " for stat_file in OUTDIR.glob(\"*.memray\"):\n", + " stat_file.unlink()\n", + "\n", + " if not is_sparse:\n", + " with (\n", + " memray_workers(OUTDIR, report_args=False, **tracer_kwargs),\n", + " memray_scheduler(OUTDIR, report_args=False, **tracer_kwargs),\n", + " ):\n", + " concat_on_disk(**concat_kwargs)\n", + " else:\n", + " with memray.Tracker(OUTDIR / \"test-profile.memray\", **tracer_kwargs):\n", + " concat_on_disk(**concat_kwargs)\n", + "\n", + " max_mem = 0\n", + " for stat_file in OUTDIR.glob(\"*.memray\"):\n", + " with memray.FileReader(stat_file) as reader:\n", + " max_mem += reader.metadata.peak_memory\n", + " \n", + " if not is_sparse:\n", + " client.shutdown()\n", + " client.close()\n", + " cluster.close()\n", + "\n", + " return max_mem\n", + "\n", + "\n", + "def dataset_max_mem(max_arg, datasets, array_type):\n", + " results = {}\n", + " is_sparse = array_type == \"sparse\"\n", + " for filepaths, axis in [(datasets[array_type, axis], axis) for axis in [0, 1]]:\n", + " writepth = OUTDIR / f\"{array_type}_{axis}.zarr\"\n", + " if writepth.exists():\n", + " shutil.rmtree(writepth)\n", + "\n", + " # print the files we are concatenating\n", + " print(\"Dataset:\", array_type, axis)\n", + " print(f\"Concatenating {len(filepaths)} files with sizes:\")\n", + " sizes = get_arr_sizes(filepaths, is_sparse)\n", + " print([str(s // (2**20)) + \"MiB\" for s in sizes])\n", + " print(f\"Total size: {sum(sizes)//(2**20)}MiB\")\n", + "\n", + " # force garbage collection\n", + " gc.collect()\n", + " # perform profiling\n", + " mem_increment = get_mem_usage(filepaths, writepth, axis, max_arg, is_sparse)\n", + " # force garbage collection again\n", + " gc.collect()\n", + "\n", + " print(\"Concatenation finished\")\n", + " print(\"Peak Memory:\", int(mem_increment) // (2**20), \"MiB\")\n", + " print(\"--------------------------------------------------\")\n", + " results[array_type, axis] = mem_increment\n", + " return results" + ] + }, + { + "cell_type": "markdown", + "id": "e73c95fd", + "metadata": {}, + "source": [ + "## Results of concatenation without reindexing\n", + "\n", + "In this section, we evaluate the memory performance of the `concat_on_disk` function when concatenating datasets **without** the need for reindexing. The printed reports provide details about the individual file sizes, the total dataset size, and the maximum memory increment during the concatenation.\n", + "\n", + "\n", + "### Sparse Datasets\n", + "\n", + "For sparse datasets:\n", + "\n", + "- We can observe that the function has been called multiple times with different memory constraints (`max_arg` values), and each time the datasets were concatenated successfully.\n", + "- It's crucial to note that even when the combined size of the files exceeds the allocated memory, the concatenation still proceeds efficiently. This behavior highlights the primary advantage of the `concat_on_disk` function: it performs the concatenation **on disk**, ensuring that memory consumption remains low, even for large datasets.\n", + " \n", + "However, it's also worth noting that if one has sufficient memory to fit the files, performing the concatenation in memory would be faster.\n", + "\n", + "### Dense Datasets\n", + "\n", + "The results for dense datasets follow a similar pattern:\n", + "\n", + "- The datasets are concatenated successfully under memory constraints.\n", + "- The total size of the dataset is much larger than the memory increment, reinforcing the efficiency of on-disk concatenation.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "ad448529", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset: sparse 0\n", + "Concatenating 6 files with sizes:\n", + "['78MiB', '78MiB', '101MiB', '98MiB', '97MiB', '78MiB']\n", + "Total size: 533MiB\n", + "Concatenation finished\n", + "Peak Memory: 16 MiB\n", + "--------------------------------------------------\n", + "Dataset: sparse 1\n", + "Concatenating 6 files with sizes:\n", + "['97MiB', '98MiB', '78MiB', '78MiB', '96MiB', '78MiB']\n", + "Total size: 527MiB\n", + "Concatenation finished\n", + "Peak Memory: 16 MiB\n", + "--------------------------------------------------\n" + ] + } + ], + "source": [ + "dataset_max_mem(max_arg=1_000_000_000, datasets=datasets_aligned, array_type=\"sparse\");" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "f3b74ee3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset: dense 0\n", + "Concatenating 6 files with sizes:\n", + "['668MiB', '668MiB', '919MiB', '668MiB', '932MiB', '932MiB']\n", + "Total size: 4789MiB\n", + "Concatenation finished\n", + "Peak Memory: 388 MiB\n", + "--------------------------------------------------\n", + "Dataset: dense 1\n", + "Concatenating 6 files with sizes:\n", + "['859MiB', '668MiB', '885MiB', '668MiB', '892MiB', '668MiB']\n", + "Total size: 4641MiB\n", + "Concatenation finished\n", + "Peak Memory: 344 MiB\n", + "--------------------------------------------------\n" + ] + } + ], + "source": [ + "dataset_max_mem(max_arg=\"4000MiB\", datasets=datasets_aligned, array_type=\"dense\");" + ] + }, + { + "cell_type": "markdown", + "id": "7a0bbd89", + "metadata": {}, + "source": [ + "## Results of concatenation with reindexing\n", + "\n", + "This section presents the results of the `concat_on_disk` function when concatenating datasets that **require** reindexing.\n", + "\n", + "The observations and interpretations for this section are similar to the ones mentioned for the \"without reindexing\" section. The primary difference is the datasets used for the concatenation. Once again, the on-disk concatenation allows for efficient memory usage, even when the datasets need reindexing.\n", + "\n", + "One can also see the effect of the memory contrain on the measurements." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "781b2ce9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset: sparse 0\n", + "Concatenating 9 files with sizes:\n", + "['78MiB', '78MiB', '101MiB', '98MiB', '98MiB', '97MiB', '96MiB', '97MiB', '78MiB']\n", + "Total size: 825MiB\n", + "Concatenation finished\n", + "Peak Memory: 273 MiB\n", + "--------------------------------------------------\n", + "Dataset: sparse 1\n", + "Concatenating 9 files with sizes:\n", + "['97MiB', '101MiB', '97MiB', '98MiB', '98MiB', '78MiB', '78MiB', '96MiB', '78MiB']\n", + "Total size: 825MiB\n", + "Concatenation finished\n", + "Peak Memory: 425 MiB\n", + "--------------------------------------------------\n" + ] + } + ], + "source": [ + "dataset_max_mem(max_arg=1_000_000_000, datasets=datasets_unaligned, array_type=\"sparse\");" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "28426915", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset: dense 0\n", + "Concatenating 9 files with sizes:\n", + "['859MiB', '668MiB', '885MiB', '668MiB', '919MiB', '892MiB', '668MiB', '932MiB', '932MiB']\n", + "Total size: 7426MiB\n", + "Concatenation finished\n", + "Peak Memory: 662 MiB\n", + "--------------------------------------------------\n", + "Dataset: dense 1\n", + "Concatenating 9 files with sizes:\n", + "['859MiB', '668MiB', '885MiB', '668MiB', '919MiB', '892MiB', '668MiB', '932MiB', '932MiB']\n", + "Total size: 7426MiB\n", + "Concatenation finished\n", + "Peak Memory: 389 MiB\n", + "--------------------------------------------------\n" + ] + } + ], + "source": [ + "dataset_max_mem(max_arg=\"4000MiB\", datasets=datasets_unaligned, array_type=\"dense\");" + ] + }, + { + "cell_type": "markdown", + "id": "9cbeca28", + "metadata": {}, + "source": [ + "## The effect of `max_loaded_elems` on performance\n", + "The parameter `max_loaded_elems` is used in very specific cases when the data is sparse and the concatenation requires reindexing. Ideally, for each concatenation element (i.e., file), the function would load the entire file into memory, reindex it, and then write it to disk. However, this is not always possible due to memory constraints. In such cases, the `max_loaded_elems` parameter is used to specify the maximum number of elements that can be loaded into memory at once. The function then iteratively loads the data, reindexes it, and writes it to disk. This process is repeated until all the data has been processed.\n", + "\n", + "Given the dataset we have created, to observe the effect of this parameter, we would need to set the `max_loaded_elems` to a very small number. However, this would result in a very long concatenation process. Therefore, we will use a subset dataset to demonstrate the effect of this parameter.\n", + "\n", + "Ideally, one would see the full benefits of this feature when the dataset has dissimilar sizes (e.g., a list consisting of 100 x 10mb + 2 x 1gb arrays). However, for the sake of simplicity, we will use a dataset with similar sizes. " + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "7ba971e1", + "metadata": {}, + "outputs": [], + "source": [ + "subset = {\n", + " (\"sparse\", 0): list(datasets_unaligned[(\"sparse\", 0)])[:3],\n", + " (\"sparse\", 1): list(datasets_unaligned[(\"sparse\", 1)])[:3],\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "f2b1afa8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset: sparse 0\n", + "Concatenating 3 files with sizes:\n", + "['78MiB', '78MiB', '101MiB']\n", + "Total size: 258MiB\n", + "Concatenation finished\n", + "Peak Memory: 11 MiB\n", + "--------------------------------------------------\n", + "Dataset: sparse 1\n", + "Concatenating 3 files with sizes:\n", + "['97MiB', '101MiB', '97MiB']\n", + "Total size: 296MiB\n", + "Concatenation finished\n", + "Peak Memory: 39 MiB\n", + "--------------------------------------------------\n" + ] + } + ], + "source": [ + "dataset_max_mem(max_arg=10_000_000, datasets=subset, array_type=\"sparse\");" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "57823bff", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset: sparse 0\n", + "Concatenating 3 files with sizes:\n", + "['78MiB', '78MiB', '101MiB']\n", + "Total size: 258MiB\n", + "Concatenation finished\n", + "Peak Memory: 11 MiB\n", + "--------------------------------------------------\n", + "Dataset: sparse 1\n", + "Concatenating 3 files with sizes:\n", + "['97MiB', '101MiB', '97MiB']\n", + "Total size: 296MiB\n", + "Concatenation finished\n", + "Peak Memory: 416 MiB\n", + "--------------------------------------------------\n" + ] + } + ], + "source": [ + "dataset_max_mem(max_arg=1_000_000_000, datasets=subset, array_type=\"sparse\");" + ] + }, + { + "cell_type": "markdown", + "id": "710508a7", + "metadata": {}, + "source": [ + "## (Optional) Cleaning Up Temporary Files\n", + "After all is done with your tests on this notebook you can cleanup the created files." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "05125e6e", + "metadata": {}, + "outputs": [], + "source": [ + "TMPDIR.cleanup()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "dask", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}