diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 91e623c7aff1..c637b3937fcb 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -106,6 +106,7 @@ def create_project( branch_name: str | None = None, branch_role_name: str | None = None, branch_database_name: str | None = None, + project_settings: dict[str, Any] | None = None, ) -> dict[str, Any]: data: dict[str, Any] = { "project": { @@ -122,6 +123,8 @@ def create_project( data["project"]["branch"]["role_name"] = branch_role_name if branch_database_name: data["project"]["branch"]["database_name"] = branch_database_name + if project_settings: + data["project"]["settings"] = project_settings resp = self.__request( "POST", diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py index 3d369683693f..fc9a8d1fa56b 100644 --- a/test_runner/random_ops/test_random_ops.py +++ b/test_runner/random_ops/test_random_ops.py @@ -91,7 +91,13 @@ class NeonBranch: is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call """ - def __init__(self, project, branch: dict[str, Any], is_reset=False): + def __init__( + self, + project, + branch: dict[str, Any], + is_reset=False, + primary_branch: NeonBranch | None = None, + ): self.id: str = branch["branch"]["id"] self.desc = branch self.name: str | None = None @@ -140,12 +146,36 @@ def __init__(self, project, branch: dict[str, Any], is_reset=False): "PGPASSWORD": self.connection_parameters["password"], "PGSSLMODE": "require", } + self.replicas: dict[str, NeonBranch] = {} + self.primary_branch: NeonBranch | None = primary_branch + if primary_branch: + if not self.connection_parameters: + raise ValueError( + "connection_parameters is required when primary_branch is specified" + ) + self.project.replicas[self.id] = self + primary_branch.replicas[self.id] = self + with psycopg2.connect(primary_branch.connstr()) as conn: + with conn.cursor() as cur: + cur.execute(f"CREATE PUBLICATION {self.id} FOR ALL TABLES") + conn.commit() + with psycopg2.connect(self.connstr()) as conn: + with conn.cursor() as cur: + cur.execute( + f"CREATE SUBSCRIPTION {self.id} CONNECTION '{primary_branch.connstr()}' PUBLICATION {self.id}" + ) + conn.commit() def __str__(self): """ Prints the branch's information with all the predecessors """ - return f"{self.id}{f'({self.name})' if self.name and self.name != self.id else ''}{f'(restored_from: {self.restored_from})' if self.restored_from else ''}, parent: {self.parent}" + name = f"({self.name})" if self.name and self.name != self.id else "" + restored_from = f"(restored_from: {self.restored_from})" if self.restored_from else "" + ancestor = ( + f" <- {self.primary_branch}" if self.primary_branch else f", parent: {self.parent}" + ) + return f"{self.id}{name}{restored_from}{ancestor}" def random_time(self) -> datetime: min_time = max( @@ -157,8 +187,10 @@ def random_time(self) -> datetime: log.info("min_time: %s, max_time: %s", min_time, max_time) return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0) - def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None: - return self.project.create_branch(self.id, parent_timestamp) + def create_child_branch( + self, parent_timestamp: datetime | None = None, primary_branch: NeonBranch | None = None + ) -> NeonBranch | None: + return self.project.create_branch(self.id, parent_timestamp, primary_branch=primary_branch) def create_ro_endpoint(self) -> NeonEndpoint | None: if not self.project.check_limit_endpoints(): @@ -249,6 +281,19 @@ def restore( ep.start_benchmark() return res + def create_logical_replica(self) -> NeonBranch | None: + if self.primary_branch is not None: + raise RuntimeError("The primary branch cannot be a logical replica") + if self.id in self.project.reset_branches: + raise RuntimeError("Reset branch cannot be a primary branch") + replica = self.create_child_branch(primary_branch=self) + return replica + + def connstr(self): + if self.connection_parameters is None: + raise RuntimeError("Connection parameters are not defined") + return " ".join([f"{key}={value}" for key, value in self.connection_parameters.items()]) + class NeonProject: """ @@ -260,7 +305,9 @@ def __init__(self, neon_api: NeonAPI, pg_bin: PgBin, pg_version: PgVersion): self.neon_api = neon_api self.pg_bin = pg_bin proj = self.neon_api.create_project( - pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}" + pg_version, + f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}", + project_settings={"enable_logical_replication": True}, ) self.id: str = proj["project"]["id"] self.name: str = proj["project"]["name"] @@ -288,6 +335,7 @@ def __init__(self, neon_api: NeonAPI, pg_bin: PgBin, pg_version: PgVersion): self.min_time: datetime = datetime.now(UTC) self.snapshots: dict[str, NeonSnapshot] = {} self.snapshot_num: int = 0 + self.replicas: dict[str, NeonBranch] = {} def get_limits(self) -> dict[str, Any]: return self.neon_api.get_project_limits(self.id) @@ -319,6 +367,7 @@ def create_branch( parent_id: str | None = None, parent_timestamp: datetime | None = None, is_reset: bool = False, + primary_branch: NeonBranch | None = None, ) -> NeonBranch | None: self.wait() if not self.check_limit_branches(): @@ -331,7 +380,7 @@ def create_branch( branch_def = self.neon_api.create_branch( self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str ) - new_branch = NeonBranch(self, branch_def, is_reset) + new_branch = NeonBranch(self, branch_def, is_reset, primary_branch) self.wait() return new_branch @@ -351,6 +400,17 @@ def delete_branch(self, branch_id: str) -> None: if branch_id not in self.reset_branches: self.terminate_benchmark(branch_id) self.neon_api.delete_branch(self.id, branch_id) + primary_branch = self.branches[branch_id].primary_branch + if primary_branch is not None: + with psycopg2.connect(primary_branch.connstr()) as conn: + with conn.cursor() as cur: + cur.execute(f"DROP PUBLICATION {branch_id}") + conn.commit() + parent.replicas.pop(branch_id) + self.replicas.pop(branch_id) + else: + for replica in self.branches[branch_id].replicas.values(): + replica.delete() if len(parent.children) == 1 and parent.parent is not None: self.leaf_branches[parent.id] = parent parent.children.pop(branch_id) @@ -372,7 +432,11 @@ def get_random_leaf_branch(self) -> NeonBranch | None: return target def get_random_parent_branch(self) -> NeonBranch: - return self.branches[random.choice(list(set(self.branches.keys()) - self.reset_branches))] + return self.branches[ + random.choice( + list(set(self.branches.keys()) - self.reset_branches - set(self.replicas.keys())) + ) + ] def gen_branch_name(self) -> str: self.branch_num += 1 @@ -658,6 +722,14 @@ def do_action(project: NeonProject, action: str) -> bool: return False snapshot_to_delete.delete() log.info("Deleted snapshot %s", snapshot_to_delete) + elif action == "create_logical_replica": + primary: NeonBranch | None = project.get_random_parent_branch() + if primary is None: + return False + replica: NeonBranch | None = primary.create_logical_replica() + if replica is None: + return False + log.info("Created logical replica %s", replica) else: raise ValueError(f"The action {action} is unknown") return True