|
32 | 32 | class ProcessingJobManager(object): |
33 | 33 | """Manages the lifecycle of a Spark job.""" |
34 | 34 |
|
| 35 | + _bootstrapping_timeout = 600.0 # all hosts should report as ready within this timeout. |
| 36 | + _wait_for_primary_timeout = 600.0 # then, all workers ask the primary if it's up within this timeout. |
| 37 | + |
35 | 38 | def __init__( |
36 | 39 | self, |
37 | 40 | resource_config: Dict[str, Any] = None, # type: ignore |
@@ -136,7 +139,11 @@ def all_hosts_have_bootstrapped() -> bool: |
136 | 139 | has_bootstrapped = [message.status == Status.WAITING for message in host_statuses.values()] |
137 | 140 | return all(has_bootstrapped) |
138 | 141 |
|
139 | | - self.waiter.wait_for(predicate_fn=all_hosts_have_bootstrapped, timeout=180.0, period=5.0) |
| 142 | + self.waiter.wait_for( |
| 143 | + predicate_fn=all_hosts_have_bootstrapped, |
| 144 | + timeout=ProcessingJobManager._bootstrapping_timeout, |
| 145 | + period=5.0, |
| 146 | + ) |
140 | 147 |
|
141 | 148 | try: |
142 | 149 | subprocess.run(spark_submit_cmd, check=True, shell=True) |
@@ -172,7 +179,7 @@ def primary_is_down() -> bool: |
172 | 179 | return not primary_is_up() |
173 | 180 |
|
174 | 181 | self.logger.info("waiting for the primary to come up") |
175 | | - self.waiter.wait_for(primary_is_up, timeout=60.0, period=1.0) |
| 182 | + self.waiter.wait_for(primary_is_up, timeout=ProcessingJobManager._wait_for_primary_timeout, period=1.0) |
176 | 183 | self.logger.info("waiting for the primary to go down") |
177 | 184 | self.waiter.wait_for(primary_is_down, timeout=float("inf"), period=5.0) |
178 | 185 | self.logger.info("primary is down, worker now exiting") |
|
0 commit comments