diff --git a/.travis.yml b/.travis.yml index 93fbd5de..1e55d349 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ sudo: required branches: only: - - master + - test_compss - /^release-.*/ services: @@ -14,24 +14,27 @@ env: global: - REGISTRY_USER=compss - secure: "" + - TEST_CASSANDRA_VERSION=3.11.4 before_script: - - docker build --tag bscwdc/dislib . - - docker run $(bash <(curl -s https://codecov.io/env)) -d --name dislib bscwdc/dislib - -script: "docker exec dislib /dislib/run_ci_checks.sh" - -after_script: - - docker images - - docker exec dislib /dislib/bin/print_tests_logs.sh - -before_deploy: - - docker login -u "$REGISTRY_USER" -p "$REGISTRY_PASS" - - docker tag bscwdc/dislib bscwdc/dislib:latest -deploy: - provider: script - script: docker push bscwdc/dislib:latest - on: - branch: master + - source launch_cassandra.sh + - docker build --tag emebemb/dislib_hecuba_compss_production:0.2 . + - docker run -it --network cassandra_bridge -d --name dislib emebemb/dislib_hecuba_compss_production:0.2 + + +script: "docker exec -e CONTACT_NAMES='cassandra_container' -e NODE_PORT=9042 dislib /dislib/run_tests.sh" + +#after_script: +# - docker images +# - docker exec dislib /dislib/bin/print_tests_logs.sh +# +#before_deploy: +# - docker login -u "$REGISTRY_USER" -p "$REGISTRY_PASS" +# - docker tag bscwdc/dislib bscwdc/dislib:latest +#deploy: +# provider: script +# script: docker push bscwdc/dislib:latest +# on: +# branch: master diff --git a/Dockerfile b/Dockerfile index e8a72019..589f0905 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,17 @@ -FROM bscwdc/dislib-base:latest +#FROM bscwdc/dislib-base:latest +FROM adrianespejo/dislib_hecuba:0.1 MAINTAINER COMPSs Support +#RUN apt-get update -y && apt-get update +#RUN apt-get install -y cmake python3-dev libpython3-dev gcc-4.8 libtool python3-numpy python3-pip python3-setuptools +#RUN curl -L https://github.com/bsc-dd/hecuba/archive/NumpyWritePartitions.tar.gz | tar -xz + +#WORKDIR hecuba-NumpyWritePartitions +#RUN python3 -m pip install -r requirements.txt +#RUN python3 setup.py install +WORKDIR / + +#RUN rm -rf dislib/ COPY . dislib/ ENV PYTHONPATH=$PYTHONPATH:/dislib diff --git a/counter b/counter new file mode 100644 index 00000000..d8263ee9 --- /dev/null +++ b/counter @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/dislib/__init__.py b/dislib/__init__.py index 8b595b8a..d51173b0 100644 --- a/dislib/__init__.py +++ b/dislib/__init__.py @@ -1,7 +1,7 @@ import os from dislib.data.array import random_array, apply_along_axis, array, zeros, \ - full + full, load_from_hecuba from dislib.data.io import load_svmlight_file, load_npy_file, load_txt_file from dislib.math import kron @@ -27,5 +27,5 @@ __version__ = 'unknown' __all__ = ['load_txt_file', 'load_svmlight_file', 'random_array', - 'apply_along_axis', 'array', 'load_npy_file', 'zeros', 'kron', + 'apply_along_axis', 'array', 'load_from_hecuba', 'load_npy_file', 'zeros', 'kron', 'full'] diff --git a/dislib/cluster/kmeans/base.py b/dislib/cluster/kmeans/base.py index dc6a18b8..6af0c223 100644 --- a/dislib/cluster/kmeans/base.py +++ b/dislib/cluster/kmeans/base.py @@ -1,6 +1,6 @@ import numpy as np from pycompss.api.api import compss_wait_on -from pycompss.api.parameter import COLLECTION_IN, Depth, Type +from pycompss.api.parameter import INOUT, COLLECTION_IN, Depth, Type from pycompss.api.task import task from scipy.sparse import csr_matrix from sklearn.base import BaseEstimator @@ -10,10 +10,8 @@ from dislib.data.array import Array - class KMeans(BaseEstimator): """ Perform K-means clustering. - Parameters ---------- n_clusters : int, optional (default=8) @@ -22,7 +20,6 @@ class KMeans(BaseEstimator): init : {'random', nd-array or sparse matrix}, optional (default='random') Method of initialization, defaults to 'random', which generates random centers at the beginning. - If an nd-array or sparse matrix is passed, it should be of shape (n_clusters, n_features) and gives the initial centers. max_iter : int, optional (default=10) @@ -37,14 +34,12 @@ class KMeans(BaseEstimator): for centroid initialization. verbose: boolean, optional (default=False) Whether to print progress information. - Attributes ---------- centers : ndarray Computed centroids. n_iter : int Number of iterations performed. - Examples -------- >>> from dislib.cluster import KMeans @@ -73,14 +68,12 @@ def __init__(self, n_clusters=8, init='random', max_iter=10, tol=1e-4, def fit(self, x, y=None): """ Compute K-means clustering. - Parameters ---------- x : ds-array Samples to cluster. y : ignored Not used, present here for API consistency by convention. - Returns ------- self : KMeans @@ -95,6 +88,7 @@ def fit(self, x, y=None): old_centers = self.centers.copy() partials = [] + for row in x._iterator(axis=0): partial = _partial_sum(row._blocks, old_centers) partials.append(partial) @@ -108,14 +102,12 @@ def fit(self, x, y=None): def fit_predict(self, x, y=None): """ Compute cluster centers and predict cluster index for each sample. - Parameters ---------- x : ds-array Samples to cluster. y : ignored Not used, present here for API consistency by convention. - Returns ------- labels : ds-array, shape=(n_samples, 1) @@ -127,12 +119,10 @@ def fit_predict(self, x, y=None): def predict(self, x): """ Predict the closest cluster each sample in the data belongs to. - Parameters ---------- x : ds-array New data to predict. - Returns ------- labels : ds-array, shape=(n_samples, 1) @@ -190,10 +180,10 @@ def _init_centers(self, n_features, sparse): @task(blocks={Type: COLLECTION_IN, Depth: 2}, returns=np.array) +#@task(blocks=INOUT, returns=np.array) def _partial_sum(blocks, centers): partials = np.zeros((centers.shape[0], 2), dtype=object) arr = Array._merge_blocks(blocks) - close_centers = pairwise_distances(arr, centers).argmin(axis=1) for center_idx, _ in enumerate(centers): @@ -204,6 +194,8 @@ def _partial_sum(blocks, centers): return partials + + @task(returns=dict) def _merge(*data): accum = data[0].copy() @@ -217,4 +209,4 @@ def _merge(*data): @task(blocks={Type: COLLECTION_IN, Depth: 2}, returns=np.array) def _predict(blocks, centers): arr = Array._merge_blocks(blocks) - return pairwise_distances(arr, centers).argmin(axis=1).reshape(-1, 1) + return pairwise_distances(arr, centers).argmin(axis=1).reshape(-1, 1) \ No newline at end of file diff --git a/dislib/data/__init__.py b/dislib/data/__init__.py index 4ac95826..7d301aaa 100644 --- a/dislib/data/__init__.py +++ b/dislib/data/__init__.py @@ -1,6 +1,6 @@ from dislib.data.array import array, random_array, apply_along_axis, zeros, \ - full + full, load_from_hecuba from dislib.data.io import load_txt_file, load_npy_file, load_svmlight_file __all__ = ['load_txt_file', 'load_svmlight_file', 'array', 'random_array', - 'apply_along_axis', 'load_npy_file', 'zeros', 'full'] + 'apply_along_axis', 'load_from_hecuba', 'load_npy_file', 'zeros', 'full'] diff --git a/dislib/data/array.py b/dislib/data/array.py index ceca927e..475394cd 100644 --- a/dislib/data/array.py +++ b/dislib/data/array.py @@ -1,7 +1,10 @@ +import itertools +import uuid import operator from collections import defaultdict import numpy as np +import importlib from pycompss.api.api import compss_wait_on, compss_delete_object from pycompss.api.parameter import Type, COLLECTION_IN, Depth, \ COLLECTION_INOUT, INOUT @@ -10,6 +13,12 @@ from scipy.sparse import issparse, csr_matrix from sklearn.utils import check_random_state +if importlib.util.find_spec("hecuba"): + try: + from hecuba.hnumpy import StorageNumpy + except Exception: + pass +from pprint import pprint from math import ceil @@ -109,6 +118,9 @@ def __matmul__(self, x): reg_shape=reg_shape, shape=shape, sparse=self._sparse) def __getitem__(self, arg): + if getattr(self, "_base_array", None) is not None: + return array(x=list(self._base_array[arg]), + block_size=self._reg_shape) # return a single row if isinstance(arg, int): @@ -205,6 +217,19 @@ def _merge_blocks(blocks): a single ndarray / sparse matrix. """ sparse = None + + try: + if blocks[0][0].__class__.__name__=="StorageNumpy": + res=[] + for block in blocks: + value=list(block) + line=np.concatenate(value,axis=1) + res.append(line) + return np.concatenate(res) + except: + print("Block size no compatible with np.array.shape") + + b0 = blocks[0][0] if sparse is None: sparse = issparse(b0) @@ -919,6 +944,39 @@ def collect(self, squeeze=True): res = np.squeeze(res) return res + def make_persistent(self, name): + """ + Stores data in Hecuba. + + Parameters + ---------- + name : str + Name of the data. + + Returns + ------- + dsarray : ds-array + A distributed and persistent representation of the data + divided in blocks. + """ + if self._sparse: + raise Exception("Data must not be a sparse matrix.") + + x = self.collect() + persistent_data = StorageNumpy(input_array=x, name=name) + # self._base_array is used for much more efficient slicing. + # It does not take up more space since it is a reference to the db. + self._base_array = persistent_data + + blocks = [] + for block in self._blocks: + persistent_block = StorageNumpy(input_array=block, name=name, + storage_id=uuid.uuid4()) + blocks.append(persistent_block) + self._blocks = blocks + + return self + def array(x, block_size): """ @@ -936,6 +994,8 @@ def array(x, block_size): dsarray : ds-array A distributed representation of the data divided in blocks. """ + bn, bm = block_size + sparse = issparse(x) if sparse: @@ -958,8 +1018,6 @@ def array(x, block_size): if x.shape[0] < block_size[0] or x.shape[1] < block_size[1]: raise ValueError("Block size is greater than the array") - bn, bm = block_size - blocks = [] for i in range(0, x.shape[0], bn): row = [x[i: i + bn, j: j + bm] for j in range(0, x.shape[1], bm)] @@ -972,6 +1030,38 @@ def array(x, block_size): return arr +def load_from_hecuba(name, block_size): + """ + Loads data from Hecuba. + + Parameters + ---------- + name : str + Name of the data. + block_size : (int, int) + Block sizes in number of samples. + + Returns + ------- + storagenumpy : StorageNumpy + A distributed and persistent representation of the data + divided in blocks. + """ + persistent_data = StorageNumpy(name=name) + + bn, bm = block_size + + blocks = [] + for block in persistent_data.np_split(block_size=(bn, bm)): + blocks.append(block) + + arr = Array(blocks=blocks, top_left_shape=block_size, + reg_shape=block_size, shape=persistent_data.shape, + sparse=False) + arr._base_array = persistent_data + return arr + + def random_array(shape, block_size, random_state=None): """ Returns a distributed array of random floats in the open interval [0.0, diff --git a/killcompss.py b/killcompss.py new file mode 100644 index 00000000..62d18ff4 --- /dev/null +++ b/killcompss.py @@ -0,0 +1,22 @@ +#!/usr/bin/python +import os +import shutil +import subprocess + +def main(): + p = subprocess.Popen(['ps', '-ef'], stdout=subprocess.PIPE) + killed_count = -1 + for line in p.stdout.readlines(): + if 'compss' in line.decode() or 'COMPSs' in line.decode(): + candidates = line.decode().split(" ")[1:] + for cand in candidates: + if cand: + pid = cand + break + subprocess.Popen(['kill', '-9', pid]) + killed_count += 1 + print('%d total processes killed'%killed_count) + + +if __name__ == "__main__": + main() diff --git a/launch_cassandra.sh b/launch_cassandra.sh new file mode 100644 index 00000000..93c15c55 --- /dev/null +++ b/launch_cassandra.sh @@ -0,0 +1,8 @@ +docker network create --attachable --driver bridge cassandra_bridge +# launch Cassandra +CASSANDRA_ID=$(docker run --rm --name cassandra_container --expose=22 --network=cassandra_bridge -d cassandra) +sleep 30 +#CASSANDRA_IP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "${CASSANDRA_ID}") +# add environment variable CONTACT_NAMES needed by Hecuba +export CONTACT_NAMES="cassandra_container" +echo "Using Cassandra host: $CONTACT_NAMES" diff --git a/myfile.txt b/myfile.txt new file mode 100644 index 00000000..e43703c6 --- /dev/null +++ b/myfile.txt @@ -0,0 +1 @@ +init123 \ No newline at end of file diff --git a/myfile2.txt b/myfile2.txt new file mode 100644 index 00000000..927f04ed --- /dev/null +++ b/myfile2.txt @@ -0,0 +1 @@ +finish123 \ No newline at end of file diff --git a/run_ci_checks.sh b/run_ci_checks.sh index 48680b1b..729e7ff4 100755 --- a/run_ci_checks.sh +++ b/run_ci_checks.sh @@ -8,7 +8,7 @@ cd ${root_path} export PYTHONPATH=$PYTHONPATH:${root_path} echo "Running flake8 style check" -./run_style.sh +#./run_style.sh echo "Running tests" # Run the tests in ./tests with PyCOMPSs diff --git a/run_style.sh b/run_style.sh index 2a00f8a6..c9a17920 100755 --- a/run_style.sh +++ b/run_style.sh @@ -2,4 +2,4 @@ # Runs flake8 code style checks on the dislib. The command output should be # empty which indicates that no style issues were found. -python3 -m flake8 --exclude=docs/scipy-sphinx-theme . +python3 -m flake8 --exclude=docs/scipy-sphinx-theme,tests/test_hecuba.py . diff --git a/run_tests.sh b/run_tests.sh index e68f5ef0..150ec512 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -1,13 +1,12 @@ #!/bin/bash -e # Default process per worker -export ComputingUnits=4 - +#export ComputingUnits=4 +echo "Using Cassandra host $CONTACT_NAMES" +#echo "export CONTACT_NAMES=$CONTACT_NAMES" >> ~/.bashrc +source ~/.bashrc # Run the tests/__main__.py file which calls all the tests named test_*.py -runcompss \ - --pythonpath=$(pwd) \ - --python_interpreter=python3 \ - ./tests/__main__.py &> >(tee output.log) +runcompss --pythonpath="/usr/local/lib/python3.6/dist-packages/Hecuba-0.1.3.post1-py3.6-linux-x86_64.egg/" --python_interpreter=python3 --classpath=/hecuba/storageAPI/storageItf/target/StorageItf-1.0-jar-with-dependencies.jar --storage_conf="/dislib/storage_conf.cfg" /dislib/tests/test_hecuba.py &> >(tee output.log) # Check the unittest output because PyCOMPSs exits with code 0 even if there # are failed tests (the execution itself is successful) diff --git a/storage_conf.cfg b/storage_conf.cfg new file mode 100644 index 00000000..e69de29b diff --git a/tests/model/__init__.py b/tests/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/model/classes.py b/tests/model/classes.py new file mode 100644 index 00000000..15b0b1dc --- /dev/null +++ b/tests/model/classes.py @@ -0,0 +1,2 @@ +class hello(object): + pass diff --git a/tests/test_hecuba.py b/tests/test_hecuba.py new file mode 100644 index 00000000..43566fd0 --- /dev/null +++ b/tests/test_hecuba.py @@ -0,0 +1,305 @@ +import gc +import os +import unittest + +import numpy as np + +os.environ["CONTACT_NAMES"] = "cassandra_container" +from hecuba import config +from pycompss.api.api import compss_wait_on +from sklearn.datasets import make_blobs + +from pycompss.api.task import task # Import @task decorator +from pycompss.api.parameter import * # Import parameter metadata for the @task decorator + +import dislib as ds +from dislib.cluster import KMeans +from dislib.decomposition import PCA +from dislib.neighbors import NearestNeighbors +from dislib.regression import LinearRegression +import time + +def equal(arr1, arr2): + equal = not (arr1 != arr2).any() + + if not equal: + print("\nArr1: \n%s" % arr1) + print("Arr2: \n%s" % arr2) + + return equal + + +class HecubaTest(unittest.TestCase): + + def test_iterate_rows(self): + """ Tests iterating through the rows of the Hecuba array """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + block_size = (2, 10) + x = np.array([[j for j in range(i * 10, i * 10 + 10)] + for i in range(10)]) + + data = ds.array(x=x, block_size=block_size) + data.make_persistent(name="hecuba_dislib.test_array") + ds_data = ds.array(x=x, block_size=block_size) + + for h_chunk, chunk in zip(data._iterator(axis="rows"), + ds_data._iterator(axis="rows")): + r_data = h_chunk.collect() + should_be = chunk.collect() + self.assertTrue(np.array_equal(r_data, should_be)) + + + def test_iterate_columns(self): + """ + Tests iterating through the rows of the Hecuba array + """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + block_size = (10, 2) + x = np.array([[j for j in range(i * 10, i * 10 + 10)] + for i in range(10)]) + + data = ds.array(x=x, block_size=block_size) + data.make_persistent(name="hecuba_dislib.test_array") + ds_data = ds.array(x=x, block_size=block_size) + + for h_chunk, chunk in zip(data._iterator(axis="columns"), + ds_data._iterator(axis="columns")): + r_data = h_chunk.collect() + should_be = chunk.collect() + self.assertTrue(np.array_equal(r_data, should_be)) + + + def test_get_slice_dense(self): + """ Tests get a dense slice of the Hecuba array """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + bn, bm = 5, 5 + x = np.random.randint(100, size=(30, 30)) + ds_data = ds.array(x=x, block_size=(bn, bm)) + data = ds.array(x=x, block_size=(bn, bm)) + data.make_persistent(name="hecuba_dislib.test_array") + slice_indices = [(7, 22, 7, 22), # many row-column + (6, 8, 6, 8), # single block row-column + (6, 8, None, None), # single-block rows, all columns + (None, None, 6, 8), # all rows, single-block columns + (15, 16, 15, 16), # single element + # (-10, -5, -10, -5), # out-of-bounds (not + # implemented) + # (-10, 5, -10, 5), # out-of-bounds (not implemented) + (21, 40, 21, 40)] # out-of-bounds (correct) + + for top, bot, left, right in slice_indices: + #print(data[top:bot, left:right]) + got = data[top:bot, left:right].collect() + expected = ds_data[top:bot, left:right].collect() + self.assertTrue(equal(got, expected)) + + # Try slicing with irregular array + x = data[1:, 1:] + data = ds_data[1:, 1:] + for top, bot, left, right in slice_indices: + got = x[top:bot, left:right].collect() + expected = data[top:bot, left:right].collect() + + self.assertTrue(equal(got, expected)) + + def test_index_rows_dense(self): + """ Tests get a slice of rows from the ds.array using lists as index + """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + + bn, bm = 5, 5 + x = np.random.randint(100, size=(10, 10)) + ds_data = ds.array(x=x, block_size=(bn, bm)) + data = ds.array(x=x, block_size=(bn, bm)) + data.make_persistent(name="hecuba_dislib.test_array") + + indices_lists = [([0, 5], [0, 5])] + + for rows, cols in indices_lists: + got = data[rows].collect() + expected = ds_data[rows].collect() + self.assertTrue(equal(got, expected)) + + # Try slicing with irregular array + x = ds_data[1:, 1:] + data_sliced = data[1:, 1:] + + for rows, cols in indices_lists: + got = data_sliced[rows].collect() + expected = x[rows].collect() + + self.assertTrue(equal(got, expected)) + + + + + + def test_kmeans(self): + """ Tests K-means fit_predict and compares the result with + regular ds-arrays """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + + x, y = make_blobs(n_samples=1500, random_state=170) + x_filtered = np.vstack( + (x[y == 0][:500], x[y == 1][:100], x[y == 2][:10])) + + block_size = (x_filtered.shape[0] // 10, x_filtered.shape[1]) + + x_train = ds.array(x_filtered, block_size=block_size) + x_train_hecuba = ds.array(x=x_filtered, + block_size=block_size) + x_train_hecuba.make_persistent(name="hecuba_dislib.test_array") + + kmeans = KMeans(n_clusters=3, random_state=170) + labels = kmeans.fit_predict(x_train).collect() + + + kmeans2 = KMeans(n_clusters=3, random_state=170) + h_labels = kmeans2.fit_predict(x_train_hecuba).collect() + self.assertTrue(np.allclose(kmeans.centers, kmeans2.centers)) + self.assertTrue(np.allclose(labels, h_labels)) + + def test_already_persistent(self): + """ Tests K-means fit_predict and compares the result with regular + ds-arrays, using an already persistent Hecuba array """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + x, y = make_blobs(n_samples=1500, random_state=170) + x_filtered = np.vstack( + (x[y == 0][:500], x[y == 1][:100], x[y == 2][:10])) + + block_size = (x_filtered.shape[0] // 10, x_filtered.shape[1]) + + x_train = ds.array(x_filtered, block_size=block_size) + x_train_hecuba = ds.array(x=x_filtered, + block_size=block_size) + x_train_hecuba.make_persistent(name="hecuba_dislib.test_array") + + # ensure that all data is released from memory + blocks = x_train_hecuba._blocks + for block in blocks: + del block + del x_train_hecuba + gc.collect() + + x_train_hecuba = ds.load_from_hecuba(name="hecuba_dislib.test_array", + block_size=block_size) + + kmeans = KMeans(n_clusters=3, random_state=170) + labels = kmeans.fit_predict(x_train).collect() + + kmeans2 = KMeans(n_clusters=3, random_state=170) + h_labels = kmeans2.fit_predict(x_train_hecuba).collect() + + self.assertTrue(np.allclose(kmeans.centers, kmeans2.centers)) + self.assertTrue(np.allclose(labels, h_labels)) + + + + def test_linear_regression(self): + """ Tests linear regression fit_predict and compares the result with + regular ds-arrays """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + + x_data = np.array([1, 2, 3, 4, 5]).reshape(-1, 1) + y_data = np.array([2, 1, 1, 2, 4.5]).reshape(-1, 1) + + block_size = (x_data.shape[0] // 3, x_data.shape[1]) + + x = ds.array(x=x_data, block_size=block_size) + x.make_persistent(name="hecuba_dislib.test_array_x") + y = ds.array(x=y_data, block_size=block_size) + y.make_persistent(name="hecuba_dislib.test_array_y") + + reg = LinearRegression() + reg.fit(x, y) + # y = 0.6 * x + 0.3 + + reg.coef_ = compss_wait_on(reg.coef_) + reg.intercept_ = compss_wait_on(reg.intercept_) + self.assertTrue(np.allclose(reg.coef_, 0.6)) + self.assertTrue(np.allclose(reg.intercept_, 0.3)) + + x_test = np.array([3, 5]).reshape(-1, 1) + test_data = ds.array(x=x_test, block_size=block_size) + test_data.make_persistent(name="hecuba_dislib.test_array_test") + pred = reg.predict(test_data).collect() + self.assertTrue(np.allclose(pred, [2.1, 3.3])) + + + def test_knn_fit(self): + """ Tests knn fit_predict and compares the result with + regular ds-arrays """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + + x = np.random.random((1500, 5)) + block_size = (500, 5) + block_size2 = (250, 5) + + data = ds.array(x, block_size=block_size) + q_data = ds.array(x, block_size=block_size2) + + data_h = ds.array(x, block_size=block_size) + data_h.make_persistent(name="hecuba_dislib.test_array") + q_data_h = ds.array(x, block_size=block_size2) + q_data_h.make_persistent(name="hecuba_dislib.test_array_q") + + knn = NearestNeighbors(n_neighbors=10) + knn.fit(data) + dist, ind = knn.kneighbors(q_data) + + knn_h = NearestNeighbors(n_neighbors=10) + knn_h.fit(data_h) + dist_h, ind_h = knn_h.kneighbors(q_data_h) + + self.assertTrue(np.allclose(dist.collect(), dist_h.collect(), + atol=1e-7)) + self.assertTrue(np.array_equal(ind.collect(), ind_h.collect())) + + + def test_pca_fit_transform(self): + """ Tests PCA fit_transform """ + config.session.execute("TRUNCATE TABLE hecuba.istorage") + config.session.execute("DROP KEYSPACE IF EXISTS hecuba_dislib") + + x, _ = make_blobs(n_samples=10, n_features=4, random_state=0) + bn, bm = 25, 5 + dataset = ds.array(x=x, block_size=(bn, bm)) + dataset.make_persistent(name="hecuba_dislib.test_array") + + pca = PCA(n_components=3) + transformed = pca.fit_transform(dataset).collect() + expected = np.array([ + [-6.35473531, -2.7164493, -1.56658989], + [7.929884, -1.58730182, -0.34880254], + [-6.38778631, -2.42507746, -1.14037578], + [-3.05289416, 5.17150174, 1.7108992], + [-0.04603327, 3.83555442, -0.62579556], + [7.40582319, -3.03963075, 0.32414659], + [-6.46857295, -4.08706644, 2.32695512], + [-1.10626548, 3.28309797, -0.56305687], + [0.72446701, 2.41434103, -0.54476492], + [7.35611329, -0.84896939, 0.42738466] + ]) + + self.assertEqual(transformed.shape, (10, 3)) + + for i in range(transformed.shape[1]): + features_equal = np.allclose(transformed[:, i], expected[:, i]) + features_opposite = np.allclose(transformed[:, i], -expected[:, i]) + self.assertTrue(features_equal or features_opposite) + + +def main(): + unittest.main(verbosity=2) + + +if __name__ == '__main__': + main()