Skip to content

Implementation of distribution_t data structure #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 67 commits into
base: project-sshmidt
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
ed36b7e
add distribution.h
Lana243 Jul 24, 2020
99ce818
add linear constructor
Lana243 Jul 24, 2020
8e04c19
add checks + add infinity bucket
Lana243 Jul 27, 2020
0b57898
add build_distribution_from_bucket_array function
Lana243 Jul 27, 2020
8170a93
add exponential constructor
Lana243 Jul 27, 2020
88137d6
fix possible memory leaks in build_distribution_from_bucket_array
Lana243 Jul 27, 2020
b47f5a2
add custom constructor
Lana243 Jul 27, 2020
edbd83c
add distribution_clone()
Lana243 Jul 27, 2020
33ed94a
add distribution_update()
Lana243 Jul 27, 2020
175687a
add distribution_average()
Lana243 Jul 27, 2020
674b70f
add static keyword to private functions
Lana243 Jul 28, 2020
c645475
add unused attribute
Lana243 Jul 28, 2020
ca2065e
fix attributes
Lana243 Jul 28, 2020
bbec9d9
fixed factor checker
Lana243 Jul 28, 2020
c9e191f
fix code duplication
Lana243 Jul 28, 2020
775fa03
change heap allocation to stack allocation
Lana243 Jul 28, 2020
cb589e2
change custom sizes to custom boundaries
Lana243 Jul 28, 2020
2e75bd5
add percentile calculation
Lana243 Jul 28, 2020
7350276
add license
Lana243 Jul 28, 2020
6707532
add comments about stroing tree as an array
Lana243 Jul 28, 2020
47bd4ab
add comments to constructor functions
Lana243 Jul 28, 2020
535fba9
add comments to header file
Lana243 Jul 28, 2020
15d383a
change exponential constructor
Lana243 Jul 29, 2020
e3cbd05
cosmetical change in linear constructor
Lana243 Jul 29, 2020
258513b
cosmetic changes in custom constructor
Lana243 Jul 29, 2020
a8f8f29
add checker in average function
Lana243 Jul 29, 2020
9774b56
change memory allocation style
Lana243 Jul 29, 2020
782ca67
change counter type from size_t to uint64_t
Lana243 Jul 29, 2020
7295215
fix compilation errors
Lana243 Jul 29, 2020
fa3386c
add distribution_test.c file
Lana243 Jul 29, 2020
4612816
add distribution files to build system
Lana243 Jul 29, 2020
d654246
test ssh commit
Lana243 Jul 29, 2020
117b89f
add unit tests into a build system
Lana243 Jul 30, 2020
d91926f
move bucket_t declaration to the header file
Lana243 Jul 30, 2020
4a9d323
add distribution_num_buckets function
Lana243 Jul 30, 2020
30e6b45
add some unit tests for distribution_new_linear
Lana243 Jul 30, 2020
8a57e8c
add getter for buckets
Lana243 Jul 31, 2020
5605691
add checker to bucket getter
Lana243 Jul 31, 2020
2a86e20
add some unit tests for distribution_new_linear
Lana243 Jul 31, 2020
d58ca9a
add some unit tests for distribution_new_linear
Lana243 Jul 31, 2020
71b1f17
add some test for distribution_new_linear
Lana243 Jul 31, 2020
8978487
add some unit tests for distribution_new_exponential
Lana243 Jul 31, 2020
64adeca
add destroy function for bucket_array_t data structure
Lana243 Jul 31, 2020
9b6695f
add some tests for exponential constructor (test failed because of do…
Lana243 Jul 31, 2020
4346809
fix exponential constructor unit tests
Lana243 Jul 31, 2020
92219cd
remove libdistribution.la and remove -lm from COMMON_LIBS
Lana243 Aug 3, 2020
5446544
change return value from tree getter function
Lana243 Aug 3, 2020
348bdeb
add unit tests for custom constructor + add inf checker to custom con…
Lana243 Aug 3, 2020
6935af1
add unit tests for update function
Lana243 Aug 3, 2020
a9e98cd
add unit tests for average function
Lana243 Aug 3, 2020
648aa5e
add unit tests for percentile calculation
Lana243 Aug 3, 2020
92854a6
add benchmark
Lana243 Aug 6, 2020
fb739ea
benchmark done
Lana243 Aug 7, 2020
1bcd249
add benchmark for million updates
Lana243 Aug 7, 2020
bf6ac1c
removed unnecessary if condition from the building function
Lana243 Aug 11, 2020
609775c
fix codestyle
Lana243 Aug 11, 2020
834ef7a
fix distribution clone function
Lana243 Aug 12, 2020
0ae46c4
add mutexes
Lana243 Aug 12, 2020
cd7c0c1
add unit test for clone function
Lana243 Aug 12, 2020
80ee320
change benchmark
Lana243 Aug 12, 2020
9c04dfe
change includes to collectd.h
Lana243 Aug 13, 2020
347a84b
fix codestyle in distribution data structure
Lana243 Aug 14, 2020
8d2095f
remove distribution benchmark as full benchmark was created in google…
Lana243 Aug 17, 2020
547fd70
remove benchmark csv files
Lana243 Aug 17, 2020
5573bbb
remove benchmark from Makefile
Lana243 Aug 17, 2020
b8611e2
fix Makefile.am
Lana243 Aug 17, 2020
ae97265
fix memory leaks in clone unit test
Lana243 Aug 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ sbin_PROGRAMS = \
bin_PROGRAMS = \
collectd-nagios \
collectd-tg \
collectdctl
collectdctl
endif # BUILD_WIN32


Expand Down Expand Up @@ -151,6 +151,7 @@ check_LTLIBRARIES = \

check_PROGRAMS = \
test_common \
test_distribution \
test_format_graphite \
test_meta_data \
test_metric \
Expand Down Expand Up @@ -356,6 +357,11 @@ test_meta_data_SOURCES = \
src/testing.h
test_meta_data_LDADD = libmetadata.la libplugin_mock.la

test_distribution_SOURCES = \
src/daemon/distribution_test.c \
src/testing.h
test_distribution_LDADD = libmetric.la libplugin_mock.la

test_metric_SOURCES = \
src/daemon/metric_test.c \
src/testing.h
Expand Down Expand Up @@ -426,9 +432,11 @@ libmetadata_la_SOURCES = \
src/utils/metadata/meta_data.h

libmetric_la_SOURCES = \
src/daemon/distribution.c \
src/daemon/distribution.h \
src/daemon/metric.c \
src/daemon/metric.h
libmetric_la_LIBADD = libmetadata.la $(COMMON_LIBS)
libmetric_la_LIBADD = libmetadata.la $(COMMON_LIBS) -lm

libplugin_mock_la_SOURCES = \
src/daemon/plugin_mock.c \
Expand Down
306 changes: 306 additions & 0 deletions src/daemon/distribution.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
/**
* collectd - src/daemon/distribution.c
* Copyright (C) 2019-2020 Google LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* Authors:
* Svetlana Shmidt <sshmidt at google.com>
**/

#include "distribution.h"

#include <pthread.h>

struct distribution_s {
bucket_t *tree;
size_t num_buckets;
double total_sum;
pthread_mutex_t mutex;
};

/**
* This code uses an Euler path to avoid gaps in the tree-to-array mapping.
* This way the tree contained N buckets contains 2 * N - 1 nodes
* Thus, left subtree has 2 * (mid - left + 1) - 1 nodes,
* therefore the right subtree starts at node_index + 2 * (mid - left + 1).
* For a detailed explanation, see
* https://docs.google.com/document/d/1ccsg5ffUfqt9-mBDGTymRn8X-9Wk1CuGYeMlRxmxiok/edit?usp=sharing".
*/

static size_t left_child_index(size_t node_index,
__attribute__((unused)) size_t left,
__attribute__((unused)) size_t right) {
return node_index + 1;
}

static size_t right_child_index(size_t node_index, size_t left, size_t right) {
size_t mid = (left + right) / 2;
return node_index + 2 * (mid - left + 1);
}

static size_t tree_size(size_t num_buckets) { return 2 * num_buckets - 1; }

static bucket_t merge_buckets(bucket_t left_child, bucket_t right_child) {
return (bucket_t){
.bucket_counter = left_child.bucket_counter + right_child.bucket_counter,
.maximum = right_child.maximum,
};
}

static void build_tree(distribution_t *d, bucket_t *buckets, size_t node_index,
size_t left, size_t right) {
if (left > right)
return;
if (left == right) {
d->tree[node_index] = buckets[left];
return;
}
size_t mid = (left + right) / 2;
size_t left_child = left_child_index(node_index, left, right);
size_t right_child = right_child_index(node_index, left, right);
build_tree(d, buckets, left_child, left, mid);
build_tree(d, buckets, right_child, mid + 1, right);
d->tree[node_index] =
merge_buckets(d->tree[left_child], d->tree[right_child]);
}

static distribution_t *
build_distribution_from_bucket_array(size_t num_buckets,
bucket_t *bucket_array) {
distribution_t *new_distribution = calloc(1, sizeof(*new_distribution));
bucket_t *nodes = calloc(tree_size(num_buckets), sizeof(*nodes));
if (new_distribution == NULL || nodes == NULL) {
free(new_distribution);
free(nodes);
return NULL;
}
new_distribution->tree = nodes;

new_distribution->num_buckets = num_buckets;
build_tree(new_distribution, bucket_array, 0, 0, num_buckets - 1);
pthread_mutex_init(&new_distribution->mutex, NULL);
return new_distribution;
}

distribution_t *distribution_new_linear(size_t num_buckets, double size) {
if (num_buckets == 0 || size <= 0) {
errno = EINVAL;
return NULL;
}

bucket_t bucket_array[num_buckets];
for (size_t i = 0; i < num_buckets; i++) {
bucket_array[i] = (bucket_t){
.bucket_counter = 0,
.maximum = (i == num_buckets - 1) ? INFINITY : (i + 1) * size,
};
}
return build_distribution_from_bucket_array(num_buckets, bucket_array);
}

distribution_t *distribution_new_exponential(size_t num_buckets, double base,
double factor) {
if (num_buckets == 0 || base <= 1 || factor <= 0) {
errno = EINVAL;
return NULL;
}

bucket_t bucket_array[num_buckets];
for (size_t i = 0; i < num_buckets; i++) {
bucket_array[i] = (bucket_t){
.bucket_counter = 0,
.maximum = (i == num_buckets - 1)
? INFINITY
: factor * pow(base, i), // check if it's slow
};
}
return build_distribution_from_bucket_array(num_buckets, bucket_array);
}

distribution_t *distribution_new_custom(size_t array_size,
double *custom_buckets_boundaries) {
for (size_t i = 0; i < array_size; i++) {
double previous_boundary = 0;
if (i > 0) {
previous_boundary = custom_buckets_boundaries[i - 1];
}
if (custom_buckets_boundaries[i] <= previous_boundary) {
errno = EINVAL;
return NULL;
}
}
if (array_size > 0 && custom_buckets_boundaries[array_size - 1] == INFINITY) {
errno = EINVAL;
return NULL;
}

size_t num_buckets = array_size + 1;
bucket_t bucket_array[num_buckets];
for (size_t i = 0; i < num_buckets; i++) {
bucket_array[i] = (bucket_t){
.bucket_counter = 0,
.maximum =
(i == num_buckets - 1) ? INFINITY : custom_buckets_boundaries[i],
};
}
return build_distribution_from_bucket_array(num_buckets, bucket_array);
}

void distribution_destroy(distribution_t *d) {
if (d == NULL)
return;
pthread_mutex_destroy(&d->mutex);
free(d->tree);
free(d);
}

distribution_t *distribution_clone(distribution_t *dist) {
if (dist == NULL)
return NULL;
distribution_t *new_distribution = calloc(1, sizeof(*new_distribution));
bucket_t *nodes = calloc(tree_size(dist->num_buckets), sizeof(*nodes));
if (new_distribution == NULL || nodes == NULL) {
free(new_distribution);
free(nodes);
return NULL;
}
pthread_mutex_lock(&dist->mutex);
memcpy(nodes, dist->tree, tree_size(dist->num_buckets) * sizeof(bucket_t));
new_distribution->num_buckets = dist->num_buckets;
new_distribution->total_sum = dist->total_sum;
pthread_mutex_unlock(&dist->mutex);
new_distribution->tree = nodes;
pthread_mutex_init(&new_distribution->mutex, NULL);
return new_distribution;
}

static void update_tree(distribution_t *dist, size_t node_index, size_t left,
size_t right, double gauge) {
if (left > right)
return;
dist->tree[node_index].bucket_counter++;
if (left == right) {
return;
}
size_t mid = (left + right) / 2;
size_t left_child = left_child_index(node_index, left, right);
size_t right_child = right_child_index(node_index, left, right);
if (dist->tree[left_child].maximum > gauge)
update_tree(dist, left_child, left, mid, gauge);
else
update_tree(dist, right_child, mid + 1, right, gauge);
}

void distribution_update(distribution_t *dist, double gauge) {
if (dist == NULL)
return;
if (gauge < 0) {
errno = EINVAL;
return;
}
pthread_mutex_lock(&dist->mutex);
update_tree(dist, 0, 0, dist->num_buckets - 1, gauge);
dist->total_sum += gauge;
pthread_mutex_unlock(&dist->mutex);
}

static double tree_get_counter(distribution_t *d, size_t node_index,
size_t left, size_t right, uint64_t counter) {
if (left > right)
return NAN;
if (left == right) {
return d->tree[node_index].maximum;
}
size_t mid = (left + right) / 2;
size_t left_child = left_child_index(node_index, left, right);
size_t right_child = right_child_index(node_index, left, right);
if (d->tree[left_child].bucket_counter >= counter)
return tree_get_counter(d, left_child, left, mid, counter);
else
return tree_get_counter(d, right_child, mid + 1, right,
counter - d->tree[left_child].bucket_counter);
}

double distribution_percentile(distribution_t *dist, double percent) {
if (percent <= 0 || percent > 100) {
errno = EINVAL;
return NAN;
}
pthread_mutex_lock(&dist->mutex);
if (dist->tree[0].bucket_counter == 0)
return NAN;
uint64_t counter = ceil(dist->tree[0].bucket_counter * percent / 100.0);
double percentile =
tree_get_counter(dist, 0, 0, dist->num_buckets - 1, counter);
pthread_mutex_unlock(&dist->mutex);
return percentile;
}

double distribution_average(distribution_t *dist) {
pthread_mutex_lock(&dist->mutex);
if (dist == NULL || dist->tree[0].bucket_counter == 0) {
return NAN;
}
double average = dist->total_sum / dist->tree[0].bucket_counter;
pthread_mutex_unlock(&dist->mutex);
return average;
}

size_t distribution_num_buckets(distribution_t *dist) {
if (dist == NULL)
return 0;
return dist->num_buckets;
}

static void tree_write_leave_buckets(distribution_t *dist, bucket_t *write_ptr,
size_t node_index, size_t left,
size_t right) {
if (left > right)
return;
if (left == right) {
write_ptr[left] = dist->tree[node_index];
return;
}
size_t mid = (left + right) / 2;
size_t left_child = left_child_index(node_index, left, right);
size_t right_child = right_child_index(node_index, left, right);
tree_write_leave_buckets(dist, write_ptr, left_child, left, mid);
tree_write_leave_buckets(dist, write_ptr, right_child, mid + 1, right);
}

buckets_array_t get_buckets(distribution_t *dist) {
buckets_array_t bucket_array = {
.num_buckets = dist == NULL ? 0 : dist->num_buckets,
.buckets = dist == NULL
? NULL
: calloc(dist->num_buckets, sizeof(*bucket_array.buckets)),
};
if (dist == NULL)
return bucket_array;
bucket_t *write_ptr = bucket_array.buckets;
pthread_mutex_lock(&dist->mutex);
tree_write_leave_buckets(dist, write_ptr, 0, 0, dist->num_buckets - 1);
pthread_mutex_unlock(&dist->mutex);
return bucket_array;
}

void destroy_buckets_array(buckets_array_t buckets_array) {
free(buckets_array.buckets);
}
Loading