Skip to content

Commit ec2b183

Browse files
authored
Merge pull request #19848 from vmarcos/ahash_consolidate
Avoid divergence of hash state in consolidation
2 parents 2ab50b3 + 2031fc1 commit ec2b183

File tree

3 files changed

+144
-22
lines changed

3 files changed

+144
-22
lines changed

src/compute/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ rust-version.workspace = true
77
publish = false
88

99
[dependencies]
10-
ahash = "0.8.0"
10+
ahash = { version = "0.8.0", default_features = false }
1111
anyhow = "1.0.66"
1212
async-trait = "0.1.68"
1313
bytesize = "1.1.0"

src/compute/src/extensions/collection.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,26 +60,7 @@ where
6060
Arranged<G, TraceAgent<Tr>>: ArrangementSize,
6161
{
6262
if must_consolidate {
63-
// We employ AHash below instead of the default hasher in DD to obtain
64-
// a better distribution of data to workers. AHash claims empirically
65-
// both speed and high quality, according to
66-
// https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
67-
// TODO(vmarcos): Consider here if it is worth it to spend the time to
68-
// implement twisted tabulation hashing as proposed in Mihai Patrascu,
69-
// Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
70-
// at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
71-
// would provide good bounds for balls-into-bins problems when the number of
72-
// bins is small (as is our case), so we'd have a theoretical guarantee.
73-
let random_state = ahash::RandomState::new();
74-
let mut h = random_state.build_hasher();
75-
let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
76-
let data = &(update.0).0;
77-
data.hash(&mut h);
78-
h.finish()
79-
});
80-
self.map(|k| (k, ()))
81-
.mz_arrange_core::<_, Tr>(exchange, name)
82-
.as_collection(|d: &D1, _| d.clone())
63+
self.mz_consolidate(name)
8364
} else {
8465
self.clone()
8566
}
@@ -91,8 +72,36 @@ where
9172
Tr::Batch: Batch,
9273
Arranged<G, TraceAgent<Tr>>: ArrangementSize,
9374
{
75+
// We employ AHash below instead of the default hasher in DD to obtain
76+
// a better distribution of data to workers. AHash claims empirically
77+
// both speed and high quality, according to
78+
// https://github.com/tkaitchuck/aHash/blob/master/compare/readme.md.
79+
// TODO(vmarcos): Consider here if it is worth it to spend the time to
80+
// implement twisted tabulation hashing as proposed in Mihai Patrascu,
81+
// Mikkel Thorup: Twisted Tabulation Hashing. SODA 2013: 209-228, available
82+
// at https://epubs.siam.org/doi/epdf/10.1137/1.9781611973105.16. The latter
83+
// would provide good bounds for balls-into-bins problems when the number of
84+
// bins is small (as is our case), so we'd have a theoretical guarantee.
85+
// NOTE: We fix the seeds of a RandomState instance explicity with the same
86+
// seeds that would be given by `AHash` via ahash::AHasher::default() so as
87+
// to avoid a different selection due to compile-time features being differently
88+
// selected in other dependencies using `AHash` vis-à-vis cargo's strategy
89+
// of unioning features. This implies that we end up employ the fallback
90+
// hasher of `AHash`, but it should be sufficient for our needs.
91+
let random_state = ahash::RandomState::with_seeds(
92+
0x243f_6a88_85a3_08d3,
93+
0x1319_8a2e_0370_7344,
94+
0xa409_3822_299f_31d0,
95+
0x082e_fa98_ec4e_6c89,
96+
);
97+
let exchange = Exchange::new(move |update: &((D1, _), G::Timestamp, R)| {
98+
let data = &(update.0).0;
99+
let mut h = random_state.build_hasher();
100+
data.hash(&mut h);
101+
h.finish()
102+
});
94103
self.map(|k| (k, ()))
95-
.mz_arrange::<Tr>(name)
104+
.mz_arrange_core::<_, Tr>(exchange, name)
96105
.as_collection(|d: &D1, _| d.clone())
97106
}
98107
}

test/cluster/mzcompose.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
6363
"test-github-17177",
6464
"test-github-17510",
6565
"test-github-17509",
66+
"test-github-19610",
6667
"test-remote-storage",
6768
"test-drop-default-cluster",
6869
"test-upsert",
@@ -973,6 +974,118 @@ def workflow_test_github_17509(c: Composition) -> None:
973974
assert "Negative accumulation in ReduceMinsMaxes" not in c1.stdout
974975

975976

