Skip to content

Commit f152d8f

Browse files
committed
Columnar merge batcher
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 594bf11 commit f152d8f

File tree

32 files changed

+531
-73
lines changed

32 files changed

+531
-73
lines changed

Cargo.lock

Lines changed: 20 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ bytesize = "1.3.0"
1818
chrono = { version = "0.4.39", default-features = false, features = ["std"] }
1919
dec = "0.4.8"
2020
derivative = "2.2.0"
21-
differential-dataflow = "0.15.1"
21+
differential-dataflow = "0.15.2"
2222
enum-kinds = "0.5.1"
2323
fail = { version = "0.5.1", features = ["failpoints"] }
2424
futures = "0.3.31"

src/catalog/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ bytesize = "1.3.0"
1818
chrono = { version = "0.4.39", default-features = false, features = ["std"] }
1919
clap = { version = "4.5.23", features = ["derive"] }
2020
derivative = "2.2.0"
21-
differential-dataflow = "0.15.1"
21+
differential-dataflow = "0.15.2"
2222
futures = "0.3.31"
2323
ipnet = "2.11.0"
2424
itertools = "0.14.0"

src/cluster/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ workspace = true
1313
anyhow = "1.0.98"
1414
async-trait = "0.1.88"
1515
crossbeam-channel = "0.5.15"
16-
differential-dataflow = "0.15.1"
16+
differential-dataflow = "0.15.2"
1717
futures = "0.3.31"
1818
lgalloc = "0.5.0"
1919
mz-cluster-client = { path = "../cluster-client" }

src/compute-client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ bytesize = "1.3.0"
1616
chrono = { version = "0.4.39", default-features = false, features = ["std"] }
1717
crossbeam-channel = "0.5.15"
1818
derivative = "2.2.0"
19-
differential-dataflow = "0.15.1"
19+
differential-dataflow = "0.15.2"
2020
futures = "0.3.31"
2121
http = "1.2.0"
2222
mz-build-info = { path = "../build-info" }

src/compute-types/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ workspace = true
1212
[dependencies]
1313
columnar = "0.5.0"
1414
columnation = "0.1.0"
15-
differential-dataflow = "0.15.1"
15+
differential-dataflow = "0.15.2"
1616
itertools = "0.14.0"
1717
mz-dyncfg = { path = "../dyncfg" }
1818
mz-expr = { path = "../expr" }

src/compute/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ bytesize = "1.3.0"
1616
columnar = "0.5.0"
1717
crossbeam-channel = "0.5.15"
1818
dec = { version = "0.4.8", features = ["serde"] }
19-
differential-dataflow = "0.15.1"
20-
differential-dogs3 = "0.1.12"
19+
differential-dataflow = "0.15.2"
20+
differential-dogs3 = "0.1.13"
2121
futures = "0.3.31"
2222
itertools = "0.14.0"
2323
lgalloc = "0.5"

src/compute/src/logging/differential.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::logging::{
3636
DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
3737
consolidate_and_pack,
3838
};
39-
use crate::row_spine::RowRowBuilder;
39+
use crate::row_spine::RowRowBuilderColumn;
4040
use crate::typedefs::{KeyBatcher, RowRowSpine};
4141

4242
/// The return type of [`construct`].
@@ -169,7 +169,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
169169
let variant = LogVariant::Differential(variant);
170170
if config.index_logs.contains_key(&variant) {
171171
let trace = collection
172-
.mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
172+
.mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilderColumn<_, _>, RowRowSpine<_, _>>(
173173
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>),
174174
&format!("Arrange {variant:?}"),
175175
)

src/compute/src/logging/reachability.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::convert::TryInto;
1414
use std::rc::Rc;
1515
use std::time::Duration;
1616

17+
use columnar::Columnar;
1718
use mz_compute_client::logging::LoggingConfig;
1819
use mz_ore::cast::CastFrom;
1920
use mz_repr::{Datum, Diff, Row, Timestamp};
@@ -26,7 +27,7 @@ use timely::dataflow::channels::pact::ExchangeCore;
2627
use crate::extensions::arrange::MzArrangeCore;
2728
use crate::logging::initialize::ReachabilityEvent;
2829
use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog, consolidate_and_pack};
29-
use crate::row_spine::RowRowBuilder;
30+
use crate::row_spine::RowRowBuilderColumn;
3031
use crate::typedefs::RowRowSpine;
3132

