Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions test_runner/fixtures/neon_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def create_project(
branch_name: str | None = None,
branch_role_name: str | None = None,
branch_database_name: str | None = None,
project_settings: dict[str, Any] | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"project": {
Expand All @@ -122,6 +123,8 @@ def create_project(
data["project"]["branch"]["role_name"] = branch_role_name
if branch_database_name:
data["project"]["branch"]["database_name"] = branch_database_name
if project_settings:
data["project"]["settings"] = project_settings

resp = self.__request(
"POST",
Expand Down
86 changes: 79 additions & 7 deletions test_runner/random_ops/test_random_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ class NeonBranch:
is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call
"""

def __init__(self, project, branch: dict[str, Any], is_reset=False):
def __init__(
self,
project,
branch: dict[str, Any],
is_reset=False,
primary_branch: NeonBranch | None = None,
):
self.id: str = branch["branch"]["id"]
self.desc = branch
self.name: str | None = None
Expand Down Expand Up @@ -140,12 +146,36 @@ def __init__(self, project, branch: dict[str, Any], is_reset=False):
"PGPASSWORD": self.connection_parameters["password"],
"PGSSLMODE": "require",
}
self.replicas: dict[str, NeonBranch] = {}
self.primary_branch: NeonBranch | None = primary_branch
if primary_branch:
if not self.connection_parameters:
raise ValueError(
"connection_parameters is required when primary_branch is specified"
)
self.project.replicas[self.id] = self
primary_branch.replicas[self.id] = self
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"CREATE PUBLICATION {self.id} FOR ALL TABLES")
conn.commit()
with psycopg2.connect(self.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(
f"CREATE SUBSCRIPTION {self.id} CONNECTION '{primary_branch.connstr()}' PUBLICATION {self.id}"
)
conn.commit()

def __str__(self):
"""
Prints the branch's information with all the predecessors
"""
return f"{self.id}{f'({self.name})' if self.name and self.name != self.id else ''}{f'(restored_from: {self.restored_from})' if self.restored_from else ''}, parent: {self.parent}"
name = f"({self.name})" if self.name and self.name != self.id else ""
restored_from = f"(restored_from: {self.restored_from})" if self.restored_from else ""
ancestor = (
f" <- {self.primary_branch}" if self.primary_branch else f", parent: {self.parent}"
)
return f"{self.id}{name}{restored_from}{ancestor}"

def random_time(self) -> datetime:
min_time = max(
Expand All @@ -157,8 +187,10 @@ def random_time(self) -> datetime:
log.info("min_time: %s, max_time: %s", min_time, max_time)
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)

def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp)
def create_child_branch(
self, parent_timestamp: datetime | None = None, primary_branch: NeonBranch | None = None
) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp, primary_branch=primary_branch)

def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
Expand Down Expand Up @@ -249,6 +281,19 @@ def restore(
ep.start_benchmark()
return res

def create_logical_replica(self) -> NeonBranch | None:
if self.primary_branch is not None:
raise RuntimeError("The primary branch cannot be a logical replica")
if self.id in self.project.reset_branches:
raise RuntimeError("Reset branch cannot be a primary branch")
replica = self.create_child_branch(primary_branch=self)
return replica

def connstr(self):
if self.connection_parameters is None:
raise RuntimeError("Connection parameters are not defined")
return " ".join([f"{key}={value}" for key, value in self.connection_parameters.items()])


class NeonProject:
"""
Expand All @@ -260,7 +305,9 @@ def __init__(self, neon_api: NeonAPI, pg_bin: PgBin, pg_version: PgVersion):
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
pg_version,
f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
project_settings={"enable_logical_replication": True},
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]
Expand Down Expand Up @@ -288,6 +335,7 @@ def __init__(self, neon_api: NeonAPI, pg_bin: PgBin, pg_version: PgVersion):
self.min_time: datetime = datetime.now(UTC)
self.snapshots: dict[str, NeonSnapshot] = {}
self.snapshot_num: int = 0
self.replicas: dict[str, NeonBranch] = {}

def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
Expand Down Expand Up @@ -319,6 +367,7 @@ def create_branch(
parent_id: str | None = None,
parent_timestamp: datetime | None = None,
is_reset: bool = False,
primary_branch: NeonBranch | None = None,
) -> NeonBranch | None:
self.wait()
if not self.check_limit_branches():
Expand All @@ -331,7 +380,7 @@ def create_branch(
branch_def = self.neon_api.create_branch(
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
)
new_branch = NeonBranch(self, branch_def, is_reset)
new_branch = NeonBranch(self, branch_def, is_reset, primary_branch)
self.wait()
return new_branch

Expand All @@ -351,6 +400,17 @@ def delete_branch(self, branch_id: str) -> None:
if branch_id not in self.reset_branches:
self.terminate_benchmark(branch_id)
self.neon_api.delete_branch(self.id, branch_id)
primary_branch = self.branches[branch_id].primary_branch
if primary_branch is not None:
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"DROP PUBLICATION {branch_id}")
conn.commit()
parent.replicas.pop(branch_id)
self.replicas.pop(branch_id)
else:
for replica in self.branches[branch_id].replicas.values():
replica.delete()
if len(parent.children) == 1 and parent.parent is not None:
self.leaf_branches[parent.id] = parent
parent.children.pop(branch_id)
Expand All @@ -372,7 +432,11 @@ def get_random_leaf_branch(self) -> NeonBranch | None:
return target

def get_random_parent_branch(self) -> NeonBranch:
return self.branches[random.choice(list(set(self.branches.keys()) - self.reset_branches))]
return self.branches[
random.choice(
list(set(self.branches.keys()) - self.reset_branches - set(self.replicas.keys()))
)
]

def gen_branch_name(self) -> str:
self.branch_num += 1
Expand Down Expand Up @@ -658,6 +722,14 @@ def do_action(project: NeonProject, action: str) -> bool:
return False
snapshot_to_delete.delete()
log.info("Deleted snapshot %s", snapshot_to_delete)
elif action == "create_logical_replica":
primary: NeonBranch | None = project.get_random_parent_branch()
if primary is None:
return False
replica: NeonBranch | None = primary.create_logical_replica()
if replica is None:
return False
log.info("Created logical replica %s", replica)
else:
raise ValueError(f"The action {action} is unknown")
return True
Expand Down
Loading