977+
def workflow_test_github_19610(c: Composition) -> None:
978+
"""
979+
Test that a monotonic one-shot SELECT will perform consolidation without error on valid data.
980+
We introduce data that results in a multiset and compute min/max. In a monotonic one-shot
981+
evaluation strategy, we must consolidate and subsequently assert monotonicity.
982+
983+
This is a regression test for https://github.com/MaterializeInc/materialize/issues/19610, where
984+
we observed a performance regression caused by a correctness issue. Here, we validate that the
985+
underlying correctness issue has been fixed.
986+
"""
987+
988+
c.down(destroy_volumes=True)
989+
with c.override(
990+
Clusterd(
991+
name="clusterd_nopanic",
992+
environment_extra=[
993+
"MZ_PERSIST_COMPACTION_DISABLED=true",
994+
],
995+
),
996+
Testdrive(no_reset=True),
997+
):
998+
c.up("testdrive", persistent=True)
999+
c.up("materialized")
1000+
c.up("clusterd_nopanic")
1001+
1002+
c.sql(
1003+
"ALTER SYSTEM SET enable_unmanaged_cluster_replicas = true;",
1004+
port=6877,
1005+
user="mz_system",
1006+
)
1007+
1008+
c.sql(
1009+
"ALTER SYSTEM SET enable_repeat_row = true;",
1010+
port=6877,
1011+
user="mz_system",
1012+
)
1013+
1014+
c.sql(
1015+
"ALTER SYSTEM SET enable_monotonic_oneshot_selects = true;",
1016+
port=6877,
1017+
user="mz_system",
1018+
)
1019+
1020+
# set up a test cluster and run a testdrive regression script
1021+
c.sql(
1022+
"""
1023+
CREATE CLUSTER cluster1 REPLICAS (
1024+
r1 (
1025+
STORAGECTL ADDRESSES ['clusterd_nopanic:2100'],
1026+
STORAGE ADDRESSES ['clusterd_nopanic:2103'],
1027+
COMPUTECTL ADDRESSES ['clusterd_nopanic:2101'],
1028+
COMPUTE ADDRESSES ['clusterd_nopanic:2102'],
1029+
WORKERS 4
1030+
)
1031+
);
1032+
-- Set data for test up.
1033+
SET cluster = cluster1;
1034+
CREATE TABLE base (data bigint, diff bigint);
1035+
CREATE MATERIALIZED VIEW data AS SELECT data FROM base, repeat_row(diff);
1036+
INSERT INTO base VALUES (1, 6);
1037+
INSERT INTO base VALUES (1, -3), (1, -2);
1038+
INSERT INTO base VALUES (2, 3), (2, 2);
1039+
INSERT INTO base VALUES (2, -1), (2, -1);
1040+
INSERT INTO base VALUES (3, 3), (3, 2);
1041+
INSERT INTO base VALUES (3, -3), (3, -2);
1042+
INSERT INTO base VALUES (4, 1), (4, 2);
1043+
INSERT INTO base VALUES (4, -1), (4, -2);
1044+
INSERT INTO base VALUES (5, 5), (5, 6);
1045+
INSERT INTO base VALUES (5, -5), (5, -6);
1046+
"""
1047+
)
1048+
c.testdrive(
1049+
dedent(
1050+
"""
1051+
> SET cluster = cluster1;
1052+
1053+
# Computing min/max with a monotonic one-shot SELECT requires
1054+
# consolidation. We test here that consolidation works correctly,
1055+
# since we assert monotonicity right after consolidating.
1056+
# Note that we employ a cursor to avoid testdrive retries.
1057+
# Hash functions used for exchanges in consolidation may be
1058+
# nondeterministic and produce the correct output by chance.
1059+
> BEGIN
1060+
> DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
1061+
> FETCH ALL cur;
1062+
1 2
1063+
> COMMIT;
1064+
1065+
# To reduce the chance of a (un)lucky strike of the hash function,
1066+
# let's do the same a few times.
1067+
> BEGIN
1068+
> DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
1069+
> FETCH ALL cur;
1070+
1 2
1071+
> COMMIT;
1072+
1073+
> BEGIN
1074+
> DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
1075+
> FETCH ALL cur;
1076+
1 2
1077+
> COMMIT;
1078+
1079+
> BEGIN
1080+
> DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
1081+
> FETCH ALL cur;
1082+
1 2
1083+
> COMMIT;
1084+
"""
1085+
)
1086+
)
1087+
1088+
9761089
def workflow_test_upsert(c: Composition) -> None:
9771090
"""Test creating upsert sources and continuing to ingest them after a restart."""
9781091
with c.override(

0 commit comments

Comments
 (0)