diff --git a/py/server/tests/test_table_data_service.py b/py/server/tests/test_table_data_service.py index 359b7f51652..9c1aa34f726 100644 --- a/py/server/tests/test_table_data_service.py +++ b/py/server/tests/test_table_data_service.py @@ -53,6 +53,7 @@ def __init__(self, gen_pa_table: Generator[pa.Table, None, None], pt_schema: pa. self.partitions_size_subscriptions: Dict[TableLocationKey, bool] = {} self.existing_partitions_called: int = 0 self.partition_size_called: int = 0 + self.is_size_sub_failure_cb_called: bool = False def table_schema(self, table_key: TableKeyImpl, schema_cb: Callable[[pa.Schema, Optional[pa.Schema]], None], @@ -164,10 +165,12 @@ def _th_partition_size_changes(self, table_key: TableKeyImpl, table_location_key rbs.append(pa_table.to_batches()[0]) new_pa_table = pa.Table.from_batches(rbs) self.partitions[table_location_key] = new_pa_table - size_cb(new_pa_table.num_rows) if self.sub_partition_size_fail_test: failure_cb(Exception("table location size subscription failure")) + self.is_size_sub_failure_cb_called = True return + else: + size_cb(new_pa_table.num_rows) time.sleep(0.1) def subscribe_to_table_location_size(self, table_key: TableKeyImpl, @@ -357,9 +360,17 @@ def test_partition_size_sub_failure(self): data_service = TableDataService(backend) backend.sub_partition_size_fail_test = True table = data_service.make_table(TableKeyImpl("test"), refreshing=True) + table.coalesce() + # wait for the size subscription failure callback to be called for 5 seconds + for _ in range(50): + time.sleep(0.1) + if backend.is_size_sub_failure_cb_called: + break + else: + self.fail("size subscription failure callback was not called in 5s") + with self.assertRaises(Exception) as cm: - # failure_cb will be called in the background thread after 2 PUG cycles, 3 seconds timeout should be enough - self.wait_ticking_table_update(table, 600, 3) + self.wait_ticking_table_update(table, 600, 1) self.assertTrue(table.j_table.isFailed())