From 35f8d0e89904357b30a9d82454e9701c01b60ea4 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Sun, 1 Nov 2015 01:08:13 -0400 Subject: [PATCH] Naive iterator --- GNUmakefile.in | 2 +- kvrow.hh | 34 ++++++++ kvtest.hh | 77 +++++++++++++++++ masstree.hh | 6 ++ masstree_iterator.hh | 192 +++++++++++++++++++++++++++++++++++++++++++ masstree_key.hh | 1 - mttest.cc | 13 +++ query_masstree.cc | 42 ++++++++++ query_masstree.hh | 1 + scantest.cc | 18 +++- 10 files changed, 381 insertions(+), 5 deletions(-) create mode 100644 masstree_iterator.hh diff --git a/GNUmakefile.in b/GNUmakefile.in index 746a1f9..0c75368 100644 --- a/GNUmakefile.in +++ b/GNUmakefile.in @@ -88,7 +88,7 @@ stamp-h: config.h.in config.status echo > stamp-h clean: - rm -f mtd mtclient mttest test_string test_atomics *.o libjson.a + rm -f mtd mtclient mttest scantest test_string test_atomics *.o libjson.a rm -rf .deps DEPFILES := $(wildcard $(DEPSDIR)/*.d) diff --git a/kvrow.hh b/kvrow.hh index 81e71d8..e0205a5 100644 --- a/kvrow.hh +++ b/kvrow.hh @@ -66,6 +66,8 @@ class query { void run_scan(T& table, Json& request, threadinfo& ti); template void run_rscan(T& table, Json& request, threadinfo& ti); + template + void run_iscan(T& table, Json& request, threadinfo& ti); const loginfo::query_times& query_times() const { return qtimes_; @@ -318,4 +320,36 @@ void query::run_rscan(T& table, Json& request, threadinfo& ti) { table.rscan(scanf.firstkey(), true, scanf, ti); } +template template +void query::run_iscan(T& table, Json& request, threadinfo& ti) { + assert(request[3].as_i() > 0); + f_.clear(); + for (int i = 4; i != request.size(); ++i) + f_.push_back(request[i].as_i()); + int nleft = request[3].as_i(); + lcdf::String firstkey; + std::swap(request[2].value().as_s(), firstkey); + request.resize(2); + scankeypos_ = 0; + typename T::iterator it = table.iterate_from(firstkey, ti); + for (; nleft != 0 && it != table.end(ti); nleft--) { + Str key = it->first; + R* value = it->second; + if (row_is_marker(value)) + break; + // NB the `key` is not stable! We must save space for it. + while (scankeypos_ + key.length() > scankey_.length()) { + scankey_ = lcdf::String::make_uninitialized(scankey_.length() ? scankey_.length() * 2 : 1024); + scankeypos_ = 0; + } + memcpy(const_cast(scankey_.data() + scankeypos_), + key.data(), key.length()); + request.push_back(scankey_.substr(scankeypos_, key.length())); + scankeypos_ += key.length(); + request.push_back(lcdf::Json()); + emit_fields1(value, request.back(), ti); + it++; + } +} + #endif diff --git a/kvtest.hh b/kvtest.hh index a9645b2..e70ca5c 100644 --- a/kvtest.hh +++ b/kvtest.hh @@ -1486,6 +1486,83 @@ void kvtest_rscan1(C &client, double writer_quiet) client.report(result); } +template +void kvtest_iscan1(C &client, double writer_quiet) +{ + int n, wq65536 = int(writer_quiet * 65536); + if (client.limit() == ~(uint64_t) 0) + n = 10000; + else + n = std::min(client.limit(), (uint64_t) 97655); + Json result; + + if (client.id() % 24 == 0) { + for (int i = 0; i < n; ++i) + client.put_key8(i * 1024, i); + client.wait_all(); + + int pos = 0, mypos = 0, scansteps = 0; + quick_istr key; + std::vector keys, values; + Json errj; + while (!client.timeout(0) && errj.size() < 1000) { + key.set(pos, 8); + client.iscan_sync(key.string(), 100, keys, values); + if (keys.size() == 0) { + if (mypos < n * 1024) + errj.push_back("missing " + String(mypos) + " through " + String((n - 1) * 1024)); + pos = mypos = 0; + } else { + for (size_t i = 0; i < keys.size(); ++i) { + int val = keys[i].to_i(); + if (val < 0) { + errj.push_back("unexpected key " + String(keys[i].s, keys[i].len)); + continue; + } + if (val < pos) + errj.push_back("got " + String(keys[i].s, keys[i].len) + ", expected " + String(pos) + " or later"); + pos = val + 1; + while (val > mypos) { + errj.push_back("got " + String(keys[i].s, keys[i].len) + ", missing " + String(mypos) + " @" + String(scansteps) + "+" + String(i)); + mypos += 1024; + } + if (val == mypos) { + mypos = val + 1024; + ++scansteps; + } + } + } + client.rcu_quiesce(); + } + if (errj.size() >= 1000) + errj.push_back("too many errors, giving up"); + result.set("ok", errj.empty()).set("scansteps", scansteps); + if (errj) + result.set("errors", errj); + + } else { + int delta = 1 + (client.id() % 30) * 32, rounds = 0; + while (!client.timeout(0)) { + int first = (client.rand.next() % n) * 1024 + delta; + int rand = client.rand.next() % 65536; + if (rand < wq65536) { + for (int d = 0; d < 31; ++d) + relax_fence(); + } else if (rounds > 100 && (rand % 2) == 1) { + for (int d = 0; d < 31; ++d) + client.remove_key8(d + first); + } else { + for (int d = 0; d < 31; ++d) + client.put_key8(d + first, d + first); + } + ++rounds; + client.rcu_quiesce(); + } + } + + client.report(result); +} + // test concurrent splits with removes in lower layers template void kvtest_splitremove1(C &client) diff --git a/masstree.hh b/masstree.hh index ea7d5c0..b08fafc 100644 --- a/masstree.hh +++ b/masstree.hh @@ -60,6 +60,7 @@ class basic_table { typedef typename P::threadinfo_type threadinfo; typedef unlocked_tcursor

unlocked_cursor_type; typedef tcursor

cursor_type; + typedef std::pair itvalue_type; inline basic_table(); @@ -76,6 +77,11 @@ class basic_table { template int rscan(Str firstkey, bool matchfirst, F& scanner, threadinfo& ti) const; + class iterator; + iterator begin(threadinfo& ti); + iterator iterate_from(Str firstkey, threadinfo& ti); + iterator end(threadinfo& ti); + template inline int modify(Str key, F& f, threadinfo& ti); template diff --git a/masstree_iterator.hh b/masstree_iterator.hh new file mode 100644 index 0000000..44c9c30 --- /dev/null +++ b/masstree_iterator.hh @@ -0,0 +1,192 @@ +/* Masstree + * Eddie Kohler, Yandong Mao, Robert Morris + * Copyright (c) 2012-2014 President and Fellows of Harvard College + * Copyright (c) 2012-2014 Massachusetts Institute of Technology + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, subject to the conditions + * listed in the Masstree LICENSE file. These conditions include: you must + * preserve this copyright notice, and you cannot mention the copyright + * holders in advertising related to the Software without their permission. + * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This + * notice is a summary of the Masstree LICENSE file; the license in that file + * is legally binding. + */ +#ifndef MASSTREE_ITERATOR_HH +#define MASSTREE_ITERATOR_HH +#include "masstree_scan.hh" + +namespace Masstree { +template +class basic_table

::iterator + : std::iterator { + typedef typename P::ikey_type ikey_type; + typedef typename node_type::key_type key_type; + typedef typename node_type::leaf_type::leafvalue_type leafvalue_type; + typedef typename node_type::nodeversion_type nodeversion_type; + typedef typename leaf_type::permuter_type permuter_type; + typedef typename leaf_type::bound_type bound_type; + + public: + iterator(basic_table

* table, threadinfo* ti, Str firstkey = ""); + static iterator make_end(basic_table

* table, threadinfo *ti); + + itvalue_type& operator*() { assert(!end_); return pair_; }; + itvalue_type* operator->() { assert(!end_); return &pair_; }; + bool operator==(const iterator& rhs) { return (end_ == rhs.end_) && (end_ || ka_.compare(rhs.ka_) == 0); }; + bool operator!=(const iterator& rhs) { return !(*this == rhs); }; + iterator operator++() { advance(); return *this; }; + iterator operator++(int) { iterator it = *this; advance(); return it; }; + + private: + basic_table

* table_; + threadinfo* ti_; + key_type ka_; + value_type value_; + itvalue_type pair_; + bool end_; + union { + ikey_type x[(MASSTREE_MAXKEYLEN + sizeof(ikey_type) - 1)/sizeof(ikey_type)]; + char s[MASSTREE_MAXKEYLEN]; + } keybuf_; + + void advance(bool emit_equal = false); + + + // Debugging support. + int id_; + static int count_; + + void dprintf(const char *format, ...) { + va_list args; + va_start(args, format); + fprintf(stderr, "it%d: ", id_); + vfprintf(stderr, format, args); + va_end(args); + } +}; + +template int basic_table

::iterator::count_ = 0; + +template +basic_table

::iterator::iterator(basic_table

* table, threadinfo* ti, Str firstkey) + : table_(table), ti_(ti), end_(false), id_(count_++) { + masstree_precondition(firstkey.len <= (int) sizeof(keybuf_)); + memcpy(keybuf_.s, firstkey.s, firstkey.len); + ka_ = key_type(keybuf_.s, firstkey.len); + + advance(true); +}; + +template +typename basic_table

::iterator +basic_table

::iterator::make_end(basic_table

* table, threadinfo *ti) { + iterator it = iterator(table, ti); + it.end_ = true; + return it; +} + +template +void +basic_table

::iterator::advance(bool emit_equal) { + int ki; + bool try_next_index = true; + leaf_type* n; + nodeversion_type v; + node_type* root; + permuter_type perm; + Str suffix; + char suffixbuf[MASSTREE_MAXKEYLEN]; + + retry_root: + ka_.unshift_all(); + root = table_->root(); + n = root->reach_leaf(ka_, v, *ti_); + perm = n->permutation(); + ki = bound_type::lower(ka_, *n).i; + + retry: + if (v.deleted()) + goto retry_root; + + int kp = (unsigned(ki) < unsigned(perm.size())) ? perm[ki] : -1; + if (kp < 0) { + n = n->safe_next(); + if (!n) { + if (root == table_->root()) { + end_ = true; + return; + } + + ka_.unshift(); + while (ka_.increment() && ka_.is_shifted()) + ka_.unshift(); + ka_.assign_store_ikey(ka_.ikey()); + ka_.assign_store_length(ka_.ikey_size); + goto retry_root; + } + perm = n->permutation(); + v = n->stable(); + ki = bound_type::lower(ka_, *n).i; + goto retry; + } + + int keylenx = n->keylenx_[kp]; + ikey_type ikey = n->ikey0_[kp]; + leafvalue_type entry = n->lv_[kp]; + if (n->keylenx_has_ksuf(keylenx)) { + suffix = n->ksuf(kp); + memcpy(suffixbuf, suffix.s, suffix.len); + suffix.s = suffixbuf; + } + + if (n->has_changed(v)) + goto retry_root; + + ka_.assign_store_ikey(ikey); + if (n->keylenx_is_layer(keylenx)) { + ka_.shift(); + root = entry.layer(); + n = root->reach_leaf(ka_, v, *ti_); + perm = n->permutation(); + ki = bound_type::lower(ka_, *n).i; + goto retry; + } + + // XXX This condition is suspect. + if (!emit_equal && try_next_index && + (!n->keylenx_has_ksuf(keylenx) || suffix.compare(ka_.suffix()) == 0)) { + try_next_index = false; + ki++; + goto retry; + } + + int keylen = keylenx; + if (n->keylenx_has_ksuf(keylenx)) { + keylen = ka_.assign_store_suffix(suffix); + } + ka_.assign_store_length(keylen); + ka_.unshift_all(); + pair_ = itvalue_type(ka_, entry.value()); +} + +template +typename basic_table

::iterator +basic_table

::begin(threadinfo& ti) { + return iterator(this, &ti); +} + +template +typename basic_table

::iterator +basic_table

::end(threadinfo& ti) { + return iterator::make_end(this, &ti); +} + +template +typename basic_table

::iterator +basic_table

::iterate_from(Str firstkey, threadinfo& ti) { + return iterator(this, &ti, firstkey); +} +} +#endif diff --git a/masstree_key.hh b/masstree_key.hh index 4d1e8a7..f003348 100644 --- a/masstree_key.hh +++ b/masstree_key.hh @@ -185,7 +185,6 @@ class key { // Return true iff wrapped. if (has_suffix()) { ++ikey0_; - len_ = 1; return unlikely(!ikey0_); } else { ++len_; diff --git a/mttest.cc b/mttest.cc index abd6ae9..7ece1e4 100644 --- a/mttest.cc +++ b/mttest.cc @@ -59,6 +59,7 @@ #include "masstree_tcursor.hh" #include "masstree_insert.hh" #include "masstree_remove.hh" +#include "masstree_iterator.hh" #include "masstree_scan.hh" #include "timestamp.hh" #include "json.hh" @@ -219,6 +220,8 @@ struct kvtest_client { std::vector &keys, std::vector &values); void rscan_sync(const Str &firstkey, int n, std::vector &keys, std::vector &values); + void iscan_sync(const Str &firstkey, int n, + std::vector &keys, std::vector &values); void put(const Str &key, const Str &value); void put(const char *key, const char *value) { @@ -402,6 +405,15 @@ void kvtest_client::rscan_sync(const Str &firstkey, int n, output_scan(req, keys, values); } +template +void kvtest_client::iscan_sync(const Str &firstkey, int n, + std::vector &keys, + std::vector &values) { + Json req = Json::array(0, 0, firstkey, n); + q_[0].run_iscan(table_->table(), req, *ti_); + output_scan(req, keys, values); +} + template void kvtest_client::output_scan(const Json& req, std::vector& keys, std::vector& values) const { @@ -539,6 +551,7 @@ MAKE_TESTRUNNER(scan1, kvtest_scan1(client, 0)); MAKE_TESTRUNNER(scan1q80, kvtest_scan1(client, 0.8)); MAKE_TESTRUNNER(rscan1, kvtest_rscan1(client, 0)); MAKE_TESTRUNNER(rscan1q80, kvtest_rscan1(client, 0.8)); +MAKE_TESTRUNNER(iscan1, kvtest_iscan1(client, 0)); MAKE_TESTRUNNER(splitremove1, kvtest_splitremove1(client)); MAKE_TESTRUNNER(url, kvtest_url(client)); diff --git a/query_masstree.cc b/query_masstree.cc index 5c88402..79f43d6 100644 --- a/query_masstree.cc +++ b/query_masstree.cc @@ -19,6 +19,7 @@ #include "masstree_tcursor.hh" #include "masstree_get.hh" #include "masstree_insert.hh" +#include "masstree_iterator.hh" #include "masstree_split.hh" #include "masstree_remove.hh" #include "masstree_scan.hh" @@ -413,6 +414,47 @@ void query_table

::test(threadinfo& ti) { // XXX destroy tree } +template +void query_table

::iterator_test(threadinfo& ti) { + typedef typename basic_table

::iterator iterator; + + query_table

t; + t.initialize(ti); + query q; + + const char * const values[] = { + "", "0", "1", "10", "100000000", // 0-4 + "1000000001", "1000000002", "2", "20", "200000000", // 5-9 + "aaaaaaaaaaaaaaaaaaaaaaaaaa", // 10 + "aaaaaaaaaaaaaaabbbb", "aaaaaaaaaaaaaaabbbc", "aaaaaaaaaxaaaaabbbc", "b", "c", "d", "e", "f", "g", "h", "i", "j", + "kkkkkkkk\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" "a", + "kkkkkkkk\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" "b", + "llllllll", + "lllllllln", + "llllllllo", + "xxxxxxxxy" + }; + const char *values_copy[arraysize(values)]; + memcpy(values_copy, values, sizeof(values)); + + for (int i = arraysize(values); i > 0; --i) { + int x = rand() % i; + q.run_replace(t.table(), Str(values_copy[x]), Str(values_copy[x]), ti); + values_copy[x] = values_copy[i - 1]; + } + + const char * const * pos = values; + for (iterator it = t.table_.begin(ti); it != t.table_.end(ti); it++) { + Str key = it->first; + if ((int) strlen(*pos) != key.len || memcmp(*pos, key.s, key.len) != 0) { + fprintf(stderr, "scan encountered \"%.*s\", expected \"%s\"\n", key.len, key.s, *pos); + assert((int) strlen(*pos) == key.len && memcmp(*pos, key.s, key.len) == 0); + } + fprintf(stderr, "scan %.*s\n", key.len, key.s); + pos++; + } +} + template void query_table

::print(FILE *f, int indent) const { table_.print(f, indent); diff --git a/query_masstree.hh b/query_masstree.hh index eb99899..98434e6 100644 --- a/query_masstree.hh +++ b/query_masstree.hh @@ -61,6 +61,7 @@ class query_table { void print(FILE* f, int indent) const; static void test(threadinfo& ti); + static void iterator_test(threadinfo& ti); static const char* name() { return "mb"; diff --git a/scantest.cc b/scantest.cc index 7a30549..04b3afe 100644 --- a/scantest.cc +++ b/scantest.cc @@ -10,9 +10,21 @@ kvtimestamp_t initial_timestamp; int main(int argc, char *argv[]) { - (void) argc; - (void) argv; + if (argc != 2) { + fprintf(stderr, "usage: %s \n", argv[0]); + return 1; + } threadinfo* ti = threadinfo::make(threadinfo::TI_MAIN, -1); - default_table::test(*ti); + + if (strcmp(argv[1], "callback") == 0) + default_table::test(*ti); + else if (strcmp(argv[1], "iterator") == 0) + default_table::iterator_test(*ti); + else { + fprintf(stderr, "fatal: unknown test\n"); + return 1; + } + + return 0; }