diff --git a/dynim/__init__.py b/dynim/__init__.py index 55872a5..cb467fd 100644 --- a/dynim/__init__.py +++ b/dynim/__init__.py @@ -8,5 +8,6 @@ from .hdpoint import HDPoint from .hdspace import HDSpace from .sampler import Sampler -from .sampler_importance import SamplerImportance from .sampler_random import SamplerRandom +from .sampler_binned import SamplerBinned +from .sampler_importance import SamplerImportance diff --git a/dynim/binning.py b/dynim/binning.py new file mode 100644 index 0000000..0019091 --- /dev/null +++ b/dynim/binning.py @@ -0,0 +1,265 @@ +# Copyright 2020 Lawrence Livermore National Security, LLC and other +# DynIm Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +################################################################################ + +import numpy as np +import logging +LOGGER = logging.getLogger(__name__) + + +# ------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ +class BinningUtils: + + # -------------------------------------------------------------------------- + @staticmethod + def create_cartesian_bins_1d(nxbins, xrange): + return np.linspace(xrange[0], xrange[1], nxbins + 1) + + @staticmethod + def create_cartesian_bins_2d(nxbins, nybins, xrange, yrange): + xbins = BinningUtils.create_cartesian_bins_1d(nxbins, xrange) + ybins = BinningUtils.create_cartesian_bins_1d(nybins, yrange) + return xbins, ybins + + # -------------------------------------------------------------------------- + @staticmethod + def create_spherical_bins(): + alt_bin_edges = np.deg2rad(np.array([0, 6, 18, 30, 42, 54, 66, 78, 90, + 102, 114, 126, 138, 150, 162, 174, + 180])) + nbins_az = np.array([1, 6, 12, 18, 24, 28, 30, 32, 32, + 30, 28, 24, 18, 12, 6, 1]) + + az_bin_edges = [np.linspace(-np.pi, np.pi, n + 1) for n in nbins_az] + return alt_bin_edges, az_bin_edges + + # -------------------------------------------------------------------------- + @staticmethod + def create_polar_bins(nrad_bins, rad_range, + tht_range=np.array([-np.pi, np.pi]), + kfactor=1, do_verify=False): + + assert isinstance(nrad_bins, int) + assert isinstance(rad_range, np.ndarray) + assert isinstance(tht_range, np.ndarray) + assert isinstance(kfactor, int) + assert rad_range.shape == (2,) + assert tht_range.shape == (2,) + assert nrad_bins > 1 + assert kfactor > 0 + assert tht_range[0] >= -np.pi and tht_range[1] <= np.pi + + # compute radius bins + rad_bin_edges = np.linspace(rad_range[0], rad_range[1], nrad_bins + 1) + + # split r'th radial bin into k*(2*r + 1) theta bins (gives equal area bins) + tht_bin_edges = [np.linspace(tht_range[0], tht_range[1], kfactor*(2*r+1)+1) + for r in range(nrad_bins)] + + if do_verify: + areas = [] + for ridx in range(nrad_bins): + for tidx in range(tht_bin_edges[ridx].shape[0] - 1): + r0, r1 = rad_bin_edges[ridx], rad_bin_edges[ridx + 1] + t0, t1 = tht_bin_edges[ridx][tidx], tht_bin_edges[ridx][tidx + 1] + + # area: 1/2 \theta (r2*r2 - r1*r1) + areas.append(0.5 * (t1 - t0) * (r1 * r1 - r0 * r0)) + assert np.isclose(min(areas), max(areas)) + + return rad_bin_edges, tht_bin_edges + + # -------------------------------------------------------------------------- + @staticmethod + def collect_bins(bin_edges0, bin_edges1): + + assert isinstance(bin_edges0, np.ndarray) + assert isinstance(bin_edges1, list) + assert bin_edges0.shape == (len(bin_edges1)+1,) + assert all([isinstance(b, np.ndarray) for b in bin_edges1]) + + # create four arrays to mark 4 corners of the bins + bin_edges = [] + for ridx in range(bin_edges0.shape[0] - 1): + + bedges10 = bin_edges1[ridx] + for tidx in range(bedges10.shape[0] - 1): + bin_edges.append([bin_edges0[ridx], bin_edges0[ridx + 1], + bedges10[tidx], bedges10[tidx + 1]]) + + return np.array(bin_edges) + + # -------------------------------------------------------------------------- + @staticmethod + def get_bin_indices_1d(data, bin_edges, num_out_of_range_bins): + + assert isinstance(data, np.ndarray) + assert isinstance(bin_edges, np.ndarray) + assert len(data.shape) == 1 + assert len(bin_edges.shape) == 1 + assert isinstance(num_out_of_range_bins, int) + assert num_out_of_range_bins in [0, 1, 2] + + LOGGER.info(f'Computing 1d bin indices for {data.shape} data ' + f'(boundary type = {num_out_of_range_bins})') + + # empty data? + if data.shape[0] == 0: + return np.array([]).astype(np.uint8) + + # digitize returns an index for each data value, such that + # bin_edges[i-1] <= x < bin_edges[i] + # for x < bin_edges[0], i = 0 (left out-of-range bin) + # for x >= bin_edges[-1], i = bin_edges.shape + 1 (right out-of-range bin) + bin_idxs = np.digitize(data, bin_edges) + nbins = bin_edges.shape[0] - 1 + + # remove out-of-range bins + if num_out_of_range_bins == 0: + assert bin_idxs.min() > 0 and bin_idxs.max() <= nbins + bin_idxs -= 1 + + # merge the left out-of-range bin into the right one + elif num_out_of_range_bins == 1: + assert bin_idxs.min() >= 0 and bin_idxs.max() <= nbins + 1 + bin_idxs[np.where(bin_idxs == 0)[0]] = nbins + 1 + bin_idxs -= 1 + + # keep both out-of-range bins + elif num_out_of_range_bins == 2: + assert bin_idxs.min() >= 0 and bin_idxs.max() <= nbins + 1 + + return bin_idxs.astype(np.uint8) + + @staticmethod + def get_bin_indices_2d_spherical(data, bins_altitude, bins_azimuth): + + assert len(data.shape) == 2 + assert data.shape[1] == 2 + LOGGER.info(f'Computing 2d-spherical bin indices for {data.shape} data') + + ndata = data.shape[0] + data_altitude, data_azimuthal = data[:, 0], data[:, 1] + + # altitude :: remove both out-of-range bins ([0,90] only) + idxs_t = BinningUtils.get_bin_indices_1d(data_altitude, bins_altitude, 0) + + # azimuthal :: compute the array of bins + idxs_r = np.array([np.nan for _ in range(ndata)]) + for tidx in range(bins_altitude.shape[0] - 1): + # find the bins for data2 "with respect to" the bin in data1 + # these are the elements in bin "idx1" wrt column 1 + eidx = np.where(idxs_t == tidx)[0] + + # idxs_r for these elements should not be set + assert np.all(np.isnan(idxs_r[eidx])) + + # compute the bin idxs + idxs_r[eidx] = BinningUtils.get_bin_indices_1d(data_azimuthal[eidx], + bins_azimuth[tidx], 0) + + idxs_r = idxs_r.astype(np.uint8) + return idxs_t, idxs_r + + # -------------------------------------------------------------------------- + @staticmethod + def compute_histogram_from_bin_indices(bin_idxs, nbins): + LOGGER.info(f'Computing histogram ' + f'for {bin_idxs.shape} bin indices and {nbins} bins') + hist = np.zeros(nbins, dtype=np.uint) + vals, counts = np.unique(bin_idxs, return_counts=True) + hist[vals] = counts + return hist + + # -------------------------------------------------------------------------- + @staticmethod + def plot_polar_histogram(ax, bin_corners, values, cmap='Purples', do_log=False): + + import matplotlib + + assert isinstance(ax, matplotlib.axes.Axes) + assert isinstance(bin_corners, np.ndarray) + assert isinstance(values, np.ndarray) + assert isinstance(do_log, bool) + assert isinstance(cmap, (str, matplotlib.colors.Colormap)) + assert len(bin_corners.shape) == 2 + assert len(values.shape) == 1 + assert bin_corners.shape[1] == 4 + assert bin_corners.shape[0] == values.shape[0] + + # create colors + if isinstance(cmap, str): + cmap = matplotlib.cm.get_cmap(cmap) + + if do_log: + norm = matplotlib.colors.LogNorm(1, values.max()) + else: + norm = matplotlib.colors.Normalize(1, values.max()) + + masked_values = np.ma.masked_equal(values, 0) + cmap.set_bad(alpha = 0) + colors = cmap(norm(masked_values)) + + # plot stacked bars + r0, r1 = bin_corners[:,0], bin_corners[:,1] + t0, t1 = bin_corners[:,2], bin_corners[:,3] + dt = t1-t0 + ax.bar(x = t0 + dt, height = r1-r0, width = dt, bottom = r0, + color=colors) #, edgecolor='#bbbbbb', linewidth=0.5) + #color=colors, edgecolor=colors, linewidth=0.5) + + # customize axes + ax.grid(False) + ax.set_theta_zero_location("N") + ax.set_theta_direction(-1) + ax.set_thetagrids([0, 45, 90, 135, 180, 225, 270, 315], + labels=['0', '45', '90', '135', '180', '-135', '-90', '-45']) + + ax.set_rticks([]) + ax.set_rlim([r0[0], r1[-1]]) + ax.set_rlabel_position(180) + + # add colorbar + sm = matplotlib.cm.ScalarMappable(cmap=cmap, norm=norm) + sm.set_array([]) + matplotlib.pyplot.colorbar(sm, orientation="horizontal", pad=0.05) + + # -------------------------------------------------------------------------- + @staticmethod + def plot_polar_scatter(ax, rad, tht, rad_rng = None): + + import matplotlib + + assert isinstance(ax, matplotlib.axes.Axes) + assert isinstance(rad, np.ndarray) + assert isinstance(tht, np.ndarray) + + assert tht.min() >= -np.pi and tht.max() <= np.pi + assert rad.min() >= 0 + + if rad_rng is None: + rad_rng = np.array([rad.min(), rad.max()]) + else: + assert isinstance(rad_rng, np.ndarray) + assert rad_rng.shape == (2,) + assert rad.min() >= rad_rng[0] and rad.max() <= rad_rng[1] + + # now, plot! + ax.scatter(tht, rad, s=1, alpha=0.1) + + # set up the axes! + ax.set_theta_zero_location("N") + ax.set_theta_direction(-1) + ax.set_thetagrids([0, 45, 90, 135, 180, 225, 270, 315], + labels=['0', '45', '90', '135', '180', '-135', '-90', '-45']) + + ax.set_rticks([]) + ax.set_rlim(rad_rng) + ax.set_rlabel_position(180) + +# ------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ diff --git a/dynim/hdpoint.py b/dynim/hdpoint.py index a9185fd..9bf95bc 100644 --- a/dynim/hdpoint.py +++ b/dynim/hdpoint.py @@ -14,6 +14,8 @@ class HDPoint: """A high-dimensional point.""" + fetch_ids = np.vectorize(lambda d: d.id, otypes=[str]) + def __init__(self, id: (int, str), coords: np.ndarray, rank: np.float32 = np.float32('nan')) -> None: diff --git a/dynim/hdspace.py b/dynim/hdspace.py index bbfa5d9..a11422b 100644 --- a/dynim/hdspace.py +++ b/dynim/hdspace.py @@ -5,6 +5,7 @@ ################################################################################ +import os from typing import List import logging @@ -34,7 +35,7 @@ def __str__(self) -> str: else: assert False - return 'HDSpace '.format(self.dim, _type, self.count()) + return f'HDSpace ' def __repr__(self) -> str: return self.__str__() @@ -55,8 +56,7 @@ def _assert_valid_points(self, points: np.ndarray) -> None: if len(points.shape) != 2: raise ValueError('Need points as a 2-rank vector of size (n,dim)') if points.shape[1] != self.dim: - raise ValueError('Dimensionality mismatch: {} != (,{})' - .format(points.shape, self.dim)) + raise ValueError(f'Dimensionality mismatch: {points.shape} != (,{self.dim})') def _assert_valid_index(self) -> None: assert self.index is not None @@ -79,17 +79,15 @@ def setup(self, dim: int, idx_type: str = 'approx', if idx_type == 'exact': # quantizer is used as the index self.index = quantizer - LOGGER.info('Initialized {}-D Space with {} metric' - .format(self.dim, idx_type)) + LOGGER.info(f'Initialized {self.dim}-D Space with {idx_type} metric') return # else # quantizer is used for mapping points to centers of voronoi cells - self.index = faiss.IndexIVFFlat(quantizer, self.dim, nlist, - faiss.METRIC_L2) + self.index = faiss.IndexIVFFlat(quantizer, self.dim, nlist, faiss.METRIC_L2) self.index.nprobe = nprobes - LOGGER.info('Initialized {}-D Space with {} metric ({}, {})' - .format(self.dim, idx_type, nlist, nprobes)) + LOGGER.info(f'Initialized {self.dim}-D Space with {idx_type} metric ' + f'({nlist}, {nprobes})') # -------------------------------------------------------------------------- # train the data-structure using some data @@ -102,13 +100,11 @@ def train(self, points: np.ndarray) -> None: assert not self.index.is_trained self._assert_valid_points(points) - LOGGER.debug('Training with {} points'.format(points.shape)) + LOGGER.debug(f'Training with {points.shape} points') try: self.index.train(points) - except Exception as e: - LOGGER.error('Failed to train {}-D Space. Error = {}' - .format(self.dim, e)) + LOGGER.error(f'Failed to train {self.dim}-D Space. Error = {e}') assert self.index.is_trained @@ -117,16 +113,20 @@ def train(self, points: np.ndarray) -> None: # -------------------------------------------------------------------------- def restore(self, fname: str) -> bool: + if not os.path.isfile(fname): + LOGGER.warning(f'Failed to find ({fname})!') + return False + try: self.index = faiss.read_index(fname) except Exception as e: - LOGGER.error('Failed to restore HD Space. Error = {}'.format(e)) + LOGGER.error(f'Failed to restore HD Space. Error = {e}') return False self.dim = self.index.d - LOGGER.info('Successfully restored {}-D Space with {} points from ({})' - .format(self.dim, self.count(), fname)) + LOGGER.info(f'Successfully restored {self.dim}-D Space ' + f'with {self.count()} points from ({fname})') return True # checkpoint the latent space @@ -136,12 +136,12 @@ def checkpoint(self, fname: str) -> bool: faiss.write_index(self.index, fname) except Exception as e: - LOGGER.error('Failed to checkpoint {}-D Space. Error = {}' - .format(self.dim, e)) + LOGGER.error(f'Failed to checkpoint {self.dim}-D Space. ' + f'Error = {e}') return False - LOGGER.info('Successfully checkpointed {}-D Space with {} points to ({})' - .format(self.dim, self.count(), fname)) + LOGGER.info(f'Successfully checkpointed {self.dim}-D Space ' + f'with {self.count()} points to ({fname})') return True # -------------------------------------------------------------------------- @@ -152,14 +152,12 @@ def add_points(self, points: np.ndarray) -> bool: self._assert_valid_index() self._assert_valid_points(points) - LOGGER.debug('Adding {} points to {}-D Space' - .format(points.shape[0], self.dim)) + LOGGER.debug(f'Adding {points.shape} points to {self.dim}-D Space') try: self.index.add(points) except Exception as e: - LOGGER.error('Failed to add point to {}-D Space (error = {})' - .format(self.dim, e)) + LOGGER.error(f'Failed to add point to {self.dim}-D Space (error = {e})') return False return True @@ -178,44 +176,50 @@ def get_knn(self, points: np.ndarray, k: int, k0: int = 0) -> List[np.ndarray]: # dont make the query if the all the points will be excluded later if k0 >= n or n == 0: - LOGGER.warning('Invalid request (k0={} > total={})'.format(k0, n)) + LOGGER.warning(f'Invalid request (k0={k0} > total={n})') return [] # ---------------------------------------------------------------------- self._assert_valid_points(points) nquery = points.shape[0] - LOGGER.debug('Getting ([{}, {}))-nn for {} query points (total = {})' - .format(k0, k + k0, nquery, n)) + LOGGER.debug(f'Getting ([{k0}, {k+k0}))-nn for {nquery} query points (total = {n})') + + k = min(k0 + k, n) + + # ---------------------------------------------------------------------- try: - k = min(k0 + k, n) - dists, inds = self.index.search(points, k) + chunk_size = 10000 + + res = [self.index.search(points[chunk_start:chunk_start + chunk_size], k) + for chunk_start in range(0, nquery, chunk_size)] + + dists = np.concatenate([_[0] for _ in res]) + inds = np.concatenate([_[1] for _ in res]) + assert dists.shape == (nquery, k) and inds.shape == (nquery, k), 'Invalid aggregation of knn' except Exception as e: - LOGGER.error('Failed to get knn from {}-D Space (error = {})' - .format(self.dim, e)) + LOGGER.error(f'Failed to get knn from {self.dim}-D Space (error = {e})') return [] - assert dists.shape == (nquery, k) and inds.shape == (nquery, k) - # ---------------------------------------------------------------------- # remove any invalid values # although, should not be needed since we adjusted k to be <= n valids = np.bitwise_and(inds != -1, dists != np.finfo(dists.dtype).max) - # pick valid knn after ignoring the first k0 valid points - knn = [] - for i in range(nquery): + def _ignore_k0(i): vi = np.where(valids[i])[0] # check to see if we didn't find anything valid if vi.shape[0] == 0: - knn.append((np.array([]), np.array([]))) - # knn.append((np.array([-1]), np.array([np.nan]))) + return np.array([]), np.array([]) else: vi = vi[k0:] # faiss returns squared distance - knn.append((inds[i][vi], np.sqrt(dists[i][vi]))) + return inds[i][vi], np.sqrt(dists[i][vi]) + + # pick valid knn after ignoring the first k0 valid points + knn = [_ignore_k0(i) for i in range(nquery)] # the return value is a list of n tuples (n = number of query points) # each tuple contains (id, dist) @@ -233,8 +237,7 @@ def get_knn_distance(self, points: np.ndarray, k: int, k0: int = 0) -> np.float3 self._assert_valid_points(points) nquery = points.shape[0] - LOGGER.debug('Getting ([{}, {}))-nn dist for {} points (total = {})' - .format(k0, k + k0, nquery, self.count())) + LOGGER.debug(f'Getting ([{k0}, {k0+k}))-nn dist for {nquery} points (total = {self.count()})') knn = self.get_knn(points, k, k0) mean_dists = -1. * np.ones(nquery).astype(np.float32) diff --git a/dynim/sampler.py b/dynim/sampler.py index 21cbf0d..0cbd3c6 100644 --- a/dynim/sampler.py +++ b/dynim/sampler.py @@ -6,14 +6,12 @@ ################################################################################ import os -import sys -import yaml from typing import List, Union import logging import numpy as np from .hdpoint import HDPoint -from .utils import format_time, backup_fname, take_backup, write_history +from .utils import format_time, take_backup, filter_list, write_history LOGGER = logging.getLogger(__name__) @@ -25,7 +23,8 @@ class Sampler(object): # -------------------------------------------------------------------------- def __init__(self, name: str, workspace: str, - min_cands_b4_sel: int = 0, buffer_size: int = 0): + buffer_size: int = 0, + min_cands_b4_sel: int = 0): """ Initialize a Sampler. Args: @@ -43,17 +42,7 @@ def __init__(self, name: str, workspace: str, self.workspace = workspace self.buffer_size = buffer_size self.min_cands_b4_sel = min_cands_b4_sel - - if not os.path.isdir(self.workspace): - os.makedirs(self.workspace) - - # prepare to write the history and checkpoints - self.tag = '{}-{}'.format(self.type, self.name) - self.chkpts = { - 'state': os.path.join(self.workspace, '{}.state.chk'.format(self.tag)), - 'data': os.path.join(self.workspace, '{}.data.npz'.format(self.tag)) - } - self.hists = os.path.join(self.workspace, '{}.history.csv'.format(self.tag)) + os.makedirs(self.workspace, exist_ok=True) # candidates for the next set of selection (sorted list of samples) self.candidates = np.array([]) @@ -67,96 +56,112 @@ def __init__(self, name: str, workspace: str, # samples that have been dropped (maintain for recalculation of weights) self.discarded = [] + # function to explicitly scale the rank + self.rank_scaler = None + + # prepare to write the history and checkpoints + self.schkpt = os.path.join(self.workspace, f'{self.__tag__()}.data.npz') + self.hists = os.path.join(self.workspace, f'{self.__tag__()}.history.csv') + # -------------------------------------------------------------------------- + def __type__(self): + return type(self).__name__ + + def __tag__(self): + return f'{self.__type__()}-{self.name}' + def __str__(self): - s = '{}<{}> '.format(self.type, self.name) - s += '[{} cached, {} candidates, {} selected, {} discarded]'\ - .format(len(self.cached), len(self.candidates), - len(self.selected), len(self.discarded)) - return s + return f'[{self.__type__()}<{self.name}>:' \ + f' {len(self.cached)} cached,' \ + f' {len(self.candidates)} candidates,' \ + f' {len(self.selected)} selected,' \ + f' {len(self.discarded)} discarded]' def __repr__(self): return self.__str__() - # -------------------------------------------------------------------------- def _state_dict(self): return {'ncached': len(self.cached), 'ncandidates': len(self.candidates), 'nselected': len(self.selected), 'ndiscarded': len(self.discarded)} + # -------------------------------------------------------------------------- def num_candidates(self): return len(self.cached) + len(self.candidates) def num_selected(self): return len(self.selected) - @staticmethod - def _filter(data_to_filter, filter_if_present_in_list): + def set_rank_scaler(self, rank_scaler: callable) -> None: + assert callable(rank_scaler) + LOGGER.info(f'Setting rank scalar ({rank_scaler}) for ({self.__tag__()})') + self.rank_scaler = rank_scaler - if len(data_to_filter) == 0 or len(filter_if_present_in_list) == 0: - return data_to_filter, [] - - # since filter_if_present_in_list is likely very large, - # let's use numpy to find the ones that are already included - ids_in_data = np.array([_.id for _ in data_to_filter]) - ids_in_list = np.array([_.id for _ in filter_if_present_in_list]) - is_present = np.intersect1d(ids_in_data, ids_in_list) + # -------------------------------------------------------------------------- + def _apply_ranks_to_candidates(self, ranks): + assert isinstance(ranks, np.ndarray) + assert ranks.dtype == np.float32 + assert ranks.shape[0] == len(self.candidates) - # no intersection - if is_present.shape[0] == 0: - return data_to_filter, [] + if self.rank_scaler is None: + LOGGER.info(f'Applying {len(self.candidates)} ranks to candidates') + for i, sample in enumerate(self.candidates): + sample.rank = ranks[i] - # slower list comprehension to find the ones not present - is_not_present = [_ for _ in data_to_filter if _.id not in is_present] - return is_not_present, is_present.tolist() + else: + LOGGER.info(f'Scaling {len(self.candidates)} ranks using ({self.rank_scaler})') + for i, sample in enumerate(self.candidates): + sample.rank = np.float32(self.rank_scaler(sample.id, ranks[i])) # -------------------------------------------------------------------------- # add new candidates to the sampler # -------------------------------------------------------------------------- - def add_candidates(self, points: List[HDPoint]) -> None: + def add_candidates(self, points: List[HDPoint], do_filter=True) -> None: - assert isinstance(points, list) + assert isinstance(points, (list, np.ndarray)) assert all([isinstance(p, HDPoint) for p in points]) n = len(points) if n == 0: return - LOGGER.debug('Adding {} candidates to ({})'.format(n, self.__str__())) - - # remove the ones we have already seen - points, discarded = self._filter(points, self.selected) - if len(discarded) > 0: - LOGGER.warning('Rejecting {} already selected points: {}' - .format(len(discarded), discarded)) - write_history(self.hists, 'rejected:add_cands:already_selected', - discarded, self._state_dict()) - - points, discarded = self._filter(points, self.discarded) - if len(discarded) > 0: - LOGGER.warning('Rejecting {} already discarded points: {}' - .format(len(discarded), discarded)) - write_history(self.hists, 'rejected:add_cands:already_discarded', - discarded, self._state_dict()) - - points, discarded = self._filter(points, self.cached) - if len(discarded) > 0: - LOGGER.warning('Rejecting {} already cached points: {}' - .format(len(discarded), discarded)) - write_history(self.hists, 'rejected:add_cands:already_cached', - discarded, self._state_dict()) - - points, discarded = self._filter(points, self.candidates) - if len(discarded) > 0: - LOGGER.warning('Rejecting {} already candidates points: {}' - .format(len(discarded), discarded)) - write_history(self.hists, 'rejected:add_cands:already_candidates', - discarded, self._state_dict()) + # ---------------------------------------------------------------------- + def _filter(_ids_to_filter, _ids_to_check_against, _tag): + + if len(_ids_to_check_against) == 0: + return np.zeros(len(_ids_to_filter), dtype=bool) + + dup_flags = np.isin(_ids_to_filter, _ids_to_check_against, assume_unique=True) + ndups = dup_flags.sum() + if ndups > 0: + dups = _ids_to_filter[dup_flags] + LOGGER.warning(f'Rejecting {ndups} already {_tag} points: {dups}') + write_history(self.hists, f'rejected:add_cands:already_{_tag}', + dups, self._state_dict()) + + return dup_flags + + # ---------------------------------------------------------------------- + LOGGER.debug(f'Adding {n} candidates to ({self})') + + # filter the ids we have already seen + if do_filter: + pids = HDPoint.fetch_ids(points) + dflags0 = _filter(pids, HDPoint.fetch_ids(self.selected), 'selected') + dflags1 = _filter(pids, HDPoint.fetch_ids(self.discarded), 'discarded') + dflags2 = _filter(pids, HDPoint.fetch_ids(self.cached), 'cached') + dflags3 = _filter(pids, HDPoint.fetch_ids(self.candidates), 'candidates') + + dflags = np.logical_or.reduce((dflags0, dflags1, dflags2, dflags3)) + points = np.array(points) + points = points[np.logical_not(dflags)] + points = points.tolist() # finally, add the ones we have not seen before + n = len(points) + LOGGER.info(f'Added {n} candidates to ({self})') self.cached.extend(points) - LOGGER.info('Added {} candidates to ({})'.format(n, self.__str__())) write_history(self.hists, 'added', points, self._state_dict()) # -------------------------------------------------------------------------- @@ -164,7 +169,7 @@ def add_candidates(self, points: List[HDPoint]) -> None: # -------------------------------------------------------------------------- def update(self): - LOGGER.debug('Updating sampler ({})'.format(self.__str__())) + LOGGER.profile(f'Updating sampler ({self})') # move the cached candidates to actual candidate list self.candidates = np.append(self.candidates, self.cached) @@ -189,7 +194,7 @@ def update(self): # write discards write_history(self.hists, 'discarded', discard, self._state_dict()) - LOGGER.debug('Updated sampler ({})'.format(self.__str__())) + LOGGER.profile(f'Updated sampler ({self})') # -------------------------------------------------------------------------- # select k new candidates @@ -201,24 +206,24 @@ def select(self, k: int, confirm_selection: bool = True) -> List[HDPoint]: if k == 0: return [] - if 0 < self.min_cands_b4_sel and self.num_candidates() < self.min_cands_b4_sel: - LOGGER.debug('Not selecting due to too few candidates: {}' - .format(self.__str__())) + if (self.min_cands_b4_sel > 0) and \ + (self.num_selected() == 0) and \ + (self.num_candidates() < self.min_cands_b4_sel): + LOGGER.debug(f'Not selecting due to too few candidates from {self}') return [] self.update() - LOGGER.debug('Selecting {} samples from ({})'.format(k, self.__str__())) + LOGGER.debug(f'Selecting {k} samples from {self}') # pick the top k candidates k = min(k, len(self.candidates)) selection = list(self.candidates[:k]) - LOGGER.info('Selected {} samples from ({})'.format(k, self.__str__())) + LOGGER.info(f'Selected {k} samples from {self}') if confirm_selection: self._confirm_selections(selection) - LOGGER.info('Confirmed selection of {} samples from ({})' - .format(k, self.__str__())) + LOGGER.info(f'Confirmed selection of {k} samples from {self}') return selection @@ -242,42 +247,37 @@ def confirm_selections(self, selection: Union[List[HDPoint], List[str]]) -> bool if k == 0: return False - LOGGER.debug('Confirming selection of {} samples from ({})' - .format(k, self.__str__())) + LOGGER.debug(f'Confirming selection of {k} samples from {self}') # ---------------------------------------------------------------------- # make sure that the given selection are the top candidates if is_samples: for i in range(k): if self.candidates[i] != selection[i]: - raise AttributeError('Cannot confirm selection [{} = {}]; ' - 'not found in candidates.' - .format(i, selection[i])) + raise AttributeError(f'Cannot confirm selection: ' + f'[{i} = {selection[i]}] not a candidate') else: for i in range(k): if self.candidates[i].id != selection[i]: - raise AttributeError('Cannot confirm selection [{} = {}]; ' - 'not found in candidates.' - .format(i, selection[i])) + raise AttributeError(f'Cannot confirm selection: ' + f'[{i} = {selection[i]}] not a candidate') + # ---------------------------------------------------------------------- selection = list(self.candidates[:k]) # now, check that these are not previously selected for i in range(k): if selection[i] in self.selected: - raise AttributeError('Cannot confirm selection [{} = {}]; ' - 'already selected.' - .format(i, selection[i])) + raise AttributeError(f'Cannot confirm selection: ' + f'[{i} = {selection[i]}] already selected') if selection[i] in self.discarded: - raise AttributeError('Cannot confirm selection [{} = {}]; ' - 'already discarded.' - .format(i, selection[i])) + raise AttributeError(f'Cannot confirm selection: ' + f'[{i} = {selection[i]}] already discarded') # ---------------------------------------------------------------------- self._confirm_selections(selection) - LOGGER.info('Confirmed selection of {} samples from ({})' - .format(k, self.__str__())) + LOGGER.info(f'Confirmed selection of {k} samples from {self}') # -------------------------------------------------------------------------- def _confirm_selections(self, selection: List[HDPoint]) -> bool: @@ -289,7 +289,7 @@ def _confirm_selections(self, selection: List[HDPoint]) -> bool: if k == 0: return False - LOGGER.debug('Confirming selection of {} samples'.format(k)) + LOGGER.debug(f'Confirming selection of {k} samples') # mark the selections as sampled (to be implemented by the child class!) self._add_selections(selection) @@ -303,69 +303,71 @@ def _confirm_selections(self, selection: List[HDPoint]) -> bool: # write selections write_history(self.hists, 'selected', selection, self._state_dict()) + # -------------------------------------------------------------------------- + def invalidate_candidates(self, validator): + assert callable(validator) + + def _filter_valid(_data, tag): + _ids = [_.id for _ in _data] + _flags = [_ for _ in validator(_ids)] + _valids = [_data[i] for i, _ in enumerate(_flags) if _] + _invalids = [_data[i] for i, _ in enumerate(_flags) if not _] + + LOGGER.info(f'{tag} = {len(_data)}: ' + f'valid = {len(_valids)}, invalid = {len(_invalids)}') + return _valids, _invalids + + LOGGER.info(f'testing for invalid candidates for {self}') + + self.cached, _invalid_cached = _filter_valid(self.cached, 'cached') + self.candidates, _invalid_cands = _filter_valid(self.candidates, 'candidates') + LOGGER.info(f'after invaliding patches: {self}') + + if len(_invalid_cands) > 0: + _hfile = self.hists + write_history(_hfile, 'invalidated:cached', _invalid_cached, self._state_dict()) + write_history(_hfile, 'invalidated:candidates', _invalid_cands, self._state_dict()) + + return len(_invalid_cands) + # -------------------------------------------------------------------------- # checkpoint and restore # -------------------------------------------------------------------------- - def checkpoint(self): + def _checkpoint(self): st = format_time() - LOGGER.info('Checkpointing {} at {}'.format(self.__str__(), st)) - sys.stdout.flush() - - # take backup of previous checkpoints - take_backup(self.chkpts['state']) - take_backup(self.chkpts['data']) - - # save current state ast yaml - with open(self.chkpts['state'], 'w') as outfile: - state = dict(t=st, type=self.type, name=self.name, - ncached=len(self.cached), - nselected=len(self.selected), - ncandidates=len(self.candidates), - ndiscarded=len(self.discarded)) - - yaml.dump(state, outfile) - - # save data as npz - np.savez_compressed(self.chkpts['data'], - t=st, type=self.type, name=self.name, + LOGGER.info(f'Checkpointing Sampler data {self} at {st}') + + # take backup of previous checkpoint + take_backup(self.schkpt) + np.savez_compressed(self.schkpt, + t=st, type=self.__type__(), name=self.name, cached=self.cached, candidates=self.candidates, selected=self.selected, discarded=self.discarded) - LOGGER.debug('Checkpointing done for Sampler') - - def restore(self): - - for k in self.chkpts.keys(): - if not os.path.isfile(self.chkpts[k]): - LOGGER.debug('Checkpoint file {} does not exist!' - .format(self.chkpts[k])) - return False - - # load yaml - with open(self.chkpts['state'], 'r') as infile: - state = yaml.load(infile, Loader=yaml.Loader) + # -------------------------------------------------------------------------- + def _restore(self, filename=None): - # restore data! - LOGGER.debug('Restoring data from {}'.format(state['t'])) - sys.stdout.flush() + if filename is None: + filename = self.schkpt try: - data = np.load(self.chkpts['data'], allow_pickle=True) - assert self.type == data['type'] + LOGGER.debug(f'Restoring from ({filename})') + data = np.load(filename, allow_pickle=True) + assert self.name == data['name'] + assert self.__type__() == data['type'] - except Exception as e: - LOGGER.error('Failed to restore {}. Error = {}'.format(self.chkpts['data'], e)) - raise e + self.cached = list(data['cached']) + self.selected = list(data['selected']) + self.candidates = np.array(data['candidates']) + self.discarded = list(data['discarded']) + data.close() - self.cached = list(data['cached']) - self.selected = list(data['selected']) - self.candidates = np.array(data['candidates']) - self.discarded = list(data['discarded']) + except Exception as e: + raise Exception(f'{type(e).__name__}: {e} ({filename})') - assert self.test() - LOGGER.info('Restored {}'.format(self.__str__())) + LOGGER.info(f'Successfully restored {self}') return True # ------------------------------------------------------------------------------ diff --git a/dynim/sampler_binned.py b/dynim/sampler_binned.py new file mode 100644 index 0000000..6866559 --- /dev/null +++ b/dynim/sampler_binned.py @@ -0,0 +1,667 @@ +# Copyright 2020 Lawrence Livermore National Security, LLC and other +# DynIm Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +################################################################################ + + +import os +import numpy as np +import logging +from typing import List + +from .hdspace import HDSpace +from .hdpoint import HDPoint +from .sampler import Sampler +from .binning import BinningUtils +from .utils import take_backup, backup_fname, files_exist + +LOGGER = logging.getLogger(__name__) + + +# ------------------------------------------------------------------------------ +# An importance sampler of hd points +# ------------------------------------------------------------------------------ +class SamplerBinned(Sampler): + + KNOWN_BINNING_TYPES = ['1d-nonperiodic', + '2d-spherical', + '3d-spherical_and_cartesian'] + KNOWN_RANKING_TYPES = ['static', 'dynamic'] + + def __init__(self, name: str, workspace: str, + novelty_factor: float, + binning_type: str, + min_cands_b4_sel: int = 0): + + self.ranking_type = 'dynamic' + self.binning_type = binning_type + assert self.ranking_type in SamplerBinned.KNOWN_RANKING_TYPES + assert self.binning_type in SamplerBinned.KNOWN_BINNING_TYPES + + super().__init__(name, workspace, min_cands_b4_sel = min_cands_b4_sel) + + # ---------------------------------------------------------------------- + # --- set up the binned sampler + # novelty_factor = 1: complete novelty (flat curve) + # novelty_factor = 0: complete random (mimic distribution) + # linear interpolation between the two! + assert isinstance(novelty_factor, float) + assert 0. <= novelty_factor <= 1.0 + self.novelty_factor = novelty_factor + + # for hdspace ranking within bins + self.hdspace = None + self.ranking_dim = 0 + self.lchkpt = os.path.join(self.workspace, f'{self.__tag__()}.lspace.idx') + + # ---------------------------------------------------------------------- + self.cand_histo, self.cand_binidxs = None, None + self.sel_histo, self.sel_binidxs = None, None + + # ---------------------------------------------------------------------- + _s = 'unknown' + if self.binning_type == '1d-nonperiodic': + # dimer dists are measured from mid points + # so add some margin + xrange = [0, np.sqrt(2.)*15. + 3.] + self.bins_x = BinningUtils.create_cartesian_bins_1d(5, xrange) + self.nbins_x = self.bins_x.shape[0]-1 + + self.nbins = self.nbins_x + _s = f'x={self.nbins_x}' + self.binning_dim = 1 + + self.distribute_zero_map = {} + for b in range(self.nbins): + self.distribute_zero_map[b] = [] + for db in range(1,self.nbins): + b2a,b2b = b-db,b+db + if b2a >= 0 and b2b < self.nbins: + self.distribute_zero_map[b].append([b2a,b2b]) + elif b2b < self.nbins: + self.distribute_zero_map[b].append([b2b]) + elif b2a >= 0: + self.distribute_zero_map[b].append([b2a]) + + # ---------------------------------------------------------------------- + elif self.binning_type in ['2d-spherical', '3d-spherical_and_cartesian']: + + # spherical bins = (bins_altitude, bins_azimuth) + self.bins_altitude, self.bins_azimuth = BinningUtils.create_spherical_bins() + self.nbins_altitude = self.bins_altitude.shape[0] - 1 + self.nbins_azimuth = [b.shape[0] - 1 for b in self.bins_azimuth] + + # count the number of azimuth bins prior to each altitude bin + self.cbins_azimuth = np.concatenate((np.zeros(1), np.cumsum(self.nbins_azimuth))).astype(np.uint) + assert sum(self.nbins_azimuth) == self.cbins_azimuth[-1] + + # that's it for 2d binning + if self.binning_type == '2d-spherical': + self.binning_dim = 2 + self.nbins = np.uint(self.cbins_azimuth[-1]) + _s = f'altitude={self.nbins_altitude}, azimuth={self.nbins_azimuth}' + + # 3d binning needs a z dimension also + elif self.binning_type == '3d-spherical_and_cartesian': + self.binning_dim = 3 + # then there will be two out of range bins: [< min] and [> max] + self.bins_z = np.array([1.5, 2., 2.5, 3.]) + self.nbins_z = 2 + (self.bins_z.shape[0] - 1) + + self.nbins = np.uint(self.nbins_z * self.cbins_azimuth[-1]) + _s = f'z={self.nbins_z}, altitude={self.nbins_altitude}, azimuth={self.nbins_azimuth}' + + # ---------------------------------------------------------------------- + LOGGER.info(f'Initialized SamplerBinned({self.name}, {self.binning_type}, ' + f'novelty={self.novelty_factor}) with ' + f'{self.nbins} bins: ({_s})') + + # -------------------------------------------------------------------------- + def set_hdspace(self, data: (str, HDSpace)) -> None: + + assert isinstance(data, (str, HDSpace)) + if isinstance(data, HDSpace): + self.hdspace = data + else: + self.hdspace = HDSpace() + self.hdspace.restore(data) + + self.ranking_dim = self.hdspace.dim + LOGGER.info(f'Assigned a {self.ranking_dim}-D hdspace for ranking') + + # -------------------------------------------------------------------------- + def _fetch_coords(self, points: List[HDPoint]): + coords = np.array([p.coords for p in points]) + assert self.binning_dim + self.ranking_dim == coords.shape[1] + if self.hdspace is None: + return None, coords + else: + return np.array(coords[:, :-self.binning_dim]), \ + np.array(coords[:, -self.binning_dim:]) + + # -------------------------------------------------------------------------- + # add these selections to hdspace + def _add_selections(self, points: List[HDPoint]) -> None: + if len(points) == 0 or self.hdspace is None: + return + rcoords, bcoords = self._fetch_coords(points) + self.hdspace.add_points(np.array(rcoords)) + + # -------------------------------------------------------------------------- + def _update_ranks(self, k=10): + + nselections = len(self.selected) + + LOGGER.debug(f'Updating ranks: {self}') + # ---------------------------------------------------------------------- + # Step 1: assign a global rank to all elements + ccoords_ranking, ccoords_binning = self._fetch_coords(self.candidates) + if ccoords_ranking is None: + LOGGER.debug(f'fetched coordinates: {ccoords_binning.shape}') + else: + LOGGER.debug(f'fetched coordinates: {ccoords_ranking.shape}, {ccoords_binning.shape}') + + if self.hdspace is None or len(self.selected) == 0: + ncandidates = len(self.candidates) + LOGGER.debug(f'Computing random ranks for {ncandidates} points') + cand_ranks = np.random.rand(ncandidates).astype(np.float32) + else: + LOGGER.debug(f'Computing importance ranks for {ccoords_ranking.shape} points') + cand_ranks = self.hdspace.get_knn_distance(ccoords_ranking, k, 0).astype(np.float32) + assert cand_ranks.max() >= 0, f'{self}: ranks cannot be negative' + + LOGGER.debug(f'cand_ranks: {cand_ranks.shape}, [{cand_ranks.min()}, {cand_ranks.max()}]') + + # ---------------------------------------------------------------------- + # Step 2: compute the histogram and bin_idxs + # for current candidates and previous selections + LOGGER.profile('computing histogram of candidates') + self.cand_histo, self.cand_binidxs = self.compute_histogram(ccoords_binning) + LOGGER.debug(f'histogram of candidates: {self.cand_histo}') + LOGGER.profile('computed histogram of candidates') + + # ---------------------------------------------------------------------- + # Step 3: compute the chunks (static or dynamic) + if self.ranking_type == 'static': + chunk_edges = SamplerBinned.create_chunks_static(self.cand_histo, self.novelty_factor) + + else: + LOGGER.profile('computed histogram') + if nselections == 0: + self.sel_histo = np.zeros_like(self.cand_histo) + self.sel_binidxs = None + else: + LOGGER.profile('computing histogram of selections') + srcoords, sbcoords = self._fetch_coords(self.selected) + self.sel_histo, self.sel_binidxs = self.compute_histogram(sbcoords) + LOGGER.profile('computed histogram of selections') + + LOGGER.profile('computing dynamic chunking') + chunk_edges = SamplerBinned.create_chunks_dynamic(self.cand_histo, + self.sel_histo, + self.novelty_factor) + LOGGER.profile('computed dynamic chunking') + + # ---------------------------------------------------------------------- + # Step 4: update the ranks to incorporate chunk ids + # basically, we will manipulate the ranks so that + # global ranks preserve a chunk ordering + # i.e., all elements in chunk 0 across all bins have higher rank + # than any element in chunk 1 in any bin + nbins, nchunks = chunk_edges.shape[0], chunk_edges.shape[1]-1 + + rank_offset = cand_ranks.max() + cand_ranks.min() + LOGGER.info(f'Updating ranks with offset={rank_offset}') + LOGGER.profile('updating ranks') + for bin_idx in range(nbins): + + elements_in_bin = np.where(self.cand_binidxs == bin_idx)[0] + if elements_in_bin.shape[0] == 0: + continue + + chunks_for_bins = chunk_edges[bin_idx] + nchnks = chunks_for_bins.shape[0] - 1 + + LOGGER.debug(f'bin {bin_idx} has {elements_in_bin.shape[0]} elements') + LOGGER.debug(f'bin {bin_idx} has {chunks_for_bins} chunks') + + # descending sort of the global ranks in this bin + rank_order_in_bin = np.argsort(-1 * cand_ranks[elements_in_bin]) + elements_bin_in_rank_order = elements_in_bin[rank_order_in_bin] + + # now go over each chunk in the rank order and update the ranks + # for a chunk c, the rank becomes rank + (nchunks-c-1)*rank_offset + # this splits the ranks into chunks, + # i.e., all elements in a given chunk (across all bins) + # have higher ranks than the next chunk + for cidx in range(nchnks): + c0,c1 = chunks_for_bins[cidx], chunks_for_bins[cidx+1] + elements_in_chunk = elements_bin_in_rank_order[c0:c1] + cand_ranks[elements_in_chunk] += (nchunks-cidx-1)*rank_offset + + LOGGER.debug(f'ranks_in_bins: {cand_ranks[elements_in_bin].shape}, ' + f'[{cand_ranks[elements_in_bin].min()}, ' + f'{cand_ranks[elements_in_bin].max()}]') + + # ---------------------------------------------------------------------- + LOGGER.debug(f'ranks: {cand_ranks.shape}, [{cand_ranks.min()}, {cand_ranks.max()}]') + LOGGER.profile('updated ranks') + + # Step 5: simply assign these ranks to the hdpoints + self._apply_ranks_to_candidates(cand_ranks) + LOGGER.info('assigned all ranks!') + + # -------------------------------------------------------------------------- + def compute_histogram(self, data): + + if self.binning_type == '1d-nonperiodic': + return self.compute_histograms_1d_nonperiodic(data) + + if self.binning_type == '2d-spherical': + return self.compute_histograms_2d_spherical(data) + + elif self.binning_type == '3d-spherical_and_cartesian': + return self.compute_histograms_3d_spherical_and_cartesian(data) + + # -------------------------------------------------------------------------- + def compute_histograms_1d_nonperiodic(self, data): + + assert len(data.shape) == 2 + assert data.shape[1] == 1 + + dmin, dmax = data[:,0].min(), data[:,0].max() + bmin, bmax = self.bins_x[0], self.bins_x[-1] + assert dmin >= self.bins_x[0], f'data ({dmin}) < bin range ({bmin})' + assert dmax <= self.bins_x[-1], f'data ({dmax}) > bin range ({bmax})' + + bin_idxs = BinningUtils.get_bin_indices_1d(data[:,0], self.bins_x, 0) + + # now compute 1-d histogram using these linearized bin idxs + hist = BinningUtils.compute_histogram_from_bin_indices(bin_idxs, self.nbins) + return hist, bin_idxs + + # -------------------------------------------------------------------------- + def compute_histograms_2d_spherical(self, data): + + assert len(data.shape) == 2 + assert data.shape[1] == 2 + + # get the bin indices for spherical binning + idxs_t, idxs_r = \ + BinningUtils.get_bin_indices_2d_spherical(data[:,:2], + self.nbins_altitude, + self.nbins_azimuth) + + # linearize the bin indices across the two dimensions! + def linearize_bins(t, r): + return np.uint(self.cbins_azimuth[t] + r) + + bin_idxs = [linearize_bins(idxs_t[i], idxs_r[i]) for i in range(data.shape[0])] + bin_idxs = np.array(bin_idxs) + assert bin_idxs.max() < self.nbins + + # now compute 1-d histogram using these linearized bin idxs + hist = BinningUtils.compute_histogram_from_bin_indices(bin_idxs, self.nbins) + return hist, bin_idxs + + # -------------------------------------------------------------------------- + def compute_histograms_3d_spherical_and_cartesian(self, data): + + assert len(data.shape) == 2 + assert data.shape[1] == 3 + + # get the bin indices for altitude and azimuth + # using spherical binning + idxs_t, idxs_r = \ + BinningUtils.get_bin_indices_2d_spherical(data[:,:2], + self.bins_altitude, + self.bins_azimuth) + + # get bin indices for height + # maintain the left and right out-of-range bins separately + idxs_d = BinningUtils.get_bin_indices_1d(data[:, 2], self.bins_z, 2) + + # linearize the bin indices across the three dimensions! + def linearize_bins(t, r, d): + return np.uint(d * self.cbins_azimuth[-1] + self.cbins_azimuth[t] + r) + + bin_idxs = [linearize_bins(idxs_t[i], idxs_r[i], idxs_d[i]) for i in range(data.shape[0])] + bin_idxs = np.array(bin_idxs) + assert bin_idxs.max() < self.nbins + + # now compute 1-d histogram using these linearized bin idxs + hist = BinningUtils.compute_histogram_from_bin_indices(bin_idxs, self.nbins) + return hist, bin_idxs + + # -------------------------------------------------------------------------- + # -------------------------------------------------------------------------- + def get_weights(self, normalize=True): + + assert '1d-nonperiodic' == self.binning_type and 1 == self.binning_dim,\ + 'Implemented only for 1D' + + assert self.test() + + nsel = len(self.selected) + ncands = len(self.candidates) + if nsel == 0: + return [], [] + + LOGGER.info(f'Computing weights {self}') + + self._update_ranks() + + LOGGER.info(f'cand_binidxs: {self.cand_binidxs}') + LOGGER.info(f'sel_binidxs: {self.sel_binidxs}') + assert nsel == len(self.sel_binidxs) + assert ncands == len(self.cand_binidxs) + + ids = np.array([s.id for s in self.selected]) + + if (self.cand_binidxs is None) or (self.sel_binidxs is None): + weights = np.ones(len(self.selected), dtype=np.float32) + + else: + # compute the number of candidates selected from each bin + def number_in_bins(_, _tag): + _n_in_bins = np.zeros(self.nbins, dtype=np.int) + _u,_n = np.unique(_, return_counts=True) + _n_in_bins[_u] = _n + assert _n_in_bins.sum() == _.shape[0], f'mismatch in {_tag}: {_n_in_bins}, {_.shape[0]}' + return _n_in_bins + + nsel_in_bins = number_in_bins(self.sel_binidxs, 'selected') + ncands_in_bins = number_in_bins(self.cand_binidxs, 'candidates') + LOGGER.debug(f'cands: {ncands_in_bins}') + LOGGER.debug(f'sel: {nsel_in_bins}') + + # ------------------------------------------------------------------ + # for all bins with no selection, + # need to assign their candidates to adjacent bins + for b in range(self.nbins): + if nsel_in_bins[b] != 0: + continue + if ncands_in_bins[b] == 0: + continue + + LOGGER.debug(f'bin {b} has no selections. ' + f'assigning candidates to {self.distribute_zero_map[b]}') + + for b2 in self.distribute_zero_map[b]: + + # if there are two neigbors + if len(b2) == 2: + b2a,b2b = b2[0],b2[1] + if nsel_in_bins[b2a] > 0 and nsel_in_bins[b2b] > 0: + k1 = ncands_in_bins[b]//2 + k2 = ncands_in_bins[b] - k1 + LOGGER.debug(f'assigning {k1} and {k2} to bins {b2a} and {b2b}') + ncands_in_bins[b2a] += k1 + ncands_in_bins[b2b] += k2 + ncands_in_bins[b] = 0 + break + + elif nsel_in_bins[b2a] > 0 and nsel_in_bins[b2b] == 0: + k1 = ncands_in_bins[b] + LOGGER.debug(f'assigning {k1} to bin {b2a}') + ncands_in_bins[b2a] += k1 + ncands_in_bins[b] = 0 + break + + elif nsel_in_bins[b2a] == 0 and nsel_in_bins[b2b] > 0: + k1 = ncands_in_bins[b] + LOGGER.debug(f'assigning {k1} to bin {b2b}') + ncands_in_bins[b2b] += k1 + ncands_in_bins[b] = 0 + break + + # only one neighbor + elif len(b2) == 1: + b2a = b2[0] + if nsel_in_bins[b2a] > 0: + k1 = ncands_in_bins[b] + LOGGER.debug(f'assigning {k1} to bin {b2a}') + ncands_in_bins[b2a] += k1 + ncands_in_bins[b] = 0 + break + + # ------------------------------------------------------------------ + LOGGER.debug(f'cands: {ncands_in_bins}') + LOGGER.debug(f'sel: {nsel_in_bins}') + + # ------------------------------------------------------------------ + # compute weights + bin_weights = np.zeros(self.nbins_x) + for b in range(self.nbins): + if nsel_in_bins[b] == 0: + # if selected is 0, then candidates must be zero + assert ncands_in_bins[b] == 0, f'Failed to distribute correctly' + else: + bin_weights[b] = (nsel_in_bins[b]+ncands_in_bins[b]) / nsel_in_bins[b] + + LOGGER.debug(f'bin_weights: {bin_weights}') + + # ------------------------------------------------------------------ + # now, assign them to the elements + weights = bin_weights[self.sel_binidxs] + LOGGER.debug(f'weights: {weights}') + + # the sum of bin weights should equal to all elements + if not np.isclose(weights.sum(), nsel+ncands): + LOGGER.error(f'cands={ncands_in_bins}, sel={nsel_in_bins}') + LOGGER.error(f'bin_weights={bin_weights}') + LOGGER.error(f'weights={weights} ({weights.sum()})') + + raise Exception(f'sum of weights should match total elements. '\ + f'cands={nsel}, sel={ncands}; wsum={weights.sum()})') + + # ---------------------------------------------------------------------- + if normalize: + LOGGER.info(f'Computed weights: {weights.shape}, sum = {weights.sum()}') + weights *= (weights.shape[0] / weights.sum()) + + LOGGER.info(f'Computed weights: {weights.shape}, sum = {weights.sum()}') + return ids, weights + + # -------------------------------------------------------------------------- + def checkpoint(self): + + super()._checkpoint() + if self.hdspace is not None: + take_backup(self.lchkpt) + self.hdspace.checkpoint(self.lchkpt) + LOGGER.debug(f'Checkpointed {self}') + + def restore(self): + + # ---------------------------------------------------------------------- + def _restore_files(_files): + if not files_exist(_files): + return False + + try: + self._restore(_files[0]) + if self.hdspace is not None: + self.hdspace.restore(_files[1]) + except Exception as e: + LOGGER.error(e) + return False + + if not self.test(): + LOGGER.error('Inconsistent restore!') + return False + return True + + # ---------------------------------------------------------------------- + if self.hdspace is None: + files = [self.schkpt] + else: + files = [self.schkpt, self.lchkpt] + + success = False + for i in range(2): + success = _restore_files(files) + if success: + break + files = [backup_fname(f) for f in files] + continue + + success = success and self.test() + if not success: + LOGGER.info(f'Failed to restore') + return False + + LOGGER.info(f'Restored {self}') + return True + + def test(self): + + if self.hdspace is not None: + h = self.hdspace.count() + if len(self.selected) != h: + LOGGER.error(f'Inconsistent Sampler: lspace={h}; {self}') + return False + return True + + # -------------------------------------------------------------------------- + # -------------------------------------------------------------------------- + @staticmethod + def fetch_first_chunk(N, S, l): + + Nsum = N.sum() + + # if there are no candidates + if Nsum == 0: + return np.zeros(N.shape).astype(np.uint) + + # if there is a single non-zero bin + if (N > 0).sum() == 1: + return N.astype(np.uint) + + Ssum = S.sum() + NS = N + S + + # target distribution (considering N+S) + # cannot pick anything if N+S = 0 + q = (1 - l) * NS / (Nsum + Ssum) + l / N.shape[0] + q[NS == 0] = 0 + q /= q.sum() + + # create a chunk size that will create the target distribution + # --- target = q*NS.sum() (use q to divide N+S) + # --- deficit = q*S.sum() - S (use q to divide S) + # --- chunk = target + deficit + c = q * (Nsum + 2 * Ssum) - S + + # if the past was bad, we may be way far away from the target + # for novelty, we want to have the chunk as small as possible + # for random, as large as possible + cnz = c > 0 + c[cnz] -= l * (c[cnz].min() - 1) + + # finally, a chunk cannot be larger than the input data + c = np.minimum(np.ceil(c), N) + assert c.min() >= 0 + return c.astype(np.uint) + + # -------------------------------------------------------------------------- + @staticmethod + def create_chunks_dynamic(hist_cands, hist_sel, novelty_factor): + + LOGGER.info(f'Creating chunks for dynamic selection: ' + f'novelty = {novelty_factor}, ' + f'cands = {hist_cands.sum()}, selected = {hist_sel.sum()}') + + # compute size of each chunk for each bin + # all bins have the same number of chunks + # but sizes of these chunks will vary across bins + N = np.copy(hist_cands) + S = np.copy(hist_sel) + + # first figure out the size of each chunk + chunk_sizes = [] + while N.sum() > 0: + c = SamplerBinned.fetch_first_chunk(N, S, novelty_factor) + chunk_sizes.append(c) + N -= c + S += c + + chunk_sizes = np.swapaxes(np.array(chunk_sizes), 0, 1) + assert chunk_sizes.shape[0] == hist_cands.shape[0] + + # all bins have the same number of chunks + nchunks = chunk_sizes.shape[1] + LOGGER.info(f'number of chunks (per bin) = {nchunks}') + + # compute the edges of these chunk + chunk_edges = np.cumsum(np.insert(chunk_sizes, 0, 0, axis=1), axis=1) + LOGGER.info(f'Created chunks {chunk_edges.shape}') + return chunk_edges + + @staticmethod + def create_chunks_static(hist_cands, novelty_factor): + + LOGGER.info(f'Creating chunks for static selection: ' + f'novelty = {novelty_factor}, cands = {hist_cands.sum()}') + + # depending upon the novelty factor, decide the chunk size + # the same chunk (id) gets equal priority across all bins + # to compute chunk size, we interpolate in the linear/log10 space + # novelty_factor = 1: chunk_sz = 1 + # this will flatten the curve (i.e., novelty sampling) + # novelty_factor = 0: chunk_sz = bin_count + # this will mimic input distribution (i.e., random sampling) + + # compute the chunk size + chunk_sizes = np.zeros_like(hist_cands).astype(np.float) + nchunks = np.zeros_like(hist_cands).astype(np.uint) + + # compute only where bins are nonzero + zidx = np.where(hist_cands == 0)[0] + nzidx = np.where(hist_cands > 0)[0] + + if True: # linear + chunk_sizes[nzidx] = novelty_factor + (1.-novelty_factor) * hist_cands[nzidx] + + else: # logarithmic + chunk_sizes[nzidx] = 10. ** ((1.-novelty_factor) * np.log10(hist_cands[nzidx])) + + # number of chunks + nchunks[nzidx] = np.ceil(hist_cands[nzidx] / chunk_sizes[nzidx]).astype(np.uint) + + # should have zero value if and only if bin_count = 0 + assert chunk_sizes[nzidx].min() > 0 + assert nchunks[nzidx].min() > 0 + if len(zidx) > 0: + assert np.isclose(chunk_sizes[zidx].min(), 0.) + assert (nchunks[zidx].min() == 0) and (nchunks[zidx].max() == 0) + + # min and max value of chunk + assert np.all(chunk_sizes[nzidx] <= hist_cands[nzidx]) + assert np.all(nchunks[nzidx] <= hist_cands[nzidx]) + + nchunks_all = int(1+nchunks.max()) + chunk_edges = np.zeros((hist_cands.shape[0], nchunks_all), dtype=np.uint) + + # create edges of these chunks + nbins = hist_cands.shape[0] + for bin in range(nbins): + + if nchunks[bin] == 0: + c = np.zeros(1).astype(np.uint) + else: + c = chunk_sizes[bin] * np.arange(0, nchunks[bin] + 1) + c = np.round(c).astype(np.uint) + c[-1] = hist_cands[bin] + + chunk_edges[bin, :c.shape[0]] = c + if c.shape[0] < nchunks_all: + chunk_edges[bin, c.shape[0]:] = np.repeat(c[-1], nchunks_all-c.shape[0]) + + LOGGER.info(f'Created chunks {chunk_edges.shape}') + return chunk_edges + +# ------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ diff --git a/dynim/sampler_importance.py b/dynim/sampler_importance.py index 72ddb2f..877bb9a 100644 --- a/dynim/sampler_importance.py +++ b/dynim/sampler_importance.py @@ -6,7 +6,6 @@ ################################################################################ import os -import sys import logging import numpy as np from typing import List @@ -14,7 +13,7 @@ from .hdspace import HDSpace from .hdpoint import HDPoint from .sampler import Sampler -from .utils import format_time, take_backup, backup_fname, read_history +from .utils import take_backup, backup_fname, read_history, files_exist LOGGER = logging.getLogger(__name__) @@ -26,7 +25,8 @@ class SamplerImportance(Sampler): # -------------------------------------------------------------------------- def __init__(self, name: str, workspace: str, - min_cands_b4_sel: int = 0, buffer_size: int = 0, + buffer_size: int = 0, + min_cands_b4_sel: int = 0, min_rand_b4_importance: int = 10): """ Initialize a HDSpace Sampler. @@ -41,21 +41,20 @@ def __init__(self, name: str, workspace: str, assert isinstance(min_cands_b4_sel, int) and int(min_cands_b4_sel) >= 0 assert isinstance(min_rand_b4_importance, int) and int(min_rand_b4_importance) >= 0 - self.type = 'SamplerImportance' super().__init__(name, workspace, - min_cands_b4_sel = min_cands_b4_sel, - buffer_size = buffer_size) + buffer_size = buffer_size, + min_cands_b4_sel=min_cands_b4_sel) self.min_rand_b4_importance = min_rand_b4_importance self.hdspace = None - self.lchkpt = os.path.join(self.workspace, '{}.lspace.idx'.format(self.tag)) + self.lchkpt = os.path.join(self.workspace, f'{self.__tag__()}.lspace.idx') + LOGGER.info(f'Initialized {self.__tag__()}') # -------------------------------------------------------------------------- # different ways of setting an HD space def set_hdspace(self, data: (str, HDSpace)) -> None: assert isinstance(data, (str, HDSpace)) - if isinstance(data, HDSpace): self.hdspace = data @@ -67,9 +66,10 @@ def set_hdspace(self, data: (str, HDSpace)) -> None: # add these selections to hdspace def _add_selections(self, points: List[HDPoint]) -> None: - if len(points) > 0: - coords = np.array([p.coords for p in points]) - self.hdspace.add_points(coords) + if len(points) == 0: + return + coords = np.array([p.coords for p in points]) + self.hdspace.add_points(coords) # -------------------------------------------------------------------------- # update the ranks of the candidates @@ -84,32 +84,28 @@ def _update_ranks(self, k=10) -> None: # but, if request ~= min_rand_b4_importance # we are probably going to be OK if len(self.selected) < self.min_rand_b4_importance: - LOGGER.debug('Assigning random ranks to {} candidates'.format(n)) - for sample in self.candidates: - sample.rank = np.random.uniform() + LOGGER.debug(f'Computing random ranks for ({n}) points') + ranks = np.random.rand(n).astype(np.float32) # for the rest, rank = avg distance to [0, 0+k] nbrs else: - LOGGER.debug('Assigning distance ranks to {} candidates'.format(n)) - + LOGGER.debug(f'Computing importance ranks for ({n}) points') coords = np.array([sample.coords for sample in self.candidates]) - ranks = self.hdspace.get_knn_distance(coords, k, 0) - assert len(self.candidates) == len(ranks) + ranks = self.hdspace.get_knn_distance(coords, k, 0).astype(np.float32) - for i, sample in enumerate(self.candidates): - sample.rank = ranks[i] + # now apply these ranks to candidates + self._apply_ranks_to_candidates(ranks) # -------------------------------------------------------------------------- # get weights of selected samples # -------------------------------------------------------------------------- - def get_weights(self, normalize: bool= True): + def get_weights(self, normalize=True): assert self.test() - if len(self.selected) == 0: return [], [] - LOGGER.info('Computing weights: {}'.format(self)) + LOGGER.info(f'Computing weights {self}') ids = np.array([s.id for s in self.selected]) weights = np.ones(len(self.selected), dtype=np.float32) @@ -123,38 +119,68 @@ def get_weights(self, normalize: bool= True): weights[n] += 1. if normalize: - LOGGER.info('Computed weights: count = {}, sum = {}' - .format(weights.shape, weights.sum())) + LOGGER.info(f'Computed weights: {weights.shape}, sum = {weights.sum()}') weights *= (weights.shape[0] / weights.sum()) - LOGGER.info('Computed weights: count = {}, sum = {}' - .format(weights.shape, weights.sum())) + LOGGER.info(f'Computed weights: {weights.shape}, sum = {weights.sum()}') return ids, weights # -------------------------------------------------------------------------- - def checkpoint(self) -> None: - super().checkpoint() + def checkpoint(self): + + super()._checkpoint() take_backup(self.lchkpt) self.hdspace.checkpoint(self.lchkpt) + LOGGER.debug(f'Checkpointed {self}') + + def restore(self): + + # ---------------------------------------------------------------------- + def _restore_files(_files): + if not files_exist(_files): + return False + + try: + self._restore(_files[0]) + self.hdspace.restore(_files[1]) + except Exception as e: + LOGGER.error(e) + return False + + if not self.test(): + LOGGER.error('Inconsistent restore!') + return False + return True - def restore(self) -> bool: - if not super().restore(): - LOGGER.warning('Failed to restore {}'.format(self.__str__())) + # ---------------------------------------------------------------------- + files = [self.schkpt, self.lchkpt] + + success = False + for i in range(2): + success = _restore_files(files) + if success: + break + files = [backup_fname(f) for f in files] + + success = success and self.test() + if not success: + LOGGER.info(f'Failed to restore') return False - self.hdspace.restore(self.lchkpt) - assert self.test() + + LOGGER.info(f'Restored {self}') return True # -------------------------------------------------------------------------- - def test(self) -> bool: - # if len(self.selected) != self.hdspace.count(): - # LOGGER.error('Inconsistent Sampler: lspace={}; {}' - # .format(self.hdspace.count(), self.__str__())) - # return True # TODO: remove this! - return len(self.selected) == self.hdspace.count() + def test(self): + + h = self.hdspace.count() + if len(self.selected) != h: + LOGGER.error(f'Inconsistent Sampler: lspace={h}; {self}') + return False + return True # -------------------------------------------------------------------------- - def restore_from_history(self, coords_fetcher) -> None: + def restore_from_history(self, coords_fetcher): assert callable(coords_fetcher) @@ -198,13 +224,14 @@ def _validate(_d, _h, _tag): _ab = np.setdiff1d(_d, _h) if _ab.shape[0] > 0: - LOGGER.error('Inconsistent restore: Found {} {} points in data, ' - 'but not in history! {}'.format(_ab.shape, _tag, _ab)) + LOGGER.error(f'Inconsistent restore: ' + f'Found {_ab.shape} {_tag} points in data, ' + f'but not in history! {_ab}') _ba = np.setdiff1d(_h, _d) if _ba.shape[0] > 0: - LOGGER.info('Found {} {} points in history, ' - 'but not in data! {}'.format(_ba.shape, _tag, _ba)) + LOGGER.info(f'Found {_ba} {_tag} points in history, ' + f'but not in data! {_ba}') _validate(np.concatenate((self.cached, self.candidates)), cached, 'added') _validate(self.discarded, discarded, 'discarded') @@ -219,7 +246,7 @@ def _validate(_d, _h, _tag): self._add_selections(self.selected) # ---------------------------------------------------------------------- - LOGGER.info('Restored from history: {}'.format(self.__str__())) + LOGGER.info(f'Restored from history: {self}') assert len(self.selected) == self.hdspace.count() # ------------------------------------------------------------------------------ diff --git a/dynim/sampler_random.py b/dynim/sampler_random.py index c553280..9dc3d8b 100644 --- a/dynim/sampler_random.py +++ b/dynim/sampler_random.py @@ -6,8 +6,11 @@ ################################################################################ import numpy as np +import logging from .sampler import Sampler +LOGGER = logging.getLogger(__name__) + # ------------------------------------------------------------------------------ # A random sampler of hd points @@ -16,12 +19,13 @@ class SamplerRandom(Sampler): # -------------------------------------------------------------------------- def __init__(self, name: str, workspace: str, - min_cands_b4_sel: int = 0, buffer_size: int = 0): + buffer_size: int = 0, + min_cands_b4_sel: int = 0): - self.type = 'SamplerRandom' super().__init__(name, workspace, - min_cands_b4_sel = min_cands_b4_sel, - buffer_size = buffer_size) + buffer_size = buffer_size, + min_cands_b4_sel=min_cands_b4_sel,) + LOGGER.info(f'Initialized {self.__tag__()}') # -------------------------------------------------------------------------- # confirm the selection of these points @@ -31,10 +35,21 @@ def _add_selections(self, points: np.ndarray) -> None: # -------------------------------------------------------------------------- # update the ranks of the candidates def _update_ranks(self) -> None: - for sample in self.candidates: - sample.rank = np.random.uniform() + n = len(self.candidates) + + LOGGER.debug(f'Computing random ranks for ({n}) points') + ranks = np.random.rand(n).astype(np.float32) + + # now apply these ranks to candidates + self._apply_ranks_to_candidates(ranks) # -------------------------------------------------------------------------- + def checkpoint(self): + super()._checkpoint() + + def restore(self): + super()._restore() + def test(self) -> bool: return True diff --git a/dynim/utils.py b/dynim/utils.py index bfe52dd..1ab9f4d 100644 --- a/dynim/utils.py +++ b/dynim/utils.py @@ -11,7 +11,6 @@ import datetime import numpy as np import logging - LOGGER = logging.getLogger(__name__) @@ -31,6 +30,86 @@ def take_backup(filename): shutil.move(filename, backup_fname(filename)) +# ------------------------------------------------------------------------------ +def files_exist(files): + for f in files: + if not os.path.isfile(f): + return False + return True + + +def files_to_restore(files): + if files_exist(files): + return files + files = [backup_fname(f) for f in files] + if files_exist(files): + return files + return [] + +# ------------------------------------------------------------------------------ +def filter_list(data_to_filter, filter_if_present_in_list): + assert isinstance(data_to_filter, list) + assert isinstance(filter_if_present_in_list, (list, np.ndarray)) + + if len(data_to_filter) == 0 or len(filter_if_present_in_list) == 0: + return data_to_filter, [] + + # since filter_if_present_in_list is likely very large, + # let's use numpy to find the ones that are already included + ids_in_data = np.array([_.id for _ in data_to_filter]) + ids_in_list = np.array([_.id for _ in filter_if_present_in_list]) + is_present = np.intersect1d(ids_in_data, ids_in_list) + + # no intersection + if is_present.shape[0] == 0: + return data_to_filter, [] + + # slower list comprehension to find the ones not present + is_not_present = [_ for _ in data_to_filter if _.id not in is_present] + return is_not_present, is_present.tolist() + + +# new attempt for faster filtering, but requires separate list of ids +def filter_list_v1(data_to_filter, filter_if_present_in_list): + + assert isinstance(data_to_filter, list) + assert isinstance(filter_if_present_in_list, (list, np.ndarray)) + + if len(data_to_filter) == 0 or len(filter_if_present_in_list) == 0: + return data_to_filter, [] + + _data = np.unique(data_to_filter) + _new_data = np.setdiff1d(_data, filter_if_present_in_list, assume_unique = True) + + if len(_data) == len(new_data): + return _new_data.tolist(), [] + + # else + _old_data = np.setdiff1d(_data, new_data, assume_unique = True) + return _new_data.tolist(), _old_data.tolist() + +# ------------------------------------------------------------------------------ +# @TODO fix this added just to make code work +def write_history_ids(filename, event_type, data, state): + + if len(data) == 0: + return + + ts = format_time() + + # write header if the file does not exist + if not os.path.isfile(filename): + with open(filename, 'a') as fp: + fp.write('tstamp, event, id, rank, ' + 'nselected, ndiscarded, ncandidates, ncached\n') + + # now, write the data + with open(filename, 'a') as fp: + for d in data: + fp.write('{}, {}, {}, {}, {}, {}, {}, {} \n' + .format(ts, event_type, d, state, "??", "??", "??", "??")) + + # ------------------------------------------------------------------------------ def write_history(filename, event_type, data, state):