Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
04372be
initial commit, add the kv-cache crate
ParkMyCar May 20, 2025
40efb6c
less generic KV read cache, add deps on mz-repr and mz-storage-types
aljoscha May 20, 2025
7bba2d0
Merge pull request #3 from aljoscha/hackweek/upsert-in-s3
ParkMyCar May 20, 2025
717f1e6
add bloom filter
ptravers May 20, 2025
92c6830
Merge pull request #4 from ptravers/hackweek/upsert-in-s3
ParkMyCar May 20, 2025
e9335c8
fix the build, plumb through an API when fetching a batch to ask what…
ParkMyCar May 20, 2025
fb87b34
Set up bloom filters + row groups + primary key pushdown for arrow an…
DAlperin May 20, 2025
1e8e385
Merge pull request #5 from DAlperin/dov/begin-row-groups
ParkMyCar May 20, 2025
d5ea8ea
wire up most of the kv-cache multi-get
ParkMyCar May 20, 2025
fe8f0b3
add bloom filter
ptravers May 20, 2025
bdbd023
add bloom filter serialization
ptravers May 20, 2025
fef968b
Support decoding rowgroups and pass bloom filter up
DAlperin May 21, 2025
5982dfb
Fix datadriven
DAlperin May 21, 2025
11e7940
Merge pull request #6 from DAlperin/dov/rg-with-bloom
ParkMyCar May 21, 2025
4df9869
add a get_range API to the Blob trait
ParkMyCar May 21, 2025
610aedf
small tweaks
ParkMyCar May 21, 2025
6581b91
Add parquet footer support in HollowBatchPart
DAlperin May 21, 2025
e8560d7
Merge pull request #7 from DAlperin/dov/footer-offsets
ParkMyCar May 21, 2025
63e89b8
add serializable bloom filter
ptravers May 21, 2025
1a08392
Merge branch 'hackweek/upsert-in-s3' into hackweek/upsert-in-s3
ptravers May 21, 2025
91e8026
Add a RowGroupsReader which lies about the shape of the underlying ro…
DAlperin May 21, 2025
ff06578
less confusing len
DAlperin May 21, 2025
45375cd
Merge pull request #9 from DAlperin/dov/row-group-chunked-reader
ParkMyCar May 21, 2025
a674fc4
Merge pull request #8 from ptravers/hackweek/upsert-in-s3
ParkMyCar May 21, 2025
ed65bab
flee dep hell
ptravers May 21, 2025
4acb3d7
I'm free
ptravers May 21, 2025
a01ee68
Merge branch 'hackweek/upsert-in-s3' into hackweek/upsert-in-s3
ParkMyCar May 21, 2025
5a15efa
Merge pull request #10 from ptravers/hackweek/upsert-in-s3
ParkMyCar May 21, 2025
26dda0e
some progress on fetching multiple row groups
ParkMyCar May 21, 2025
03b5076
a few build fixes
ParkMyCar May 21, 2025
627d841
tweak the bloomfilter impl to use a new AsParquetBytes trait
ParkMyCar May 21, 2025
6dcc894
re-add original impl of fetch_batch_part_blob so we can do an end-to-…
ParkMyCar May 21, 2025
cdc4ae1
fix Bazel build
ParkMyCar May 21, 2025
dceea28
Support reading bloom filters out of the parquet buffer
DAlperin May 21, 2025
8346023
fix persist doesn't roundtrip state diff
ParkMyCar May 21, 2025
b454cf8
Merge branch 'hackweek/upsert-in-s3' into dov/deser-bloom-filters
DAlperin May 21, 2025
004418c
small bit of progress
ParkMyCar May 21, 2025
8bb6294
fix
DAlperin May 21, 2025
80df518
Merge pull request #11 from DAlperin/dov/deser-bloom-filters
ParkMyCar May 21, 2025
73438e7
add stuff
ptravers May 21, 2025
f3aaff6
fix roundtripping
ParkMyCar May 21, 2025
9e4959c
possibly fix bug
ParkMyCar May 21, 2025
2998df2
change default sizes
DAlperin May 21, 2025
f2f017b
Merge pull request #12 from DAlperin/actually-use-row-groups
ParkMyCar May 21, 2025
efa1465
don't roundtrip metadata through parquet
DAlperin May 21, 2025
954bc1f
Merge pull request #13 from DAlperin/dov/dont-round-trip-metadata-to-…
ParkMyCar May 21, 2025
2f26ae2
add decoding for chunked persist row groups
ParkMyCar May 21, 2025
6a33212
add get_range for FileBlob
ParkMyCar May 21, 2025
20b6b20
actually initialize KeyValueReadHandle, massage API
aljoscha May 20, 2025
fff0462
add proto serialize and deserialize
ptravers May 21, 2025
fc4e87f
Merge remote-tracking branch 'parker/hackweek/upsert-in-s3' into hack…
ptravers May 21, 2025
7180f5e
Merge pull request #14 from ptravers/hackweek/upsert-in-s3
ParkMyCar May 21, 2025
65912e9
turn feedback upsert into "stateless" upsert, initial commit
aljoscha May 20, 2025
07b421d
Merge pull request #15 from aljoscha/hackweek/upsert-in-s3
ParkMyCar May 21, 2025
2439647
wire things together, hopefully
ParkMyCar May 21, 2025
ac42f13
the fixes to make things work
DAlperin May 21, 2025
4d782d7
Apply suggestions from code review
DAlperin May 21, 2025
c959ca5
Merge pull request #16 from DAlperin/dov/make-it-work
ParkMyCar May 21, 2025
362671d
add cache
ParkMyCar May 21, 2025
4e3bc23
add an LRU cache and describe further improvements
ParkMyCar May 22, 2025
6005a84
misc fixes
DAlperin May 22, 2025
e5870c4
Merge pull request #17 from DAlperin/dov/misc-fixes
ParkMyCar May 22, 2025
7285a68
solve over fetching
ParkMyCar May 22, 2025
87412d9
comment out stuff
DAlperin May 22, 2025
04bc8d1
Merge pull request #18 from DAlperin/comment-out-code
ParkMyCar May 22, 2025
cbbbf3b
massage log statements
aljoscha May 22, 2025
5542cbf
re-add feedback UPSERT impl, make configurable via dyncfg
aljoscha May 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,906 changes: 1,038 additions & 868 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ members = [
"src/http-util",
"src/interchange",
"src/kafka-util",
"src/kv-cache",
"src/license-keys",
"src/lowertest",
"src/lowertest-derive",
Expand All @@ -56,7 +57,6 @@ members = [
"src/metrics",
"src/mysql-util",
"src/mz",
"src/mz-debug",
"src/npm",
"src/orchestrator",
"src/orchestrator-kubernetes",
Expand Down Expand Up @@ -177,7 +177,6 @@ default-members = [
"src/metrics",
"src/mysql-util",
"src/mz",
"src/mz-debug",
"src/npm",
"src/orchestrator",
"src/orchestrator-kubernetes",
Expand Down Expand Up @@ -316,6 +315,19 @@ postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres" }
postgres-openssl = { git = "https://github.com/MaterializeInc/rust-postgres" }
postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" }

parquet = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-data = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-buffer = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-cast = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-ipc = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-select = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-string = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-ord = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-arith = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-schema = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }
arrow-array = { git = "https://github.com/ptravers/arrow-rs", branch = "hackweek/upsert-in-s3" }

# Waiting on https://github.com/MaterializeInc/serde-value/pull/35.
serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" }

Expand Down
14 changes: 7 additions & 7 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -502,12 +502,12 @@ crates_repository(
gen_build_script = False,
deps = ["@bzip2"],
)],
"lzma-sys": [crate.annotation(
additive_build_file = "@//misc/bazel/c_deps:rust-sys/BUILD.lzma-sys.bazel",
gen_build_script = False,
# Note: This is a target we add from the additive build file above.
deps = [":xz"],
)],
# "lzma-sys": [crate.annotation(
# additive_build_file = "@//misc/bazel/c_deps:rust-sys/BUILD.lzma-sys.bazel",
# gen_build_script = False,
# # Note: This is a target we add from the additive build file above.
# deps = [":xz"],
# )],
"openssl-sys": [crate.annotation(
build_script_data = [
"@openssl//:openssl_lib",
Expand Down Expand Up @@ -634,6 +634,7 @@ crates_repository(
"//:src/http-util/Cargo.toml",
"//:src/interchange/Cargo.toml",
"//:src/kafka-util/Cargo.toml",
"//:src/kv-cache/Cargo.toml",
"//:src/license-keys/Cargo.toml",
"//:src/lowertest-derive/Cargo.toml",
"//:src/lowertest/Cargo.toml",
Expand All @@ -643,7 +644,6 @@ crates_repository(
"//:src/metrics/Cargo.toml",
"//:src/mysql-util/Cargo.toml",
"//:src/mz/Cargo.toml",
"//:src/mz-debug/Cargo.toml",
"//:src/npm/Cargo.toml",
"//:src/orchestrator-kubernetes/Cargo.toml",
"//:src/orchestrator-process/Cargo.toml",
Expand Down
4 changes: 2 additions & 2 deletions misc/bazel/c_deps/rust-sys/BUILD.protobuf-native.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ cc_library(
"@com_google_absl//absl/strings",
"@com_google_protobuf//src/google/protobuf/compiler:code_generator",
"@com_google_protobuf//src/google/protobuf/compiler:importer",
"@crates_io__cxx-1.0.122//:cxx_cc",
"@crates_io__cxx-1.0.158//:cxx_cc",
],
)

Expand Down Expand Up @@ -82,6 +82,6 @@ cc_library(
include_prefix = "protobuf-native",
deps = [
"@com_google_absl//absl/strings",
"@crates_io__cxx-1.0.122//:cxx_cc",
"@crates_io__cxx-1.0.158//:cxx_cc",
],
)
97 changes: 97 additions & 0 deletions src/kv-cache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Code generated by cargo-gazelle DO NOT EDIT

# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

load("@crates_io//:defs.bzl", "aliases", "all_crate_deps")
load("@rules_rust//cargo:defs.bzl", "extract_cargo_lints")
load("@rules_rust//rust:defs.bzl", "rust_doc_test", "rust_library", "rust_test")

package(default_visibility = ["//visibility:public"])

rust_library(
name = "mz_kv_cache",
srcs = glob(["src/**/*.rs"]),
aliases = aliases(
normal = True,
proc_macro = True,
),
compile_data = [],
crate_features = [],
data = [],
lint_config = ":lints",
proc_macro_deps = [] + all_crate_deps(proc_macro = True),
rustc_env = {},
rustc_flags = [],
version = "0.1.0",
deps = [
"//src/persist-client:mz_persist_client",
"//src/persist-types:mz_persist_types",
"//src/repr:mz_repr",
"//src/storage-types:mz_storage_types",
] + all_crate_deps(normal = True),
)

alias(
name = "kv-cache",
actual = "mz_kv_cache",
)

rust_test(
name = "mz_kv_cache_lib_tests",
size = "medium",
aliases = aliases(
normal = True,
normal_dev = True,
proc_macro = True,
proc_macro_dev = True,
),
compile_data = [],
crate = ":mz_kv_cache",
crate_features = [],
data = [],
env = {},
lint_config = ":lints",
proc_macro_deps = [] + all_crate_deps(
proc_macro = True,
proc_macro_dev = True,
),
rustc_env = {},
rustc_flags = [],
version = "0.1.0",
deps = [
"//src/persist-client:mz_persist_client",
"//src/persist-types:mz_persist_types",
"//src/repr:mz_repr",
"//src/storage-types:mz_storage_types",
] + all_crate_deps(
normal = True,
normal_dev = True,
),
)

rust_doc_test(
name = "mz_kv_cache_doc_test",
crate = ":mz_kv_cache",
deps = [
"//src/persist-client:mz_persist_client",
"//src/persist-types:mz_persist_types",
"//src/repr:mz_repr",
"//src/storage-types:mz_storage_types",
] + all_crate_deps(
normal = True,
normal_dev = True,
),
)

extract_cargo_lints(
name = "lints",
manifest = "Cargo.toml",
workspace = "@//:Cargo.toml",
)
18 changes: 18 additions & 0 deletions src/kv-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "mz-kv-cache"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true

[dependencies]
anyhow = "1.0.98"
differential-dataflow = "0.15.1"
mz-persist-types = { path = "../persist-types" }
mz-persist-client = { path = "../persist-client" }
mz-repr = { path = "../repr" }
mz-storage-types = { path = "../storage-types" }
timely = "0.21.0"
tracing = "0.1.37"

[lints]
workspace = true
170 changes: 170 additions & 0 deletions src/kv-cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::collections::BTreeSet;

use differential_dataflow::lattice::Lattice;
use mz_persist_client::lru::Lru;
use mz_persist_client::read::ReadHandle;
use mz_persist_types::Codec64;
use mz_persist_types::bloom_filter::BloomFilter;
use mz_repr::{DatumVec, Row, TimestampManipulation};
use mz_storage_types::StorageDiff;
use mz_storage_types::sources::SourceData;
use timely::progress::{Antichain, Timestamp};

/// TODO(upsert-in-persist), make this configurable.
///
/// 20 MiB
const LRU_CACHE_SIZE: usize = 20 * 1024 * 1024;

pub struct KeyValueReadHandle<T> {
handle: ReadHandle<SourceData, (), T, StorageDiff>,
cache: Lru<Row, Row>,
cache_upper: T,
}

impl<T> KeyValueReadHandle<T>
where
T: Timestamp + Lattice + Codec64 + Sync + TimestampManipulation,
{
pub fn new(handle: ReadHandle<SourceData, (), T, StorageDiff>) -> Self {
Self {
handle,
cache: Lru::new(LRU_CACHE_SIZE, move |_, _, _| {}),
cache_upper: T::minimum(),
}
}

pub async fn get_multi(
&mut self,
key_columns: &[usize],
keys: Vec<Row>,
ts: T,
) -> Vec<(Row, Row)> {
// Check the cache first.
// let mut cached_values = Vec::new();
// let mut obtained_keys = BTreeSet::new();
// for key in &keys {
// if let Some((_key, cached_val)) = self.cache.get(key) {
// tracing::info!(?key, ?ts, ?self.cache_upper, "found in cache");
// // if self.cache_upper.step_back().unwrap_or(T::minimum()) == ts {
// if self.cache_upper == ts {
// tracing::info!(?key, ?ts, "returning cached value");
// cached_values.push((key.clone(), cached_val.clone()));
// obtained_keys.insert(key.clone());
// }
// }
// }

// // Skip querying for keys that have already been obtained from the cache.
// let keys: Vec<_> = keys
// .into_iter()
// .filter(|key| !obtained_keys.contains(key))
// .collect();

// // If there isn't anything else to obtain then return early!
// if keys.is_empty() {
// // return Vec::new();
// return cached_values;
// }

let as_of = Antichain::from_elem(ts);
let batch_parts = self.handle.snapshot(as_of).await.expect("OH NO");
assert_eq!(key_columns.len(), 1, "support composite keys");
let key_col = key_columns[0];

// We should fetch a RowGroup in a Part if it contains any of our keys.
let mut datum_vec_a = DatumVec::new();
let mut encode_buffer = Vec::new();
let mut should_fetch = |bloom_filter: &BloomFilter| {
keys.iter().any(|row| {
let datums = datum_vec_a.borrow_with(row);
assert_eq!(datums.len(), 1, "composite keys");
let key = datums[0];
let contains = bloom_filter.contains(key, &mut encode_buffer);
// let contains = true;
if contains {
tracing::info!("matched bloom filter for key {key}");
} else {
tracing::info!("did not match bloom filter for key {key}");
}
contains
})
};

let mut filtered_values = Vec::new();
let mut datum_vec_b = DatumVec::new();
let mut datum_vec_c = DatumVec::new();

// TODO(upsert-in-persist)
//
// There are two more things we can do to make this faster:
// 1. Check the statistics for the primary key column on each `Part`
// before even looking at the bloom filters.
// 2. Sort the list of `Part`s by their upper in a descending order.
// If a key exists in an upsert source it is guaranteed to have a
// diff of 1 or -1, meaning the latest value for a key is the
// correct value. There is no need to scan for all previous
// instances of the key and then consolidate.

for part in batch_parts {
// Check if this part could possibly match any of the bloom filters.
// let could_match = if let Some(mut bloom_filters) = part.pkey_bloom_filters() {
// bloom_filters.any(|filter| should_fetch(filter))
// } else {
// // If there are no bloom filters then we could always match.
// true
// };

// If we can't possibly match this part then don't fetch it.
// if !could_match {
// continue;
// }

let values = self.handle.fetch_values(&part, &mut should_fetch).await;
for ((source_data, _unit_type), _ts, diff) in values {
let source_data = source_data.expect("HACK WEEK");
let candidate_row = source_data.0.expect("HACK WEEK");

let maybe_matching_key = {
let candidate_datums = datum_vec_b.borrow_with(&candidate_row);
keys.iter().find(|wanted_row| {
let wanted_datums = datum_vec_c.borrow_with(wanted_row);
assert_eq!(wanted_datums.len(), 1, "composite keys");
wanted_datums[0] == candidate_datums[key_col]
})
};
if let Some(matching_key) = maybe_matching_key {
tracing::debug!(?matching_key, ?candidate_row, ?diff, "found matching key");
filtered_values.push(((matching_key.clone(), candidate_row), diff));
}
}
}

differential_dataflow::consolidation::consolidate(&mut filtered_values);
if !filtered_values.iter().all(|(_x, diff)| *diff == 1) {
tracing::warn!("filtered values: {filtered_values:?}");
}
// filtered_values.iter().all(|(_x, diff)| *diff == 1);

filtered_values
.into_iter()
.map(|(payload, _diff)| payload)
// .chain(cached_values)
.collect()
}

pub fn apply_changes(&mut self, mut changes: Vec<((Row, Row), T, StorageDiff)>, upper: T) {
changes.sort_by(|a, b| (a.1.clone(), a.2).cmp(&(b.1.clone(), b.2)));
self.cache_upper = upper;

for ((key, val), _ts, diff) in changes {
if diff == 1 {
let weight = val.byte_len();
self.cache.insert(key, val, weight);
} else if diff == -1 {
self.cache.remove(&key);
} else {
panic!("unexpected diff value {diff}");
}
}
}
}
Loading