3233
/// The return type of [`construct`].
@@ -83,14 +84,14 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
8384
TimelyLog::Reachability,
8485
move |((datum, ()), time, diff), packer, session| {
8586
let (update_type, operator_id, source, port, ts) = datum;
86-
let update_type = if *update_type { "source" } else { "target" };
87+
let update_type = if update_type { "source" } else { "target" };
8788
let data = packer.pack_slice(&[
88-
Datum::UInt64(u64::cast_from(*operator_id)),
89+
Datum::UInt64(u64::cast_from(operator_id)),
8990
Datum::UInt64(u64::cast_from(worker_id)),
90-
Datum::UInt64(u64::cast_from(*source)),
91-
Datum::UInt64(u64::cast_from(*port)),
91+
Datum::UInt64(u64::cast_from(source)),
92+
Datum::UInt64(u64::cast_from(port)),
9293
Datum::String(update_type),
93-
Datum::from(*ts),
94+
Datum::from(Timestamp::into_owned(ts)),
9495
]);
9596
session.give((data, time, diff));
9697
}
@@ -100,7 +101,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
100101
for variant in logs_active {
101102
if config.index_logs.contains_key(&variant) {
102103
let trace = updates
103-
.mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
104+
.mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilderColumn<_, _>, RowRowSpine<_, _>>(
104105
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
105106
&format!("Arrange {variant:?}"),
106107
)

src/compute/src/logging/timely.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::extensions::arrange::MzArrangeCore;
3939
use crate::logging::compute::{ComputeEvent, DataflowShutdown};
4040
use crate::logging::{EventQueue, LogVariant, TimelyLog};
4141
use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
42-
use crate::row_spine::RowRowBuilder;
42+
use crate::row_spine::RowRowBuilderColumn;
4343
use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
4444

4545
/// The return type of [`construct`].
@@ -312,7 +312,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
312312
let variant = LogVariant::Timely(variant);
313313
if config.index_logs.contains_key(&variant) {
314314
let trace = collection
315-
.mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
315+
.mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilderColumn<_, _>, RowRowSpine<_, _>>(
316316
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>),
317317
&format!("Arrange {variant:?}"),
318318
)

src/compute/src/render/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::compute_state::{ComputeState, HydrationEvent};
4848
use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
4949
use crate::render::errors::ErrorLogger;
5050
use crate::render::{LinearJoinSpec, RenderTimestamp};
51-
use crate::row_spine::{DatumSeq, RowRowBuilder};
51+
use crate::row_spine::{DatumSeq, RowRowBuilderColumn};
5252
use crate::typedefs::{
5353
ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
5454
};
@@ -942,7 +942,7 @@ where
942942
},
943943
);
944944
let oks = oks
945-
.mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
945+
.mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilderColumn<_, _>, RowRowSpine<_, _>>(
946946
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
947947
);
948948
(oks, errs.as_collection())

src/compute/src/render/join/linear_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::extensions::arrange::MzArrangeCore;
3939
use crate::render::RenderTimestamp;
4040
use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};
4141
use crate::render::join::mz_join_core::mz_join_core;
42-
use crate::row_spine::{RowRowBuilder, RowRowSpine};
42+
use crate::row_spine::{RowRowBuilderColumn, RowRowSpine};
4343
use crate::typedefs::{RowRowAgent, RowRowEnter};
4444

4545
/// Available linear join implementations.
@@ -400,7 +400,7 @@ where
400400
errors.push(errs.as_collection());
401401

402402
let arranged = keyed
403-
.mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
403+
.mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilderColumn<_, _>, RowRowSpine<_, _>>(
404404
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
405405
);
406406
joined = JoinedFlavor::Local(arranged);

src/compute/src/row_spine.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ pub use self::container::DatumContainer;
1111
pub use self::container::DatumSeq;
1212
pub use self::offset_opt::OffsetOptimized;
1313
pub use self::spines::{
14-
RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15-
RowValBuilder, RowValSpine,
14+
RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowBuilderColumn, RowRowSpine,
15+
RowSpine, RowValBatcher, RowValBuilder, RowValSpine,
1616
};
1717
use differential_dataflow::trace::implementations::OffsetList;
1818

