Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51b9b5e
test: deduplicate workflow_default boilerplate via Composition.run_al…
def- Feb 15, 2026
c5f8034
test: add exclude and shard parameters to Composition.run_all_workflo…
def- Feb 15, 2026
db3240a
test: deduplicate CDC workflow_default via run_all_workflows() parame…
def- Feb 15, 2026
f3c5580
test: extract test file globbing+sharding into Composition.glob_test_…
def- Feb 15, 2026
1ff0de8
test: extract scenario runner boilerplate into Composition.shard_and_…
def- Feb 15, 2026
e07b636
test: extract shared CDC helpers into mysql_util.py and postgres_util.py
def- Feb 15, 2026
b3f8171
test: deduplicate mysql-cdc-resumption workflow_default via run_all_w…
def- Feb 15, 2026
b45ecaa
test: extract default testdrive size args into Materialized.default_t…
def- Feb 15, 2026
ef9b617
test: deduplicate resumption test workflow_default via run_all_workfl…
def- Feb 15, 2026
e761dcc
test: extract mysql-cdc-resumption shared helpers/scenarios into mysq…
def- Feb 15, 2026
c3a06be
test: extract pg-cdc-resumption shared helpers/scenarios into pg_cdc_…
def- Feb 15, 2026
666de59
test: extract SSH setup/authorize helpers in ssh-connection tests
def- Feb 15, 2026
eb8863e
test: use MySql/SqlServer.default_testdrive_args() across test files
def- Feb 15, 2026
3ba6a2b
test: extract replication slot helpers from pg-cdc tests into postgre…
def- Feb 15, 2026
a2060ec
test: extract shared mysql-cdc workflow functions into mysql_cdc.py
def- Feb 15, 2026
0d78e53
test: extract shared pg-cdc helpers into pg_cdc.py and move get_testd…
def- Feb 15, 2026
8431385
test: deduplicate KAFKA_SCHEMA and remove_target_cluster_from_explain…
def- Feb 15, 2026
e0454d1
test: extract workflow_cdc, workflow_replication_slots, workflow_sile…
def- Feb 15, 2026
dd80697
test: extract enable_unorchestrated_cluster_replicas() into Composition
def- Feb 15, 2026
d085f92
test: add Composition.alter_system_set() to replace ALTER SYSTEM SET …
def- Feb 15, 2026
9522362
test: add Composition.recreate_quickstart_cluster() and replace lefto…
def- Feb 17, 2026
66e9a53
test: add Composition.restart_mz() to replace kill+up boilerplate
def- Feb 17, 2026
041bf2c
test: add Composition.sql_as_mz_system() to replace port=6877/user=mz…
def- Feb 17, 2026
4117d8b
test: extract SSH bastion failure/recovery helpers in ssh-connection
def- Feb 18, 2026
951fb0b
test: add Clusterd.replica_addresses() to replace inline address boil…
def- Feb 18, 2026
256b45e
test: extract workflow_many_inserts into shared mysql_cdc.py module
def- Feb 18, 2026
c23dce6
test: extract repeated mz_new override and promote helpers in 0dt tests
def- Feb 18, 2026
9698b6f
test: extract shared backup-restore workflow into backup_restore.py
def- Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/test/lint-main/checks/check-mzcompose-files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ check_default_workflow_references_others() {
if (( MATCHES_COUNT > 1 )); then
# mzcompose file contains more than one workflow

LOOP_DETECTED=$(grep "c.workflow(name" "$file" -c)
LOOP_DETECTED=$(grep -c -E "c\.workflow\(name|c\.run_all_workflows\(" "$file")

if (( LOOP_DETECTED < 1 )); then
echo "$file contains more than one workflow but does not seem to loop over the workflows"
Expand Down
71 changes: 71 additions & 0 deletions misc/python/materialize/backup_restore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Shared backup & restore test logic for CockroachDB and Postgres metadata stores."""

from textwrap import dedent

from materialize.mzcompose.composition import Composition, Service


def workflow_default(c: Composition) -> None:
"""Basic Backup & Restore test: create a table, back up, insert more, restore, verify."""

# Enable versioning for the Persist bucket
c.enable_minio_versioning()

# Start Materialize, and set up some basic state in it
c.up("materialized", Service("testdrive", idle=True))
c.testdrive(
dedent(
"""
> DROP TABLE IF EXISTS numbers;
> CREATE TABLE IF NOT EXISTS numbers (id BIGINT);
> INSERT INTO numbers SELECT * from generate_series(1, 1);
> INSERT INTO numbers SELECT * from generate_series(1, 10);
> INSERT INTO numbers SELECT * from generate_series(1, 100);
"""
)
)

c.backup()

# Make further updates to Materialize's state
for i in range(0, 100):
# TODO: This seems to be enough to produce interesting shard state;
# ie. if we remove the restore-blob step we can see the restore fail.
# Is there any cheaper or more obvious way to do that?
c.testdrive(
dedent(
"""
> INSERT INTO numbers SELECT * from generate_series(1, 1);
> INSERT INTO numbers SELECT * from generate_series(1, 10);
> INSERT INTO numbers SELECT * from generate_series(1, 100);
"""
)
)

# Restore from backup, run persistcli restore-blob and restart Mz
c.restore()

# Confirm that the database is readable / has shard data
c.exec_metadata_store_sql(
"SELECT shard, min(sequence_number), max(sequence_number) "
"FROM consensus.consensus GROUP BY 1 ORDER BY 2 DESC, 3 DESC, 1 ASC LIMIT 32;"
)

# Check that the cluster is up and that it answers queries as of the old state
c.testdrive(
dedent(
"""
> SELECT count(*) FROM numbers;
111
"""
)
)
10 changes: 3 additions & 7 deletions misc/python/materialize/checks/all_checks/alter_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.common import KAFKA_SCHEMA
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


def schema() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


class SshChange(Enum):
ADD_SSH = 1
DROP_SSH = 2
Expand Down Expand Up @@ -50,7 +46,7 @@ def initialize(self) -> Testdrive:
i = self.index

return Testdrive(
schema()
KAFKA_SCHEMA
+ dedent(
f"""
$ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
Expand Down Expand Up @@ -95,7 +91,7 @@ def manipulate(self) -> list[Testdrive]:
i = self.index

return [
Testdrive(schema() + dedent(s))
Testdrive(KAFKA_SCHEMA + dedent(s))
for s in [
f"""
$ kafka-ingest topic=alter-connection-{i}a format=bytes
Expand Down
10 changes: 3 additions & 7 deletions misc/python/materialize/checks/all_checks/alter_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,14 @@

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD


def schema() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
from materialize.checks.common import KAFKA_SCHEMA


@externally_idempotent(False)
class AlterIndex(Check):
def initialize(self) -> Testdrive:
return Testdrive(
schema()
KAFKA_SCHEMA
+ dedent(
"""
> CREATE TABLE alter_index_table (f1 STRING);
Expand All @@ -46,7 +42,7 @@ def initialize(self) -> Testdrive:

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schema() + dedent(s))
Testdrive(KAFKA_SCHEMA + dedent(s))
for s in [
"""
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
Expand Down
18 changes: 7 additions & 11 deletions misc/python/materialize/checks/all_checks/continual_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.common import KAFKA_SCHEMA
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


class AuditLogCT(Check):
"""Continual Task for audit logging"""

Expand All @@ -27,7 +23,7 @@ def _can_run(self, e: Executor) -> bool:

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
KAFKA_SCHEMA
+ dedent(
"""
> CREATE TABLE t_input (key INT);
Expand All @@ -42,7 +38,7 @@ def initialize(self) -> Testdrive:

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
Testdrive(KAFKA_SCHEMA + dedent(s))
for s in [
"""
> INSERT INTO t_input VALUES (2), (3);
Expand Down Expand Up @@ -74,7 +70,7 @@ def _can_run(self, e: Executor) -> bool:

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
KAFKA_SCHEMA
+ dedent(
"""
> CREATE TABLE big (key INT);
Expand All @@ -94,7 +90,7 @@ def initialize(self) -> Testdrive:

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
Testdrive(KAFKA_SCHEMA + dedent(s))
for s in [
"""
> UPDATE small SET val = 'v' || val;
Expand Down Expand Up @@ -140,7 +136,7 @@ def _can_run(self, e: Executor) -> bool:

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
KAFKA_SCHEMA
+ dedent(
"""
> CREATE TABLE append_only (key INT, val INT);
Expand All @@ -155,7 +151,7 @@ def initialize(self) -> Testdrive:

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
Testdrive(KAFKA_SCHEMA + dedent(s))
for s in [
"""
> INSERT INTO append_only VALUES (1, 3), (2, 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import re
from textwrap import dedent

from materialize.checks.actions import Testdrive
Expand Down Expand Up @@ -135,7 +134,3 @@ def validate(self) -> Testdrive:
)

return Testdrive(sql)


def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
8 changes: 2 additions & 6 deletions misc/python/materialize/checks/all_checks/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.common import KAFKA_SCHEMA
from materialize.mz_version import MzVersion
from materialize.util import naughty_strings

Expand All @@ -34,10 +34,6 @@ def sq(ident: str) -> Any:
return literal(ident)


def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


def cluster() -> str:
return "> CREATE CLUSTER identifiers SIZE 'scale=1,workers=4'\n"

Expand Down Expand Up @@ -126,7 +122,7 @@ def initialize(self) -> Testdrive:
> COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
"""

return Testdrive(schemas() + cluster() + dedent(cmds))
return Testdrive(KAFKA_SCHEMA + cluster() + dedent(cmds))

def manipulate(self) -> list[Testdrive]:
cmds = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import re
from textwrap import dedent

from materialize.checks.actions import Testdrive
Expand Down Expand Up @@ -384,7 +383,3 @@ def validate(self) -> Testdrive:
"""
)
)


def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD


def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
from materialize.checks.common import KAFKA_SCHEMA


@externally_idempotent(False)
Expand All @@ -23,7 +19,7 @@ class MultiplePartitions(Check):

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
KAFKA_SCHEMA
+ dedent(
"""
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
Expand All @@ -49,7 +45,7 @@ def initialize(self) -> Testdrive:

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
Testdrive(KAFKA_SCHEMA + dedent(s))
for s in [
"""
# ingest B-key entries
Expand Down
5 changes: 0 additions & 5 deletions misc/python/materialize/checks/all_checks/mysql_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import re
from random import Random
from textwrap import dedent
from typing import Any
Expand Down Expand Up @@ -590,7 +589,3 @@ def validate(self) -> Testdrive:
"""
)
)


def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
5 changes: 0 additions & 5 deletions misc/python/materialize/checks/all_checks/pg_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import re
from random import Random
from textwrap import dedent
from typing import Any
Expand Down Expand Up @@ -382,7 +381,3 @@ def validate(self) -> Testdrive:
"""
)
)


def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
8 changes: 2 additions & 6 deletions misc/python/materialize/checks/all_checks/ranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,13 @@

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD


def schema() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
from materialize.checks.common import KAFKA_SCHEMA


class Range(Check):
def initialize(self) -> Testdrive:
return Testdrive(
schema()
KAFKA_SCHEMA
+ dedent(
"""
> CREATE TABLE range_table (
Expand Down
Loading