@@ -28,6 +28,7 @@ mod spines {
2828
use differential_dataflow::trace::implementations::spine_fueled::Spine;
2929
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
3030
use mz_repr::Row;
31+
use mz_timely_util::containers::ColumnValBuilder;
3132

3233
use crate::row_spine::{DatumContainer, OffsetOptimized};
3334
use crate::typedefs::{KeyBatcher, KeyValBatcher};
@@ -36,6 +37,8 @@ mod spines {
3637
pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
3738
pub type RowRowBuilder<T, R> =
3839
RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
40+
pub type RowRowBuilderColumn<T, R> =
41+
RcBuilder<ColumnValBuilder<RowRowLayout<((Row, Row), T, R)>>>;
3942

4043
pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
4144
pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;

src/durable-cache/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ workspace = true
1212
[dependencies]
1313
async-trait = "0.1.88"
1414
bytes = { version = "1.10.1" }
15-
differential-dataflow = "0.15.1"
15+
differential-dataflow = "0.15.2"
1616
futures = "0.3.31"
1717
itertools = { version = "0.14.0" }
1818
mz-ore = { path = "../ore", features = ["process"] }

src/expr/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ chrono = { version = "0.4.39", default-features = false, features = ["std"] }
2626
chrono-tz = { version = "0.8.1", features = ["serde", "case-insensitive"] }
2727
crc32fast = "1.4.2"
2828
csv = "1.3.1"
29-
differential-dataflow = "0.15.1"
29+
differential-dataflow = "0.15.2"
3030
dec = "0.4.8"
3131
derivative = "2.2.0"
3232
encoding = "0.2.0"

src/interchange/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ byteorder = "1.4.3"
2020
bytes = "1.10.1"
2121
chrono = { version = "0.4.39", default-features = false, features = ["std"] }
2222
clap = { version = "4.5.23", features = ["derive"] }
23-
differential-dataflow = "0.15.1"
23+
differential-dataflow = "0.15.2"
2424
itertools = "0.14.0"
2525
maplit = "1.0.2"
2626
mz-avro = { path = "../avro", features = ["snappy"] }

src/ore/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ columnation = { version = "0.1.0", optional = true }
2929
columnar = { version = "0.5.0", optional = true }
3030
compact_bytes = { version = "0.1.4", optional = true }
3131
ctor = { version = "0.4.2", optional = true }
32-
differential-dataflow = { version = "0.15.1", optional = true }
32+
differential-dataflow = { version = "0.15.2", optional = true }
3333
derivative = { version = "2.2.0" }
3434
either = "1.15.0"
3535
futures = { version = "0.3.31", optional = true }

src/persist-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async-trait = "0.1.88"
2323
axum = "0.7.5"
2424
bytes = { version = "1.10.1", features = ["serde"] }
2525
clap = { version = "4.5.23", features = ["derive", "env"] }
26-
differential-dataflow = "0.15.1"
26+
differential-dataflow = "0.15.2"
2727
futures = "0.3.31"
2828
humantime = "2.2.0"
2929
mz-dyncfg = { path = "../dyncfg" }

src/persist-client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async-stream = "0.3.6"
3535
async-trait = "0.1.88"
3636
bytes = { version = "1.10.1", features = ["serde"] }
3737
clap = { version = "4.5.23", features = ["derive"] }
38-
differential-dataflow = "0.15.1"
38+
differential-dataflow = "0.15.2"
3939
futures = "0.3.31"
4040
futures-util = "0.3"
4141
h2 = "0.4.10"

src/persist/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ azure_core = "0.21.0"
3737
base64 = "0.22.1"
3838
bytes = "1.10.1"
3939
deadpool-postgres = "0.10.3"
40-
differential-dataflow = "0.15.1"
40+
differential-dataflow = "0.15.2"
4141
fail = { version = "0.5.1", features = ["failpoints"] }
4242
futures-util = "0.3.31"
4343
itertools = "0.14.0"

src/repr/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ columnation = "0.1.0"
3636
chrono = { version = "0.4.39", default-features = false, features = ["serde", "std"] }
3737
compact_bytes = "0.1.4"
3838
dec = "0.4.8"
39-
differential-dataflow = "0.15.1"
39+
differential-dataflow = "0.15.2"
4040
enum-kinds = "0.5.1"
4141
hex = "0.4.3"
4242
itertools = "0.14.0"
@@ -79,6 +79,7 @@ tracing = { version = "0.1.37" }
7979
# for the tracing feature
8080
tracing-subscriber = { version = "0.3.19", default-features = false, optional = true }
8181
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
82+
bytemuck = { version = "1.23.0", features = ["derive"] }
8283

8384
[dev-dependencies]
8485
criterion = { version = "0.6.0" }

0 commit comments

Comments
 (0)